An agent that works in a Jupyter notebook is a demo. An agent that handles 50,000 requests per day, stays within a monthly LLM budget, recovers from rate limits, and lets engineers debug yesterday's failed run is a product. This lesson covers everything between demo and production.
Cost Tracking
LLM API costs scale with token usage. Track every call:
python
from dataclasses import dataclass, fieldfrom datetime import datetimePRICING = { "claude-opus-4-5": {"input": 15.00, "output": 75.00}, # per million tokens "claude-haiku-4-5": {"input": 0.25, "output": 1.25}, "claude-sonnet-4-5": {"input": 3.00, "output": 15.00},}@dataclassclass UsageRecord: request_id: str model: str input_tokens: int output_tokens: int timestamp: datetime = field(default_factory=datetime.utcnow) @property def cost_usd(self) -> float: p = PRICING.get(self.model, {"input": 0, "output": 0}) return ( self.input_tokens / 1_000_000 * p["input"] + self.output_tokens / 1_000_000 * p["output"] )usage_log: list[UsageRecord] = []def tracked_llm_call(model: str, messages: list, **kwargs): from anthropic import Anthropic client = Anthropic() response = client.messages.create(model=model, messages=messages, **kwargs) record = UsageRecord( request_id = response.id, model = model, input_tokens = response.usage.input_tokens, output_tokens = response.usage.output_tokens, ) usage_log.append(record) if sum(r.cost_usd for r in usage_log) > DAILY_BUDGET_USD: raise RuntimeError("Daily LLM budget exceeded") return response
OpenTelemetry Instrumentation
python
from opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterprovider = TracerProvider()provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))trace.set_tracer_provider(provider)tracer = trace.get_tracer("agent.service")def instrumented_agent(user_message: str, session_id: str) -> str: with tracer.start_as_current_span("agent.run") as span: span.set_attribute("session.id", session_id) span.set_attribute("input.length", len(user_message)) span.set_attribute("input.preview", user_message[:100]) try: result = run_agent(user_message) span.set_attribute("output.length", len(result)) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.record_exception(e) span.set_status(trace.StatusCode.ERROR, str(e)) raise
Traces flow to Jaeger, Honeycomb, or Grafana Tempo, giving you waterfall views of every agent step with timing.
Multi-Turn State Management
python
import jsonfrom pathlib import Pathclass ConversationStore: def __init__(self, storage_dir: str = "./sessions"): self.dir = Path(storage_dir) self.dir.mkdir(exist_ok=True) def load(self, session_id: str) -> list[dict]: path = self.dir / f"{session_id}.json" if not path.exists(): return [] return json.loads(path.read_text()) def save(self, session_id: str, messages: list[dict]): path = self.dir / f"{session_id}.json" path.write_text(json.dumps(messages, indent=2)) def truncate(self, messages: list[dict], max_tokens: int = 80_000) -> list[dict]: """Keep the system message and the most recent turns within token budget.""" # Simple approximation: 1 token ≈ 4 characters while sum(len(str(m)) // 4 for m in messages) > max_tokens and len(messages) > 2: messages.pop(1) # remove oldest non-system message return messagesstore = ConversationStore()def multi_turn_agent(user_message: str, session_id: str) -> str: messages = store.load(session_id) messages.append({"role": "user", "content": user_message}) messages = store.truncate(messages) result = run_agent_from_messages(messages) messages.append({"role": "assistant", "content": result}) store.save(session_id, messages) return result
import time, randomdef with_retry(fn, max_retries=3, base_delay=1.0): for attempt in range(max_retries): try: return fn() except Exception as e: if "rate_limit" not in str(e).lower() or attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) + random.uniform(0, 1) time.sleep(delay)