Free lesson
Implement an Anthropic Claude streaming adapter
You will create an AnthropicStreamAdapter class in adapters/anthropic_stream.py wrapping anthropic.AsyncAnthropic. The adapter exposes stream_chat(messages, model, temperature) -> AsyncGenerator[StreamChunk, None] calling client.messages.stream() as an async context manager. You iterate over events handling message_start to extract message.id and message.model, content_block_delta to extract delta.text from TextDelta, and message_stop to capture usage with input_tokens and output_tokens. Each delta maps to a StreamChunk Pydantic model. You implement system prompt extraction separating system-role messages into the top-level system parameter. The adapter maps stop_reason values (end_turn, max_tokens, stop_sequence) to the unified FinishReason enum. Configuration reads ANTHROPIC_API_KEY via Settings and raises ProviderAuthError if missing.
~25 min read · Free to read — no subscription required.
Integrate Anthropic Claude Messages API streaming and convert MessageStreamEvent objects into provider-normalized SSE frames
Introduction
When you wire Anthropic's Messages API into the same streaming pipeline that already serves OpenAI and Gemini responses, the differences in event shape can quietly break your frontend — tokens stop appearing mid-response, the spinner spins forever, or the final usage payload arrives malformed and your billing telemetry under-counts every Claude call. The Anthropic SDK delivers a typed MessageStreamEvent taxonomy (message_start, content_block_delta, message_delta, message_stop) rather than the flat delta shape OpenAI sends, so a copy-paste of the OpenAI adapter does not work. By the end of this lesson you will be able to build an AnthropicStreamAdapter that consumes that taxonomy and reshapes it into the same StreamChunk and SSE frame contract your OpenAI and Gemini adapters already produce, so your FastAPI endpoint can serve Claude responses token-by-token without changing the frontend.
Key Terminology
- MessageStreamEvent: the discriminated-union event type yielded by
anthropic.AsyncAnthropic().messages.stream(); each event has atypefield (message_start,content_block_start,content_block_delta,content_block_stop,message_delta,message_stop) that determines its payload shape. - ContentBlockDelta: the streaming event whose
deltapayload carries either atext_delta(with atextfield) or aninput_json_delta(for tool-use); your adapter forwards onlytext_deltapayloads asStreamChunktokens. - StreamChunk: the provider-normalized dataclass your adapter yields (token, done, provider, optional
finish_reason, optionalinput_tokens/output_tokens) so the FastAPI SSE serializer treats Anthropic, OpenAI, and Gemini output identically.
Concepts
Anthropic's Messages API follows a different streaming contract than the OpenAI-compatible providers you integrated in earlier goals. Where OpenAI emits a flat sequence of ChatCompletionChunk objects with a single delta field, Anthropic delivers a typed event stream where each Server-Sent Event carries a distinct type discriminator — message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop — each with its own payload shape. You cannot simply swap base URLs the way you did with Together.ai; you need a dedicated adapter that consumes Anthropic's event taxonomy, extracts text from ContentBlockDelta events, and reshapes them into the same unified StreamChunk your frontend already understands (see Code Walkthrough).
The SDK's AsyncMessageStream yields Python objects that map one-to-one with the SSE event types documented in Anthropic's streaming specification. Your adapter handles each event gracefully — extracting tokens from content_block_delta, capturing stop_reason and usage from message_delta, and propagating errors — while preserving the StreamChunk contract that normalizes output across all four providers in your system.
Handling edge cases in production
Three edge cases require attention when deploying the Anthropic streaming adapter alongside the OpenAI, Gemini, and Together.ai adapters you built in earlier goals.
Multiple content blocks. Anthropic's API can return multiple content blocks in a single message—for example, a text block followed by a tool-use block when function calling is enabled. Your adapter's type guard (event.delta.type == "text_delta") silently ignores non-text deltas, which is correct for pure chat streaming. If you later add tool-use support, extend the adapter to yield a different StreamChunk variant for input_json_delta events.
Stop reason mapping. Anthropic uses "end_turn" where OpenAI uses "stop", and "max_tokens" is shared between both. Your provider-normalized StreamChunk carries the raw finish_reason string, so your frontend or API gateway should map these to a common enum. The value None appears briefly in message_start events if the model is still generating—your adapter avoids this by only reading stop_reason from the message_delta event, which always carries a definitive value.
Rate limiting and retries. The AsyncAnthropic client includes built-in retry logic with exponential backoff for 429 and 529 status codes. However, retries only apply to the initial connection—once streaming begins, a network interruption terminates the stream. Wrap your adapter call in a try/except block catching anthropic.APIConnectionError and yield an error StreamChunk so the frontend can display a reconnection prompt rather than silently hanging.
Code Walkthrough
Anthropic streaming event lifecycle
Before diving into implementation, study the full lifecycle of a single streaming request. Unlike OpenAI, which sends content in every chunk, Anthropic wraps content inside explicitly bounded content blocks. A single message can contain multiple content blocks (text, tool use, or thinking blocks when extended thinking is enabled), and each block has its own start/delta/stop cycle nested inside the outer message envelope.
- The Client calls
stream_chat(messages, model, temperature)on the Adapter, which forwards aPOST /v1/messagesrequest to the Anthropic API with streaming enabled. - The API responds with a
message_startevent (message metadata) followed bycontent_block_start(block index and type). - Inside the
loop, the API emits onecontent_block_deltaper token (delta.text); the Adapter yields a non-terminalStreamChunk(token, done=False)per delta. - After the last token, the API sends
content_block_stop, then a singlemessage_deltacarrying the definitivestop_reasonand cumulativeusagecounts, thenmessage_stop. - The Adapter yields one final
StreamChunk(token="", done=True, usage)to signal completion downstream.
The diagram reveals three layers of structure. The outer layer is the message envelope (message_start → message_stop). The middle layer consists of one or more content blocks (content_block_start → content_block_stop). The inner layer contains the actual token deltas (content_block_delta). Your adapter only needs to forward content_block_delta events of type text_delta to the client, but it must track the outer lifecycle events to correctly signal completion and capture usage metadata.
Building the AnthropicStreamAdapter
The core adapter class wraps anthropic.AsyncAnthropic and exposes a single stream_chat async generator method that matches the interface established by your OpenAI and Gemini adapters. The class accepts an API key at construction time and uses client.messages.stream() as an async context manager, which handles connection lifecycle and automatic cleanup if the client disconnects mid-stream. The following implementation shows the AnthropicStreamAdapter class with its init method for client instantiation and the stream_chat async generator that iterates over MessageStreamEvent objects, pattern-matches on event.type, extracts text from ContentBlockDelta payloads, and yields StreamChunk instances. Pay particular attention to how message_delta events are handled separately to capture stop_reason and usage before the final chunk.
Code snippet python
1import anthropic 2from dataclasses import dataclass 3from typing import AsyncGenerator, Optional 4 5@dataclass 6class StreamChunk: 7 token: str 8 done: bool 9 provider: str = "anthropic" 10 finish_reason: Optional[str] = None 11 input_tokens: Optional[int] = None 12 output_tokens: Optional[int] = None 13 14class AnthropicStreamAdapter: 15 def __init__(self, api_key: str, default_model: str = "claude-sonnet-4-20250514"): 16 self._client = anthropic.AsyncAnthropic(api_key=api_key) 17 self._default_model = default_model 18 19 async def stream_chat( 20 self, 21 messages: list[dict], 22 model: str | None = None, 23 temperature: float = 0.7, 24 max_tokens: int = 1024, 25 ) -> AsyncGenerator[StreamChunk, None]: 26 resolved_model = model or self._default_model 27 stop_reason = None 28 usage_output = None 29 30 async with self._client.messages.stream( 31 model=resolved_model, 32 messages=messages, 33 temperature=temperature, 34 max_tokens=max_tokens, 35 ) as stream: 36 async for event in stream: 37 if event.type == "content_block_delta": 38 if event.delta.type == "text_delta": 39 yield StreamChunk( 40 token=event.delta.text, 41 done=False, 42 ) 43 elif event.type == "message_delta": 44 stop_reason = event.delta.stop_reason 45 usage_output = event.usage.output_tokens 46 47 yield StreamChunk( 48 token="", 49 done=True, 50 finish_reason=stop_reason, 51 output_tokens=usage_output, 52 )
- Lines 1-2: Import the
anthropicSDK and Python'sdataclassdecorator; the SDK providesAsyncAnthropicfor non-blocking HTTP operations - Lines 5-12: Define
StreamChunkas the provider-normalized output dataclass withproviderdefaulting to"anthropic"and optional fields forfinish_reasonand token usage that only populate on the final chunk - Lines 15-17: Initialize
AnthropicStreamAdapterwith an API key and a default model string;AsyncAnthropicmanages connection pooling and retry logic internally - Lines 19-25: The
stream_chatmethod signature matches the interface used by OpenAI and Gemini adapters—acceptingmessages, optionalmodeloverride,temperature, andmax_tokens—returning anAsyncGeneratorthat yieldsStreamChunkobjects - Lines 26-27: Two local variables track metadata from the
message_deltaevent, which arrives beforemessage_stopbut after all content deltas; capturing them here lets you attach them to the final sentinel chunk - Lines 29-34: The
client.messages.stream()asynccontext manager opens the HTTP connection and begins buffering events; usingasyncwith ensures the connection closes cleanly even if the consumer breaks out of the loop early (critical for FastAPI client disconnect detection) - Lines 35-41: The inner loop iterates over
MessageStreamEventobjects; the first branch checks forcontent_block_deltawith a nestedtext_deltatype guard, yielding each text token wrapped in a non-terminalStreamChunk - Lines 42-44: The
message_deltabranch capturesstop_reason(values like"end_turn","max_tokens", or"stop_sequence") and the cumulative output token count fromevent.usage - Lines 46-51: After the stream context manager exits,
yielda finalStreamChunkwithdone=True, propagating the capturedstop_reasonand usage data to the downstream SSE serializer
Converting StreamChunks to W3C-compliant SSE frames
Your FastAPI endpoint needs to serialize each StreamChunk into the text/event-stream format that the browser's EventSource API or your frontend fetch-based reader expects. The SSE specification requires each frame to contain a data: field terminated by two newline characters. The following function, format_sse_frame, takes a StreamChunk and produces a properly framed SSE string using JSON serialization. It integrates directly with FastAPI's StreamingResponse by being called inside the async generator that the response object consumes. This function also handles the terminal [DONE] sentinel that mirrors the OpenAI streaming convention your frontend already parses.
Code snippet python
1import json 2from fastapi import FastAPI, Request 3from fastapi.responses import StreamingResponse 4 5app = FastAPI() 6 7def format_sse_frame(chunk: StreamChunk) -> str: 8 if chunk.done: 9 payload = {"token": "", "done": True, 10 "finish_reason": chunk.finish_reason, 11 "usage": {"output_tokens": chunk.output_tokens}} 12 return f"data: {json.dumps(payload)}\n\ndata: [DONE]\n\n" 13 return f"data: {json.dumps({'token': chunk.token, 'done': False})}\n\n" 14 15async def anthropic_event_generator( 16 request: Request, 17 adapter: AnthropicStreamAdapter, 18 messages: list[dict], 19 model: str, 20) -> AsyncGenerator[str, None]: 21 async for chunk in adapter.stream_chat(messages, model=model): 22 if await request.is_disconnected(): 23 break 24 yield format_sse_frame(chunk) 25 26@app.post("/v1/chat/stream/anthropic") 27async def stream_anthropic(request: Request): 28 body = await request.json() 29 adapter = AnthropicStreamAdapter(api_key="sk-ant-...") 30 return StreamingResponse( 31 anthropic_event_generator( 32 request, adapter, body["messages"], body.get("model") 33 ), 34 media_type="text/event-stream", 35 headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, 36 )
- Lines 1-3: Import
jsonfor payload serialization,FastAPIandRequestfor the application framework, andStreamingResponsefor chunked HTTP delivery - Lines 7-13:
format_sse_framecheckschunk.doneto decide between a terminal frame (which includesfinish_reason, usage metadata, and the[DONE]sentinel) and a regular token frame; every frame ends with\n\nper the W3C SSE specification, ensuring the browser's event parser correctly delineates messages - Lines 15-24:
anthropic_event_generatoris theasyncgenerator that FastAPI'sStreamingResponseconsumes; it callsadapter.stream_chatto obtainStreamChunkobjects and checksrequest.is_disconnected()on each iteration, breaking the loop cleanly if the client closes the connection—this triggers theasyncwith cleanup in the adapter'sstream()context manager - Lines 26-36: The
/v1/chat/stream/anthropicendpoint parses the request body, instantiates the adapter, and returns aStreamingResponsewithmedia_type="text/event-stream"; theX-Accel-Buffering: noheader prevents Nginx reverse proxies from buffering the stream, which would defeat token-by-token delivery
Do's and Don'ts
Do's
- ✓Do yield the final
StreamChunk(token="", done=True, ...)outside theasync with self._client.messages.stream()block — theusage.output_tokensandstop_reasonyou accumulate frommessage_deltaare only complete once the context manager has flushed; yielding inside the block risks emitting the terminal chunk before Anthropic has sentmessage_stop, which corrupts downstream billing telemetry and leaves the SSE stream without a cleandone=Trueframe. - ✓Do guard
content_block_deltahandling withevent.delta.type == "text_delta"before forwardingevent.delta.textto aStreamChunk— the samecontent_block_deltaevent type carriesinput_json_deltapayloads for tool-use blocks, and passing those raw JSON fragments downstream as chat tokens will corrupt the transcript visible to the frontend. - ✓Do read
stop_reasonfromevent.delta.stop_reasonand output token count fromevent.usage.output_tokenson themessage_deltaevent, storing both in local variables before the context manager exits — these fields exist only onmessage_delta, not onmessage_stop, so reading them at the wrong lifecycle point yieldsNonefinish reasons and zero-count usage in everyStreamChunkyour downstream enum mapping and billing hooks receive.
Don'ts
- ✗Don't copy the OpenAI adapter's flat
delta.contentextraction pattern intoAnthropicStreamAdapter— Anthropic wraps token text three levels deep (content_block_delta→delta→text), with an explicitevent.delta.type == "text_delta"discriminator; a copy-paste that reads a top-leveldeltaattribute will raiseAttributeErroron everymessage_startandcontent_block_startevent that doesn't carry that field. - ✗Don't yield a
StreamChunkformessage_start,content_block_start,content_block_stop, ormessage_stopevents — those events form the outer lifecycle envelope and carry no token text; forwarding them as chunks sends empty orNonetoken strings to the frontend, causing the spinner to stall and injecting blank SSE frames that break SSE parsers expecting adata:payload with content. - ✗Don't normalize
finish_reasonto"stop"— map Anthropic's"end_turn","max_tokens", and"stop_sequence"explicitly — Anthropic never emits the OpenAI string"stop", so a direct equality check or a fallback that passes the raw value through will silently mismatch every downstream enum comparison and route"end_turn"responses to an unknown-finish-reason error branch in your provider-normalized SSE frame handler.
Keep going with GenAI Application Engineering
Create a free account to track your progress and open this lesson in the full learning view. Subscribe to unlock the entire path — every goal, the hands-on labs, quizzes, and your verifiable skill graph — from . Cancel anytime.