Free lesson
Build DataPipeline with Instructor
You build a DataPipeline class that ingests enterprise documents through format-specific parsers and bulk connectors, scrubs PII before downstream stages, and uses Instructor with Pydantic to extract structured training pairs.
~25 min read · Free to read — no subscription required.
Extract and PII-scrub training pairs from enterprise documents, proprietary formats, and bulk data sources
Introduction
When you sit down to fine-tune a model on a company's own knowledge, the training corpus almost never arrives as clean JSONL. It arrives as .docx contracts with embedded tables, .pptx decks where the real substance lives in speaker notes, .xlsx workbooks whose column meanings shift between tabs, .eml and .mbox archives with nested MIME parts, and Slack exports buried under emoji reactions. Worse, every one of those files is laced with names, emails, SSNs, account numbers, and customer identifiers that cannot legally enter an LLM provider's training endpoint. Get the ingestion wrong and you ship a model that either learned nothing useful or — far worse — memorized regulated PII and now leaks it on demand. By the end of this lesson you will be able to extract prompt-completion pairs from heterogeneous enterprise sources, scrub PII in two layers before any LLM sees the text, and emit schema-validated JSONL ready for a fine-tuning job.
Key Terminology
- Training pair — a
(prompt, completion)record used for supervised fine-tuning; the unit your pipeline must produce, validated, before any training run. - Proprietary format — a non-plaintext document type (
.docx,.pptx,.xlsx,.eml, Slack JSON) that requires a format-specific parser; in this lesson each format gets its own extractor function registered in a dispatcher. - PII span — a
(start, end, entity_type, confidence)record marking a character range to redact; spans are the common currency that regex and NER detectors both emit so they can be merged before replacement. - Layered detection — combining a deterministic regex pass (SSN, email, phone, card, IP) with a probabilistic NER pass (PERSON, ORG, GPE); needed because regex misses contextual identifiers and NER misses well-structured tokens.
- Structured extraction — using Instructor + a Pydantic
response_modelso the LLM's output is parsed and validated against a schema, with automatic retry on validation failure; this is what turns scrubbed prose into trustworthy training pairs.
Concepts
The three-stage pipeline
Enterprise ingestion decomposes into three stages that must run in this order: extract raw text from the source format, scrub PII from that text, structure the clean text into validated training pairs. The order is non-negotiable — if you scrub after the LLM call, you have already transmitted PII to the provider, which is a HIPAA/PCI/GDPR violation regardless of what you do downstream.
Format-specific extraction with a dispatcher
Each format needs its own parser, but the pipeline above must not branch on file type at every step. The pattern is a dispatcher dictionary — EXTRACTORS: dict[str, Callable] keyed on file extension — that returns a generator yielding RawChunk(text, source_file, location, metadata). Generators (not lists) matter: a 500-page contract streams chunk-by-chunk instead of loading every paragraph at once. Per-chunk location strings such as slide_3_notes or paragraph_47 are how you trace a hallucination six months later back to the source document (see Code Walkthrough).
Layered PII detection
A regex-only scrubber catches structured patterns (SSN, email, phone, card, IP) at confidence 1.0 but misses person names, organizations, and locations. An NER-only scrubber catches the names but misses well-formatted tokens and runs ~50× slower. Production pipelines run both and merge overlapping spans before replacing, using a min_confidence threshold to drop low-confidence NER hits. Replacement uses indexed placeholders ([PERSON_1], [SSN_2]) so the downstream model still learns that two different people are referenced in the same conversation — flattening every name to [REDACTED] destroys that signal.
Structured extraction as a self-healing loop
Once text is clean, Instructor patches the LLM client so client.chat.completions.create(response_model=list[TrainingPair]) returns a list of validated Pydantic objects instead of a string you have to JSON-parse. A model_validator on TrainingPair — for example, rejecting quality_score < 0.7 — is more than defensive coding: when validation fails, Instructor injects the error back into the prompt and retries, so the model self-corrects without any code on your side. This eliminates the brittle string-parsing layer that kills naive extraction pipelines.
Code Walkthrough
This walkthrough demonstrates all three stages in one runnable snippet — a .docx extractor that streams RawChunks, a layered PIIScrubber that merges regex and NER spans, and an Instructor-backed extract_training_pairs that emits schema-validated JSONL. The snippet is the end-to-end shape; the lab fills in the .pptx, .xlsx, and .eml extractors against the same FormatExtractor protocol.
Code snippetpython
1import re, json 2from dataclasses import dataclass, field 3from pathlib import Path 4from typing import Protocol, Iterator 5import docx 6import instructor 7from openai import OpenAI 8from pydantic import BaseModel, Field, model_validator 9 10@dataclass 11class RawChunk: 12 text: str 13 source_file: str 14 location: str 15 metadata: dict = field(default_factory=dict) 16 17@dataclass 18class PIISpan: 19 start: int 20 end: int 21 entity_type: str 22 confidence: float = 1.0 23 24class FormatExtractor(Protocol): 25 def __call__(self, path: Path) -> Iterator[RawChunk]: ... 26 27def extract_docx(path: Path) -> Iterator[RawChunk]: 28 doc = docx.Document(str(path)) 29 for i, para in enumerate(doc.paragraphs): 30 text = para.text.strip() 31 if len(text) >= 20: 32 yield RawChunk(text, path.name, f"paragraph_{i}", 33 {"style": para.style.name}) 34 35EXTRACTORS: dict[str, FormatExtractor] = {".docx": extract_docx} 36 37class RegexPIIDetector: 38 PATTERNS = { 39 "SSN": r"\b\d{3}-\d{2}-\d{4}\b", 40 "EMAIL": r"\b[\w.+-]+@[\w-]+\.[\w.-]+\b", 41 "PHONE": r"\b\(\d{3}\)\s?\d{3}-\d{4}\b", 42 "CREDIT_CARD": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", 43 "IP_ADDRESS": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", 44 } 45 46 def detect(self, text: str) -> list[PIISpan]: 47 return [ 48 PIISpan(m.start(), m.end(), et, 1.0) 49 for et, pat in self.PATTERNS.items() 50 for m in re.finditer(pat, text) 51 ] 52 53class PIIScrubber: 54 def __init__(self, detectors=None, min_confidence: float = 0.85): 55 self.detectors = detectors or [RegexPIIDetector()] 56 self.min_confidence = min_confidence 57 58 def _merge(self, spans: list[PIISpan]) -> list[PIISpan]: 59 spans = sorted( 60 (s for s in spans if s.confidence >= self.min_confidence), 61 key=lambda s: (s.start, -s.end), 62 ) 63 merged: list[PIISpan] = [] 64 for s in spans: 65 if merged and s.start <= merged[-1].end: 66 merged[-1] = PIISpan( 67 merged[-1].start, max(merged[-1].end, s.end), 68 merged[-1].entity_type, 69 max(merged[-1].confidence, s.confidence), 70 ) 71 else: 72 merged.append(s) 73 return merged 74 75 def scrub(self, text: str) -> tuple[str, list[PIISpan]]: 76 spans = self._merge( 77 [s for d in self.detectors for s in d.detect(text)] 78 ) 79 counters: dict[str, int] = {} 80 out, offset = text, 0 81 for s in spans: 82 counters[s.entity_type] = counters.get(s.entity_type, 0) + 1 83 ph = f"[{s.entity_type}_{counters[s.entity_type]}]" 84 out = out[: s.start + offset] + ph + out[s.end + offset :] 85 offset += len(ph) - (s.end - s.start) 86 return out, spans 87 88class TrainingPair(BaseModel): 89 prompt: str = Field(min_length=20) 90 completion: str = Field(min_length=50) 91 domain: str 92 quality_score: float = Field(ge=0.0, le=1.0) 93 source_location: str 94 95 @model_validator(mode="after") 96 def reject_low_quality(self) -> "TrainingPair": 97 if self.quality_score < 0.7: 98 raise ValueError( 99 f"quality_score {self.quality_score} below 0.7 threshold" 100 ) 101 return self 102 103def extract_training_pairs( 104 text: str, source_location: str, domain: str 105) -> list[TrainingPair]: 106 client = instructor.from_openai(OpenAI()) 107 return client.chat.completions.create( 108 model="gpt-4o", 109 response_model=list[TrainingPair], 110 max_retries=3, 111 messages=[ 112 {"role": "system", "content": 113 "Extract self-contained prompt-completion pairs from the " 114 "scrubbed enterprise text. Assign an honest quality_score."}, 115 {"role": "user", "content": 116 f"Domain: {domain}\nSource: {source_location}\n\n{text}"}, 117 ], 118 ) 119 120def process(path: Path, domain: str, out_path: Path) -> int: 121 scrubber = PIIScrubber() 122 extractor = EXTRACTORS[path.suffix] 123 n = 0 124 with out_path.open("a") as fh: 125 for chunk in extractor(path): 126 scrubbed, _ = scrubber.scrub(chunk.text) 127 for pair in extract_training_pairs( 128 scrubbed, chunk.location, domain 129 ): 130 fh.write(pair.model_dump_json() + "\n") 131 n += 1 132 return n
process() ties all three stages together end-to-end: the format extractor streams RawChunks, PIIScrubber.scrub returns redacted text plus the span list (kept for audit), and extract_training_pairs emits validated TrainingPairs that are appended to JSONL one line at a time, so RAM stays flat even across thousand-document batches. That single function is the entire ingestion pipeline; everything else in the lab is a new extractor plugged into EXTRACTORS against the FormatExtractor protocol. Verify by running process(Path("sample.docx"), "support", Path("pairs.jsonl")) on a fixture containing one obvious SSN and one obvious email — you'll know it works when every line of pairs.jsonl parses as a TrainingPair, every quality_score >= 0.7, and grep -E '\d{3}-\d{2}-\d{4}|@' against the output finds zero matches.
Do's and Don'ts
Now that you have an end-to-end extract → scrub → structure pipeline running, the rules below are the guardrails that keep it production-safe as you scale it across more formats, more documents, and more sensitive sources.
Do's
- ✓Do scrub before the LLM call — regulated PII (SSN, PHI, PAN) must never reach a provider's training endpoint; running
PIIScrubber.scrubbetween extraction andextract_training_pairskeeps you on the right side of HIPAA, PCI-DSS, and GDPR. - ✓Do preserve source traceability end-to-end — every
RawChunkcarriessource_file+locationand everyTrainingPaircarriessource_location, so a hallucination six months out maps back to the exact paragraph or slide note that taught it. - ✓Do let Pydantic validators be your quality gate — a
model_validatorthat rejectsquality_score < 0.7doubles as a self-healing retry signal because Instructor feeds the validation error back into the prompt onmax_retries.
Don'ts
- ✗Don't rely on regex alone for PII — regex misses person names, organization references, and contextual identifiers; pair it with an NER pass and merge spans by
min_confidenceso coverage and precision both hold. - ✗Don't flatten every redaction to
[REDACTED]— indexed placeholders ([PERSON_1],[PERSON_2]) preserve the signal that two distinct actors are referenced; collapsing them teaches the model that everyone in a conversation is the same person. - ✗Don't return whole-list extractions from a generator —
Iterator[RawChunk]keeps memory flat across 500-page documents, whilelist[RawChunk]blows up RAM on the first contract that doesn't fit in memory.
Keep going with GenAI Inference Engineering
Create a free account to track your progress and open this lesson in the full learning view. Subscribe to unlock the entire path — every goal, the hands-on labs, quizzes, and your verifiable skill graph — from . Cancel anytime.