Data Quality Assessment — Auditing Before You Clean
24 min
Why Audit First, Clean Second
The most dangerous mistake in data preparation is cleaning data before you understand what is wrong with it. Analysts who jump straight to imputation and deduplication make decisions — often implicitly — that should be explicit: Which nulls should be imputed vs removed? Which outliers are errors vs legitimate extremes? Which duplicates are true duplicates vs legitimate repeated events?
A systematic quality audit forces you to make these decisions consciously, document your reasoning, and produce a record of every issue before any row is modified. This lesson covers that audit — the phase that transforms "I cleaned the data" from a black box into a defensible, reproducible process.
The Six Data Quality Dimensions
The data quality literature converges on six dimensions. Every quality issue you will encounter maps to at least one of these.
Completeness
Are all expected values present? Null rates by column, by row, by time period.
Accuracy
Do the values correctly represent the real-world phenomenon they describe? Accuracy is hard to measure without ground truth, but proxies exist: values outside plausible ranges, impossible values (negative age, revenue greater than total market), and values that contradict business rules.
Consistency
Do related values agree across tables and within rows? Order status "completed" with a null shipped_date is inconsistent. A customer segment of "Enterprise" with an annual revenue below the Enterprise threshold is inconsistent.
Timeliness
Is the data current enough for the intended use? An events table that lags 48 hours makes real-time churn detection impossible. A customer table last updated 18 months ago may have stale contact details.
Validity
Do values conform to defined formats, domains, or business rules? An email address without "@", a postcode with letters in a US-only field, a date in "DD/MM/YYYY" format when the column expects "YYYY-MM-DD".
Uniqueness
Are records that should be unique actually unique? Duplicate customer records inflate cohort counts. Duplicate orders inflate revenue.
Scoring Each Dimension
python
import pandas as pdimport numpy as npfrom dataclasses import dataclass, fieldfrom typing import Anyfrom enum import Enumclass Severity(Enum): CRITICAL = "critical" # Analysis result will be wrong without fixing this WARNING = "warning" # May affect results; requires decision INFO = "info" # Low impact; worth documenting@dataclassclass QualityIssue: dimension: str column: str | None description: str severity: Severity affected_rows: int affected_pct: float recommendation: str@dataclassclass DataQualityReport: dataset_name: str n_rows: int n_cols: int issues: list[QualityIssue] = field(default_factory=list) def add_issue(self, issue: QualityIssue) -> None: self.issues.append(issue) @property def score(self) -> float: """ Score the dataset 0–100. Start at 100, deduct per issue: - CRITICAL: 15 points - WARNING: 5 points - INFO: 1 point Minimum score is 0. """ deductions = sum( 15 if i.severity == Severity.CRITICAL else 5 if i.severity == Severity.WARNING else 1 for i in self.issues ) return max(0.0, 100.0 - deductions) def summary(self) -> pd.DataFrame: if not self.issues: return pd.DataFrame(columns=["dimension", "column", "severity", "affected_pct", "description"]) return pd.DataFrame([{ "dimension": i.dimension, "column": i.column or "(all)", "severity": i.severity.value.upper(), "affected_rows": i.affected_rows, "affected_pct": round(i.affected_pct, 2), "description": i.description, "recommendation": i.recommendation, } for i in self.issues]).sort_values("severity") def render(self) -> None: print(f"\n{'='*60}") print(f"DATA QUALITY REPORT: {self.dataset_name}") print(f"Shape: {self.n_rows:,} rows × {self.n_cols} cols") print(f"Quality Score: {self.score:.0f}/100") print(f"Issues: {len(self.issues)} total " f"({sum(1 for i in self.issues if i.severity == Severity.CRITICAL)} critical, " f"{sum(1 for i in self.issues if i.severity == Severity.WARNING)} warnings, " f"{sum(1 for i in self.issues if i.severity == Severity.INFO)} info)") print(f"{'='*60}") for sev in [Severity.CRITICAL, Severity.WARNING, Severity.INFO]: group = [i for i in self.issues if i.severity == sev] if group: print(f"\n[{sev.value.upper()}]") for issue in group: col = f" [{issue.column}]" if issue.column else "" print(f" {issue.dimension}{col}: {issue.description}") print(f" → {issue.recommendation}") print(f" → Affects {issue.affected_rows:,} rows ({issue.affected_pct:.1f}%)") print()
Null Analysis: Counting, Patterns, and MAR/MCAR/MNAR
Understanding why data is missing is as important as knowing how much is missing. The statistical taxonomy:
MCAR — Missing Completely At Random: The probability of a value being missing is unrelated to any observed or unobserved data. Safe to drop rows or impute without introducing bias. Example: a sensor occasionally fails due to random electrical noise.
MAR — Missing At Random: Missingness depends on observed data but not on the missing value itself. Imputation can correct for this if conditioned on the right variables. Example: income is missing more often for younger respondents (age is observed and explains the missingness).
MNAR — Missing Not At Random: Missingness depends on the missing value itself. The most dangerous case — imputation without correction introduces bias. Example: customers with very low satisfaction scores don't fill out satisfaction surveys.
python
import pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport seaborn as snsdef analyse_nulls(df: pd.DataFrame) -> pd.DataFrame: """ Compute a per-column null summary and detect potential missingness patterns. """ n = len(df) null_stats = [] for col in df.columns: n_null = df[col].isnull().sum() if n_null == 0: continue null_stats.append({ "column": col, "dtype": str(df[col].dtype), "n_null": int(n_null), "null_pct": round(n_null / n * 100, 2), "n_non_null": int(n - n_null), }) if not null_stats: print("No nulls found.") return pd.DataFrame() result = pd.DataFrame(null_stats).set_index("column") return result.sort_values("null_pct", ascending=False)def visualise_null_matrix(df: pd.DataFrame, figsize: tuple = (12, 6)) -> None: """ Create a null heatmap: each column shows its null pattern as a vertical band. White = null, Dark = present. """ null_cols = df.columns[df.isnull().any()].tolist() if not null_cols: print("No nulls to visualise.") return # Sample up to 500 rows for readability sample = df[null_cols].sample(min(500, len(df)), random_state=42) null_matrix = sample.isnull().astype(int) fig, ax = plt.subplots(figsize=figsize) sns.heatmap( null_matrix.T, cmap="Blues_r", cbar=False, yticklabels=null_cols, xticklabels=False, ax=ax, ) ax.set_title("Null Pattern Matrix (white = missing)", fontsize=13) ax.set_xlabel("Rows (sampled)", fontsize=10) ax.set_ylabel("Columns with nulls", fontsize=10) plt.tight_layout() plt.show()def detect_null_correlation(df: pd.DataFrame) -> pd.DataFrame: """ Detect whether missingness in one column correlates with missingness in another. High correlation suggests a shared cause (potential MCAR cluster) or linked missingness. """ null_indicators = df.isnull().astype(int) null_cols = null_indicators.columns[null_indicators.sum() > 0].tolist() if len(null_cols) < 2: return pd.DataFrame() return null_indicators[null_cols].corr().round(3)# Simulate dataset with realistic null patternsnp.random.seed(42)n = 2000customers = pd.DataFrame({ "customer_id": range(1, n + 1), "age": np.where(np.random.rand(n) < 0.08, np.nan, np.random.randint(18, 75, n).astype(float)), "annual_revenue": np.where(np.random.rand(n) < 0.15, np.nan, np.random.exponential(50000, n).round(2)), "signup_date": pd.date_range("2020-01-01", periods=n, freq="2h"), "segment": np.random.choice(["SMB", "Enterprise", "Consumer", None], p=[0.3, 0.2, 0.45, 0.05], size=n), "last_login": np.where(np.random.rand(n) < 0.20, pd.NaT, pd.date_range("2023-01-01", periods=n, freq="3h")), "referral_source": np.where(np.random.rand(n) < 0.35, np.nan, np.random.choice(["organic", "paid", "partner"], size=n)), "country": np.random.choice(["US", "DE", "UK", "FR", "CA"], size=n),})# Run null analysisnull_summary = analyse_nulls(customers)print("Null Summary:")print(null_summary.to_string())
MNAR Diagnostic: Checking if Missingness Correlates with Values
python
def test_mnar_signal( df: pd.DataFrame, target_col: str, test_cols: list[str],) -> pd.DataFrame: """ Test whether values in test_cols differ when target_col is null vs not null. A significant difference suggests MNAR or MAR — not MCAR. Uses Mann-Whitney U for numeric columns. """ from scipy import stats results = [] null_mask = df[target_col].isnull() missing_group = df[null_mask] present_group = df[~null_mask] for col in test_cols: if col == target_col: continue if not pd.api.types.is_numeric_dtype(df[col]): continue m_vals = missing_group[col].dropna() p_vals = present_group[col].dropna() if len(m_vals) < 10 or len(p_vals) < 10: continue stat, p_value = stats.mannwhitneyu(m_vals, p_vals, alternative="two-sided") results.append({ "target_null_col": target_col, "test_col": col, "mean_when_null": round(m_vals.mean(), 4), "mean_when_present": round(p_vals.mean(), 4), "p_value": round(p_value, 6), "significant_at_05": p_value < 0.05, "likely_mnar_or_mar": p_value < 0.05, }) return pd.DataFrame(results)# Test if annual_revenue missingness correlates with agemnar_test = test_mnar_signal(customers, "annual_revenue", ["age"])print("\nMAR/MNAR Test:")print(mnar_test.to_string())
Outlier Detection
Expert analysts do not blindly remove outliers. They detect, classify, and document them — then make an explicit decision.
python
import pandas as pdimport numpy as npdef detect_outliers_iqr(series: pd.Series, k: float = 1.5) -> pd.Series: """ Flag outliers using the IQR method. Returns a boolean Series: True = outlier. k=1.5 is standard; k=3.0 for extreme outliers only. """ q1 = series.quantile(0.25) q3 = series.quantile(0.75) iqr = q3 - q1 lower = q1 - k * iqr upper = q3 + k * iqr return (series < lower) | (series > upper)def detect_outliers_zscore(series: pd.Series, threshold: float = 3.0) -> pd.Series: """ Flag outliers using the Z-score method. Assumes roughly normal distribution — inappropriate for heavily skewed data. """ mean = series.mean() std = series.std() if std == 0: return pd.Series(False, index=series.index) z_scores = (series - mean).abs() / std return z_scores > thresholddef detect_outliers_modified_zscore(series: pd.Series, threshold: float = 3.5) -> pd.Series: """ Modified Z-score using Median Absolute Deviation (MAD). Robust to outliers themselves — preferred for skewed or heavy-tailed distributions. Reference: Iglewicz & Hoaglin (1993), threshold of 3.5 is conventional. """ median = series.median() mad = (series - median).abs().median() if mad == 0: return pd.Series(False, index=series.index) modified_z = 0.6745 * (series - median) / mad return modified_z.abs() > thresholddef outlier_report(df: pd.DataFrame, numeric_cols: list[str] | None = None) -> pd.DataFrame: """ Run all three outlier methods on numeric columns and compare. """ if numeric_cols is None: numeric_cols = df.select_dtypes(include="number").columns.tolist() rows = [] for col in numeric_cols: series = df[col].dropna() if len(series) == 0: continue iqr_flags = detect_outliers_iqr(series) z_flags = detect_outliers_zscore(series) mz_flags = detect_outliers_modified_zscore(series) rows.append({ "column": col, "n_total": len(series), "iqr_outliers": int(iqr_flags.sum()), "iqr_pct": round(iqr_flags.mean() * 100, 2), "zscore_outliers": int(z_flags.sum()), "zscore_pct": round(z_flags.mean() * 100, 2), "modified_z_outliers": int(mz_flags.sum()), "modified_z_pct": round(mz_flags.mean() * 100, 2), "min": round(float(series.min()), 2), "max": round(float(series.max()), 2), "skewness": round(float(series.skew()), 3), }) return pd.DataFrame(rows).set_index("column")# Build a sample orders DataFrame with outliersnp.random.seed(0)n = 2000orders_w_outliers = pd.DataFrame({ "order_id": range(1, n + 1), "revenue": np.concatenate([ np.random.exponential(scale=80, size=n - 20), # Normal range np.array([5000, 7500, 9200, 12000, 15000, # Legitimate large orders -50, -120, -200, 0.01, 0.02, # Suspicious negatives/near-zero 99999, 150000, 200000, # Likely data errors 80, 75, 82, 78, 79, 81, 77]), # Normal ]), "quantity": np.concatenate([ np.random.randint(1, 10, size=n - 10).astype(float), np.array([500, 1000, 2000, -1, -5, 0, 0, 0, 999, 888]), # Suspicious ]),})outlier_rpt = outlier_report(orders_w_outliers)print("Outlier Report:")print(outlier_rpt.to_string())
Type Mismatch Detection
Type mismatches — numbers stored as strings, booleans stored as integers — are silent correctness killers.
python
import pandas as pdimport redef detect_type_mismatches(df: pd.DataFrame) -> list[dict]: """ Heuristically detect columns whose stored type likely differs from their semantic type. """ issues = [] for col in df.columns: dtype = df[col].dtype sample = df[col].dropna().head(200) # Numeric stored as object if dtype == "object": numeric_convertible = pd.to_numeric(sample, errors="coerce").notna().mean() if numeric_convertible > 0.90: issues.append({ "column": col, "stored_dtype": str(dtype), "suspected_type": "numeric", "convertible_pct": round(numeric_convertible * 100, 1), "description": f"{numeric_convertible*100:.0f}% of values convert to numeric — likely stored as string.", }) # Date stored as object if dtype == "object": date_convertible = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True).notna().mean() if date_convertible > 0.85: issues.append({ "column": col, "stored_dtype": str(dtype), "suspected_type": "datetime", "convertible_pct": round(date_convertible * 100, 1), "description": f"{date_convertible*100:.0f}% of values parse as dates — likely stored as string.", }) # Boolean stored as int (0/1 only) if dtype in ("int64", "int32", "int8"): unique_vals = set(df[col].dropna().unique().tolist()) if unique_vals.issubset({0, 1}): issues.append({ "column": col, "stored_dtype": str(dtype), "suspected_type": "boolean", "convertible_pct": 100.0, "description": "Column contains only 0 and 1 — likely a boolean stored as integer.", }) # ID column stored as float (common when NaN forces int → float upgrade) if dtype == "float64" and col.lower().endswith("_id"): no_frac = (df[col].dropna() % 1 == 0).mean() if no_frac > 0.99: issues.append({ "column": col, "stored_dtype": str(dtype), "suspected_type": "int (ID)", "convertible_pct": round(no_frac * 100, 1), "description": "ID column stored as float — likely caused by null-forced upcast. Consider Int64 (nullable int).", }) return issues# Simulate type mismatch scenariosmessy_df = pd.DataFrame({ "order_id": [1.0, 2.0, 3.0, np.nan, 5.0], # Float ID — should be Int64 "revenue_str": ["29.99", "45.00", "12.50", "88.00", "5.25"], # Numeric as string "order_date_str": ["2023-01-15", "2023-02-10", "2023-03-01", "2023-04-20", "2023-05-05"], "is_repeat": [0, 1, 1, 0, 1], # Boolean as int "customer_name": ["Alice", "Bob", "Carol", "Dave", "Eve"],})mismatches = detect_type_mismatches(messy_df)for m in mismatches: print(f"[TYPE MISMATCH] {m['column']} ({m['stored_dtype']} → {m['suspected_type']}): {m['description']}")
Cardinality Analysis
High-cardinality categoricals and near-duplicate categories are common quality issues.
python
import pandas as pddef analyse_cardinality(df: pd.DataFrame) -> pd.DataFrame: """ Analyse cardinality of all non-numeric columns. Flags: - Very high cardinality in a "categorical" column (likely an ID column mislabelled) - Very low cardinality (near-constant columns — low analytical value) - Potential near-duplicate categories (e.g., "US" vs "United States") """ results = [] n = len(df) cat_cols = df.select_dtypes(include=["object", "category"]).columns for col in cat_cols: vc = df[col].value_counts(dropna=True) n_unique = len(vc) top_pct = vc.iloc[0] / n * 100 if len(vc) > 0 else 0.0 # Detect near-duplicates: values that are identical when lowercased and stripped cleaned_vals = df[col].dropna().str.lower().str.strip() n_unique_cleaned = cleaned_vals.nunique() near_dupes = n_unique - n_unique_cleaned results.append({ "column": col, "n_unique": n_unique, "cardinality_ratio": round(n_unique / n, 4), "top_value_pct": round(top_pct, 1), "top_value": str(vc.index[0]) if len(vc) > 0 else None, "near_duplicate_categories": near_dupes, "flag_high_cardinality": n_unique / n > 0.5, "flag_near_constant": top_pct > 95, "flag_near_dupes": near_dupes > 0, }) return pd.DataFrame(results).set_index("column")# Simulate dataset with cardinality issuescardinality_test = pd.DataFrame({ "user_email": [f"user{i}@example.com" for i in range(500)], # High cardinality "country": ( ["US", "us", "United States", "U.S.", "USA"] * 50 + ["DE", "Germany", "germany"] * 50 + ["UK"] * 100 + ["FR"] * 50 )[:500], # Near-duplicate categories "status": ["active"] * 490 + ["inactive"] * 10, # Near-constant "channel": np.random.choice(["organic", "paid", "email", "direct"], size=500),})card_report = analyse_cardinality(cardinality_test)print("Cardinality Analysis:")print(card_report.to_string())
Duplicate Detection
python
import pandas as pdimport numpy as npdef detect_duplicates( df: pd.DataFrame, key_cols: list[str] | None = None,) -> dict: """ Detect exact duplicates (all columns) and key-column duplicates. Args: df: DataFrame to check. key_cols: Columns that should uniquely identify each row (primary key). Returns: Dict with counts and example duplicate rows. """ n = len(df) # Full row duplicates full_dupes_mask = df.duplicated(keep=False) n_full_dupes = full_dupes_mask.sum() result = { "n_rows": n, "n_full_duplicate_rows": int(n_full_dupes), "full_duplicate_pct": round(n_full_dupes / n * 100, 2), "full_duplicate_examples": df[full_dupes_mask].head(5) if n_full_dupes > 0 else pd.DataFrame(), } # Key column duplicates if key_cols: key_dupes_mask = df.duplicated(subset=key_cols, keep=False) n_key_dupes = key_dupes_mask.sum() result.update({ "key_cols": key_cols, "n_key_duplicate_rows": int(n_key_dupes), "key_duplicate_pct": round(n_key_dupes / n * 100, 2), "key_duplicate_examples": df[key_dupes_mask].head(5) if n_key_dupes > 0 else pd.DataFrame(), }) return result# Simulate duplicate datanp.random.seed(42)n = 500dup_orders = pd.DataFrame({ "order_id": list(range(1, n + 1)) + [10, 20, 30], # 3 duplicate order IDs "customer_id": np.random.randint(1, 200, n + 3), "revenue": np.random.exponential(80, n + 3).round(2), "order_date": pd.date_range("2023-01-01", periods=n + 3, freq="1h"),})# Add 5 fully duplicate rowsdup_orders = pd.concat([dup_orders, dup_orders.iloc[:5]], ignore_index=True)dup_result = detect_duplicates(dup_orders, key_cols=["order_id"])print(f"Full duplicate rows: {dup_result['n_full_duplicate_rows']} ({dup_result['full_duplicate_pct']}%)")print(f"Key duplicate rows (order_id): {dup_result['n_key_duplicate_rows']} ({dup_result['key_duplicate_pct']}%)")if not dup_result['full_duplicate_examples'].empty: print("\nFull duplicate examples:") print(dup_result['full_duplicate_examples'].to_string())
Referential Integrity Checks
In multi-table analyses, foreign key violations mean silently dropped rows during joins.
python
import pandas as pdimport numpy as npdef check_referential_integrity( child_df: pd.DataFrame, parent_df: pd.DataFrame, child_key: str, parent_key: str, child_name: str = "child", parent_name: str = "parent",) -> dict: """ Check that all values in child_key exist in parent_key. Returns a report of orphaned records (those that would be lost in an INNER JOIN). """ child_keys = set(child_df[child_key].dropna()) parent_keys = set(parent_df[parent_key].dropna()) orphaned_keys = child_keys - parent_keys orphaned_rows = child_df[child_df[child_key].isin(orphaned_keys)] n_child = len(child_df) n_orphaned = len(orphaned_rows) return { "child_table": child_name, "parent_table": parent_name, "join_key": f"{child_key} → {parent_key}", "n_child_rows": n_child, "n_orphaned_rows": n_orphaned, "orphaned_pct": round(n_orphaned / n_child * 100, 2), "n_orphaned_keys": len(orphaned_keys), "example_orphaned_keys": list(orphaned_keys)[:10], "severity": "CRITICAL" if n_orphaned / n_child > 0.05 else "WARNING" if n_orphaned > 0 else "OK", }# Simulate referential integrity issuenp.random.seed(0)parent_customers = pd.DataFrame({"customer_id": range(1, 301)}) # 300 customers# Orders reference customer IDs 1-320 — 20 IDs don't exist in customerschild_orders = pd.DataFrame({ "order_id": range(1, 1001), "customer_id": np.random.randint(1, 321, 1000), # Some IDs beyond 300 "revenue": np.random.exponential(80, 1000).round(2),})ri_check = check_referential_integrity( child_orders, parent_customers, child_key="customer_id", parent_key="customer_id", child_name="orders", parent_name="customers",)print("Referential Integrity Check:")for k, v in ri_check.items(): print(f" {k}: {v}")
The Complete DataQualityReport Class
Now assemble all checks into one callable that scores the entire dataset:
python
def run_quality_audit( df: pd.DataFrame, dataset_name: str, key_cols: list[str] | None = None, expected_schema: dict[str, str] | None = None, business_rules: list[tuple] | None = None,) -> DataQualityReport: """ Run a comprehensive data quality audit. Args: df: DataFrame to audit. dataset_name: Name for the report header. key_cols: Columns that form the primary key (for uniqueness check). expected_schema: {col: dtype} for schema validation. business_rules: List of (rule_name, boolean_series, severity) tuples. Returns: DataQualityReport with all issues populated. """ report = DataQualityReport( dataset_name=dataset_name, n_rows=len(df), n_cols=len(df.columns), ) n = len(df) # 1. Completeness — null analysis for col in df.columns: n_null = df[col].isnull().sum() if n_null == 0: continue null_pct = n_null / n * 100 sev = Severity.CRITICAL if null_pct > 20 else Severity.WARNING if null_pct > 5 else Severity.INFO report.add_issue(QualityIssue( dimension="Completeness", column=col, description=f"{n_null:,} null values ({null_pct:.1f}%)", severity=sev, affected_rows=int(n_null), affected_pct=round(null_pct, 2), recommendation=( "Investigate null mechanism (MCAR/MAR/MNAR) before choosing imputation strategy. " "If >20%, consider whether the column is usable." ), )) # 2. Uniqueness — duplicate rows n_full_dupes = df.duplicated().sum() if n_full_dupes > 0: report.add_issue(QualityIssue( dimension="Uniqueness", column=None, description=f"{n_full_dupes:,} fully duplicate rows ({n_full_dupes/n*100:.1f}%)", severity=Severity.CRITICAL if n_full_dupes / n > 0.01 else Severity.WARNING, affected_rows=int(n_full_dupes), affected_pct=round(n_full_dupes / n * 100, 2), recommendation="Deduplicate before analysis. Determine which record to keep (most recent, most complete).", )) if key_cols: n_key_dupes = df.duplicated(subset=key_cols).sum() if n_key_dupes > 0: report.add_issue(QualityIssue( dimension="Uniqueness", column=str(key_cols), description=f"{n_key_dupes:,} rows with duplicate key {key_cols}", severity=Severity.CRITICAL, affected_rows=int(n_key_dupes), affected_pct=round(n_key_dupes / n * 100, 2), recommendation="Primary key violation. Investigate source — may indicate pipeline fan-out or merge error.", )) # 3. Validity — business rules if business_rules: for rule_name, violation_mask, severity in business_rules: n_violations = violation_mask.sum() if n_violations > 0: report.add_issue(QualityIssue( dimension="Validity", column=None, description=f"Business rule violated: '{rule_name}' — {n_violations:,} rows", severity=severity, affected_rows=int(n_violations), affected_pct=round(n_violations / n * 100, 2), recommendation=f"Investigate rows violating '{rule_name}'. May indicate data entry errors or system bugs.", )) return report# Run a full audit on the demo orders datasetbusiness_rules = [ ("revenue must be >= 0", orders_w_outliers["revenue"] < 0, Severity.CRITICAL), ("quantity must be >= 1", orders_w_outliers["quantity"] < 1, Severity.CRITICAL), ("revenue < 100000 (plausible max)", orders_w_outliers["revenue"] > 100_000, Severity.WARNING),]quality_report = run_quality_audit( orders_w_outliers, dataset_name="orders", key_cols=["order_id"], business_rules=business_rules,)quality_report.render()print(f"Final Quality Score: {quality_report.score:.0f}/100")print("\nIssue Summary:")print(quality_report.summary().to_string())
The Data Contract Concept
A data contract is a formal specification of what a dataset is supposed to contain. It is the agreement between the data producer (pipeline, API, upstream team) and the data consumer (analyst, model, dashboard).
python
from dataclasses import dataclass, fieldfrom typing import Any@dataclassclass ColumnContract: name: str dtype: str nullable: bool min_value: float | None = None max_value: float | None = None allowed_values: list[Any] | None = None regex_pattern: str | None = None description: str = ""@dataclassclass DataContract: table_name: str version: str owner: str columns: list[ColumnContract] primary_key: list[str] = field(default_factory=list) expected_row_range: tuple[int, int] | None = None # (min_rows, max_rows) def validate(self, df: pd.DataFrame) -> list[str]: """Validate a DataFrame against this contract. Returns list of violations.""" violations = [] for col_contract in self.columns: col = col_contract.name if col not in df.columns: violations.append(f"MISSING COLUMN: {col}") continue # Null check if not col_contract.nullable and df[col].isnull().any(): n = df[col].isnull().sum() violations.append(f"NULL VIOLATION: {col} is non-nullable but has {n} nulls.") # Range check (numeric) if col_contract.min_value is not None: if pd.api.types.is_numeric_dtype(df[col]): n_below = (df[col].dropna() < col_contract.min_value).sum() if n_below > 0: violations.append(f"RANGE VIOLATION: {col} has {n_below} values below min {col_contract.min_value}.") if col_contract.max_value is not None: if pd.api.types.is_numeric_dtype(df[col]): n_above = (df[col].dropna() > col_contract.max_value).sum() if n_above > 0: violations.append(f"RANGE VIOLATION: {col} has {n_above} values above max {col_contract.max_value}.") # Allowed values check if col_contract.allowed_values is not None: invalid = ~df[col].dropna().isin(col_contract.allowed_values) n_invalid = invalid.sum() if n_invalid > 0: violations.append(f"DOMAIN VIOLATION: {col} has {n_invalid} values not in {col_contract.allowed_values}.") # Row count check if self.expected_row_range: lo, hi = self.expected_row_range if not (lo <= len(df) <= hi): violations.append(f"ROW COUNT VIOLATION: Expected {lo:,}–{hi:,} rows, got {len(df):,}.") return violations# Define the orders contractorders_contract = DataContract( table_name="orders", version="2.0", owner="data-engineering", primary_key=["order_id"], expected_row_range=(100, 10_000_000), columns=[ ColumnContract("order_id", dtype="int64", nullable=False, min_value=1, description="Unique order identifier"), ColumnContract("customer_id", dtype="int64", nullable=False, min_value=1), ColumnContract("revenue", dtype="float64", nullable=False, min_value=0.0, max_value=99_999.0), ColumnContract("quantity", dtype="int64", nullable=False, min_value=1, max_value=10_000), ColumnContract("status", dtype="object", nullable=False, allowed_values=["completed", "cancelled", "refunded", "pending"]), ],)violations = orders_contract.validate(orders_w_outliers)if violations: print(f"Contract violations ({len(violations)}):") for v in violations: print(f" [X] {v}")else: print("Contract validation PASSED.")
Key Takeaways
Audit before you clean. Every cleaning decision should be motivated by a documented quality finding, not by intuition or habit.
The six quality dimensions — Completeness, Accuracy, Consistency, Timeliness, Validity, Uniqueness — cover every issue you will encounter. Score each one explicitly.
Missing data is not a monolith: MCAR, MAR, and MNAR require different handling strategies. The MNAR case is the most dangerous because imputation without correction introduces systematic bias.
Use three complementary outlier methods: IQR (distribution-agnostic), Z-score (assumes normality), and Modified Z-score (robust, MAD-based). Compare results — agreement across methods means higher confidence.
Type mismatch detection catches silent errors: numeric-as-string prevents aggregations; boolean-as-integer distorts correlations; float IDs cause join failures.
Referential integrity checks before joins prevent the silent row loss that makes analysis results wrong without any error being raised.
The DataQualityReport class with a 0–100 score gives you a single headline number for stakeholder communication and a diff-able artifact to track improvement over time.
Data contracts formalise the expected schema, ranges, and business rules. Validating against them at load time is the production-grade defence against silent data drift.