GadaaLabs
Machine Learning Engineering — Production ML Systems
Lesson 2

Production Data Pipelines for ML

26 min

A data pipeline that works 95% of the time is a liability. When it fails silently — dropping rows, letting nulls through, writing a corrupted partition — your model trains on garbage and you find out three weeks later when business metrics crater. Production data pipelines for ML must be idempotent, validated at every boundary, and versioned so you can reproduce any training run from any point in time.

ETL vs ELT

ETL (Extract, Transform, Load) transforms data before loading it into the destination. ELT (Extract, Load, Transform) loads raw data first, then transforms at query time inside the destination system.

When to use ETL: you need to enforce a strict schema at the destination, the raw data contains sensitive fields that must be masked or dropped before storage, or the destination is not powerful enough to run complex transformations (e.g., an OLTP database).

When to use ELT: the destination is a cloud data warehouse (BigQuery, Snowflake, Redshift) with massive query compute, you want to preserve raw data for future feature engineering, or your transformation logic changes frequently and you want to avoid re-running the full extraction.

For ML specifically, ELT is increasingly common: land raw data in a data lake (S3/GCS/ADLS), transform in SQL or Spark inside the warehouse, and materialise feature tables. This preserves optionality — you can always re-derive features from raw events.

python
# ETL: transform before loading (tight schema enforcement)
def etl_pipeline(raw_events: list[dict]) -> pd.DataFrame:
    cleaned = [clean_event(e) for e in raw_events]          # transform
    validated = [e for e in cleaned if is_valid(e)]         # filter
    df = pd.DataFrame(validated)
    df.to_parquet("s3://data-lake/processed/events/")        # load clean
    return df

# ELT: load raw first, transform at query time
def elt_pipeline(raw_events: list[dict]) -> None:
    df = pd.DataFrame(raw_events)                            # minimal parsing
    df["ingested_at"] = pd.Timestamp.utcnow()
    df.to_parquet("s3://data-lake/raw/events/")              # load raw
    # transformation happens later in SQL/dbt/Spark

Batch vs Micro-batch vs Streaming

Batch processing runs on a schedule (hourly, daily). It is simple to implement, easy to backfill, and cost-efficient for workloads where latency of minutes-to-hours is acceptable. Most ML training pipelines are batch.

Micro-batch (Spark Structured Streaming, Flink in batch mode) processes data in small windows (seconds to minutes). Useful when you need near-real-time features but cannot afford the complexity of true streaming.

Streaming (Kafka + Flink, Kafka Streams) processes each event as it arrives with sub-second latency. Required for fraud detection, real-time recommendation, or any use case where stale features cause business harm. The operational complexity is significantly higher: you must handle out-of-order events, watermarks, state management, and exactly-once semantics.

python
# Batch: process yesterday's data
def batch_job(date: str) -> None:
    df = read_parquet(f"s3://raw/events/dt={date}/")
    features = compute_features(df)
    features.to_parquet(f"s3://features/dt={date}/")

# Micro-batch pattern: process in time windows
def micro_batch_job(window_start: datetime, window_end: datetime) -> None:
    df = read_events_between(window_start, window_end)
    features = compute_features(df)
    features.to_parquet(
        f"s3://features/ts={window_start.isoformat()}/"
    )

# Streaming: event-by-event (pseudocode for Kafka consumer)
def streaming_consumer(consumer: KafkaConsumer) -> None:
    for message in consumer:
        event = json.loads(message.value)
        feature_vector = compute_online_features(event)
        redis_client.set(
            f"features:{event['user_id']}",
            json.dumps(feature_vector),
            ex=3600  # TTL 1 hour
        )

Building Idempotent Pipelines

An idempotent pipeline produces the same output when run with the same input, regardless of how many times it runs. This is not optional — pipelines fail and get retried, and non-idempotent pipelines corrupt data on retry.

Three rules for idempotency:

  1. Write to a deterministic output path. Include the date/version/run-id in the output path so each run writes to its own location. Never append to an existing file.
  2. Delete before write. If the output path already exists, delete it before writing. This makes a re-run equivalent to a fresh run.
  3. Use atomic writes. Write to a temporary path, validate, then rename. This prevents partially-written files from being consumed downstream.
python
import shutil
from pathlib import Path

def idempotent_write(df: pd.DataFrame, output_path: str) -> None:
    """Write a DataFrame idempotently using delete-before-write."""
    path = Path(output_path)
    temp_path = path.parent / f"_tmp_{path.name}"

    # Write to temp first
    df.to_parquet(temp_path, index=False)

    # Validate temp output
    written = pd.read_parquet(temp_path)
    assert len(written) == len(df), "Row count mismatch after write"

    # Atomic: delete existing, rename temp to final
    if path.exists():
        shutil.rmtree(path) if path.is_dir() else path.unlink()
    temp_path.rename(path)

def process_partition(date: str, input_path: str, output_path: str) -> None:
    """Process a date partition idempotently."""
    out = Path(output_path) / f"dt={date}"

    # Idempotent: always recompute from scratch
    df = pd.read_parquet(f"{input_path}/dt={date}/")
    features = compute_features(df)
    idempotent_write(features, str(out))
    print(f"Written {len(features)} rows to {out}")

Data Validation with Great Expectations

Great Expectations (GX) is the standard library for data validation in ML pipelines. You define expectations about your data — assertions about schema, nullability, ranges, patterns — and run them as a checkpoint before any transformation or training.

python
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest

# Initialise a data context (stores config, expectations, results)
context = gx.get_context()

# Create a datasource for a Pandas DataFrame
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("events")

# Define an expectation suite
suite = context.add_expectation_suite("events.basic_quality")

# Common expectations for ML data
validator = context.get_validator(
    batch_request=data_asset.build_batch_request(dataframe=df),
    expectation_suite_name="events.basic_quality",
)

# Completeness
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("event_timestamp")
validator.expect_column_values_to_not_be_null("label")

# Range checks (clip outliers or flag corrupt data)
validator.expect_column_values_to_be_between(
    "session_duration_seconds", min_value=0, max_value=86400
)
validator.expect_column_values_to_be_between(
    "purchase_amount_usd", min_value=0, max_value=50000
)

# Format checks
validator.expect_column_values_to_match_regex(
    "user_id", r"^usr_[a-f0-9]{16}$"
)
validator.expect_column_values_to_match_regex(
    "event_timestamp", r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"
)

# Cardinality
validator.expect_column_distinct_values_to_be_in_set(
    "event_type", {"page_view", "add_to_cart", "purchase", "search"}
)

# Save expectations and run checkpoint
validator.save_expectation_suite(discard_failed_expectations=False)

# Run a checkpoint (validation + optionally save results)
checkpoint = context.add_or_update_checkpoint(
    name="events_checkpoint",
    validations=[{
        "batch_request": data_asset.build_batch_request(dataframe=df),
        "expectation_suite_name": "events.basic_quality",
    }]
)
result = checkpoint.run()

if not result["success"]:
    failed = [
        r["expectation_config"]["expectation_type"]
        for r in result.list_validation_results()[0]["results"]
        if not r["success"]
    ]
    raise ValueError(f"Data validation failed: {failed}")

print("Data validation passed.")

Schema Evolution Handling

Schemas change. New features get added, old columns get renamed, types change. A pipeline that breaks on every schema change is fragile.

Additive changes (new columns, new enum values) are backwards compatible — old code ignores new fields. Handle these gracefully by selecting only the columns you need.

Breaking changes (column renamed, type changed, column removed) require explicit migration. Version your schema explicitly and handle both old and new versions during a transition window.

python
from dataclasses import dataclass
from typing import Optional

@dataclass
class EventSchemaV1:
    user_id: str
    session_id: str
    event_type: str
    timestamp: str

@dataclass
class EventSchemaV2:
    user_id: str
    session_uuid: str          # renamed from session_id
    event_type: str
    event_timestamp: int       # changed: string → unix epoch
    client_version: Optional[str] = None  # new optional column

def parse_event(raw: dict, schema_version: int) -> dict:
    """Parse raw event dict into a normalised schema."""
    if schema_version == 1:
        return {
            "user_id": raw["user_id"],
            "session_uuid": raw["session_id"],  # map old name
            "event_type": raw["event_type"],
            "event_timestamp": int(
                pd.Timestamp(raw["timestamp"]).timestamp()
            ),
            "client_version": None,
        }
    elif schema_version == 2:
        return {
            "user_id": raw["user_id"],
            "session_uuid": raw["session_uuid"],
            "event_type": raw["event_type"],
            "event_timestamp": raw["event_timestamp"],
            "client_version": raw.get("client_version"),
        }
    else:
        raise ValueError(f"Unknown schema version: {schema_version}")

def infer_schema_version(raw: dict) -> int:
    if "session_uuid" in raw:
        return 2
    return 1

Data Partitioning for Training Data

How you partition training data has significant impact on training speed, reproducibility, and the validity of your evaluation.

By date: the most common partition for temporal data. Always train on data before date X and evaluate on data after date X — never shuffle across time, or you leak future information into training.

By label: useful when one class is rare. Store positive and negative examples in separate partitions so you can easily sample at any ratio.

Stratified: ensure each partition has the same class distribution as the full dataset. Required when downstream consumers need balanced data but the raw data is imbalanced.

python
import pandas as pd
from pathlib import Path
from sklearn.model_selection import StratifiedKFold

def partition_by_date(
    df: pd.DataFrame,
    output_dir: str,
    date_col: str = "event_date"
) -> None:
    """Partition training data by date for temporal splits."""
    for date, group in df.groupby(date_col):
        out_path = Path(output_dir) / f"dt={date}"
        out_path.mkdir(parents=True, exist_ok=True)
        group.to_parquet(out_path / "data.parquet", index=False)

def temporal_train_test_split(
    df: pd.DataFrame,
    cutoff_date: str,
    date_col: str = "event_date"
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Split by time — never shuffle across the boundary."""
    train = df[df[date_col] < cutoff_date].copy()
    test  = df[df[date_col] >= cutoff_date].copy()
    assert len(train) + len(test) == len(df)
    return train, test

def stratified_partition(
    df: pd.DataFrame,
    output_dir: str,
    label_col: str,
    n_splits: int = 5
) -> None:
    """Write stratified k-fold partitions for cross-validation."""
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    for fold, (train_idx, val_idx) in enumerate(
        skf.split(df, df[label_col])
    ):
        train_df = df.iloc[train_idx]
        val_df   = df.iloc[val_idx]
        fold_dir = Path(output_dir) / f"fold={fold}"
        fold_dir.mkdir(parents=True, exist_ok=True)
        train_df.to_parquet(fold_dir / "train.parquet", index=False)
        val_df.to_parquet(fold_dir / "val.parquet",   index=False)

DVC for Data Versioning

DVC (Data Version Control) versions large files and datasets alongside your code in Git, without storing the files in Git itself. The actual data lives in remote storage (S3, GCS, Azure Blob); Git tracks a small .dvc metadata file.

bash
# Initialise DVC in an existing Git repo
dvc init
git add .dvc .dvcignore
git commit -m "init: add DVC"

# Configure remote storage
dvc remote add -d myremote s3://my-bucket/dvc-store
git add .dvc/config
git commit -m "config: add DVC S3 remote"

# Track a dataset
dvc add data/raw/events_2025_03.parquet
git add data/raw/events_2025_03.parquet.dvc .gitignore
git commit -m "data: add March 2025 events dataset"

# Push data to remote
dvc push

# On another machine: pull the data
git pull
dvc pull   # downloads the file from S3

# Pin to a specific data version in code
dvc get . data/raw/events_2025_03.parquet --rev v1.2.0
python
# Read data lineage programmatically
import subprocess, json

def get_data_lineage(dvc_file: str) -> dict:
    """Return the DVC metadata for a tracked file."""
    result = subprocess.run(
        ["dvc", "params", "diff", "--show-json"],
        capture_output=True, text=True
    )
    return json.loads(result.stdout)

def reproduce_pipeline(target: str) -> None:
    """Reproduce a DVC pipeline stage (runs only if inputs changed)."""
    subprocess.run(["dvc", "repro", target], check=True)

def get_stage_dependencies(stage_name: str) -> list[str]:
    """List all data dependencies of a pipeline stage."""
    result = subprocess.run(
        ["dvc", "dag", "--dot"], capture_output=True, text=True
    )
    # Parse DOT format to extract edges
    return [
        line.split("->")[0].strip().strip('"')
        for line in result.stdout.splitlines()
        if "->" in line and stage_name in line
    ]

Handling Late-Arriving Data

In real-world systems, data does not arrive in event-time order. A mobile app event logged at 14:00 may arrive at your pipeline at 14:45 due to network issues, retries, or the device being offline. Processing pipelines must handle this gracefully.

Grace period: after the nominal window closes, keep the window open for an additional time (the grace period) to accept late arrivals. A typical grace period is 30 minutes to 2 hours for ad-click data, and up to 24 hours for mobile events.

Watermark: the watermark is the estimated time up to which all events have arrived. Events with event_time less than the watermark are considered complete; events beyond the watermark are still accumulating. Flink and Spark Structured Streaming implement watermarks natively.

python
from datetime import datetime, timedelta

def process_with_grace_period(
    events: list[dict],
    window_end: datetime,
    grace_period_hours: int = 2,
) -> pd.DataFrame:
    """
    Process events for a window, accepting late arrivals up to
    grace_period_hours after the window nominally closed.
    """
    cutoff = window_end - timedelta(hours=grace_period_hours)
    in_window = [
        e for e in events
        if cutoff <= datetime.fromisoformat(e["event_time"]) < window_end
    ]
    late_arrivals = [
        e for e in events
        if datetime.fromisoformat(e["event_time"]) < cutoff
    ]
    if late_arrivals:
        print(f"Warning: {len(late_arrivals)} events arrived after grace period")

    return pd.DataFrame(in_window)

def compute_watermark(
    events: list[dict],
    allowed_lateness_seconds: int = 3600,
) -> datetime:
    """
    Watermark = max(event_time) - allowed_lateness.
    Events with event_time < watermark are considered complete.
    """
    if not events:
        return datetime.min
    max_event_time = max(
        datetime.fromisoformat(e["event_time"]) for e in events
    )
    return max_event_time - timedelta(seconds=allowed_lateness_seconds)

Prefect-Style Pipeline with Decorators

Prefect (and similar orchestrators like Airflow, Dagster) use decorators to mark functions as tasks and flows. Even without installing an orchestrator, you can structure your pipeline code so it is drop-in compatible.

python
import functools
import time
import logging
from typing import Callable, Any

logger = logging.getLogger(__name__)

# Minimal task/flow decorator implementation
# (compatible with the @task/@flow pattern without requiring Prefect)
def task(retries: int = 0, retry_delay_seconds: int = 5):
    """Decorator that adds retry logic and logging to a pipeline step."""
    def decorator(fn: Callable) -> Callable:
        @functools.wraps(fn)
        def wrapper(*args, **kwargs) -> Any:
            for attempt in range(retries + 1):
                try:
                    start = time.time()
                    logger.info(f"[TASK] {fn.__name__} starting (attempt {attempt+1})")
                    result = fn(*args, **kwargs)
                    elapsed = time.time() - start
                    logger.info(f"[TASK] {fn.__name__} completed in {elapsed:.2f}s")
                    return result
                except Exception as exc:
                    logger.warning(f"[TASK] {fn.__name__} failed: {exc}")
                    if attempt < retries:
                        time.sleep(retry_delay_seconds)
                    else:
                        raise
        return wrapper
    return decorator

def flow(name: str):
    """Decorator that marks a function as an orchestrated pipeline flow."""
    def decorator(fn: Callable) -> Callable:
        @functools.wraps(fn)
        def wrapper(*args, **kwargs) -> Any:
            logger.info(f"[FLOW] {name} started")
            start = time.time()
            result = fn(*args, **kwargs)
            elapsed = time.time() - start
            logger.info(f"[FLOW] {name} finished in {elapsed:.2f}s")
            return result
        return wrapper
    return decorator

Complete Pipeline: Ingest to Partition

Assembling all the pieces into a single, production-ready pipeline.

python
import pandas as pd
import great_expectations as gx
import subprocess
from datetime import datetime, timedelta
from pathlib import Path

RAW_DIR      = Path("data/raw")
CLEAN_DIR    = Path("data/clean")
FEATURE_DIR  = Path("data/features")
PARTITION_DIR = Path("data/partitioned")

@task(retries=3, retry_delay_seconds=10)
def ingest(source_url: str, date: str) -> pd.DataFrame:
    """Download raw events for a given date."""
    df = pd.read_parquet(f"{source_url}/dt={date}/")
    df["ingested_at"] = pd.Timestamp.utcnow().isoformat()
    raw_path = RAW_DIR / f"dt={date}" / "events.parquet"
    raw_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(raw_path, index=False)
    print(f"Ingested {len(df):,} rows for {date}")
    return df

@task(retries=1)
def validate(df: pd.DataFrame, date: str) -> pd.DataFrame:
    """Run Great Expectations validation on raw data."""
    context = gx.get_context()
    datasource = context.sources.add_or_update_pandas("pipeline_source")
    asset = datasource.add_dataframe_asset("raw_events")

    suite_name = "raw_events.quality"
    try:
        suite = context.get_expectation_suite(suite_name)
    except Exception:
        suite = context.add_expectation_suite(suite_name)

    validator = context.get_validator(
        batch_request=asset.build_batch_request(dataframe=df),
        expectation_suite_name=suite_name,
    )
    validator.expect_column_values_to_not_be_null("user_id")
    validator.expect_column_values_to_not_be_null("event_type")
    validator.expect_column_values_to_be_between(
        "session_duration_seconds", min_value=0, max_value=86400
    )
    validator.save_expectation_suite(discard_failed_expectations=False)

    checkpoint = context.add_or_update_checkpoint(
        name="raw_events_checkpoint",
        validations=[{
            "batch_request": asset.build_batch_request(dataframe=df),
            "expectation_suite_name": suite_name,
        }]
    )
    result = checkpoint.run()
    if not result["success"]:
        raise ValueError(f"Data validation failed for date={date}")

    return df

@task(retries=0)
def transform(df: pd.DataFrame, date: str) -> pd.DataFrame:
    """Clean and engineer features from raw events."""
    df = df.copy()

    # Normalise types
    df["event_timestamp"] = pd.to_datetime(df["event_timestamp"], utc=True)
    df["session_duration_seconds"] = df["session_duration_seconds"].clip(0, 86400)

    # Engineer features
    df["hour_of_day"]   = df["event_timestamp"].dt.hour
    df["day_of_week"]   = df["event_timestamp"].dt.dayofweek
    df["is_weekend"]    = df["day_of_week"].isin([5, 6]).astype(int)

    # Session-level aggregations
    session_features = df.groupby("session_id").agg(
        session_event_count  = ("event_type", "count"),
        session_duration     = ("session_duration_seconds", "max"),
        session_has_purchase = ("event_type", lambda x: (x == "purchase").any()),
    ).reset_index()
    df = df.merge(session_features, on="session_id", how="left")

    clean_path = CLEAN_DIR / f"dt={date}" / "features.parquet"
    clean_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(clean_path, index=False)
    return df

@task(retries=0)
def partition(df: pd.DataFrame, date: str) -> Path:
    """Partition transformed data by date and label for training."""
    out_dir = PARTITION_DIR / f"dt={date}"
    if out_dir.exists():
        import shutil
        shutil.rmtree(out_dir)          # idempotent: delete before write
    out_dir.mkdir(parents=True, exist_ok=True)

    # Temporal partition
    df.to_parquet(out_dir / "all.parquet", index=False)

    # Label-based partition (assumes binary label 'converted')
    if "converted" in df.columns:
        df[df["converted"] == 1].to_parquet(
            out_dir / "positive.parquet", index=False
        )
        df[df["converted"] == 0].to_parquet(
            out_dir / "negative.parquet", index=False
        )
    return out_dir

@task(retries=0)
def version_data(out_dir: Path, date: str) -> None:
    """Track the output partition with DVC."""
    subprocess.run(["dvc", "add", str(out_dir)], check=True)
    subprocess.run(
        ["git", "add", f"{out_dir}.dvc", ".gitignore"], check=True
    )
    subprocess.run(
        ["git", "commit", "-m", f"data: add partition dt={date}"],
        check=True
    )
    subprocess.run(["dvc", "push"], check=True)

@flow(name="ml-data-pipeline")
def run_pipeline(source_url: str, date: str) -> None:
    """End-to-end ML data pipeline: ingest → validate → transform → partition → version."""
    raw_df      = ingest(source_url, date)
    valid_df    = validate(raw_df, date)
    clean_df    = transform(valid_df, date)
    out_dir     = partition(clean_df, date)
    version_data(out_dir, date)
    print(f"Pipeline complete for date={date}. Output: {out_dir}")

if __name__ == "__main__":
    import sys
    date = sys.argv[1] if len(sys.argv) > 1 else "2025-03-28"
    run_pipeline(
        source_url="s3://company-data-lake/events",
        date=date
    )

Key Takeaways

  • Use ELT when your destination has powerful query compute; use ETL when you must enforce schema or mask sensitive fields at landing time.
  • Choose batch for training pipelines (simplicity, cost), micro-batch for near-real-time features, and streaming only when sub-minute latency is a genuine business requirement.
  • Idempotency requires deterministic output paths, delete-before-write semantics, and atomic renames — without these, retries corrupt data.
  • Great Expectations catches data quality regressions before they reach training; run it as a blocking checkpoint, not a logging step.
  • Handle schema changes explicitly by versioning your schema and maintaining migration logic during transition windows — additive changes are safe, breaking changes require coordination.
  • Partition training data by time and never shuffle across the temporal boundary; this is the most common source of evaluation leakage in ML pipelines.
  • DVC gives you reproducible data lineage alongside your Git history — every training run should be traceable to a specific data version.
  • Build your pipeline with @task/@flow patterns from day one so adoption of a real orchestrator (Prefect, Airflow, Dagster) is a drop-in swap.