GadaaLabs
Python Mastery — From Zero to AI Engineering
Lesson 17

Production Python — FastAPI, Packaging & Profiling

40 min

The Gap Between Scripts and Production

Most Python tutorials stop when the code "works on my machine." Production Python demands more:

  • Reproducibility — any engineer can install, run, and test the project in minutes
  • Correctness at the boundary — all external input is validated and typed
  • Observability — logs are structured JSON, parseable by Datadog/Splunk/CloudWatch
  • Performance — you know why code is slow before you optimize it
  • Deployability — the app ships as a container, not a zip of .py files

This lesson closes that gap systematically.


Project Structure: Flat vs Src Layout

Two layouts dominate Python projects. The choice matters for packaging correctness.

Flat layout — simple, good for applications and services:

ml_service/
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI app + lifespan
│   ├── models.py        # Pydantic schemas
│   ├── services/
│   │   ├── __init__.py
│   │   ├── predictor.py # ML inference logic
│   │   └── cache.py     # Redis caching
│   └── api/
│       ├── __init__.py
│       ├── predict.py   # /predict endpoint
│       └── health.py    # /health endpoint
├── tests/
│   ├── __init__.py
│   ├── test_predict.py
│   └── test_health.py
├── pyproject.toml
├── Dockerfile
└── .env.example

Src layout — mandatory for libraries published to PyPI:

my_library/
├── src/
│   └── my_library/
│       ├── __init__.py   # public API re-exported here
│       ├── core.py
│       └── utils.py
├── tests/
│   └── test_core.py
├── pyproject.toml
└── README.md

Why src layout prevents real bugs: With flat layout, running python -c "import my_library" from the project root imports from the local directory, not the installed package. A broken __init__.py would make tests pass (importing local) but deployment fail (importing installed). The src/ barrier forces proper install (pip install -e .) and catches this class of error during development.

Decision rule:

  • Building an API service, CLI app, or data pipeline → flat layout
  • Building a reusable library for PyPI → src layout

pyproject.toml — The Single Source of Truth

pyproject.toml replaced setup.py, setup.cfg, requirements.txt, and MANIFEST.in. It is the PEP 517/518/621 standard:

toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "ml-serving"
version = "0.2.0"
description = "FastAPI-based ML model serving with structured logging"
readme = "README.md"
requires-python = ">=3.11"
license = {text = "MIT"}
keywords = ["machine-learning", "fastapi", "mlops"]

dependencies = [
    "fastapi>=0.111.0",
    "uvicorn[standard]>=0.29.0",    # [standard] = httptools + uvloop (fast event loop)
    "pydantic>=2.7.0",
    "scikit-learn>=1.4.0",
    "numpy>=1.26.0",
    "structlog>=24.0.0",            # Structured logging library
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-asyncio>=0.23.0",
    "httpx>=0.27.0",    # Required for FastAPI TestClient with async
    "ruff>=0.4.0",
    "mypy>=1.10.0",
    "coverage>=7.0.0",
]
prod = [
    "gunicorn>=22.0.0",     # Process manager for uvicorn in production
    "prometheus-client>=0.20.0",
]

[project.scripts]
# Creates a CLI command: "serve" -> calls ml_serving.main:start()
serve = "ml_serving.main:start"

[tool.ruff]
line-length = 88
target-version = "py311"
select = ["E", "F", "I", "N", "UP", "B", "C90"]

[tool.ruff.per-file-ignores]
"tests/**" = ["S101"]    # Allow assert in tests

[tool.mypy]
strict = true
python_version = "3.11"
ignore_missing_imports = true

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
addopts = "--tb=short -q"

[tool.coverage.run]
source = ["app"]
omit = ["tests/*"]

[tool.coverage.report]
fail_under = 80

Key commands:

bash
pip install -e ".[dev]"    # Install in editable mode with dev extras
ruff check .               # Lint
ruff format .              # Format (replaces Black)
mypy app/                  # Type check
pytest --cov=app           # Test with coverage
python -m build            # Build wheel + sdist for PyPI

Virtual Environments and Dependency Pinning

Always work in virtual environments. Never install packages globally:

bash
python -m venv .venv
source .venv/bin/activate    # Linux/Mac
.venv\Scripts\activate       # Windows

pip install -e ".[dev]"
pip freeze > requirements.lock    # Pin exact versions for reproducibility

For production, pin exact versions:

bash
# requirements.lock — exact pins, used in Docker
fastapi==0.111.1
uvicorn==0.29.0
pydantic==2.7.2
numpy==1.26.4
scikit-learn==1.4.2

The pyproject.toml uses >= (minimum versions, flexible). requirements.lock uses == (pinned, reproducible builds). Docker uses the lock file. Developers install from pyproject.toml.


FastAPI: From Basics to Production

FastAPI generates OpenAPI docs, validates requests via Pydantic, and handles async natively. Here's a complete production-grade ML API:

The Lifespan Context Manager

python
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import pickle
import logging

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load resources on startup, clean up on shutdown."""
    # ── STARTUP ────────────────────────────────────────────────────────────────
    logger.info("startup.begin", extra={"stage": "model_load"})

    try:
        with open("artifacts/model.pkl", "rb") as f:
            app.state.model = pickle.load(f)
        with open("artifacts/scaler.pkl", "rb") as f:
            app.state.scaler = pickle.load(f)
        app.state.model_version = "v2.1.0"
        logger.info("startup.complete", extra={"model_version": app.state.model_version})
    except FileNotFoundError as e:
        logger.error("startup.failed", extra={"error": str(e)})
        raise RuntimeError("Cannot start: model artifacts missing") from e

    yield   # App runs here

    # ── SHUTDOWN ───────────────────────────────────────────────────────────────
    # Free GPU memory, close DB connections, flush metrics
    logger.info("shutdown.complete")

app = FastAPI(
    title="Churn Predictor API",
    version="2.1.0",
    description="Real-time ML inference with structured logging and OpenAPI docs",
    lifespan=lifespan,
)

Why lifespan, not module-level globals? Module-level code runs once at import time — in tests, this tries to load the model file and fails. The lifespan manager only runs when the ASGI server starts, and tests can mock app.state before the test client starts.

Pydantic Schemas — The API Contract

python
# app/models.py
from pydantic import BaseModel, Field, field_validator, computed_field
from typing import Literal
from datetime import datetime

class PredictRequest(BaseModel):
    """Input validation — all fields checked before the route handler runs."""

    tenure_months: int = Field(
        ...,                           # required (no default)
        ge=1, le=120,
        description="Customer tenure in months",
        examples=[24]
    )
    monthly_charges: float = Field(..., gt=0.0, le=1000.0)
    contract_type: Literal["month-to-month", "one_year", "two_year"]
    num_products: int = Field(..., ge=1, le=8)
    internet_service: Literal["DSL", "Fiber", "None"] = "DSL"

    @field_validator("monthly_charges")
    @classmethod
    def charges_reasonable(cls, v: float) -> float:
        if v < 18.0:
            raise ValueError(
                f"monthly_charges={v} is below our minimum plan of $18"
            )
        return round(v, 2)

class PredictResponse(BaseModel):
    churn_probability: float = Field(..., ge=0.0, le=1.0)
    churn_prediction: bool
    risk_tier: Literal["low", "medium", "high"]
    model_version: str
    predicted_at: datetime

    @computed_field                    # Derived at serialization time
    @property
    def recommendation(self) -> str:
        if self.risk_tier == "high":
            return "Immediate retention offer — 20% discount"
        if self.risk_tier == "medium":
            return "Proactive check-in call within 7 days"
        return "Standard engagement — no action required"

Dependency Injection

python
# app/dependencies.py
from fastapi import Request, HTTPException, Header
from typing import Annotated
import os

# Dependencies are reusable, testable, and composable
async def get_model(request: Request):
    """Inject the loaded model from app.state."""
    if not hasattr(request.app.state, "model"):
        raise HTTPException(503, "Model not loaded — service starting up")
    return request.app.state.model

async def require_api_key(
    x_api_key: Annotated[str | None, Header()] = None
):
    """Authentication via API key header."""
    valid_keys = set(os.environ.get("API_KEYS", "dev-key").split(","))
    if x_api_key not in valid_keys:
        raise HTTPException(401, "Invalid or missing API key")
    return x_api_key

# Usage in route:
# async def predict(
#     request: PredictRequest,
#     model = Depends(get_model),
#     _auth = Depends(require_api_key),
# ):

The Prediction Endpoint

python
# app/api/predict.py
import numpy as np
from fastapi import APIRouter, Depends, HTTPException
from fastapi.concurrency import run_in_threadpool

from app.models import PredictRequest, PredictResponse
from app.dependencies import get_model, require_api_key
from datetime import datetime, timezone

router = APIRouter(prefix="/predict", tags=["inference"])

CONTRACT_ENCODING = {
    "month-to-month": 0,
    "one_year":        1,
    "two_year":        2,
}

INTERNET_ENCODING = {"DSL": 0, "Fiber": 1, "None": 2}

def _sync_predict(model, scaler, req: PredictRequest) -> dict:
    """CPU-bound inference — runs in a thread pool."""
    features = np.array([[
        req.tenure_months,
        req.monthly_charges,
        CONTRACT_ENCODING[req.contract_type],
        req.num_products,
        INTERNET_ENCODING[req.internet_service],
    ]])
    features_scaled = scaler.transform(features)
    proba = float(model.predict_proba(features_scaled)[0, 1])

    risk = "high" if proba > 0.7 else "medium" if proba > 0.4 else "low"
    return {
        "churn_probability": round(proba, 4),
        "churn_prediction": proba > 0.5,
        "risk_tier": risk,
    }

@router.post("/", response_model=PredictResponse)
async def predict(
    request: PredictRequest,
    model = Depends(get_model),
    _auth = Depends(require_api_key),
):
    try:
        # run_in_threadpool: offload CPU work, keep event loop responsive
        result = await run_in_threadpool(
            _sync_predict, model, request.app.state.scaler, request
        )
        return PredictResponse(
            **result,
            model_version=request.app.state.model_version,
            predicted_at=datetime.now(timezone.utc),
        )
    except Exception as e:
        raise HTTPException(500, detail=f"Inference failed: {e}") from e

Middleware

python
# app/middleware.py
import uuid, time
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request
import structlog

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

log = structlog.get_logger()

class ObservabilityMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Trace across services: read from header or generate new
        cid = request.headers.get("X-Correlation-ID", uuid.uuid4().hex[:12])
        token = correlation_id.set(cid)

        start = time.perf_counter()
        try:
            response = await call_next(request)
            elapsed_ms = (time.perf_counter() - start) * 1000

            log.info(
                "request.completed",
                method=request.method,
                path=request.url.path,
                status_code=response.status_code,
                duration_ms=round(elapsed_ms, 2),
                correlation_id=cid,
            )
            response.headers["X-Correlation-ID"] = cid
            response.headers["X-Response-Time-Ms"] = str(round(elapsed_ms, 2))
            return response
        finally:
            correlation_id.reset(token)

Pydantic v2 — Deep Dive

Pydantic v2 (2023) rewrote the core in Rust, achieving 5–17x speedup. The API changed significantly:

Pydantic v2 Validation Patterns
Click Run to execute — Python runs in your browser via WebAssembly

Profiling: Measure Before You Optimize

The cardinal rule: never optimize without measuring. Profiling tells you where the time actually goes — not where you think it goes.

CPU Profiling with cProfile and timeit

CPU Profiling: Finding Real Bottlenecks
Click Run to execute — Python runs in your browser via WebAssembly

Memory Profiling with tracemalloc

Memory Profiling with tracemalloc
Click Run to execute — Python runs in your browser via WebAssembly

Performance Optimization Patterns

Python Performance Optimization Patterns
Click Run to execute — Python runs in your browser via WebAssembly

Structured Logging

Production logs must be machine-parseable. Structured JSON logs are queryable by any log aggregation system (Datadog, Splunk, AWS CloudWatch):

python
# app/logging_config.py
import logging
import json
import sys
from contextvars import ContextVar

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

class JSONFormatter(logging.Formatter):
    """Emit one JSON object per log line — parseable by any log aggregator."""

    RESERVED = {"name", "msg", "args", "levelname", "levelno", "pathname",
                 "filename", "module", "exc_info", "exc_text", "stack_info",
                 "lineno", "funcName", "created", "msecs", "relativeCreated",
                 "thread", "threadName", "processName", "process"}

    def format(self, record: logging.LogRecord) -> str:
        log_obj = {
            "timestamp": self.formatTime(record, "%Y-%m-%dT%H:%M:%S.%03dZ"),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "line": record.lineno,
            "correlation_id": correlation_id.get(""),
        }
        # Include any extra= fields that aren't internal logging fields
        for key, value in record.__dict__.items():
            if key not in self.RESERVED and not key.startswith("_"):
                log_obj[key] = value

        if record.exc_info:
            log_obj["exception"] = self.formatException(record.exc_info)

        return json.dumps(log_obj, default=str)

def configure_logging(level: str = "INFO") -> None:
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JSONFormatter())
    logging.basicConfig(level=level, handlers=[handler])

# Usage:
# logger = logging.getLogger(__name__)
# logger.info("prediction.complete", extra={
#     "model_version": "v2.1",
#     "tenure": 24,
#     "churn_probability": 0.73,
#     "duration_ms": 12.4,
# })
#
# Output:
# {"timestamp": "2024-01-15T10:23:45.123Z", "level": "INFO",
#  "logger": "app.api.predict", "message": "prediction.complete",
#  "model_version": "v2.1", "churn_probability": 0.73, "duration_ms": 12.4}

Logging levels — use them correctly:

DEBUG    — Detailed internals: "Loading 3,412 rows from cache"
INFO     — Normal events: "prediction.complete", "model.loaded"
WARNING  — Degraded but functional: "cache.miss, falling back to DB"
ERROR    — Request failed, but service alive: "prediction.failed: ValueError"
CRITICAL — Service is broken: "model.load.failed — cannot serve requests"

Environment Configuration with Pydantic Settings

Environment Configuration Pattern
Click Run to execute — Python runs in your browser via WebAssembly

Testing a FastAPI Application

python
# tests/test_predict.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, AsyncMock, patch
import numpy as np
from app.main import app

@pytest.fixture(scope="module")
def mock_model():
    """Reusable mock — predict_proba returns 70% churn."""
    model = MagicMock()
    model.predict_proba.return_value = np.array([[0.30, 0.70]])
    return model

@pytest.fixture(scope="module")
def mock_scaler():
    scaler = MagicMock()
    scaler.transform.side_effect = lambda x: x   # Identity transform
    return scaler

@pytest.fixture(scope="module")
def client(mock_model, mock_scaler):
    """TestClient with model pre-loaded in app.state."""
    with TestClient(app) as c:
        c.app.state.model = mock_model
        c.app.state.scaler = mock_scaler
        c.app.state.model_version = "test-v1"
        yield c

# ── Happy path ────────────────────────────────────────────────────────────────
def test_predict_high_churn(client):
    response = client.post(
        "/predict/",
        json={
            "tenure_months": 3,
            "monthly_charges": 89.5,
            "contract_type": "month-to-month",
            "num_products": 1,
            "internet_service": "Fiber",
        },
        headers={"X-Api-Key": "dev-key"},
    )
    assert response.status_code == 200
    data = response.json()
    assert data["churn_prediction"] is True        # 70% > 50%
    assert data["risk_tier"] == "high"             # 70% > 70%
    assert "recommendation" in data                # computed_field
    assert data["model_version"] == "test-v1"

# ── Validation errors (422 Unprocessable Entity) ───────────────────────────────
@pytest.mark.parametrize("bad_payload,error_field", [
    ({"tenure_months": 0}, "tenure_months"),        # ge=1 violated
    ({"monthly_charges": 5.0}, "monthly_charges"),  # custom validator
    ({"contract_type": "weekly"}, "contract_type"), # not in Literal
])
def test_predict_validation(client, bad_payload):
    base = {
        "tenure_months": 12, "monthly_charges": 55.0,
        "contract_type": "month-to-month", "num_products": 2,
    }
    base.update(bad_payload)
    response = client.post("/predict/", json=base, headers={"X-Api-Key": "dev-key"})
    assert response.status_code == 422

# ── Authentication ─────────────────────────────────────────────────────────────
def test_predict_no_auth(client):
    response = client.post("/predict/", json={})
    assert response.status_code == 401

Docker: Multi-Stage Builds

dockerfile
# ── Stage 1: Builder ──────────────────────────────────────────────────────────
FROM python:3.11-slim AS builder
WORKDIR /build

# Install build tools
RUN pip install --no-cache-dir build hatchling

# Copy dependency spec FIRST — this layer is cached unless pyproject.toml changes
COPY pyproject.toml .
RUN pip install --no-cache-dir ".[prod]" --target=/deps

# ── Stage 2: Runtime ──────────────────────────────────────────────────────────
FROM python:3.11-slim AS runtime
WORKDIR /app

# Non-root user — mandatory for production security
RUN groupadd -r appgroup && useradd -r -g appgroup appuser

# Copy installed packages from builder
COPY --from=builder /deps /usr/local/lib/python3.11/site-packages/

# Copy application code LAST — changes here don't invalidate dependency layer
COPY app/ app/
COPY artifacts/ artifacts/   # Pre-trained model files

RUN chown -R appuser:appgroup /app
USER appuser

EXPOSE 8000

# --workers: CPU count. --worker-class uvicorn.workers.UvicornWorker: async.
CMD ["gunicorn", "app.main:app",
     "--workers", "4",
     "--worker-class", "uvicorn.workers.UvicornWorker",
     "--bind", "0.0.0.0:8000",
     "--timeout", "30",
     "--access-logfile", "-",
     "--error-logfile", "-"]

Layer ordering is a performance decision:

Rarely changes  → COPY pyproject.toml + RUN pip install  (cached layer, fast rebuilds)
Often changes   → COPY app/                               (invalidates only this + subsequent)

A model file change rebuilds only the last 2 layers — 5s instead of 3min.


Full Project: ML Serving API

Full ML Serving API — Business Logic Layer
Click Run to execute — Python runs in your browser via WebAssembly

Exercises

Exercise 1 — Profiling Challenge Write a function word_frequency(text: str) -> dict[str, int] using three approaches: (a) split + loop, (b) Counter, (c) regex + Counter. Use timeit to benchmark all three on a 10,000-word text. Report which is fastest and why.

Exercise 1 — Word Frequency Profiling
Click Run to execute — Python runs in your browser via WebAssembly

Exercise 2 — Pydantic-Style Validator Implement a DatasetConfig validator class that validates: name (3-50 chars, alphanumeric + underscores only), n_rows (100–10,000,000), split_ratio (0.05–0.4), and features (non-empty list of strings, each 1-30 chars). Raise descriptive errors for all violations.

Exercise 2 — Validator Implementation
Click Run to execute — Python runs in your browser via WebAssembly

Exercise 3 — Memory-Efficient Data Pipeline Write a DataPipeline class with methods from_csv_chunks(path, chunksize) (generator), filter(predicate), transform(fn), and to_stats(). It should never hold more than one chunk in memory. Use tracemalloc to verify memory stays below 10MB even for a 1M row dataset.

Exercise 4 — Rate Limiter Implement a TokenBucketRateLimiter with capacity, refill_rate (tokens/second), and consume(n=1) method. Test it under burst load: 100 requests in 1 second against a limiter with capacity=20, refill_rate=5/sec. Count how many succeed.

Exercise 4 — Token Bucket Rate Limiter
Click Run to execute — Python runs in your browser via WebAssembly

Exercise 5 — Benchmark String Formatting Methods Benchmark five ways to build a log line from 5 variables: % formatting, .format(), f-string, Template, and "".join() on a pre-built list. Rank them by speed. Then explain why f-strings and % are faster than .format().

Exercise 6 — Structured Logger Implement a StructuredLogger class that wraps Python's logging module and emits JSON. Each call to info(event, **kwargs) should produce one JSON line with timestamp, level, event, and all extra kwargs. Write 3 tests using caplog (pytest fixture).

Exercise 7 — Configuration Loader Build a Config class that loads settings from three sources in priority order: (1) environment variables, (2) a config.json file, (3) hardcoded defaults. Write a test that patches os.environ and verifies environment variables override file values.

Exercise 8 — Production Health Check Implement a HealthChecker class with async check() method that checks: model loaded, memory below 80% of 4GB, last prediction within 60 seconds, and no more than 5 errors in the last 60 seconds. Return {"status": "healthy"|"degraded"|"unhealthy", "checks": {...}}.