Chapter 1

Chat Completion API with Streaming

event-stream content type with W3C-compliant framingFastAPI StreamingResponse with async generator functions and client disconnect detectionOpenAI ChatCompletion.create(stream=True) delta extraction and finish reason handlingGoogle genai.Client streaming with thinking config for Gemini 2.5 Flash reasoning budget controlTogether.ai OpenAI-compatible API for Llama 4 Maverick streaming inference

Learning Path

Hands-on Labs

Each objective has a coding lab that opens in VS Code in your browser

Objective 1

Build a FastAPI SSE streaming response endpoint that delivers chat tokens as text/event-stream frames.

Goal

You will build a FastAPI endpoint at POST /api/v1/chat/stream that accepts a ChatRequest Pydantic model containing messages: list[ChatMessage], provider: str, model: str, and temperature: float, returning a StreamingResponse with media_type='text/event-stream'. The endpoint uses an async generator stream_tokens() that yields SSE-formatted strings 'data: {json}\n\n' for each token and a final 'data: [DONE]\n\n' sentinel. You implement ChatRequest and ChatMessage Pydantic models with field validators for provider names and temperature ranges. You configure CORSMiddleware for browser EventSource clients and add GET /api/v1/chat/stream/health returning streaming status. SSE frames include id, event, and data fields per the W3C spec. A StreamChunk Pydantic model standardizes output with content, finish_reason, model, provider, and usage fields.

Objective 2

Implement an OpenAI GPT-4o streaming adapter that consumes ChatCompletion deltas and yields unified SSE chunks.

Goal

You will create an OpenAIStreamAdapter class in adapters/openai_stream.py wrapping openai.AsyncOpenAI. The adapter exposes stream_chat(messages: list[ChatMessage], model: str, temperature: float) -> AsyncGenerator[StreamChunk, None] calling client.chat.completions.create(model='gpt-4o', stream=True, stream_options={'include_usage': True}) and iterating over the async response. For each ChatCompletionChunk, you extract chunk.choices[0].delta.content, map it into a StreamChunk Pydantic model with fields content, finish_reason, model, provider, and usage, then yield. You handle finish_reason='stop' by capturing token usage from chunk.usage. The adapter reads OPENAI_API_KEY via a Settings model using pydantic-settings and raises ProviderAuthError if missing. Tests verify chunk transformation using a mock AsyncOpenAI client.

Objective 3

Implement a Gemini 2.5 Flash streaming adapter with thinking budget control using google.genai.Client.

Goal

You will create a GeminiStreamAdapter class in adapters/gemini_stream.py wrapping google.genai.Client (the new unified SDK, not deprecated google-generativeai). The adapter exposes stream_chat(messages, model, temperature, thinking_enabled) -> AsyncGenerator[StreamChunk, None] calling client.aio.models.generate_content_stream(). When thinking_enabled is True, pass config=GenerateContentConfig(thinking_config=ThinkingConfig(thinking_budget=1024)) for extended reasoning; when False, set thinking_budget=0. For each GenerateContentResponse chunk, extract chunk.candidates[0].content.parts[0].text and map to StreamChunk. Handle safety blocks by checking candidate.finish_reason against FinishReason.SAFETY and emitting StreamError. The adapter normalizes Gemini role 'model' to 'assistant'. Configuration reads GEMINI_API_KEY from Settings.

Objective 4

Implement an Anthropic Claude streaming adapter that consumes MessageStream events and yields unified SSE chunks.

Goal

You will create an AnthropicStreamAdapter class in adapters/anthropic_stream.py wrapping anthropic.AsyncAnthropic. The adapter exposes stream_chat(messages, model, temperature) -> AsyncGenerator[StreamChunk, None] calling client.messages.stream() as an async context manager. You iterate over events handling message_start to extract message.id and message.model, content_block_delta to extract delta.text from TextDelta, and message_stop to capture usage with input_tokens and output_tokens. Each delta maps to a StreamChunk Pydantic model. You implement system prompt extraction separating system-role messages into the top-level system parameter. The adapter maps stop_reason values (end_turn, max_tokens, stop_sequence) to the unified FinishReason enum. Configuration reads ANTHROPIC_API_KEY via Settings and raises ProviderAuthError if missing.

Objective 5

Build a Llama 4 Maverick streaming adapter using OpenAI SDK with Together.ai as the inference provider.

Goal

You will create a Llama4StreamAdapter class in adapters/llama4_stream.py using openai.AsyncOpenAI configured with base_url='https://api.together.xyz/v1' and api_key from TOGETHER_API_KEY. The adapter exposes stream_chat(messages, model, temperature) -> AsyncGenerator[StreamChunk, None] calling client.chat.completions.create(model='meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8', stream=True). Since Together.ai implements the OpenAI-compatible API, chunk parsing reuses delta.content extraction from OpenAIStreamAdapter. You implement TogetherSettings reading TOGETHER_API_KEY from environment. The adapter adds Together-specific headers via httpx default_headers for tracing. You validate that models exist in SUPPORTED_LLAMA_MODELS and raise ModelNotSupportedError otherwise. Tests mock the Together.ai endpoint and verify StreamChunk output.