Advanced Data Cleaning & Wrangling
Cleaning as Engineering, Not Exploration
Amateur cleaning is ad hoc: fix one thing, run the analysis, discover another problem, fix that, repeat. Expert cleaning is engineered: define the cleaning strategy based on the quality audit, implement it as reusable functions, apply it in a documented pipeline, validate the output, and log every transformation. The result is a cleaning layer that can be re-run on the next data refresh without re-thinking any decision.
This lesson assumes you have completed the quality audit from Lesson 03. Every strategy here maps to a specific quality issue type.
Missing Data Strategies by Mechanism and Type
The appropriate strategy for missing data depends on two things: the missingness mechanism (MCAR, MAR, MNAR) and the column's data type.
import pandas as pd
import numpy as np
from typing import Literal
# ---- Reference strategy matrix ----
IMPUTATION_STRATEGY_MATRIX = """
Mechanism | Data Type | Strategy
----------+------------+-----------------------------------
MCAR | Numeric | Median imputation (robust to outliers)
MCAR | Categorical| Mode imputation
MCAR | Time series| Forward-fill or interpolation
MAR | Numeric | Conditional median (group-wise) or KNN
MAR | Categorical| Conditional mode (group-wise)
MNAR | Any | Flag + domain-defined fill value
MNAR | Any | Analyse mechanism, consider model
High null | Any | Drop column if >50% null and not critical
ID/Key | Any | Never impute — flag and investigate
"""
print(IMPUTATION_STRATEGY_MATRIX)Simple Imputation: Median, Mode, Fill
import pandas as pd
import numpy as np
def impute_simple(
df: pd.DataFrame,
column: str,
strategy: Literal["median", "mean", "mode", "constant", "ffill", "bfill"],
fill_value=None,
group_by: str | None = None,
) -> pd.DataFrame:
"""
Apply simple imputation to a column, optionally grouped.
Args:
df: Input DataFrame (not modified in place — returns copy).
column: Column to impute.
strategy: Imputation method.
fill_value: Required if strategy="constant".
group_by: If provided, compute fill value within each group.
Returns:
DataFrame with nulls in column filled.
"""
df = df.copy()
if group_by and strategy in ("median", "mean", "mode"):
# Grouped imputation: fill each group's nulls with that group's statistic
if strategy == "median":
fill_map = df.groupby(group_by)[column].transform("median")
elif strategy == "mean":
fill_map = df.groupby(group_by)[column].transform("mean")
elif strategy == "mode":
# mode() returns a Series; take first value
fill_map = df.groupby(group_by)[column].transform(
lambda x: x.fillna(x.mode().iloc[0] if not x.mode().empty else np.nan)
)
df[column] = fill_map
return df
df[column] = df[column].fillna(fill_map)
elif strategy == "median":
df[column] = df[column].fillna(df[column].median())
elif strategy == "mean":
df[column] = df[column].fillna(df[column].mean())
elif strategy == "mode":
mode_val = df[column].mode()
if not mode_val.empty:
df[column] = df[column].fillna(mode_val.iloc[0])
elif strategy == "constant":
if fill_value is None:
raise ValueError("fill_value must be provided when strategy='constant'.")
df[column] = df[column].fillna(fill_value)
elif strategy == "ffill":
df[column] = df[column].ffill()
elif strategy == "bfill":
df[column] = df[column].bfill()
return df
# Simulate data
np.random.seed(42)
n = 500
customers = pd.DataFrame({
"customer_id": range(1, n + 1),
"age": np.where(np.random.rand(n) < 0.10, np.nan, np.random.randint(18, 75, n).astype(float)),
"annual_revenue": np.where(np.random.rand(n) < 0.18, np.nan, np.random.exponential(50000, n)),
"segment": np.random.choice(["SMB", "Enterprise", "Consumer", None], p=[0.3, 0.2, 0.45, 0.05], size=n),
"referral_source": np.where(np.random.rand(n) < 0.30, np.nan,
np.random.choice(["organic", "paid", "partner"], size=n)),
})
# Impute age with median (MCAR assumption)
customers_clean = impute_simple(customers, "age", strategy="median")
# Impute annual_revenue with segment-grouped median (MAR — revenue depends on segment)
customers_clean = impute_simple(customers_clean, "annual_revenue", strategy="median", group_by="segment")
# Impute referral_source with mode
customers_clean = impute_simple(customers_clean, "referral_source", strategy="mode")
# Impute segment with constant (MNAR — segment is unknown, not random)
customers_clean = impute_simple(customers_clean, "segment", strategy="constant", fill_value="Unknown")
print("Nulls before:", customers.isnull().sum().to_dict())
print("Nulls after:", customers_clean.isnull().sum().to_dict())Advanced Imputation: KNN and Iterative
When simple statistics are too crude, use model-based imputation.
from sklearn.impute import KNNImputer, IterativeImputer
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
def knn_impute_numeric(
df: pd.DataFrame,
numeric_cols: list[str],
n_neighbors: int = 5,
) -> pd.DataFrame:
"""
Impute numeric columns using K-Nearest Neighbours.
KNN finds the k most similar rows (by non-null feature values)
and fills each null with the weighted average of those neighbours.
Best for: datasets where variables are correlated with each other.
"""
df = df.copy()
imputer = KNNImputer(n_neighbors=n_neighbors, weights="distance")
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
return df
def iterative_impute_numeric(
df: pd.DataFrame,
numeric_cols: list[str],
max_iter: int = 10,
random_state: int = 42,
) -> pd.DataFrame:
"""
Impute numeric columns using IterativeImputer (MICE-like approach).
Models each column with nulls as a function of all other columns,
iterating until convergence. The most accurate simple imputation method.
Requires: scikit-learn >= 0.21 (experimental in older versions).
"""
from sklearn.experimental import enable_iterative_imputer # noqa: F401
df = df.copy()
imputer = IterativeImputer(max_iter=max_iter, random_state=random_state, verbose=0)
df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
return df
# Example: impute age and annual_revenue together using KNN
numeric_impute_cols = ["age", "annual_revenue"]
customers_knn = knn_impute_numeric(
customers[["customer_id"] + numeric_impute_cols],
numeric_cols=numeric_impute_cols,
n_neighbors=5,
)
print("KNN imputation complete.")
print(customers_knn[numeric_impute_cols].describe().round(2))
# Compare: simple median vs KNN imputed distributions
print("\nMedian of age:")
print(f" Simple median: {customers['age'].median():.1f}")
print(f" KNN result: {customers_knn['age'].mean():.1f}")String Cleaning
Text data from real systems is messy in predictable ways. Systematize the cleaning.
import pandas as pd
import re
def clean_string_column(
series: pd.Series,
strip: bool = True,
lowercase: bool = True,
remove_extra_spaces: bool = True,
remove_special_chars: bool = False,
special_char_pattern: str = r"[^a-z0-9\s\-\_\.]",
) -> pd.Series:
"""
Apply a standard string cleaning pipeline to a Series.
"""
s = series.copy()
if strip:
s = s.str.strip()
if lowercase:
s = s.str.lower()
if remove_extra_spaces:
s = s.str.replace(r"\s+", " ", regex=True)
if remove_special_chars:
s = s.str.replace(special_char_pattern, "", regex=True)
return s
# Standardising category values with a mapping dictionary
COUNTRY_NORMALISATION = {
"us": "US",
"u.s.": "US",
"u.s.a": "US",
"usa": "US",
"united states": "US",
"united states of america": "US",
"uk": "GB",
"united kingdom": "GB",
"britain": "GB",
"england": "GB",
"de": "DE",
"germany": "DE",
"deutschland": "DE",
"fr": "FR",
"france": "FR",
}
def normalise_categories(
series: pd.Series,
mapping: dict[str, str],
default: str = "Other",
) -> pd.Series:
"""
Normalise a categorical column using a lookup dictionary.
Unmapped values get the default label.
"""
cleaned = series.str.lower().str.strip()
return cleaned.map(mapping).fillna(default)
# Extract structured data from messy strings
def extract_from_string(df: pd.DataFrame, col: str) -> pd.DataFrame:
"""
Extract domain/TLD from email, area code from phone, etc.
Demonstrates regex extraction patterns.
"""
df = df.copy()
# Extract email domain
if col == "email":
df["email_domain"] = df[col].str.extract(r"@([\w\.\-]+)", expand=False)
df["email_tld"] = df["email_domain"].str.split(".").str[-1]
# Extract year from a messy date string like "Jan 2023" or "2023-01" or "2023"
elif col == "date_str":
df["year_extracted"] = df[col].str.extract(r"(20\d{2}|19\d{2})", expand=False)
# Extract numeric value from strings like "$29.99" or "29.99 USD"
elif col == "price_str":
df["price_numeric"] = pd.to_numeric(
df[col].str.replace(r"[^\d\.]", "", regex=True),
errors="coerce",
)
return df
# Demonstrate string cleaning
dirty_countries = pd.Series(["US", "u.s.", "United States", "UK", "britain", "DE", "germany", "FRANCE", "xyz"])
clean_countries = normalise_categories(dirty_countries, COUNTRY_NORMALISATION)
print("Country normalisation:")
for dirty, clean in zip(dirty_countries, clean_countries):
print(f" '{dirty}' → '{clean}'")
# Messy email column
emails = pd.DataFrame({"email": [
"Alice@GMAIL.COM",
" bob@company.co.uk ",
"carol@startup.io",
"DAVE@ENTERPRISE.COM",
]})
emails["email_clean"] = clean_string_column(emails["email"])
emails = extract_from_string(emails, "email_clean")
# Re-extract using the clean version
emails["email_domain"] = emails["email_clean"].str.extract(r"@([\w\.\-]+)", expand=False)
print("\nEmail extraction:")
print(emails.to_string())Outlier Handling Strategies
Once outliers are detected (Lesson 03), you must decide what to do with them. The choice depends on domain knowledge and analysis goals.
import pandas as pd
import numpy as np
from scipy import stats
def winsorise(
series: pd.Series,
lower_pct: float = 0.01,
upper_pct: float = 0.99,
) -> pd.Series:
"""
Cap values at the lower and upper percentiles (Winsorization).
Preserves all rows — no data loss. Reduces the influence of extremes.
Best for: aggregations, regression, where outlier magnitude matters.
"""
lower = series.quantile(lower_pct)
upper = series.quantile(upper_pct)
return series.clip(lower=lower, upper=upper)
def log_transform(
series: pd.Series,
constant: float = 1.0,
) -> pd.Series:
"""
Apply log(x + constant) transform to reduce right skew.
The constant avoids log(0) = -inf.
Best for: revenue, count data, anything that follows exponential growth.
"""
if (series + constant <= 0).any():
raise ValueError(f"Cannot log-transform: values exist where x + {constant} <= 0.")
return np.log(series + constant)
def flag_and_keep_outliers(
df: pd.DataFrame,
column: str,
method: str = "iqr",
k: float = 1.5,
) -> pd.DataFrame:
"""
Add a boolean flag column for outliers rather than removing them.
Allows separate analysis of "normal" vs "outlier" populations.
"""
df = df.copy()
if method == "iqr":
q1 = df[column].quantile(0.25)
q3 = df[column].quantile(0.75)
iqr = q3 - q1
lower = q1 - k * iqr
upper = q3 + k * iqr
df[f"{column}_is_outlier"] = (df[column] < lower) | (df[column] > upper)
elif method == "zscore":
z = np.abs(stats.zscore(df[column].dropna()))
mask = pd.Series(False, index=df.index)
mask.iloc[df[column].dropna().index] = z > 3.0
df[f"{column}_is_outlier"] = mask
n_flagged = df[f"{column}_is_outlier"].sum()
print(f"Flagged {n_flagged:,} outliers in '{column}' ({n_flagged/len(df)*100:.2f}%)")
return df
# Simulate revenue with outliers
np.random.seed(0)
n = 1000
revenue = pd.Series(
np.concatenate([
np.random.exponential(scale=80, size=980),
np.array([5000, 7500, 8000, 12000, 15000, 50000, -50, -100, 0, 0.1]),
]),
name="revenue",
)
# Strategy 1: Winsorize (cap at 1st–99th percentile)
revenue_winsorised = winsorise(revenue)
print(f"Original max: {revenue.max():,.0f}, Winsorised max: {revenue_winsorised.max():,.0f}")
# Strategy 2: Log transform (for right-skewed distributions)
# First fix negatives/zeros by setting a floor
revenue_positive = revenue.clip(lower=0.01)
revenue_log = log_transform(revenue_positive)
print(f"Original skew: {revenue.skew():.2f}, Log-transformed skew: {revenue_log.skew():.2f}")
# Strategy 3: Flag and keep
orders_df = pd.DataFrame({"order_id": range(n), "revenue": revenue})
orders_df = flag_and_keep_outliers(orders_df, "revenue", method="iqr")
print("\nDistribution by outlier flag:")
print(orders_df.groupby("revenue_is_outlier")["revenue"].describe().round(2))Date and Time Cleaning
Date columns are the most commonly broken column type in real datasets.
import pandas as pd
import numpy as np
from zoneinfo import ZoneInfo
def clean_date_column(
series: pd.Series,
formats_to_try: list[str] | None = None,
timezone: str | None = None,
errors: str = "coerce",
) -> pd.Series:
"""
Parse a messy date column to datetime64[ns].
Tries multiple formats before falling back to pandas infer mode.
Coerces unparseable values to NaT (null datetime).
Optionally converts to a target timezone.
"""
formats_to_try = formats_to_try or [
"%Y-%m-%d",
"%Y/%m/%d",
"%d/%m/%Y",
"%m/%d/%Y",
"%d-%m-%Y",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%b %d, %Y",
"%d %b %Y",
]
# Try inference first (fast path for clean data)
parsed = pd.to_datetime(series, errors="coerce", infer_datetime_format=True)
n_failed = parsed.isnull().sum() - series.isnull().sum()
if n_failed > 0:
# Try explicit formats for rows that failed
repaired = series.copy()
for fmt in formats_to_try:
failed_mask = parsed.isnull() & series.notna()
if not failed_mask.any():
break
repaired_attempt = pd.to_datetime(series[failed_mask], format=fmt, errors="coerce")
parsed[failed_mask] = repaired_attempt
if timezone:
# Localise then convert — handles naive datetimes
tz = ZoneInfo(timezone)
if parsed.dt.tz is None:
parsed = parsed.dt.tz_localize("UTC").dt.tz_convert(tz)
else:
parsed = parsed.dt.tz_convert(tz)
return parsed
# Simulate messy date column
messy_dates = pd.Series([
"2023-01-15",
"15/01/2023",
"Jan 15, 2023",
"01/15/2023",
"2023-01-15T14:30:00",
"not a date",
None,
"2023-13-01", # Invalid month — will become NaT
"2023-06-30 09:00:00",
])
cleaned_dates = clean_date_column(messy_dates)
print("Date cleaning results:")
for original, cleaned in zip(messy_dates, cleaned_dates):
print(f" '{original}' → {cleaned}")
# NaT count (unparseable)
print(f"\nFailed to parse: {cleaned_dates.isnull().sum()} values")Date Feature Audit
import pandas as pd
def audit_date_column(series: pd.Series) -> dict:
"""
Profile a datetime column for quality and analytical potential.
"""
clean = series.dropna()
total = len(series)
n_null = series.isnull().sum()
result = {
"total_rows": total,
"null_count": int(n_null),
"null_pct": round(n_null / total * 100, 2),
"min_date": str(clean.min()),
"max_date": str(clean.max()),
"date_range_days": (clean.max() - clean.min()).days if len(clean) > 0 else None,
"n_future_dates": int((clean > pd.Timestamp.now()).sum()),
"n_dates_before_2000": int((clean.dt.year < 2000).sum()),
"timezone_aware": clean.dt.tz is not None,
"n_distinct_years": clean.dt.year.nunique(),
"n_distinct_months": clean.dt.to_period("M").nunique(),
}
return result
date_series = clean_date_column(messy_dates)
date_report = audit_date_column(date_series.dropna().reset_index(drop=True))
for k, v in date_report.items():
print(f" {k}: {v}")Deduplication
import pandas as pd
import numpy as np
def deduplicate(
df: pd.DataFrame,
subset: list[str] | None = None,
keep: str = "last",
sort_by: list[str] | None = None,
sort_ascending: bool | list[bool] = False,
) -> tuple[pd.DataFrame, int]:
"""
Deduplicate a DataFrame with explicit priority logic.
Args:
df: Input DataFrame.
subset: Columns that define a "duplicate" record. None = all columns.
keep: 'first', 'last', or 'none' (drop all duplicates).
sort_by: Sort before deduplication to control which record is kept.
E.g., sort_by=["updated_at"] keeps the most recent.
sort_ascending: Sort order for sort_by columns.
Returns:
Tuple of (deduplicated DataFrame, n_rows_removed).
"""
original_len = len(df)
if sort_by:
df = df.sort_values(sort_by, ascending=sort_ascending)
if keep == "none":
df = df[~df.duplicated(subset=subset, keep=False)]
else:
df = df.drop_duplicates(subset=subset, keep=keep)
n_removed = original_len - len(df)
return df.reset_index(drop=True), n_removed
# Simulate orders with duplicates
np.random.seed(42)
n = 1000
orders_raw = pd.DataFrame({
"order_id": list(range(1, n + 1)) + [5, 10, 15, 20, 25],
"customer_id": np.random.randint(1, 200, n + 5),
"revenue": np.random.exponential(80, n + 5).round(2),
"created_at": pd.date_range("2023-01-01", periods=n + 5, freq="15min"),
"updated_at": pd.date_range("2023-01-01", periods=n + 5, freq="15min"),
})
# Duplicate order IDs with different updated_at (we want the most recent)
orders_clean, removed = deduplicate(
orders_raw,
subset=["order_id"],
sort_by=["updated_at"],
sort_ascending=False,
keep="first", # After sort descending, first = most recent
)
print(f"Deduplication: removed {removed} rows ({removed/len(orders_raw)*100:.1f}%)")
print(f"Result: {len(orders_clean):,} unique orders")Building a Cleaning Pipeline
A cleaning pipeline is a sequence of named, ordered, re-runnable functions. This is the professional pattern — not an ad hoc notebook of one-off fixes.
import pandas as pd
import numpy as np
from typing import Callable
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class TransformationLog:
"""Records every transformation applied during cleaning."""
entries: list[dict] = field(default_factory=list)
def record(
self,
step_name: str,
column: str | None,
description: str,
rows_affected: int,
rows_before: int,
rows_after: int,
) -> None:
self.entries.append({
"timestamp": datetime.now().isoformat(),
"step": step_name,
"column": column or "(all)",
"description": description,
"rows_affected": rows_affected,
"rows_before": rows_before,
"rows_after": rows_after,
})
def to_dataframe(self) -> pd.DataFrame:
return pd.DataFrame(self.entries)
def print_summary(self) -> None:
print("\n=== Transformation Log ===")
for e in self.entries:
print(f" [{e['step']}] {e['column']}: {e['description']} "
f"(affected: {e['rows_affected']:,}, rows: {e['rows_before']:,} → {e['rows_after']:,})")
@dataclass
class CleaningStep:
name: str
fn: Callable[[pd.DataFrame], pd.DataFrame]
description: str
class CleaningPipeline:
"""
An ordered sequence of cleaning steps applied to a DataFrame.
Each step is logged. The pipeline is re-runnable and serialisable.
"""
def __init__(self, name: str):
self.name = name
self.steps: list[CleaningStep] = []
self.log = TransformationLog()
def add_step(self, name: str, description: str, fn: Callable) -> "CleaningPipeline":
self.steps.append(CleaningStep(name=name, fn=fn, description=description))
return self # Fluent interface
def run(self, df: pd.DataFrame, verbose: bool = True) -> pd.DataFrame:
"""Apply all cleaning steps in order, logging each transformation."""
if verbose:
print(f"\nRunning cleaning pipeline: {self.name}")
print(f"Input: {len(df):,} rows × {df.shape[1]} columns\n")
current = df.copy()
for step in self.steps:
rows_before = len(current)
current = step.fn(current)
rows_after = len(current)
rows_affected = rows_before - rows_after if rows_before != rows_after else (
current.isnull().sum().sum() # Rough proxy for nulls filled
)
self.log.record(
step_name=step.name,
column=None,
description=step.description,
rows_affected=int(rows_affected),
rows_before=rows_before,
rows_after=rows_after,
)
if verbose:
print(f" ✓ {step.name}: {step.description} [{rows_before:,} → {rows_after:,} rows]")
if verbose:
print(f"\nOutput: {len(current):,} rows × {current.shape[1]} columns")
return current
def summary(self) -> None:
self.log.print_summary()
# Define cleaning functions for the orders dataset
def remove_negative_revenue(df: pd.DataFrame) -> pd.DataFrame:
"""Remove orders with negative revenue — likely refund credits misrouted."""
return df[df["revenue"] >= 0].copy()
def cap_extreme_revenue(df: pd.DataFrame) -> pd.DataFrame:
"""Cap revenue at 99th percentile (winsorize)."""
cap = df["revenue"].quantile(0.99)
df = df.copy()
df["revenue"] = df["revenue"].clip(upper=cap)
return df
def drop_duplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
"""Remove duplicate order IDs, keeping the most recently updated."""
if "updated_at" in df.columns:
df = df.sort_values("updated_at", ascending=False)
return df.drop_duplicates(subset=["order_id"], keep="first").reset_index(drop=True)
def normalise_status(df: pd.DataFrame) -> pd.DataFrame:
"""Normalise order status to lowercase, strip whitespace."""
df = df.copy()
STATUS_MAP = {
"complete": "completed",
"Complete": "completed",
"COMPLETED": "completed",
"Cancelled": "cancelled",
"CANCELLED": "cancelled",
"cancel": "cancelled",
"Refunded": "refunded",
"REFUND": "refunded",
}
df["status"] = df["status"].str.strip()
df["status"] = df["status"].replace(STATUS_MAP)
df["status"] = df["status"].str.lower()
return df
def flag_zero_quantity(df: pd.DataFrame) -> pd.DataFrame:
"""Flag orders with zero quantity rather than removing them."""
df = df.copy()
df["has_zero_quantity"] = df["quantity"] == 0
return df
def impute_missing_country(df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing country with 'Unknown'."""
df = df.copy()
df["country"] = df["country"].fillna("Unknown")
return df
# Assemble the pipeline
pipeline = (
CleaningPipeline(name="orders_cleaning_v1")
.add_step("remove_negative_revenue", "Remove orders with revenue < 0", remove_negative_revenue)
.add_step("cap_extreme_revenue", "Winsorise revenue at 99th percentile", cap_extreme_revenue)
.add_step("drop_duplicates", "Deduplicate on order_id, keep most recent", drop_duplicate_orders)
.add_step("normalise_status", "Standardise status field to lowercase", normalise_status)
.add_step("flag_zero_quantity", "Flag zero-quantity orders for review", flag_zero_quantity)
.add_step("impute_country", "Fill null country with Unknown", impute_missing_country)
)
# Create demo data with issues
np.random.seed(1)
n = 800
raw_orders = pd.DataFrame({
"order_id": list(range(1, n + 1)) + [100, 200, 300], # 3 duplicates
"customer_id": np.random.randint(1, 200, n + 3),
"revenue": np.concatenate([
np.random.exponential(80, n),
np.array([-50, -20, 99999]), # Negatives + extreme
]),
"quantity": np.random.randint(0, 8, n + 3),
"status": np.random.choice(
["completed", "Complete", "COMPLETED", "Cancelled", "CANCELLED", "Refunded"],
size=n + 3,
),
"country": np.where(np.random.rand(n + 3) < 0.10, None,
np.random.choice(["US", "DE", "UK"], size=n + 3)),
"updated_at": pd.date_range("2023-01-01", periods=n + 3, freq="10min"),
})
# Run the pipeline
clean_orders = pipeline.run(raw_orders, verbose=True)
pipeline.summary()Post-Cleaning Validation
After cleaning, validate that the output meets quality standards. Never assume cleaning worked.
import pandas as pd
import pandera as pa # pip install pandera
def validate_cleaned_orders(df: pd.DataFrame) -> bool:
"""
Post-cleaning validation using assert statements.
Returns True if all checks pass; raises AssertionError otherwise.
"""
n = len(df)
# 1. No negative revenue
assert (df["revenue"] >= 0).all(), "Negative revenue found after cleaning."
# 2. No duplicate order IDs
assert df["order_id"].nunique() == n, f"Duplicate order IDs found: {n - df['order_id'].nunique()} duplicates."
# 3. Status values are from expected set
valid_statuses = {"completed", "cancelled", "refunded", "pending"}
invalid = set(df["status"].unique()) - valid_statuses
assert not invalid, f"Invalid status values: {invalid}"
# 4. No null country (should have been imputed)
assert df["country"].notnull().all(), "Null country values found after imputation."
# 5. Reasonable revenue range
assert df["revenue"].max() <= 100_000, f"Revenue max {df['revenue'].max()} exceeds expected ceiling."
print(f"All validation checks PASSED. ({n:,} rows)")
return True
validate_cleaned_orders(clean_orders)
# Pandera schema validation (declarative, reusable)
try:
import pandera as pa
orders_schema = pa.DataFrameSchema(
columns={
"order_id": pa.Column(int, nullable=False),
"revenue": pa.Column(float, pa.Check.ge(0), nullable=False),
"quantity": pa.Column(int, pa.Check.ge(0), nullable=False),
"status": pa.Column(
str,
pa.Check.isin(["completed", "cancelled", "refunded", "pending"]),
nullable=False,
),
"country": pa.Column(str, nullable=False),
},
checks=pa.Check(lambda df: ~df.duplicated(subset=["order_id"]).any(),
error="Duplicate order_ids found."),
)
orders_schema.validate(clean_orders)
print("Pandera schema validation PASSED.")
except ImportError:
print("pandera not installed. Install with: pip install pandera")
except pa.errors.SchemaError as e:
print(f"Pandera validation FAILED: {e}")Key Takeaways
- Imputation strategy depends on two factors: the missingness mechanism (MCAR → simple imputation is safe; MAR → conditional/grouped imputation; MNAR → flag and domain-define) and the data type.
- KNN and IterativeImputer produce better imputed values than median/mode when variables are correlated — the cost is computation time and the risk of overfitting if not carefully scoped.
- String cleaning must be systematic: strip, lowercase, regex-replace, then map to controlled vocabulary. Fuzzy matching (not covered in full here) handles cases where a mapping dictionary is insufficient.
- Outlier handling is a conscious choice between three strategies: cap/floor (winsorize), transform (log, sqrt), or flag and keep. Remove-and-delete is almost never the right choice.
- Date cleaning requires trying multiple formats explicitly;
errors='coerce'converts unparseable strings to NaT silently — always count NaT before and after to know what was lost. - A cleaning pipeline (ordered named functions + transformation log) is the professional pattern. It is re-runnable, auditable, and diff-able across data refreshes.
- Post-cleaning validation with
assertstatements and a schema validator likepanderaconverts the question "did it work?" from a hope into a proof. - Document every transformation decision in the log. The question "why was this value changed?" should always be answerable from the pipeline record.