GadaaLabs
Data Analysis with Python — Expert Practitioner Track
Lesson 2

Data Ingestion, Profiling & First Contact

22 min

The First Contact Problem

Loading a dataset is the easiest step in any analysis. Understanding what you have loaded is the hardest. Most analysts run .head(), glance at a few rows, and proceed. Expert analysts treat first contact as a distinct, structured phase that produces documented output before any transformation begins.

This lesson covers two things: how to load data from every common source with production-quality parameters, and how to build an automated profiling system that gives you a complete picture of any DataFrame in seconds.


Loading from CSV — Every Parameter That Matters

pd.read_csv has over 50 parameters. Most analysts use none of them. The parameters below are the ones that actually affect analysis correctness.

python
import pandas as pd
import numpy as np
from pathlib import Path

# Basic load — adequate for small clean files only
df_basic = pd.read_csv("data/orders.csv")

# Production load — explicit types prevent silent errors
orders = pd.read_csv(
    "data/orders.csv",

    # Explicitly specify dtypes for known columns.
    # Without this, pandas infers — often incorrectly for IDs and codes.
    dtype={
        "order_id": "int64",
        "customer_id": "int64",
        "product_id": "str",       # Product codes like "SKU-001" must stay string
        "quantity": "int32",
        "status": "category",      # Repeated string values → category saves memory
    },

    # Parse date columns during load — faster than post-load pd.to_datetime
    parse_dates=["order_date", "shipped_date", "delivered_date"],

    # Treat these strings as NaN in addition to the default set
    na_values=["N/A", "n/a", "NONE", "none", "-", "--", "unknown", "Unknown"],

    # Suppress DtypeWarning: mixed types in a column.
    # IMPORTANT: Always investigate the warning before suppressing it.
    # low_memory=False reads the whole file before inferring types — slower but accurate.
    low_memory=False,

    # Skip completely empty rows
    skip_blank_lines=True,

    # Encoding — always specify; latin-1 handles most Western European legacy data
    encoding="utf-8",  # Change to "latin-1" if you see UnicodeDecodeError
)

print(f"Loaded: {orders.shape[0]:,} rows × {orders.shape[1]} columns")
print(f"Memory: {orders.memory_usage(deep=True).sum() / 1e6:.1f} MB")

Loading Large Files in Chunks

When a file exceeds available RAM, read it in chunks:

python
from typing import Callable
import pandas as pd


def load_large_csv_filtered(
    path: str,
    chunk_size: int = 100_000,
    filter_fn: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
) -> pd.DataFrame:
    """
    Load a large CSV in chunks, apply an optional filter to each chunk,
    and concatenate into a single DataFrame.

    Args:
        path: Path to the CSV file.
        chunk_size: Rows per chunk.
        filter_fn: Optional function applied to each chunk before appending.

    Returns:
        Filtered DataFrame.
    """
    chunks = []
    rows_read = 0

    with pd.read_csv(path, chunksize=chunk_size) as reader:
        for chunk in reader:
            rows_read += len(chunk)
            if filter_fn is not None:
                chunk = filter_fn(chunk)
            if len(chunk) > 0:
                chunks.append(chunk)

    result = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame()
    print(f"Read {rows_read:,} rows total, kept {len(result):,} after filter.")
    return result


# Example: load only 2023 orders from a 10M row file
def filter_2023(chunk: pd.DataFrame) -> pd.DataFrame:
    chunk["order_date"] = pd.to_datetime(chunk["order_date"], errors="coerce")
    return chunk[chunk["order_date"].dt.year == 2023]


# orders_2023 = load_large_csv_filtered("data/orders_full.csv", filter_fn=filter_2023)

Loading from Excel

Excel files introduce complexity that CSV files do not: multiple sheets, merged header rows, data that starts on row 3, trailing summary rows. Handle all of it explicitly.

python
import pandas as pd

# Single sheet, default header row 0
df = pd.read_excel("data/sales_report.xlsx", sheet_name="Orders")

# Multiple sheets at once → returns dict of DataFrames
sheets = pd.read_excel("data/sales_report.xlsx", sheet_name=None)  # None = all sheets
print("Sheets found:", list(sheets.keys()))

# Header on row 2 (0-indexed → row index 1), skip first row
df_awkward = pd.read_excel(
    "data/legacy_report.xlsx",
    sheet_name="Revenue",
    header=1,        # Row index 1 contains column names
    skiprows=[0],    # Skip the first row (e.g., a title row)
    skipfooter=3,    # Skip the last 3 rows (e.g., "Total" summary rows)
    usecols="A:H",   # Read only columns A through H
    dtype={"Revenue": float, "Units": int},
)

# Combine multiple sheets with source tracking
def load_all_sheets(path: str) -> pd.DataFrame:
    """Load all sheets from an Excel workbook and add a 'sheet' column."""
    all_sheets = pd.read_excel(path, sheet_name=None)
    frames = []
    for sheet_name, df in all_sheets.items():
        df = df.copy()
        df["source_sheet"] = sheet_name
        frames.append(df)
    combined = pd.concat(frames, ignore_index=True)
    print(f"Combined {len(all_sheets)} sheets: {combined.shape}")
    return combined

Loading from SQL Databases

SQL databases are the most common data source in production analytics. Use SQLAlchemy as the connection layer — it gives you database portability and query safety.

python
import pandas as pd
from sqlalchemy import create_engine, text

# Connection strings by database type:
# PostgreSQL:  postgresql+psycopg2://user:pass@host:5432/dbname
# MySQL:       mysql+pymysql://user:pass@host:3306/dbname
# SQLite:      sqlite:///path/to/file.db
# BigQuery:    bigquery://project/dataset  (requires sqlalchemy-bigquery)

engine = create_engine(
    "postgresql+psycopg2://analyst:secret@db.internal:5432/ecommerce",
    # Connection pool settings for repeated queries
    pool_size=5,
    max_overflow=2,
    pool_pre_ping=True,  # Test connections before using (handles stale connections)
)

# Simple query
orders = pd.read_sql(
    "SELECT * FROM orders WHERE order_date >= '2023-01-01'",
    con=engine,
    parse_dates=["order_date", "shipped_date"],
)

# Parameterised query — NEVER use f-strings for user input (SQL injection risk)
start_date = "2023-01-01"
end_date = "2023-09-30"

orders = pd.read_sql(
    text("SELECT * FROM orders WHERE order_date BETWEEN :start AND :end"),
    con=engine,
    params={"start": start_date, "end": end_date},
    parse_dates=["order_date"],
)

# Chunking large tables — avoids loading millions of rows into RAM at once
def load_sql_chunked(
    query: str,
    engine,
    chunk_size: int = 50_000,
) -> pd.DataFrame:
    """Load a large SQL result set in chunks."""
    chunks = []
    for chunk in pd.read_sql(query, con=engine, chunksize=chunk_size):
        chunks.append(chunk)
    return pd.concat(chunks, ignore_index=True)


# Best practice: push filtering and aggregation to the database
# BAD: Load 10M rows, filter in Python
# bad_df = pd.read_sql("SELECT * FROM events", engine)
# filtered = bad_df[bad_df["event_type"] == "purchase"]

# GOOD: Filter in SQL, load only what you need
# good_df = pd.read_sql(
#     "SELECT * FROM events WHERE event_type = 'purchase' AND event_date >= '2023-01-01'",
#     engine
# )

Loading from JSON and Nested JSON

JSON is the lingua franca of APIs. Nested JSON — objects within objects within arrays — is the norm, not the exception.

python
import pandas as pd
import json
from pathlib import Path

# Flat JSON (list of records)
orders = pd.read_json("data/orders.json")

# Nested JSON — use json_normalize
with open("data/orders_nested.json") as f:
    raw = json.load(f)

# Example structure:
# [
#   {
#     "order_id": 1001,
#     "customer": {"id": 42, "name": "Alice", "tier": "premium"},
#     "items": [{"sku": "A1", "qty": 2, "price": 29.99}],
#     "shipping": {"city": "Berlin", "country": "DE"}
#   },
#   ...
# ]

# Flatten one level: customer and shipping nested objects
orders_flat = pd.json_normalize(
    raw,
    # Columns from nested dicts become "customer.id", "customer.name", etc.
    sep=".",
    # Expand the "items" list into separate rows, keeping parent fields
    record_path="items",
    meta=[
        "order_id",
        ["customer", "id"],
        ["customer", "tier"],
        ["shipping", "city"],
        ["shipping", "country"],
    ],
    meta_prefix="",        # No prefix for meta fields
    errors="ignore",       # Skip records missing optional nested fields
)

# Rename flattened columns
orders_flat = orders_flat.rename(columns={
    "customer.id": "customer_id",
    "customer.tier": "customer_tier",
    "shipping.city": "city",
    "shipping.country": "country",
})

print(orders_flat.head(3))
print(orders_flat.dtypes)

Loading from APIs with Pagination

Most production APIs paginate results. A robust loader handles pagination automatically.

python
import requests
import pandas as pd
from typing import Any
import time


def fetch_paginated_api(
    base_url: str,
    endpoint: str,
    headers: dict[str, str],
    params: dict[str, Any] | None = None,
    page_param: str = "page",
    page_size_param: str = "per_page",
    page_size: int = 100,
    results_key: str = "data",
    max_pages: int = 500,
    rate_limit_sleep: float = 0.1,
) -> pd.DataFrame:
    """
    Fetch all pages from a paginated REST API endpoint.

    Args:
        base_url: Root URL, e.g. "https://api.example.com"
        endpoint: Path, e.g. "/v1/orders"
        headers: Auth headers, e.g. {"Authorization": "Bearer TOKEN"}
        params: Additional query params
        page_param: Name of the page number parameter
        page_size_param: Name of the page size parameter
        page_size: Records per request
        results_key: JSON key that contains the list of records
        max_pages: Safety limit to prevent infinite loops
        rate_limit_sleep: Seconds to sleep between requests

    Returns:
        All records as a DataFrame.
    """
    params = params or {}
    params[page_size_param] = page_size

    all_records: list[dict] = []
    page = 1

    while page <= max_pages:
        params[page_param] = page
        response = requests.get(
            f"{base_url}{endpoint}",
            headers=headers,
            params=params,
            timeout=30,
        )
        response.raise_for_status()
        payload = response.json()

        records = payload.get(results_key, [])
        if not records:
            break  # No more data

        all_records.extend(records)
        print(f"  Page {page}: fetched {len(records)} records (total: {len(all_records):,})")

        # Some APIs signal last page explicitly
        if not payload.get("has_more", True):
            break

        page += 1
        time.sleep(rate_limit_sleep)

    df = pd.DataFrame(all_records)
    print(f"Loaded {len(df):,} records from {page - 1} pages.")
    return df


# Usage example (commented out — requires live API)
# orders_api = fetch_paginated_api(
#     base_url="https://api.gadaalabs.com",
#     endpoint="/v1/orders",
#     headers={"Authorization": f"Bearer {API_TOKEN}"},
#     params={"status": "completed", "start_date": "2023-01-01"},
# )

First Contact Checklist

After loading, run this checklist on every new dataset before doing anything else:

python
import pandas as pd
import numpy as np


def first_contact(df: pd.DataFrame, name: str = "DataFrame") -> None:
    """
    Run the standard first-contact checklist on a newly loaded DataFrame.
    Prints a structured summary to stdout.
    """
    print(f"{'=' * 60}")
    print(f"FIRST CONTACT: {name}")
    print(f"{'=' * 60}")

    # 1. Shape
    rows, cols = df.shape
    print(f"\n[Shape] {rows:,} rows × {cols} columns")

    # 2. Memory
    mem_mb = df.memory_usage(deep=True).sum() / 1e6
    print(f"[Memory] {mem_mb:.1f} MB")

    # 3. Column names and types
    print(f"\n[Dtypes]")
    dtype_summary = df.dtypes.value_counts()
    for dtype, count in dtype_summary.items():
        print(f"  {str(dtype):<15} {count} columns")

    # 4. Null counts
    null_counts = df.isnull().sum()
    null_cols = null_counts[null_counts > 0]
    if len(null_cols) > 0:
        print(f"\n[Nulls] {len(null_cols)} columns have nulls:")
        for col, count in null_cols.sort_values(ascending=False).items():
            pct = count / rows * 100
            print(f"  {col:<30} {count:>8,} ({pct:.1f}%)")
    else:
        print(f"\n[Nulls] None — all columns complete.")

    # 5. Duplicate rows
    n_dupes = df.duplicated().sum()
    print(f"\n[Duplicates] {n_dupes:,} fully duplicate rows ({n_dupes/rows*100:.2f}%)")

    # 6. Numeric summary
    numeric_cols = df.select_dtypes(include="number").columns
    if len(numeric_cols) > 0:
        print(f"\n[Numeric Summary] {len(numeric_cols)} numeric columns:")
        print(df[numeric_cols].describe().round(2).to_string())

    # 7. Categorical/object summary
    cat_cols = df.select_dtypes(include=["object", "category"]).columns
    if len(cat_cols) > 0:
        print(f"\n[Categorical Columns] {len(cat_cols)} columns:")
        for col in cat_cols[:10]:  # Show first 10 to avoid flooding output
            n_unique = df[col].nunique()
            top_val = df[col].value_counts().index[0] if n_unique > 0 else "N/A"
            print(f"  {col:<30} unique: {n_unique:>6,}  top: '{top_val}'")

    # 8. Date columns
    date_cols = df.select_dtypes(include=["datetime64"]).columns
    if len(date_cols) > 0:
        print(f"\n[Date Columns]:")
        for col in date_cols:
            print(f"  {col:<30} min: {df[col].min()}  max: {df[col].max()}")

    print(f"\n{'=' * 60}\n")


# Simulate a dataset for demonstration
np.random.seed(0)
n = 2000
demo_orders = pd.DataFrame({
    "order_id": range(1001, 1001 + n),
    "customer_id": np.random.choice(range(1, 501), size=n),
    "product_id": np.random.choice(["SKU-001", "SKU-002", "SKU-003", "SKU-004"], size=n),
    "order_date": pd.date_range("2023-01-01", periods=n, freq="2h"),
    "revenue": np.random.exponential(scale=80, size=n).round(2),
    "quantity": np.random.randint(1, 6, size=n),
    "status": np.random.choice(["completed", "cancelled", "refunded"], p=[0.75, 0.15, 0.10], size=n),
    "channel": np.random.choice(["organic", "paid", "email", "direct"], size=n),
    "country": np.random.choice(["US", "DE", "UK", "FR", None], p=[0.4, 0.2, 0.2, 0.1, 0.1], size=n),
})

first_contact(demo_orders, name="orders")

Automated Column Profiling

The first_contact function gives an overview. The profile_dataframe function goes deeper — one rich summary per column, formatted for logging and comparison.

python
from typing import Any
import pandas as pd
import numpy as np


def profile_column(series: pd.Series) -> dict[str, Any]:
    """
    Generate a detailed profile for a single Series.
    Returns a dict suitable for appending to a profiling report DataFrame.
    """
    n_total = len(series)
    n_null = series.isnull().sum()
    n_non_null = n_total - n_null
    n_unique = series.nunique(dropna=True)

    profile: dict[str, Any] = {
        "column": series.name,
        "dtype": str(series.dtype),
        "n_total": n_total,
        "n_null": int(n_null),
        "null_pct": round(n_null / n_total * 100, 2) if n_total > 0 else 0.0,
        "n_unique": int(n_unique),
        "cardinality_ratio": round(n_unique / n_non_null, 4) if n_non_null > 0 else None,
        "completeness_pct": round(n_non_null / n_total * 100, 2) if n_total > 0 else 0.0,
    }

    if pd.api.types.is_numeric_dtype(series):
        desc = series.describe()
        profile.update({
            "min": round(float(desc["min"]), 4) if not pd.isna(desc["min"]) else None,
            "max": round(float(desc["max"]), 4) if not pd.isna(desc["max"]) else None,
            "mean": round(float(desc["mean"]), 4) if not pd.isna(desc["mean"]) else None,
            "median": round(float(series.median()), 4),
            "std": round(float(desc["std"]), 4) if not pd.isna(desc["std"]) else None,
            "skewness": round(float(series.skew()), 4),
            "kurtosis": round(float(series.kurt()), 4),
            "n_zeros": int((series == 0).sum()),
            "n_negative": int((series < 0).sum()),
            "sample_values": series.dropna().sample(min(5, n_non_null), random_state=42).tolist(),
            "top_categories": None,
        })
    elif pd.api.types.is_datetime64_any_dtype(series):
        profile.update({
            "min": str(series.min()),
            "max": str(series.max()),
            "mean": None,
            "median": None,
            "std": None,
            "skewness": None,
            "kurtosis": None,
            "n_zeros": None,
            "n_negative": None,
            "sample_values": [str(v) for v in series.dropna().sample(min(5, n_non_null), random_state=42).tolist()],
            "top_categories": None,
        })
    else:
        # Categorical / object / string
        value_counts = series.value_counts(dropna=True)
        top_cats = value_counts.head(5).to_dict()
        profile.update({
            "min": None,
            "max": None,
            "mean": None,
            "median": None,
            "std": None,
            "skewness": None,
            "kurtosis": None,
            "n_zeros": None,
            "n_negative": None,
            "sample_values": list(value_counts.head(5).index),
            "top_categories": {str(k): int(v) for k, v in top_cats.items()},
        })

    return profile


def profile_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Profile every column in a DataFrame.

    Returns:
        A DataFrame where each row is a column profile.
    """
    profiles = [profile_column(df[col]) for col in df.columns]
    report = pd.DataFrame(profiles).set_index("column")
    return report


# Generate profiling report
profile_report = profile_dataframe(demo_orders)

# Show high-level view
print("Profiling Report:")
print(profile_report[["dtype", "null_pct", "completeness_pct", "n_unique", "cardinality_ratio"]].to_string())

# Columns with high null rates — flag for quality review
high_null = profile_report[profile_report["null_pct"] > 5]
print(f"\nColumns with >5% nulls: {list(high_null.index)}")

# High cardinality columns — may indicate ID-like columns that shouldn't be categorical
high_card = profile_report[profile_report["cardinality_ratio"] > 0.5]
print(f"High-cardinality columns: {list(high_card.index)}")

Detecting Schema Drift

In recurring analyses, data arrives from pipelines that can change without warning. Schema drift — unexpected column additions, removals, or type changes — breaks your code silently. Detect it explicitly.

python
from typing import NamedTuple


class SchemaDriftReport(NamedTuple):
    missing_columns: list[str]     # Expected but not present
    extra_columns: list[str]       # Present but not expected
    type_mismatches: dict[str, tuple[str, str]]  # col → (expected, actual)
    is_clean: bool


def check_schema_drift(
    df: pd.DataFrame,
    expected_schema: dict[str, str],  # {column_name: expected_dtype_string}
) -> SchemaDriftReport:
    """
    Compare actual DataFrame schema against expected schema.

    Args:
        df: Loaded DataFrame.
        expected_schema: Dict of {col_name: dtype_string}, e.g.
                         {"order_id": "int64", "order_date": "datetime64[ns]"}

    Returns:
        SchemaDriftReport with details of any drift detected.
    """
    actual_schema = {col: str(dtype) for col, dtype in df.dtypes.items()}

    missing = [col for col in expected_schema if col not in actual_schema]
    extra = [col for col in actual_schema if col not in expected_schema]

    mismatches = {}
    for col in expected_schema:
        if col in actual_schema and actual_schema[col] != expected_schema[col]:
            mismatches[col] = (expected_schema[col], actual_schema[col])

    is_clean = not missing and not extra and not mismatches

    return SchemaDriftReport(
        missing_columns=missing,
        extra_columns=extra,
        type_mismatches=mismatches,
        is_clean=is_clean,
    )


# Define expected schema for the orders table
ORDERS_EXPECTED_SCHEMA = {
    "order_id": "int64",
    "customer_id": "int64",
    "product_id": "object",
    "order_date": "datetime64[ns]",
    "revenue": "float64",
    "quantity": "int64",
    "status": "object",
    "channel": "object",
    "country": "object",
}

drift = check_schema_drift(demo_orders, ORDERS_EXPECTED_SCHEMA)

if drift.is_clean:
    print("Schema check PASSED — no drift detected.")
else:
    print("Schema check FAILED:")
    if drift.missing_columns:
        print(f"  Missing columns: {drift.missing_columns}")
    if drift.extra_columns:
        print(f"  Extra columns:   {drift.extra_columns}")
    if drift.type_mismatches:
        print("  Type mismatches:")
        for col, (exp, act) in drift.type_mismatches.items():
            print(f"    {col}: expected {exp}, got {act}")

Memory Profiling and Downcasting

DataFrames often consume 3–10× more memory than necessary because pandas chooses conservative types by default. Downcasting is free performance.

python
import pandas as pd
import numpy as np


def downcast_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """
    Downcast numeric columns to their smallest valid type.
    Convert low-cardinality string columns to category dtype.

    Returns a memory-optimised copy of the DataFrame.
    """
    df = df.copy()

    before_mb = df.memory_usage(deep=True).sum() / 1e6

    # Downcast integers
    int_cols = df.select_dtypes(include=["int64", "int32"]).columns
    for col in int_cols:
        df[col] = pd.to_numeric(df[col], downcast="integer")

    # Downcast floats
    float_cols = df.select_dtypes(include=["float64", "float32"]).columns
    for col in float_cols:
        df[col] = pd.to_numeric(df[col], downcast="float")

    # Convert low-cardinality object columns to category
    # Rule of thumb: unique values < 5% of total rows → good candidate
    obj_cols = df.select_dtypes(include="object").columns
    for col in obj_cols:
        n_unique = df[col].nunique(dropna=True)
        n_total = len(df)
        if n_unique / n_total < 0.05:
            df[col] = df[col].astype("category")

    after_mb = df.memory_usage(deep=True).sum() / 1e6
    reduction_pct = (before_mb - after_mb) / before_mb * 100

    print(f"Memory before: {before_mb:.1f} MB")
    print(f"Memory after:  {after_mb:.1f} MB")
    print(f"Reduction:     {reduction_pct:.1f}%")

    return df


# Apply to demo dataset
demo_orders_opt = downcast_dataframe(demo_orders)

print("\nDtype changes after optimisation:")
for col in demo_orders.columns:
    before = str(demo_orders[col].dtype)
    after = str(demo_orders_opt[col].dtype)
    if before != after:
        print(f"  {col:<20} {before}{after}")

Saving the Profiling Output

Profiling results should be saved as a structured artifact, not just printed. This creates an audit trail and enables comparison across pipeline runs.

python
import json
from pathlib import Path
from datetime import datetime
import pandas as pd


def save_profiling_report(
    df: pd.DataFrame,
    dataset_name: str,
    output_dir: str = "outputs/profiling",
) -> Path:
    """
    Generate, display, and save a profiling report for a DataFrame.

    Returns the path to the saved report.
    """
    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    # Generate profile
    profile_report = profile_dataframe(df)

    # Timestamp the file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    report_filename = f"{dataset_name}_profile_{timestamp}.csv"
    report_path = output_path / report_filename

    # Save the profile DataFrame as CSV (excluding nested dicts for simplicity)
    profile_csv = profile_report.drop(columns=["top_categories", "sample_values"], errors="ignore")
    profile_csv.to_csv(report_path)

    # Save metadata as JSON
    metadata = {
        "dataset": dataset_name,
        "generated_at": timestamp,
        "shape": {"rows": int(df.shape[0]), "cols": int(df.shape[1])},
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1e6, 2),
        "null_summary": {
            col: int(count)
            for col, count in df.isnull().sum().items()
            if count > 0
        },
        "dtypes": {col: str(dt) for col, dt in df.dtypes.items()},
    }

    meta_path = output_path / f"{dataset_name}_meta_{timestamp}.json"
    with open(meta_path, "w") as f:
        json.dump(metadata, f, indent=2)

    print(f"Profile saved: {report_path}")
    print(f"Metadata saved: {meta_path}")
    return report_path


# Save profile for the demo orders dataset
# save_profiling_report(demo_orders, dataset_name="orders")

Key Takeaways

  • Use pd.read_csv with explicit dtype, parse_dates, na_values, and low_memory=False on every production load. Silent type inference is a source of subtle downstream errors.
  • For files that exceed RAM, use chunksize to load and filter in chunks before concatenating. Always push filtering to the source (SQL WHERE clause) when possible.
  • JSON normalization with pd.json_normalize handles nested structures correctly; always inspect the result for unexpected column explosion or row multiplication.
  • Paginated API loading requires an explicit loop with a safety maximum page count and rate-limit sleep. Encapsulate this in a reusable function.
  • The first_contact checklist — shape, memory, dtypes, nulls, duplicates, numeric describe, categorical top values, date ranges — must run on every new dataset before any transformation.
  • profile_dataframe produces a column-level audit trail. Save it as a CSV artifact so you can diff schema and quality changes across pipeline runs.
  • Schema drift detection — checking expected vs actual column names and types — should be a standard step in any recurring analysis or data pipeline. Silent schema changes are a leading cause of incorrect analysis.
  • Downcasting numerics and converting low-cardinality strings to category dtype typically reduces memory usage by 30–70% at zero analytical cost.