Free lesson
Build data pipelines
You will build efficient data processing pipelines. Chain multiple generators together, process large files lazily without loading into memory, and filter and transform data streams.
~25 min read · Free to read — no subscription required.
Build data pipelines
One of the most powerful applications of generators is building data processing pipelines. Each stage in the pipeline is a generator that takes input from the previous stage, processes it, and yields output to the next stage. Data flows through the pipeline one item at a time, without ever loading the entire dataset into memory.
Introduction
When you need to process datasets larger than memory — log files, training corpora, JSONL exports — loading everything eagerly is not an option. Generator pipelines let you stream one record through multiple transformation stages, holding only a single item in memory at any moment. Get this wrong and you'll OOM the pod, blow past memory limits, or wait hours for a load step that never finishes. By the end of this lesson you'll be able to chain generators into multi-stage pipelines, process files larger than RAM, and use yield from to delegate to sub-generators.
Key Terminology
- Pipeline stage — a single generator function that consumes from an upstream iterable and yields to a downstream consumer; pipelines are built by composing these stages so data flows through one item at a time.
- Lazy evaluation — work is deferred until a consumer pulls a value, which is what lets a multi-stage pipeline process gigabyte-scale inputs with constant memory.
yield from— a Python 3.3+ statement that delegates iteration to a sub-generator or any iterable, the standard tool for composing generators and flattening nested structures without manual loops.- ETL — extract-transform-load, the canonical data-pipeline pattern where each step (read, filter, parse, extract) is naturally expressible as a generator stage.
Concepts
Chaining Generators
A pipeline is just a chain of generator functions where each stage pulls from its predecessor and yields to its successor. Because every stage is lazy, no work happens until a terminal consumer iterates — and at that point exactly one record traverses the chain before the next is pulled. This is the foundation for processing LLM outputs, transforming datasets, and building efficient ETL workflows (see Code Walkthrough).
The arrows are demand, not push: the consumer asks extract_field for one value, which asks parse_json_lines, which asks filter_non_empty, which asks read_lines, which reads exactly one line from disk. Memory stays flat regardless of file size.
Streaming Files Larger Than RAM
Iterating an open file object (for line in open(filename)) yields one line at a time without slurping the file — the canonical low-memory streaming idiom. Combined with the pipeline pattern above, this is how you process multi-gigabyte training corpora or log archives on a pod with a few hundred megabytes of RAM. The contract: only one record (plus any running counters you keep) is resident at a time.
Delegation with yield from
Python 3.3 introduced yield from to delegate iteration to a sub-generator or any iterable in a single statement. It's the right tool for two cases: composing generators (concatenating multiple sources into one stream) and recursing over nested structures (flattening trees without manual stack management). Beyond syntactic sugar, it correctly forwards send(), throw(), and return values to the delegated generator — something a hand-written for x in sub: yield x loop gets wrong (see Code Walkthrough).
Code Walkthrough
Now that you've seen how each pipeline stage pulls lazily from its predecessor, here's what that wiring looks like in code.
The first snippet implements all four stages from the diagram in the Concepts section — read_lines, filter_non_empty, parse_json_lines, and extract_field — composed into a single JSONL processor. The second snippet demonstrates yield from for both recursive flattening and source concatenation.
Code snippetpython
1def read_lines(filename): 2 """Stage 1: yield one stripped line at a time from a file.""" 3 with open(filename, 'r') as f: 4 for line in f: 5 yield line.strip() 6 7def filter_non_empty(lines): 8 """Stage 2: drop blank lines.""" 9 for line in lines: 10 if line: 11 yield line 12 13def parse_json_lines(lines): 14 """Stage 3: parse JSON, skipping malformed records.""" 15 import json 16 for line in lines: 17 try: 18 yield json.loads(line) 19 except json.JSONDecodeError: 20 continue 21 22def extract_field(records, field): 23 """Stage 4: project a single field out of each record.""" 24 for record in records: 25 if field in record: 26 yield record[field] 27 28# Compose: no I/O has happened yet. 29lines = read_lines('data.jsonl') 30non_empty = filter_non_empty(lines) 31records = parse_json_lines(non_empty) 32texts = extract_field(records, 'text') 33 34# Iteration is what makes data flow through the chain. 35for text in texts: 36 print(text)
read_lines:for line in fstreams the file lazily — only one line is resident at a time, so multi-GB inputs work fine.filter_non_empty/parse_json_lines/extract_field: each is a single-purpose stage that pulls from the previous one and yields downstream. Note the narrowexcept json.JSONDecodeError— we skip parse failures, not every exception.- Composition: building
textsdoesn't read the file; thefor text in textsloop is what pulls records through the entire pipeline, one at a time.
Code snippetpython
1def flatten(nested_list): 2 """Recursively flatten a nested list using yield from.""" 3 for item in nested_list: 4 if isinstance(item, list): 5 yield from flatten(item) 6 else: 7 yield item 8 9def combined_sources(*iterables): 10 """Concatenate any number of iterables into one stream.""" 11 for iterable in iterables: 12 yield from iterable 13 14print(list(flatten([1, [2, 3, [4, 5]], 6, [7, [8, 9]]]))) 15# [1, 2, 3, 4, 5, 6, 7, 8, 9] 16 17print(list(combined_sources([1, 2], [3, 4], [5, 6]))) 18# [1, 2, 3, 4, 5, 6]
flatten:yield from flatten(item)recurses into nested lists; leaves are yielded directly. Each value emerges as soon as it's reached — no stack of intermediate lists.combined_sources:yield from iterableworks for any iterable, not just generators, making it a one-line way to splice streams together.
You'll know it works when you can run the JSONL pipeline against a small file, see only the text field per record, and observe flat memory usage as input size grows — and when list(flatten(...)) produces the expected flattened sequence for an arbitrarily nested input.
Do's and Don'ts
Do's
- ✓Do compose small single-purpose stages — each generator function should read, filter, parse, or extract one thing; composing them keeps the pipeline easy to test and reorder.
- ✓Do iterate the file directly with
for line in open(...)— this is the canonical low-memory streaming idiom and keeps RAM flat regardless of file size. - ✓Do prefer
yield fromover manual sub-iteration — delegating to sub-generators avoids subtle bugs around exception propagation andsend()/returnvalue forwarding.
Don'ts
- ✗Don't call
f.read()orlist(...)on the upstream stream — both materialize the whole dataset and defeat the lazy pipeline. - ✗Don't catch and swallow
Exception— narrow yourexceptto the specific error you expect (e.g.json.JSONDecodeError) so real bugs aren't hidden behind acontinue. - ✗Don't reuse a generator object after it's been exhausted — generators are one-shot iterators; build a new pipeline if you need to iterate again.
Real-World AI Applications
Generators are essential for production AI systems. The patterns below appear throughout the AI ecosystem, and understanding them is crucial for building efficient applications.
Introduction
When you call an LLM for a long completion or process a multi-gigabyte fine-tuning corpus, loading the entire response or dataset into memory blocks the user and risks an OOM kill mid-job. Generators give you the toolkit to stream tokens, transform records, and chain pipeline stages one item at a time — without ever holding the full payload in memory. By the end of this lesson you'll be able to stream tokens from an LLM API, iterate a JSONL training file larger than RAM, and compose a multi-stage agent data pipeline using generator chaining.
Key Terminology
- Streaming response — an HTTP response delivered as a series of partial chunks instead of one complete payload; LLM providers stream tokens so callers can render output as it is produced.
- Time-to-first-token — the latency between sending a prompt and receiving the first generated token; the headline UX metric for chat applications and the main reason to consume responses with a generator.
- Lazy evaluation — computing values only when the consumer asks for them; lets generators iterate datasets larger than RAM because nothing materializes until pulled.
- Data pipeline — a chain of generators where each stage transforms output from the previous one; intermediate stages never build full lists, so memory stays flat regardless of input size.
- Generator — a Python function that uses
yieldto produce values one at a time on demand; the construct underlying all three patterns in this lesson — token streaming, lazy iteration, and pipeline composition.
Concepts
Three patterns dominate real-world AI code: streaming LLM tokens, lazy iteration over large datasets, and chained generator pipelines. Each is demonstrated later in this lesson.
Streaming LLM Tokens
LLM SDKs return chunk iterators when you opt into streaming. Wrapping that iterator in your own generator gives you control over what to yield (deltas, full text, token+metadata pairs) and lets the rest of your code stay synchronous and pull-based. Faster time-to-first-token is the user-visible win; the operational win is that the server never buffers the full response (see Code Walkthrough).
Lazy Iteration over Training Data
Fine-tuning corpora routinely exceed RAM. A generator that yields one parsed JSONL record at a time keeps resident memory flat regardless of file size, and composes naturally with batching helpers like itertools.islice when the consumer wants groups of N (see Code Walkthrough).
Pipeline Composition
Agent code often needs parse → extract → chunk → score before a decision. Each stage as its own generator means data flows through one item at a time, end-to-end, with no intermediate list ever built. Stages interleave naturally: a record dropped at the head reaches the consumer before the tail of the input has finished parsing.
Code Walkthrough
Now that you understand the three patterns — streaming LLM tokens, lazy iteration over large datasets, and generator pipeline composition — the following examples demonstrate each one in runnable Python.
The first snippet wraps an SDK streaming call in a generator so tokens flow to the caller as they arrive rather than accumulating in a buffer. The client argument is any SDK client whose .chat.completions.create() supports stream=True (e.g., openai.OpenAI()):
Code snippetpython
1def stream_llm_response(client, prompt): 2 """Yield tokens from a streaming LLM response as they arrive.""" 3 response = client.chat.completions.create( 4 model="gpt-4", 5 messages=[{"role": "user", "content": prompt}], 6 stream=True, 7 ) 8 for chunk in response: 9 delta = chunk.choices[0].delta.content 10 if delta: 11 yield delta 12 13# Each token prints as it arrives — no buffering 14for token in stream_llm_response(client, "Tell me a story"): 15 print(token, end="", flush=True) 16print()
The second snippet chains three generator stages — parse_jsonl, extract_content, and chunk_content — into agent_pipeline, then groups the stream into fixed-size lists with batched. All six functions are defined in place; no intermediate list is ever built between stages, so resident memory stays flat even for a multi-gigabyte file:
Code snippetpython
1import json 2from itertools import islice 3 4def parse_jsonl(path): 5 with open(path) as f: 6 for line in f: 7 yield json.loads(line) 8 9def extract_content(records): 10 for r in records: 11 if "content" in r: 12 yield r["content"] 13 14def chunk_content(contents, max_length=1000): 15 for c in contents: 16 for i in range(0, len(c), max_length): 17 yield c[i : i + max_length] 18 19def agent_pipeline(path): 20 return chunk_content(extract_content(parse_jsonl(path))) 21 22def batched(iterable, n): 23 it = iter(iterable) 24 while batch := list(islice(it, n)): 25 yield batch 26 27def upload_batch(batch): 28 """Stub: replace with your actual upload or indexing call.""" 29 print(f"Uploading {len(batch)} chunks") 30 31for batch in batched(agent_pipeline("training_data.jsonl"), 100): 32 upload_batch(batch)
You'll know it works when tokens print to your terminal incrementally instead of after a long pause, and memory usage stays flat in a process monitor while the pipeline drains a multi-gigabyte JSONL file.
Do's and Don'ts
Do's
- ✓Do use generators for large datasets — Datasets that don't fit in memory are the headline use case; lazy iteration keeps resident memory flat regardless of input size.
- ✓Do chain generators into pipelines — Stage-by-stage composition keeps each step small, readable, and lets data flow one item at a time end-to-end.
- ✓Do test with small fixtures first — Generator bugs (off-by-one, early
return, swallowed exceptions) are easier to spot on a 10-record sample than on a production stream.
Don'ts
- ✗Don't materialize generators with
list()— Wrapping a generator inlist()defeats the memory purpose; only do it when you genuinely need random access or repeated iteration. - ✗Don't catch
StopIterationmanually —forloops handle it automatically; catching it yourself usually masks the real bug instead of fixing it. - ✗Don't jump straight to production-scale data — Validate on a sample first; reproducing a generator bug at 10GB scale costs hours that a 10-row fixture would have caught in seconds.
Keep going with GenAI Agent Engineering
Create a free account to track your progress and open this lesson in the full learning view. Subscribe to unlock the entire path — every goal, the hands-on labs, quizzes, and your verifiable skill graph — from . Cancel anytime.