Prerequisites

This chapter assumes you have completed the course onboarding module and have a working Python 3.11+ development environment with pip and venv configured. You should be comfortable writing async Python functions using async def and await, and have a basic understanding of HTTP request-response cycles. No prior experience with Server-Sent Events or streaming APIs is required—this chapter builds that knowledge from the ground up. Ensure you have active API keys for OpenAI, Google AI Studio, Anthropic, and Together.ai, as all four providers are used in the lab exercises.

Learning Goals

  1. Implement Server-Sent Events streaming over HTTP using FastAPI StreamingRespo...

    • Implement Server-Sent Events streaming over HTTP using FastAPI StreamingResponse withasync** generators** for token-by-token chat delivery that maintains persistent connections and delivers incremental content to browser and programmatic clients alike
    • Understand the W3C Server-Sent Events specification including the text/event-stream content type, the data:, event:, id:, and retry: field syntax, and the mandatory double-newline (\n\n) frame terminator that separates logical messages on the wire. You will learn why SSE was chosen over WebSockets for unidirectional LLM streaming — SSE rides on plain HTTP/1.1 or HTTP/2, requires zero upgrade handshake, works through corporate proxies that block WebSocket upgrades, and automatically reconnects via the browser's built-in EventSource API using the Last-Event-ID header. Senior engineers must understand the framing at byte level: each SSE frame is a sequence of field:value lines terminated by a single newline, and frames are separated by a blank line. Malformed frames — such as omitting the trailing \n\n or injecting bare newlines inside the data: value — silently break client parsers, causing tokens to merge or vanish without any error.
    • Build a FastAPI StreamingResponse endpoint backed by a Python async generator function that yields SSE-formatted strings. You will wire up the /v1/chat/stream route using StreamingResponse(generator(), media_type="text/event-stream") and set the Cache-Control: no-cache, Connection: keep-alive, and X-Accel-Buffering: no headers that prevent Nginx, CloudFront, and other reverse proxies from buffering the stream. The async generator pattern lets you await each chunk from the upstream LLM provider, format it as an SSE data: line carrying a JSON payload, and yield it immediately so that time-to-first-token stays under 200 ms for a warm model. You will implement a finally block inside the generator to detect when the client disconnects mid-stream — FastAPI raises a CancelledError — so you can close the upstream HTTP connection and avoid paying for tokens the user will never see.
    • Design a provider-agnostic SSE envelope schema that normalizes deltas from OpenAI, Gemini, Anthropic, and Together.ai into a single JSON structure with fields provider, model, delta_text, finish_reason, and usage. This uniform envelope means your frontend or downstream microservice parses one schema regardless of which LLM generated the token. You will implement the format_sse_frame(event_type: str, payload: dict) -> str helper that serializes the payload to compact JSON, prepends event: {event_type}\ndata: , appends \n\n, and handles edge cases like None deltas (which OpenAI sends on the final chunk) and empty strings (which Gemini may send between thinking-block boundaries). The helper also injects a monotonically increasing id: field so that clients can resume after a dropped connection.
    • Handle backpressure and error propagation within the async generator so that slow consumers do not cause unbounded memory growth. You will learn to set an asyncio.Queue(maxsize=64) between the provider-fetch task and the SSE-yield loop, apply a configurable timeout of 30 seconds per chunk using asyncio.wait_for, and emit a structured event: error\ndata: {"code": "timeout", ...}\n\n frame before closing the stream gracefully. This pattern prevents a single stalled client from holding open a database connection or provider socket indefinitely — a critical concern at scale where thousands of concurrent streams share a fixed pool of upstream HTTP connections.
  2. Integrate the OpenAI Chat Completions API in streaming mode and transform Cha...

    • Integrate the OpenAI Chat Completions API in streaming mode and transform ChatCompletionChunk deltas into unified SSE framesso that GPT-4o and GPT-4o-mini responses flow token-by-token through your FastAPI endpoint with correct finish-reason semantics
    • Configure the openai.AsyncOpenAI client with connection pooling, retry logic, and timeout settings appropriate for streaming. You will instantiate the client with timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0) and max_retries=2 to handle transient 429 and 500 errors from the OpenAI API without silently dropping the stream. The read timeout is set to 120 seconds because long completions with max_tokens=4096 can take over a minute on heavily loaded endpoints. You will also pass http_client=httpx.AsyncHTTPClient(limits=httpx.PoolLimits(max_connections=100, max_keepalive_connections=20)) to cap the connection pool size and prevent file-descriptor exhaustion under load. Understanding these transport-layer settings is what separates a production integration from a demo — default timeouts of 10 seconds will cause spurious failures on 30 % of streaming requests during peak traffic.
    • Call client.chat.completions.create(model="gpt-4o", messages=messages, stream=True, stream_options={"include_usage": True}) and iterate over the async iterator to extract chunk.choices[0].delta.content. Each ChatCompletionChunk object carries a choices list where delta.content is either a string token fragment or None. The finish_reason field is None on every chunk except the last, where it becomes "stop", "length", or "content_filter". When stream_options includes include_usage, the very final chunk also carries usage.prompt_tokens and usage.completion_tokens — this is the only reliable way to get token counts in streaming mode because partial chunks do not include usage data. You must guard against the case where choices is an empty list, which OpenAI sends as the initial chunk when the model is still loading; skipping this chunk prevents your SSE formatter from raising an IndexError.
    • Map each ChatCompletionChunk to the provider-agnostic envelope by extracting delta.content, finish_reason, model, and usage, then passing them to format_sse_frame("chunk", payload). You will handle the "content_filter" finish reason by emitting a user-facing error event rather than silently ending the stream, because clients need to distinguish between a completed response and a moderation cutoff. You will also implement "stop" detection to emit a final event: done\ndata: [DONE]\n\n sentinel — matching the OpenAI streaming convention — so that frontend EventSource listeners can close the connection cleanly without relying on a TCP-level disconnect signal, which can be delayed by up to 30 seconds behind load balancers.
    • Implement function-calling and tool-use awareness within the streaming loop so that delta.tool_calls chunks are buffered, reassembled, and emitted as a single event: tool_call frame once the function name and arguments JSON are complete. Streaming tool calls arrive as incremental fragments spread across multiple chunks — delta.tool_calls[0].function.name appears in one chunk, then delta.tool_calls[0].function.arguments is split across many subsequent chunks. You will accumulate these fragments in a dictionary keyed by tool_calls[index].index, detect completion when the next chunk's finish_reason is "tool_calls", and serialize the assembled tool call into the SSE envelope. This is essential for agentic architectures where the frontend must display a "calling tool X" indicator in real time.
  3. Configure Gemini 2.5 Flash streaming via google.genai.Client with thinking_co...

    • Configure Gemini 2.5 Flash streaming via google.genai.Client with thinking_config to toggle extended reasoning on or off per requestand parse interleaved thinking and response parts from the streaming output
    • Initialize the google.genai.Client with your API key and understand the difference between the generate_content and stream_generate_content methods. The synchronous generate_content returns a single GenerateContentResponse object after the model finishes, whereas stream_generate_content returns an iterator of GenerateContentResponse chunks where each chunk's candidates[0].content.parts list may contain zero, one, or multiple Part objects. You will configure the client with http_options={"api_version": "v1alpha"} to access the latest Gemini 2.5 Flash features including native thinking support. Gemini's streaming differs from OpenAI's in a fundamental way: instead of flat delta.content strings, Gemini returns structured Part objects with a thought boolean field that indicates whether the text is internal chain-of-thought reasoning or the final user-facing answer — your streaming pipeline must handle both part types correctly.
    • Control the Gemini 2.5 Flash reasoning budget using thinking_config=types.ThinkingConfig(thinking_budget=N) where N ranges from 0 (thinking disabled) to 24576 tokens (maximum reasoning depth). Setting thinking_budget=0 forces the model to skip internal reasoning entirely, producing faster but potentially less accurate responses — ideal for simple chat completions where latency matters more than depth. Setting values between 1024 and 8192 provides a practical trade-off for most production use cases. You will expose this as a request parameter thinking_budget: Optional[int] = None on your FastAPI endpoint, defaulting to None which lets the model decide autonomously. Understanding thinking budget is critical because it directly controls cost and latency: each thinking token consumes the same billing units as output tokens, and a 10000-token thinking budget can add 3-8 seconds of latency and double the per-request cost compared to thinking-disabled mode.
    • Parse the streaming response by iterating over chunks and inspecting each Part object's thought attribute. When part.thought is True, the text represents internal reasoning that you may choose to forward to the client as event: thinking frames for transparency or suppress entirely for a cleaner UX. When part.thought is False or absent, the text is the user-facing answer and should be emitted as a standard event: chunk frame. You will handle the transition boundary where the model switches from thinking to answering — this is not explicitly signaled in the stream, so you must track state with a boolean in_thinking_phase flag and emit an event: thinking_done marker when the first non-thinking part arrives. This state tracking prevents the frontend from mixing reasoning tokens into the visible answer, which would confuse users who did not opt into seeing the model's thought process.
    • Implement error handling specific to Gemini streaming, including google.api_core.exceptions.ResourceExhausted for rate limits, InvalidArgument for malformed thinking configs, and the silent stream-termination behavior where Gemini closes the iterator without sending a finish reason when content is blocked by safety filters. You will wrap the iteration in a try/except block that catches StopIteration cleanly, checks whether any content was actually received, and emits an appropriate event: error frame if the stream ended prematurely. Unlike OpenAI, Gemini does not send a finish_reason field on stream chunks — you infer completion by the iterator exhausting naturally — so your sentinel event: done must be emitted outside the loop unconditionally.
  4. Integrate Anthropic Claude Messages API streaming and convert MessageStreamEv...

    • Integrate Anthropic Claude Messages API streaming and convert MessageStreamEvent objects into provider-normalized SSE framesso that Claude 3.5 Sonnet and Claude 4 responses are delivered with the same envelope schema as other providers
    • Initialize the anthropic.AsyncAnthropic client and understand Claude's unique streaming event model, which differs substantially from OpenAI's flat-delta approach. Anthropic's Messages API produces a sequence of typed events: message_start carries the initial Message object with id, model, and usage.input_tokens; content_block_start signals the beginning of a new content block (text or tool_use); content_block_delta delivers incremental text via delta.text or tool input via delta.partial_json; content_block_stop closes the block; and message_delta carries the final stop_reason and usage.output_tokens. You will call client.messages.create(model="claude-sonnet-4-20250514", max_tokens=4096, messages=messages, stream=True) and iterate over the returned AsyncStream object. Each event is a strongly-typed Python dataclass — not a raw dictionary — so you access fields via attribute syntax (event.delta.text) rather than dictionary subscript, and AttributeError replaces KeyError as the primary failure mode when handling unexpected event shapes.
    • Map each Anthropic streaming event type to the appropriate SSE frame in your provider-agnostic envelope. The content_block_delta events with type="text_delta" map directly to event: chunk frames carrying delta_text. The message_start event maps to an event: metadata frame carrying the message ID and input token count. The message_delta event at the end maps to event: done carrying the stop_reason (which can be "end_turn", "max_tokens", "stop_sequence", or "tool_use") and the output token count. You will normalize Anthropic's stop_reason values to the common schema by mapping "end_turn""stop", "max_tokens""length", and "stop_sequence""stop", preserving parity with OpenAI's finish_reason vocabulary so that your frontend handles all providers with a single code path.
    • Handle Claude's extended thinking (also called "thinking blocks") in the streaming response by detecting content_block_start events where content_block.type == "thinking". When extended thinking is enabled via thinking={"type": "enabled", "budget_tokens": 10000} in the request, Claude emits thinking content blocks before the answer blocks. Each thinking block's content_block_delta carries delta.thinking (not delta.text), and you must route these to event: thinking SSE frames while tracking block indices to correctly associate deltas with their parent block. The block-indexed architecture means you may receive interleaved deltas for multiple content blocks — thinking block at index 0 and text block at index 1 — so you must maintain a blocks: dict[int, str] accumulator keyed by block index. This is more complex than Gemini's flat thought boolean because Anthropic can produce multiple sequential thinking blocks separated by redacted segments that carry type="redacted_thinking", which you should acknowledge in the stream but never attempt to decode.
    • Implement robust retry logic for Anthropic-specific failure modes including anthropic.RateLimitError (HTTP 429), anthropic.APIStatusError with status 529 (overloaded), and the less-documented anthropic.APIConnectionError that occurs when Anthropic's edge servers reset the connection mid-stream during capacity shifts. For 529 errors, Anthropic's documentation recommends exponential backoff starting at 2 seconds with a maximum of 60 seconds, which differs from OpenAI's recommended 1-second base. You will implement a @retry(max_attempts=3, backoff_base=2.0, retryable=(RateLimitError, APIStatusError)) decorator that only retries before the first token is emitted — once streaming has begun, retrying would cause duplicate tokens, so you must instead emit an event: error frame and close the stream. This "retry-before-first-token" pattern is a critical production concern that most tutorials ignore but that prevents garbled output in real deployments.
  5. Build a Llama 4 Maverick streaming adapter using OpenAI SDK with Together.ai ...

    • Build a Llama 4 Maverick streaming adapter using OpenAI SDK with Together.ai base_urlfor open-weight model access that runs through the same SSE normalization pipeline as proprietary providers
    • Configure the openai.AsyncOpenAI client to point at Together.ai's inference API by setting base_url="https://api.together.xyz/v1" and api_key=TOGETHER_API_KEY. This works because Together.ai implements the OpenAI Chat Completions API specification, so the same client.chat.completions.create method, the same ChatCompletionChunk response objects, and the same streaming iterator protocol apply. You will instantiate a separate client instance — not reuse the OpenAI-configured one — to keep API keys isolated and allow independent timeout tuning. Together.ai's infrastructure has different latency characteristics than OpenAI's: time-to-first-token for Llama 4 Maverick averages 400-800 ms (versus 150-300 ms for GPT-4o) because the model runs on distributed GPU clusters with pipeline parallelism, so you will set read timeout to 180 seconds to accommodate longer generation times on the 400B-parameter model without false timeouts.
    • Call client.chat.completions.create(model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", messages=messages, stream=True, max_tokens=4096) and iterate over the async stream exactly as you would for OpenAI. The key difference is that Together.ai may not support stream_options={"include_usage": True}, so you must implement client-side token counting as a fallback using tiktoken with the cl100k_base encoding as an approximation (Llama 4 uses a different tokenizer, but cl100k provides a reasonable estimate within 10-15 % for cost monitoring). You will also handle Together.ai-specific error responses that arrive as JSON error objects inside the stream rather than HTTP status codes — when the model is overloaded, Together.ai sometimes returns {"error": {"message": "Model is at capacity", "type": "server_error"}} as a stream chunk, which your iterator must detect and convert to an event: error SSE frame rather than forwarding as malformed content.
    • Implement a provider abstraction layer that selects the correct client, model identifier, and configuration based on a provider request parameter. You will define a ProviderConfig dataclass with fields client: AsyncOpenAI | genai.Client | AsyncAnthropic, model: str, supports_thinking: bool, supports_tool_streaming: bool, and timeout: httpx.Timeout, then register four instances in a PROVIDERS: dict[str, ProviderConfig] registry. The streaming endpoint reads provider from the request body, looks up the config, and dispatches to the appropriate async generator — stream_openai, stream_gemini, stream_anthropic, or stream_together — each of which yields the same SSEFrame dataclass. This registry pattern makes adding a sixth provider (such as Mistral or Cohere) a single-file change: define the config, write the generator, register it. The abstraction avoids a brittle if/elif chain and enables per-provider feature flags like supports_thinking that the endpoint uses to validate request parameters before starting the stream.
    • Test the complete multi-provider streaming pipeline by writing an integration test that sends identical prompts to all four providers and asserts that every stream produces at least one event: chunk frame with non-empty delta_text, ends with an event: done frame, and completes within the provider-specific timeout. You will use httpx.AsyncClient with app=app to call the FastAPI endpoint directly (no network hop), collect frames by splitting the response body on \n\n, parse each frame's event: and data: lines, and deserialize the JSON payload. The test also validates that the provider and model fields in every frame match the request, catching accidental cross-contamination bugs where a Gemini response leaks into an OpenAI stream due to shared mutable state. This kind of end-to-end streaming test is rare in tutorials but essential in production: it catches encoding errors, missing \n\n terminators, and malformed JSON that unit tests on the generator function alone would miss.

Key Terminology

Server-Sent Events (SSE)
A W3C-standard protocol that enables a server to push real-time updates to a client over a single, long-lived HTTP connection using the `text/event-stream` content type.
text/event-stream
The MIME content type required in the HTTP `Content-Type` header to signal that the response body conforms to the SSE wire format of newline-delimited `data:` frames.
SSE Frame
A single message unit in the Server-Sent Events protocol, structured as one or more `field: value` lines (typically **data: **) followed by a blank line (`\n\n`) that signals the end of that event.
StreamingResponse
A FastAPI/Starlette response class that accepts an async generator (or sync iterator) and writes each yielded chunk to the client incrementally, keeping the HTTP connection open until the generator is exhausted or the client disconnects.
Async Generator
A Python function declared with **async def** and containing **yield** expressions, allowing it to produce a sequence of values asynchronously — used by FastAPI's StreamingResponse to emit SSE frames without blocking the event loop.
Client Disconnect Detection
The mechanism by which a FastAPI streaming endpoint detects that the downstream client has closed the connection, typically caught as an **asyncio.CancelledError** inside the async generator, enabling the server to stop LLM inference and release resources.
ChatCompletionChunk
The OpenAI SDK object returned during a **stream=True** chat completion request, where each chunk contains a **choices** array with a **delta** field holding the incremental token content rather than the full accumulated message.
Delta Object
The partial-message payload inside each OpenAI ChatCompletionChunk that carries only the newly generated token(s) in its **content** attribute, requiring the consumer to concatenate successive deltas to reconstruct the complete response.
finish_reason
A string field on each streaming chunk's choice that remains **None** during active generation and transitions to a terminal value such as `"stop"`, `"length"`, or `"tool_calls"` to indicate why the model ceased producing tokens.
thinking_config
A configuration object passed to Google's **genai.Client** when invoking Gemini 2.5 Flash, controlling whether the model's extended chain-of-thought reasoning is enabled and setting the **thinking_budget** (in tokens) that caps how much internal reasoning the model performs before generating visible output.
google.genai.Client
The unified Python client from the `google-genai` SDK that provides both synchronous and asynchronous methods — including **aio.models.generate_content_stream** — for streaming inference against Gemini models with server-side configuration like safety settings and thinking budgets.
Reasoning Budget
The maximum number of tokens allocated to Gemini 2.5 Flash's internal thinking phase via **thinking_config**, allowing developers to trade off response latency and cost against depth of reasoning on a per-request basis.
Together.ai
A cloud inference platform that hosts open-weight models such as Llama 4 Maverick behind an OpenAI-compatible REST API, enabling developers to reuse the OpenAI Python SDK by simply overriding the **base_url** and providing a Together API key.
OpenAI-Compatible API
A third-party inference endpoint that mirrors the OpenAI REST contract — including `/v1/chat/completions` with **stream=True** support — so that existing OpenAI SDK code works against alternative model providers like Together.ai with minimal configuration changes.
Llama 4 Maverick
Meta's open-weight mixture-of-experts language model accessible through third-party inference providers such as Together.ai, supporting streaming chat completions via the OpenAI-compatible protocol.
MessageStreamEvent
The Anthropic Python SDK's event object emitted during a streaming Claude Messages API call, with event types such as **content_block_delta** carrying incremental **TextDelta** payloads that must be extracted and reformatted into a provider-normalized SSE frame.
Provider-Normalized SSE Frame
A unified JSON payload structure (typically containing fields like **token**, **provider**, and **done**) that an application defines to abstract away vendor-specific streaming formats, allowing a single frontend EventSource handler to consume streams from OpenAI, Gemini, Anthropic, and Together.ai identically.
EventSource
The browser-native JavaScript API (and its Node.js equivalents) that connects to an SSE endpoint, automatically parses incoming `data:` lines, reconnects on dropped connections, and dispatches **onmessage** events — serving as the standard client-side consumer for streaming chat responses.

On This Page