Free lesson

Implement an Anthropic Claude streaming adapter

You will create an AnthropicStreamAdapter class in adapters/anthropic_stream.py wrapping anthropic.AsyncAnthropic. The adapter exposes stream_chat(messages, model, temperature) -> AsyncGenerator[StreamChunk, None] calling client.messages.stream() as an async context manager. You iterate over events handling message_start to extract message.id and message.model, content_block_delta to extract delta.text from TextDelta, and message_stop to capture usage with input_tokens and output_tokens. Each delta maps to a StreamChunk Pydantic model. You implement system prompt extraction separating system-role messages into the top-level system parameter. The adapter maps stop_reason values (end_turn, max_tokens, stop_sequence) to the unified FinishReason enum. Configuration reads ANTHROPIC_API_KEY via Settings and raises ProviderAuthError if missing.

~25 min read · Free to read — no subscription required.

Integrate Anthropic Claude Messages API streaming and convert MessageStreamEvent objects into provider-normalized SSE frames

Introduction

When you wire Anthropic's Messages API into the same streaming pipeline that already serves OpenAI and Gemini responses, the differences in event shape can quietly break your frontend — tokens stop appearing mid-response, the spinner spins forever, or the final usage payload arrives malformed and your billing telemetry under-counts every Claude call. The Anthropic SDK delivers a typed MessageStreamEvent taxonomy (message_start, content_block_delta, message_delta, message_stop) rather than the flat delta shape OpenAI sends, so a copy-paste of the OpenAI adapter does not work. By the end of this lesson you will be able to build an AnthropicStreamAdapter that consumes that taxonomy and reshapes it into the same StreamChunk and SSE frame contract your OpenAI and Gemini adapters already produce, so your FastAPI endpoint can serve Claude responses token-by-token without changing the frontend.

Key Terminology

  • MessageStreamEvent: the discriminated-union event type yielded by anthropic.AsyncAnthropic().messages.stream(); each event has a type field (message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop) that determines its payload shape.
  • ContentBlockDelta: the streaming event whose delta payload carries either a text_delta (with a text field) or an input_json_delta (for tool-use); your adapter forwards only text_delta payloads as StreamChunk tokens.
  • StreamChunk: the provider-normalized dataclass your adapter yields (token, done, provider, optional finish_reason, optional input_tokens/output_tokens) so the FastAPI SSE serializer treats Anthropic, OpenAI, and Gemini output identically.

Concepts

Anthropic's Messages API follows a different streaming contract than the OpenAI-compatible providers you integrated in earlier goals. Where OpenAI emits a flat sequence of ChatCompletionChunk objects with a single delta field, Anthropic delivers a typed event stream where each Server-Sent Event carries a distinct type discriminator — message_start, content_block_start, content_block_delta, content_block_stop, message_delta, message_stop — each with its own payload shape. You cannot simply swap base URLs the way you did with Together.ai; you need a dedicated adapter that consumes Anthropic's event taxonomy, extracts text from ContentBlockDelta events, and reshapes them into the same unified StreamChunk your frontend already understands (see Code Walkthrough).

The SDK's AsyncMessageStream yields Python objects that map one-to-one with the SSE event types documented in Anthropic's streaming specification. Your adapter handles each event gracefully — extracting tokens from content_block_delta, capturing stop_reason and usage from message_delta, and propagating errors — while preserving the StreamChunk contract that normalizes output across all four providers in your system.

Handling edge cases in production

Three edge cases require attention when deploying the Anthropic streaming adapter alongside the OpenAI, Gemini, and Together.ai adapters you built in earlier goals.

Multiple content blocks. Anthropic's API can return multiple content blocks in a single message—for example, a text block followed by a tool-use block when function calling is enabled. Your adapter's type guard (event.delta.type == "text_delta") silently ignores non-text deltas, which is correct for pure chat streaming. If you later add tool-use support, extend the adapter to yield a different StreamChunk variant for input_json_delta events.

Stop reason mapping. Anthropic uses "end_turn" where OpenAI uses "stop", and "max_tokens" is shared between both. Your provider-normalized StreamChunk carries the raw finish_reason string, so your frontend or API gateway should map these to a common enum. The value None appears briefly in message_start events if the model is still generating—your adapter avoids this by only reading stop_reason from the message_delta event, which always carries a definitive value.

Rate limiting and retries. The AsyncAnthropic client includes built-in retry logic with exponential backoff for 429 and 529 status codes. However, retries only apply to the initial connection—once streaming begins, a network interruption terminates the stream. Wrap your adapter call in a try/except block catching anthropic.APIConnectionError and yield an error StreamChunk so the frontend can display a reconnection prompt rather than silently hanging.

Code Walkthrough

Anthropic streaming event lifecycle

Before diving into implementation, study the full lifecycle of a single streaming request. Unlike OpenAI, which sends content in every chunk, Anthropic wraps content inside explicitly bounded content blocks. A single message can contain multiple content blocks (text, tool use, or thinking blocks when extended thinking is enabled), and each block has its own start/delta/stop cycle nested inside the outer message envelope.

Loading diagram...
  • The Client calls stream_chat(messages, model, temperature) on the Adapter, which forwards a POST /v1/messages request to the Anthropic API with streaming enabled.
  • The API responds with a message_start event (message metadata) followed by content_block_start (block index and type).
  • Inside the loop, the API emits one content_block_delta per token (delta.text); the Adapter yields a non-terminal StreamChunk(token, done=False) per delta.
  • After the last token, the API sends content_block_stop, then a single message_delta carrying the definitive stop_reason and cumulative usage counts, then message_stop.
  • The Adapter yields one final StreamChunk(token="", done=True, usage) to signal completion downstream.

The diagram reveals three layers of structure. The outer layer is the message envelope (message_startmessage_stop). The middle layer consists of one or more content blocks (content_block_startcontent_block_stop). The inner layer contains the actual token deltas (content_block_delta). Your adapter only needs to forward content_block_delta events of type text_delta to the client, but it must track the outer lifecycle events to correctly signal completion and capture usage metadata.

Building the AnthropicStreamAdapter

The core adapter class wraps anthropic.AsyncAnthropic and exposes a single stream_chat async generator method that matches the interface established by your OpenAI and Gemini adapters. The class accepts an API key at construction time and uses client.messages.stream() as an async context manager, which handles connection lifecycle and automatic cleanup if the client disconnects mid-stream. The following implementation shows the AnthropicStreamAdapter class with its init method for client instantiation and the stream_chat async generator that iterates over MessageStreamEvent objects, pattern-matches on event.type, extracts text from ContentBlockDelta payloads, and yields StreamChunk instances. Pay particular attention to how message_delta events are handled separately to capture stop_reason and usage before the final chunk.

Code snippet python
1import anthropic 2from dataclasses import dataclass 3from typing import AsyncGenerator, Optional 4 5@dataclass 6class StreamChunk: 7 token: str 8 done: bool 9 provider: str = "anthropic" 10 finish_reason: Optional[str] = None 11 input_tokens: Optional[int] = None 12 output_tokens: Optional[int] = None 13 14class AnthropicStreamAdapter: 15 def __init__(self, api_key: str, default_model: str = "claude-sonnet-4-20250514"): 16 self._client = anthropic.AsyncAnthropic(api_key=api_key) 17 self._default_model = default_model 18 19 async def stream_chat( 20 self, 21 messages: list[dict], 22 model: str | None = None, 23 temperature: float = 0.7, 24 max_tokens: int = 1024, 25 ) -> AsyncGenerator[StreamChunk, None]: 26 resolved_model = model or self._default_model 27 stop_reason = None 28 usage_output = None 29 30 async with self._client.messages.stream( 31 model=resolved_model, 32 messages=messages, 33 temperature=temperature, 34 max_tokens=max_tokens, 35 ) as stream: 36 async for event in stream: 37 if event.type == "content_block_delta": 38 if event.delta.type == "text_delta": 39 yield StreamChunk( 40 token=event.delta.text, 41 done=False, 42 ) 43 elif event.type == "message_delta": 44 stop_reason = event.delta.stop_reason 45 usage_output = event.usage.output_tokens 46 47 yield StreamChunk( 48 token="", 49 done=True, 50 finish_reason=stop_reason, 51 output_tokens=usage_output, 52 )
  • Lines 1-2: Import the anthropic SDK and Python's dataclass decorator; the SDK provides AsyncAnthropic for non-blocking HTTP operations
  • Lines 5-12: Define StreamChunk as the provider-normalized output dataclass with provider defaulting to "anthropic" and optional fields for finish_reason and token usage that only populate on the final chunk
  • Lines 15-17: Initialize AnthropicStreamAdapter with an API key and a default model string; AsyncAnthropic manages connection pooling and retry logic internally
  • Lines 19-25: The stream_chat method signature matches the interface used by OpenAI and Gemini adapters—accepting messages, optional model override, temperature, and max_tokens—returning an AsyncGenerator that yields StreamChunk objects
  • Lines 26-27: Two local variables track metadata from the message_delta event, which arrives before message_stop but after all content deltas; capturing them here lets you attach them to the final sentinel chunk
  • Lines 29-34: The client.messages.stream() async context manager opens the HTTP connection and begins buffering events; using async with ensures the connection closes cleanly even if the consumer breaks out of the loop early (critical for FastAPI client disconnect detection)
  • Lines 35-41: The inner loop iterates over MessageStreamEvent objects; the first branch checks for content_block_delta with a nested text_delta type guard, yielding each text token wrapped in a non-terminal StreamChunk
  • Lines 42-44: The message_delta branch captures stop_reason (values like "end_turn", "max_tokens", or "stop_sequence") and the cumulative output token count from event.usage
  • Lines 46-51: After the stream context manager exits, yield a final StreamChunk with done=True, propagating the captured stop_reason and usage data to the downstream SSE serializer

Converting StreamChunks to W3C-compliant SSE frames

Your FastAPI endpoint needs to serialize each StreamChunk into the text/event-stream format that the browser's EventSource API or your frontend fetch-based reader expects. The SSE specification requires each frame to contain a data: field terminated by two newline characters. The following function, format_sse_frame, takes a StreamChunk and produces a properly framed SSE string using JSON serialization. It integrates directly with FastAPI's StreamingResponse by being called inside the async generator that the response object consumes. This function also handles the terminal [DONE] sentinel that mirrors the OpenAI streaming convention your frontend already parses.

Code snippet python
1import json 2from fastapi import FastAPI, Request 3from fastapi.responses import StreamingResponse 4 5app = FastAPI() 6 7def format_sse_frame(chunk: StreamChunk) -> str: 8 if chunk.done: 9 payload = {"token": "", "done": True, 10 "finish_reason": chunk.finish_reason, 11 "usage": {"output_tokens": chunk.output_tokens}} 12 return f"data: {json.dumps(payload)}\n\ndata: [DONE]\n\n" 13 return f"data: {json.dumps({'token': chunk.token, 'done': False})}\n\n" 14 15async def anthropic_event_generator( 16 request: Request, 17 adapter: AnthropicStreamAdapter, 18 messages: list[dict], 19 model: str, 20) -> AsyncGenerator[str, None]: 21 async for chunk in adapter.stream_chat(messages, model=model): 22 if await request.is_disconnected(): 23 break 24 yield format_sse_frame(chunk) 25 26@app.post("/v1/chat/stream/anthropic") 27async def stream_anthropic(request: Request): 28 body = await request.json() 29 adapter = AnthropicStreamAdapter(api_key="sk-ant-...") 30 return StreamingResponse( 31 anthropic_event_generator( 32 request, adapter, body["messages"], body.get("model") 33 ), 34 media_type="text/event-stream", 35 headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, 36 )
  • Lines 1-3: Import json for payload serialization, FastAPI and Request for the application framework, and StreamingResponse for chunked HTTP delivery
  • Lines 7-13: format_sse_frame checks chunk.done to decide between a terminal frame (which includes finish_reason, usage metadata, and the [DONE] sentinel) and a regular token frame; every frame ends with \n\n per the W3C SSE specification, ensuring the browser's event parser correctly delineates messages
  • Lines 15-24: anthropic_event_generator is the async generator that FastAPI's StreamingResponse consumes; it calls adapter.stream_chat to obtain StreamChunk objects and checks request.is_disconnected() on each iteration, breaking the loop cleanly if the client closes the connection—this triggers the async with cleanup in the adapter's stream() context manager
  • Lines 26-36: The /v1/chat/stream/anthropic endpoint parses the request body, instantiates the adapter, and returns a StreamingResponse with media_type="text/event-stream"; the X-Accel-Buffering: no header prevents Nginx reverse proxies from buffering the stream, which would defeat token-by-token delivery

Do's and Don'ts

Do's

  1. Do yield the final StreamChunk(token="", done=True, ...) outside the async with self._client.messages.stream() block — the usage.output_tokens and stop_reason you accumulate from message_delta are only complete once the context manager has flushed; yielding inside the block risks emitting the terminal chunk before Anthropic has sent message_stop, which corrupts downstream billing telemetry and leaves the SSE stream without a clean done=True frame.
  2. Do guard content_block_delta handling with event.delta.type == "text_delta" before forwarding event.delta.text to a StreamChunk — the same content_block_delta event type carries input_json_delta payloads for tool-use blocks, and passing those raw JSON fragments downstream as chat tokens will corrupt the transcript visible to the frontend.
  3. Do read stop_reason from event.delta.stop_reason and output token count from event.usage.output_tokens on the message_delta event, storing both in local variables before the context manager exits — these fields exist only on message_delta, not on message_stop, so reading them at the wrong lifecycle point yields None finish reasons and zero-count usage in every StreamChunk your downstream enum mapping and billing hooks receive.

Don'ts

  1. Don't copy the OpenAI adapter's flat delta.content extraction pattern into AnthropicStreamAdapter — Anthropic wraps token text three levels deep (content_block_deltadeltatext), with an explicit event.delta.type == "text_delta" discriminator; a copy-paste that reads a top-level delta attribute will raise AttributeError on every message_start and content_block_start event that doesn't carry that field.
  2. Don't yield a StreamChunk for message_start, content_block_start, content_block_stop, or message_stop events — those events form the outer lifecycle envelope and carry no token text; forwarding them as chunks sends empty or None token strings to the frontend, causing the spinner to stall and injecting blank SSE frames that break SSE parsers expecting a data: payload with content.
  3. Don't normalize finish_reason to "stop" — map Anthropic's "end_turn", "max_tokens", and "stop_sequence" explicitly — Anthropic never emits the OpenAI string "stop", so a direct equality check or a fallback that passes the raw value through will silently mismatch every downstream enum comparison and route "end_turn" responses to an unknown-finish-reason error branch in your provider-normalized SSE frame handler.

Keep going with GenAI Application Engineering

Create a free account to track your progress and open this lesson in the full learning view. Subscribe to unlock the entire path — every goal, the hands-on labs, quizzes, and your verifiable skill graph — from . Cancel anytime.