Enterprise Data Pipeline
Learning Path
Hands-on Labs
Each objective has a coding lab that opens in VS Code in your browser
Build DataPipeline with Instructor
You will build a `DataPipeline` class that ingests raw enterprise documents (contracts, tickets, clinical notes) and proprietary file formats (DOCX, PPTX, Excel spreadsheets, email archives, Slack exports) via format-specific parsers. Connect bulk ingestion connectors — REST API endpoints for SaaS platforms, data warehouse extractors for Snowflake and BigQuery, and streaming sources via webhooks. Use Instructor with Pydantic models to extract structured training pairs. Each extracted pair includes input text, expected output, and quality metadata. Implement PII scrubbing before fine-tuning — combine regex pattern matching with LLM-based entity detection for names, SSNs, emails, and financial data. Configure redaction policies per document type, run a secondary verification scan, and maintain a full audit trail of scrubbed entities. Implement `extract_training_pairs()` that processes documents in batches, validates output schema, and scores data quality. Use Gemini Code Execution to clean and normalize text before extraction.
Implement distillation data generation
You will use GPT-4o as a teacher model to generate expert-quality answers for your domain, storing them as training data for smaller models. Build `DistillationGenerator` that takes enterprise questions, calls `client.chat.completions.create()` for frontier answers, validates quality with Instructor's `response_model` parameter, and outputs JSONL training files. Implement `generate_expert_pairs()` with exponential-backoff retry logic and embedding-based deduplication.
Build data quality dashboard
You will build a `QualityAnalyzer` class that scores your training dataset across four dimensions: diversity (topic spread), balance (label distribution), coverage (edge case representation), and consistency (contradictory examples). Use Gemini's `generate_content()` to analyze dataset statistics and flag potential issues. Implement ETL orchestration with scheduled ingestion runs triggered by cron or webhook, incremental ingestion using high-water-mark checkpoints to process only new data, pipeline state tracking in PostgreSQL, idempotent retry on transient failures, and a status endpoint reporting pipeline health. Implement `analyze_dataset()` that outputs a typed `QualityReport` Pydantic model with per-dimension scores and actionable recommendations.
Build data validation pipeline
You will build an automated data validation pipeline that checks training data quality before fine-tuning. Implement schema validation for each data format (instruction, conversation, preference), detect duplicates using MinHash, identify toxic or low-quality samples using an LLM-as-judge filter, compute dataset statistics (token distributions, label balance), and generate a validation report. Deploy as a FastAPI endpoint that accepts a dataset path and returns pass/fail with detailed metrics.
Optimize data pipeline throughput
You will optimize the data pipeline for throughput by implementing parallel processing, batched API calls, and incremental ingestion. Build a pipeline orchestrator that splits large document sets into chunks, processes them in parallel workers, batches Instructor extraction calls (up to 50 documents per batch), implements retry logic with exponential backoff for API failures, and tracks throughput metrics (docs/sec, tokens/sec). Compare sequential vs parallel performance and achieve at least 5x throughput improvement.
Build data lineage tracker
You will build a data lineage tracking system that records the provenance of every training example. Track source document, extraction method, transformation steps, quality scores, and inclusion/exclusion decisions. Implement a LineageRecord Pydantic model stored in PostgreSQL. Build a FastAPI API to query lineage by example ID, source document, or quality score range. Create a lineage visualization showing the flow from raw documents through extraction, transformation, and final dataset inclusion.