GadaaLabs
AI Automation — Production Agents & Agentic Systems
Lesson 11

Production Agent Deployment — Reliability, Cost & Scaling

28 min

The Reliability Problem at Scale

A single agent run that works 99% of the time sounds reliable. At 1000 tasks per day, that is 10 failures per day — which means 10 users, customers, or automated pipelines hitting errors. At 10,000 tasks per day, it is 100 failures.

Production reliability requires addressing failures at every layer:

  • Transient failures (network timeouts, rate limits, temporary service unavailability): retry with exponential backoff
  • Duplicate execution (the same task submitted twice): idempotency keys
  • Cascading failures (one service failing causes agents to queue up and crash): circuit breakers
  • Runaway tasks (agent loops, LLM generates infinite tool calls): timeouts
  • Cost overruns (context window growing indefinitely): token budgets

Retry Strategy with Tenacity

python
from tenacity import (
    retry,
    wait_exponential,
    stop_after_attempt,
    retry_if_exception_type,
    before_sleep_log,
)
import logging
import time

logger = logging.getLogger(__name__)


class TransientError(Exception):
    """Retryable: network error, rate limit, 5xx from API."""


class PermanentError(Exception):
    """Non-retryable: invalid input, 4xx (except 429), authentication failure."""


@retry(
    retry=retry_if_exception_type(TransientError),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
def call_llm_reliable(messages: list[dict], model: str) -> str:
    """LLM call with automatic retry on transient failures."""
    from groq import Groq, RateLimitError, APIConnectionError, APIStatusError

    client = Groq()
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            max_tokens=800,
            timeout=30,   # hard timeout per call
        )
        return response.choices[0].message.content
    except RateLimitError as e:
        raise TransientError(f"Rate limited: {e}") from e
    except APIConnectionError as e:
        raise TransientError(f"Connection error: {e}") from e
    except APIStatusError as e:
        if e.status_code >= 500:
            raise TransientError(f"Server error {e.status_code}: {e}") from e
        raise PermanentError(f"Client error {e.status_code}: {e}") from e

Idempotency Keys

python
import redis
import json
import hashlib

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = 86400  # 24 hours


def run_idempotent(idempotency_key: str, task_fn, *args, **kwargs) -> dict:
    """
    Execute a task function exactly once per idempotency key.
    Subsequent calls with the same key return the cached result.
    """
    cache_key = f"idempotent:{idempotency_key}"
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    result = task_fn(*args, **kwargs)
    redis_client.setex(cache_key, IDEMPOTENCY_TTL, json.dumps(result, default=str))
    return result

Circuit Breaker

A circuit breaker prevents cascading failures by stopping calls to a failing service before they pile up:

python
import threading
import time
from enum import Enum


class CircuitState(str, Enum):
    CLOSED = "closed"       # normal operation, calls pass through
    OPEN = "open"           # service is failing, calls immediately fail
    HALF_OPEN = "half_open" # probe: try one call, close if it succeeds


class CircuitBreaker:
    """
    Circuit breaker for any external service call.
    Opens after failure_threshold failures in window_seconds.
    """

    def __init__(
        self,
        name: str,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        window_seconds: float = 60.0,
    ):
        self.name = name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.window_seconds = window_seconds
        self._state = CircuitState.CLOSED
        self._failures: list[float] = []
        self._opened_at: float = 0.0
        self._lock = threading.Lock()

    def call(self, fn, *args, **kwargs):
        """Call fn through the circuit breaker."""
        with self._lock:
            if self._state == CircuitState.OPEN:
                if time.time() - self._opened_at > self.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                else:
                    raise RuntimeError(f"Circuit '{self.name}' is OPEN — service unavailable")

        try:
            result = fn(*args, **kwargs)
            with self._lock:
                if self._state == CircuitState.HALF_OPEN:
                    self._state = CircuitState.CLOSED
                    self._failures.clear()
                    print(f"Circuit '{self.name}' CLOSED — service recovered")
            return result
        except Exception as e:
            with self._lock:
                now = time.time()
                # Expire old failures outside the window
                self._failures = [t for t in self._failures if now - t < self.window_seconds]
                self._failures.append(now)

                if len(self._failures) >= self.failure_threshold:
                    self._state = CircuitState.OPEN
                    self._opened_at = now
                    print(f"Circuit '{self.name}' OPENED after {len(self._failures)} failures")
            raise

Token Budget Management

python
def count_tokens_approx(text: str) -> int:
    """Rough token estimate: 1 token ≈ 4 characters."""
    return len(text) // 4


def build_messages_within_budget(
    system_prompt: str,
    conversation_history: list[dict],
    user_message: str,
    max_tokens: int = 6000,
    reserve_for_output: int = 800,
) -> list[dict]:
    """
    Build a message list that fits within the token budget.
    Keeps the system prompt and the most recent messages.
    Drops the oldest messages if the budget is exceeded.
    """
    budget = max_tokens - reserve_for_output

    system_tokens = count_tokens_approx(system_prompt)
    user_tokens = count_tokens_approx(user_message)
    available_for_history = budget - system_tokens - user_tokens

    if available_for_history < 0:
        # Even the system prompt + user message exceeds budget — truncate user message
        available_for_history = 0

    # Fill history from newest to oldest until we exhaust the budget
    selected_history = []
    used_tokens = 0
    for msg in reversed(conversation_history):
        msg_tokens = count_tokens_approx(msg["content"])
        if used_tokens + msg_tokens > available_for_history:
            break
        selected_history.insert(0, msg)
        used_tokens += msg_tokens

    messages = [{"role": "system", "content": system_prompt}]
    messages.extend(selected_history)
    messages.append({"role": "user", "content": user_message})
    return messages

Model Routing — Cost Optimisation

Route simple tasks to a cheap, fast model and complex tasks to a capable, expensive one:

python
ROUTING_CONFIG = {
    "simple": {
        "model": "llama-3.1-8b-instant",
        "max_tokens": 400,
        "description": "Simple Q&A, short summaries, classification",
    },
    "standard": {
        "model": "llama-3.3-70b-versatile",
        "max_tokens": 800,
        "description": "Multi-step reasoning, code generation, complex analysis",
    },
}

SIMPLE_TASK_INDICATORS = [
    "what is", "define", "summarise", "list", "classify",
    "yes or no", "true or false", "how many"
]


def route_to_model(task_description: str) -> dict:
    """
    Route a task to the appropriate model based on complexity heuristics.
    """
    lower = task_description.lower()
    is_simple = any(indicator in lower for indicator in SIMPLE_TASK_INDICATORS)
    # Simple tasks under 20 words get the small model
    is_short = len(task_description.split()) < 20

    if is_simple and is_short:
        return ROUTING_CONFIG["simple"]
    return ROUTING_CONFIG["standard"]

Production FastAPI Service

python
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio
import uuid
import time

app = FastAPI(title="Production Agent Service")

# Task state store (use Redis in production for horizontal scaling)
task_store: dict[str, dict] = {}


class TaskRequest(BaseModel):
    task: str
    user_id: str = "anonymous"
    idempotency_key: str | None = None


class TaskResponse(BaseModel):
    task_id: str
    status: str  # "pending" | "running" | "complete" | "failed"
    result: str | None = None
    error: str | None = None
    cost_usd: float | None = None
    duration_ms: float | None = None


@app.post("/tasks", response_model=TaskResponse, status_code=202)
async def create_task(request: TaskRequest, background_tasks: BackgroundTasks) -> TaskResponse:
    """Submit a new agent task. Returns immediately with a task_id for polling."""
    # Idempotency: check if we've seen this key before
    if request.idempotency_key:
        existing = _find_by_idempotency_key(request.idempotency_key)
        if existing:
            return TaskResponse(**existing)

    task_id = str(uuid.uuid4())[:8]
    task_store[task_id] = {
        "task_id": task_id,
        "task": request.task,
        "user_id": request.user_id,
        "idempotency_key": request.idempotency_key,
        "status": "pending",
        "result": None,
        "error": None,
        "cost_usd": 0.0,
        "duration_ms": 0.0,
        "created_at": time.time(),
    }

    background_tasks.add_task(_run_agent_task, task_id, request.task)

    return TaskResponse(task_id=task_id, status="pending")


@app.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str) -> TaskResponse:
    """Poll task status and retrieve result when complete."""
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")
    return TaskResponse(**task)


async def _run_agent_task(task_id: str, task_description: str) -> None:
    """Background worker that executes the agent task."""
    task_store[task_id]["status"] = "running"
    t_start = time.perf_counter()

    try:
        # Route to appropriate model
        routing = route_to_model(task_description)
        model = routing["model"]

        messages = [
            {"role": "system", "content": "You are a helpful assistant. Complete the task concisely."},
            {"role": "user", "content": task_description},
        ]

        result = call_llm_reliable(messages, model)

        duration_ms = (time.perf_counter() - t_start) * 1000
        task_store[task_id].update({
            "status": "complete",
            "result": result,
            "duration_ms": round(duration_ms, 2),
        })
    except Exception as e:
        duration_ms = (time.perf_counter() - t_start) * 1000
        task_store[task_id].update({
            "status": "failed",
            "error": str(e),
            "duration_ms": round(duration_ms, 2),
        })


def _find_by_idempotency_key(key: str) -> dict | None:
    for task in task_store.values():
        if task.get("idempotency_key") == key:
            return task
    return None


@app.get("/health")
async def health() -> dict:
    pending = sum(1 for t in task_store.values() if t["status"] in {"pending", "running"})
    return {"status": "ok", "tasks_in_flight": pending}

Graceful Shutdown

python
import signal
import asyncio

_shutdown_event = asyncio.Event()


def handle_shutdown(sig, frame):
    print(f"Received {signal.Signals(sig).name} — initiating graceful shutdown")
    _shutdown_event.set()


signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)


async def wait_for_drain(max_wait_seconds: int = 30) -> None:
    """Wait for in-flight tasks to complete before shutting down."""
    deadline = time.time() + max_wait_seconds
    while time.time() < deadline:
        in_flight = sum(1 for t in task_store.values() if t["status"] in {"pending", "running"})
        if in_flight == 0:
            print("All tasks drained. Shutting down cleanly.")
            return
        print(f"Draining: {in_flight} tasks in flight. Waiting...")
        await asyncio.sleep(2)
    print(f"Drain timeout ({max_wait_seconds}s). Forcing shutdown.")

Production Dockerfile

python
# This is a Dockerfile (not Python) — shown as a comment-formatted code block
# syntax=docker/dockerfile:1

# Stage 1: build dependencies
# FROM python:3.12-slim AS builder
# WORKDIR /app
# COPY requirements.txt .
# RUN pip install --no-cache-dir --user -r requirements.txt

# Stage 2: production image
# FROM python:3.12-slim
# WORKDIR /app
# COPY --from=builder /root/.local /root/.local
# COPY . .
# ENV PATH=/root/.local/bin:$PATH
# ENV PYTHONDONTWRITEBYTECODE=1
# ENV PYTHONUNBUFFERED=1
# RUN useradd -m -u 1000 agent && chown -R agent:agent /app
# USER agent
# HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
#   CMD curl -f http://localhost:8000/health || exit 1
# EXPOSE 8000
# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
print("See Dockerfile in repo root — multi-stage build with non-root user and health check")

Horizontal Scaling Architecture

Load Balancer (nginx / ALB)

    ├── Agent Worker 1 (stateless FastAPI)
    ├── Agent Worker 2 (stateless FastAPI)
    └── Agent Worker 3 (stateless FastAPI)

           ├── Redis (shared task state, idempotency keys, cache)
           └── Qdrant / External APIs (shared external services)

Workers are stateless — all task state lives in Redis. Any worker can handle any request. Scaling is horizontal add-workers.

Key Takeaways

  • At 1000 tasks/day, 1% failure rate = 10 failures/day; reliability engineering is not optional at production scale.
  • Use tenacity with wait_exponential for transient errors; categorise every exception as TransientError (retry) or PermanentError (fail immediately).
  • Idempotency keys prevent double-execution when clients retry on network errors — store results in Redis with 24h TTL.
  • Circuit breakers prevent cascading failures: open after 5 failures in 60 seconds, probe with one call after 30 seconds.
  • Token budgets prevent context window overruns — always reserve tokens for output and drop oldest messages when the budget is exceeded.
  • Model routing (simple → 8B, complex → 70B) reduces cost by 5–10× on mixed workloads with minimal quality loss.
  • The POST /tasks + GET /tasks/ async pattern is the correct API design: never make callers wait synchronously for long-running agent tasks.
  • Graceful shutdown drains in-flight tasks before exiting — prevents partial task execution on every deployment.