GadaaLabs
Machine Learning Engineering — Production ML Systems
Lesson 3

Feature Stores — Offline, Online & Point-in-Time Correctness

24 min

The feature store is the component of the ML platform that most teams build too late. They start by computing features in a training notebook, re-implement the same logic in a serving microservice, and discover three months into production that the two implementations diverge. The model's offline accuracy does not match online performance. This is training-serving skew, and a feature store is the architectural solution.

The Training-Serving Skew Problem

Training-serving skew occurs when the feature values seen by the model at training time differ from the feature values seen at serving time. This is one of the most common and most insidious production ML bugs.

Concrete Example: Timestamp Leakage

Suppose you are training a churn prediction model. You compute a feature days_since_last_purchase for each customer. At training time, if you carelessly join the full purchase history against the label without a time filter, you will leak future purchases into the feature.

python
# WRONG: leaks future information into training features
def compute_days_since_last_purchase_wrong(
    customers: pd.DataFrame,
    purchases: pd.DataFrame,
) -> pd.DataFrame:
    last_purchase = (
        purchases.groupby("customer_id")["purchase_date"]
        .max()
        .reset_index()
        .rename(columns={"purchase_date": "last_purchase_date"})
    )
    # BUG: this uses ALL purchases, including those after the label timestamp
    merged = customers.merge(last_purchase, on="customer_id", how="left")
    merged["days_since_last_purchase"] = (
        pd.Timestamp.today() - merged["last_purchase_date"]
    ).dt.days
    return merged

# RIGHT: only use purchases that existed at the time the label was generated
def compute_days_since_last_purchase_correct(
    customer_id: str,
    as_of_timestamp: pd.Timestamp,
    purchases: pd.DataFrame,
) -> float:
    """
    Point-in-time correct: only consider purchases before as_of_timestamp.
    This is what the model would have seen if it ran at as_of_timestamp.
    """
    past_purchases = purchases[
        (purchases["customer_id"] == customer_id)
        & (purchases["purchase_date"] < as_of_timestamp)
    ]
    if past_purchases.empty:
        return float("inf")
    last_date = past_purchases["purchase_date"].max()
    return (as_of_timestamp - last_date).days

At serving time, days_since_last_purchase is computed correctly (using only purchases before now). But the training-time version used all purchases. The feature distributions differ, so the model's learned coefficients are calibrated to wrong data.

Offline Store vs Online Store

A feature store has two faces: the offline store for training and the online store for serving.

Offline store: stores the full historical feature values, usually as Parquet files in S3/GCS or as Delta Lake tables. Used to generate training datasets. Queries are batch (slow is acceptable). Point-in-time correct joins are computed here. Typical tools: Parquet on S3, Delta Lake, Hive, BigQuery.

Online store: stores only the latest feature values for each entity, optimised for low-latency point lookups. A single feature vector retrieval must complete in under 5ms. Typical tools: Redis (most common), DynamoDB, Cassandra, Bigtable.

python
# Offline store: batch retrieval for training dataset generation
def get_training_features_offline(
    entity_df: pd.DataFrame,   # must contain entity_id + event_timestamp
    feature_table_path: str,
) -> pd.DataFrame:
    """
    Join entity_df against historical feature table with point-in-time
    correct semantics: for each row in entity_df, use the latest feature
    values available BEFORE entity_df.event_timestamp.
    """
    features = pd.read_parquet(feature_table_path)
    result_rows = []
    for _, entity_row in entity_df.iterrows():
        eid = entity_row["customer_id"]
        ts  = entity_row["event_timestamp"]
        # Point-in-time: only features written before ts
        valid_features = features[
            (features["customer_id"] == eid)
            & (features["feature_timestamp"] <= ts)
        ]
        if valid_features.empty:
            continue
        # Take the most recent valid row
        latest = valid_features.sort_values("feature_timestamp").iloc[-1]
        result_rows.append({**entity_row.to_dict(), **latest.to_dict()})
    return pd.DataFrame(result_rows)

# Online store: single entity lookup for real-time serving
import redis, json

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

def get_online_features(customer_id: str) -> dict:
    """Retrieve latest feature vector from Redis for serving."""
    key = f"features:customer:{customer_id}"
    raw = redis_client.get(key)
    if raw is None:
        return get_default_features()
    return json.loads(raw)

def write_online_features(customer_id: str, features: dict, ttl_seconds: int = 3600) -> None:
    """Write materialised features to Redis with TTL."""
    key = f"features:customer:{customer_id}"
    redis_client.set(key, json.dumps(features), ex=ttl_seconds)

Point-in-Time Correct Joins

The as-of join (also called a point-in-time join or temporal join) is the core primitive of correct training dataset generation. For each training example, it retrieves the feature values that were available at the timestamp of the training label — not the latest values and not future values.

SQL Implementation

sql
-- Point-in-time correct join in SQL (works in BigQuery, Snowflake, DuckDB)
-- entity_df: table of (customer_id, label_timestamp, label)
-- feature_history: table of (customer_id, feature_timestamp, feature_value_*)

SELECT
    e.customer_id,
    e.label_timestamp,
    e.label,
    f.tenure_days,
    f.total_spend_30d,
    f.support_tickets_90d
FROM entity_df e
LEFT JOIN LATERAL (
    SELECT
        tenure_days,
        total_spend_30d,
        support_tickets_90d
    FROM feature_history fh
    WHERE fh.customer_id = e.customer_id
      AND fh.feature_timestamp <= e.label_timestamp   -- point-in-time filter
    ORDER BY fh.feature_timestamp DESC
    LIMIT 1
) f ON TRUE;

Python Implementation with Merge-ASOF

python
def point_in_time_join(
    entity_df: pd.DataFrame,       # columns: entity_id, event_timestamp, label
    feature_df: pd.DataFrame,      # columns: entity_id, feature_timestamp, feature_*
    entity_col: str = "customer_id",
) -> pd.DataFrame:
    """
    Vectorised point-in-time join using pandas merge_asof.
    Both DataFrames must be sorted by timestamp.
    """
    entity_df  = entity_df.sort_values("event_timestamp")
    feature_df = feature_df.sort_values("feature_timestamp")

    result = pd.merge_asof(
        entity_df,
        feature_df.rename(columns={"feature_timestamp": "event_timestamp"}),
        on="event_timestamp",
        by=entity_col,
        direction="backward",   # use the most recent feature row <= event_timestamp
    )
    return result

# Example usage
labels = pd.DataFrame({
    "customer_id":    ["c1", "c2", "c1"],
    "event_timestamp": pd.to_datetime(["2025-01-10", "2025-01-10", "2025-02-01"]),
    "label":          [1, 0, 1],
})
features = pd.DataFrame({
    "customer_id":      ["c1", "c1", "c2"],
    "feature_timestamp": pd.to_datetime(["2025-01-05", "2025-01-28", "2025-01-08"]),
    "tenure_days":      [120, 135, 55],
    "total_spend_30d":  [240.0, 310.0, 90.0],
})
training_df = point_in_time_join(labels, features)
# For c1 at 2025-01-10: uses feature row from 2025-01-05 (tenure=120, spend=240)
# For c1 at 2025-02-01: uses feature row from 2025-01-28 (tenure=135, spend=310)

Feast Architecture

Feast is the most widely adopted open-source feature store. Its key abstractions are:

  • Entity: the primary key of your features (e.g., customer_id, driver_id). Defines what you are computing features for.
  • DataSource: where raw feature data lives (local file, S3 Parquet, BigQuery table, Kafka topic).
  • FeatureView: defines a group of features computed from a DataSource for an Entity, with a TTL (maximum staleness allowed).
  • FeatureService: a named collection of features for a specific model — your model requests features by service name, not individual feature names.
  • FeatureStore: the top-level object; manages the registry, offline store, and online store.
  • MaterializationJob: copies data from the offline store to the online store for a given time range.

Feast Usage Example

python
# feature_repo/feature_store.yaml
# (YAML config, shown as comment)
# project: gadaalabs_churn
# registry: data/registry.db
# provider: local
# online_store:
#   type: sqlite
#   path: data/online_store.db

from datetime import timedelta
import pandas as pd
from feast import (
    Entity, FeatureView, FeatureService,
    Field, FileSource, FeatureStore,
)
from feast.types import Float32, Int64, String

# --- Define the feature repo ---

customer = Entity(
    name="customer_id",
    description="Unique customer identifier",
)

customer_source = FileSource(
    path="data/customer_features.parquet",
    timestamp_field="feature_timestamp",
)

customer_features_view = FeatureView(
    name="customer_features",
    entities=[customer],
    ttl=timedelta(days=7),             # features older than 7 days are stale
    schema=[
        Field(name="tenure_days",        dtype=Int64),
        Field(name="total_spend_30d",    dtype=Float32),
        Field(name="support_tickets_90d", dtype=Int64),
        Field(name="last_purchase_days", dtype=Int64),
    ],
    source=customer_source,
    tags={"team": "growth", "model": "churn_v2"},
)

churn_model_service = FeatureService(
    name="churn_model_v2",
    features=[customer_features_view],
)

# --- Materialise to online store ---
store = FeatureStore(repo_path="feature_repo/")

# Apply the feature repo definitions
store.apply([customer, customer_features_view, churn_model_service])

# Materialise: copy last 7 days of features to online store
from datetime import datetime, timezone
store.materialize_incremental(
    end_date=datetime.now(tz=timezone.utc)
)

# --- Training dataset generation (offline, point-in-time correct) ---
entity_df = pd.DataFrame({
    "customer_id":    ["c001", "c002", "c003"],
    "event_timestamp": [
        datetime(2025, 3, 1, tzinfo=timezone.utc),
        datetime(2025, 3, 5, tzinfo=timezone.utc),
        datetime(2025, 3, 10, tzinfo=timezone.utc),
    ],
})
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=["customer_features:tenure_days",
              "customer_features:total_spend_30d",
              "customer_features:support_tickets_90d"],
).to_df()

# --- Online feature retrieval (serving, low-latency) ---
online_features = store.get_online_features(
    features=["customer_features:tenure_days",
              "customer_features:total_spend_30d",
              "customer_features:support_tickets_90d"],
    entity_rows=[{"customer_id": "c001"}, {"customer_id": "c002"}],
).to_dict()
print(online_features)
# {'customer_id': ['c001', 'c002'],
#  'tenure_days': [180, 45], ...}

Feature Freshness SLAs

Not all features need to be fresh. Defining and enforcing freshness SLAs prevents stale features from silently degrading model performance.

python
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone

@dataclass
class FeatureFreshnessSLA:
    feature_view: str
    max_staleness: timedelta
    alert_threshold: timedelta   # alert before breaching SLA

FRESHNESS_SLAS = [
    FeatureFreshnessSLA("customer_features",   timedelta(hours=24), timedelta(hours=20)),
    FeatureFreshnessSLA("transaction_features", timedelta(hours=1),  timedelta(minutes=50)),
    FeatureFreshnessSLA("device_features",     timedelta(days=7),   timedelta(days=6)),
]

def check_feature_freshness(
    store: "FeatureStore",
    slas: list[FeatureFreshnessSLA],
) -> list[dict]:
    """Check all feature views against their freshness SLAs."""
    violations = []
    now = datetime.now(tz=timezone.utc)

    for sla in slas:
        last_materialized = store.get_feature_view(sla.feature_view).materialization_intervals
        if not last_materialized:
            violations.append({
                "feature_view": sla.feature_view,
                "issue": "never_materialized",
            })
            continue
        last_end = max(i.end_time for i in last_materialized)
        staleness = now - last_end
        if staleness > sla.max_staleness:
            violations.append({
                "feature_view":  sla.feature_view,
                "staleness":     str(staleness),
                "sla":           str(sla.max_staleness),
                "issue":         "SLA_BREACHED",
            })
        elif staleness > sla.alert_threshold:
            violations.append({
                "feature_view":  sla.feature_view,
                "staleness":     str(staleness),
                "sla":           str(sla.max_staleness),
                "issue":         "APPROACHING_SLA",
            })
    return violations

Feature Monitoring

Feature distributions change over time. A feature that was normally distributed at training becomes bimodal in production. This changes the model's internal representations and degrades accuracy. Monitoring feature distributions is as important as monitoring model outputs.

python
import numpy as np
from scipy import stats

def compute_feature_stats(df: pd.DataFrame) -> dict:
    """Compute summary statistics for all numeric features."""
    stats_dict = {}
    for col in df.select_dtypes(include=[np.number]).columns:
        series = df[col].dropna()
        stats_dict[col] = {
            "mean":    float(series.mean()),
            "std":     float(series.std()),
            "min":     float(series.min()),
            "max":     float(series.max()),
            "p25":     float(series.quantile(0.25)),
            "p50":     float(series.quantile(0.50)),
            "p75":     float(series.quantile(0.75)),
            "null_pct": float(df[col].isna().mean()),
            "n":       len(series),
        }
    return stats_dict

def detect_feature_drift(
    reference_stats: dict,
    current_df: pd.DataFrame,
    ks_pvalue_threshold: float = 0.05,
) -> list[dict]:
    """
    Run KS test comparing current feature distribution to reference.
    Returns a list of features with significant distribution shift.
    """
    drifted = []
    for feature, ref in reference_stats.items():
        if feature not in current_df.columns:
            continue
        current_vals = current_df[feature].dropna().values
        # Reconstruct approximate reference distribution from stats
        ref_vals = np.random.normal(
            loc=ref["mean"], scale=max(ref["std"], 1e-6), size=1000
        )
        stat, pvalue = stats.ks_2samp(ref_vals, current_vals)
        if pvalue < ks_pvalue_threshold:
            drifted.append({
                "feature":    feature,
                "ks_stat":    round(stat, 4),
                "p_value":    round(pvalue, 6),
                "ref_mean":   ref["mean"],
                "curr_mean":  float(current_df[feature].mean()),
            })
    return drifted

The Feature Server API Pattern

The feature server exposes online features as a REST API. This standardises feature access: any serving microservice — regardless of language — can retrieve features via HTTP without knowing the underlying store.

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import redis, json, time

app = FastAPI(title="Feature Server")
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)

class FeatureRequest(BaseModel):
    entity_type: str                # e.g., "customer"
    entity_ids:  list[str]
    feature_names: list[str]

class FeatureResponse(BaseModel):
    entity_ids: list[str]
    features:   dict[str, list]
    latency_ms: float

@app.post("/features", response_model=FeatureResponse)
async def get_features(request: FeatureRequest) -> FeatureResponse:
    start = time.monotonic()
    result: dict[str, list] = {f: [] for f in request.feature_names}

    pipeline = redis_client.pipeline()
    for eid in request.entity_ids:
        key = f"features:{request.entity_type}:{eid}"
        pipeline.get(key)
    raw_values = pipeline.execute()

    for i, (eid, raw) in enumerate(zip(request.entity_ids, raw_values)):
        if raw is None:
            raise HTTPException(
                status_code=404,
                detail=f"No features found for {request.entity_type}={eid}"
            )
        feature_dict = json.loads(raw)
        for fname in request.feature_names:
            result[fname].append(feature_dict.get(fname))

    latency_ms = (time.monotonic() - start) * 1000
    return FeatureResponse(
        entity_ids=request.entity_ids,
        features=result,
        latency_ms=round(latency_ms, 2),
    )

@app.get("/health")
async def health() -> dict:
    return {"status": "ok"}

Cost of Online vs Batch Feature Retrieval

Online feature lookup has a fundamentally different cost profile from batch retrieval.

| Dimension | Offline (Batch) | Online (Redis) | |---|---|---| | Latency | Seconds to minutes | 0.1 — 5ms | | Throughput | Millions of rows/min | Tens of thousands req/s | | Infrastructure | Object storage (cheap) | Memory-resident (expensive) | | Staleness | Can be up-to-date | Limited by materialisation cadence | | Use case | Training dataset generation | Real-time serving |

A common mistake is storing all features in the online store. Only store features that are actually needed for real-time serving. Training-only features (e.g., historical aggregations over years) live in the offline store and never need to be in Redis.

python
def estimate_online_store_cost(
    n_entities: int,
    n_features: int,
    bytes_per_feature: float = 8,   # float64
    replication_factor: int = 3,
    redis_memory_gb_price_per_month: float = 0.08,
) -> dict:
    """Estimate monthly cost of storing features in Redis."""
    bytes_per_entity = n_features * bytes_per_feature
    total_bytes = n_entities * bytes_per_entity * replication_factor
    total_gb = total_bytes / (1024 ** 3)
    monthly_cost = total_gb * redis_memory_gb_price_per_month
    return {
        "n_entities":       n_entities,
        "bytes_per_entity": bytes_per_entity,
        "total_gb":         round(total_gb, 2),
        "monthly_cost_usd": round(monthly_cost, 2),
    }

# 10M customers, 50 features
cost = estimate_online_store_cost(n_entities=10_000_000, n_features=50)
# -> {"total_gb": 11.18, "monthly_cost_usd": 2.68} (before replication)

Key Takeaways

  • Training-serving skew is the most common silent bug in production ML; it occurs when feature computation logic differs between training and serving time.
  • Point-in-time correct joins are mandatory for any feature computed from events: always filter to only use data that existed before the label timestamp.
  • The offline store (Parquet/Delta) serves training; the online store (Redis/DynamoDB) serves inference. Never confuse the two.
  • Feast provides a unified abstraction over both stores: define once, materialise to online, retrieve correctly from offline.
  • Feature freshness SLAs must be defined, monitored, and enforced — a model receiving 72-hour-old features when it expects 1-hour-old features is effectively a broken model.
  • Monitor feature distributions over time and alert on KS-test drift before it affects model accuracy.
  • The feature server pattern decouples the feature store from serving code: any microservice retrieves features via HTTP, regardless of implementation language.
  • Only put features in the online store that are actually needed for real-time serving; keep everything else in the (much cheaper) offline store.