A responsive RAG API should complete in under 3 seconds end-to-end at the 99th percentile. The budget breaks down as follows:
| Stage | Target p50 | Target p99 | Notes |
|-------|-----------|-----------|-------|
| Embed query | 15 ms | 25 ms | Local model on CPU |
| ANN search | 10 ms | 30 ms | HNSW, 1M vectors |
| Reranking | 150 ms | 250 ms | MiniLM-L-6 CPU |
| LLM generation | 800 ms | 1,800 ms | Groq, 500 token output |
| Overhead (serialisation, I/O) | 20 ms | 50 ms | |
| Total | ~1 s | ~2.2 s | |
Hitting these numbers requires async retrieval, an intelligent cache, and a fast LLM. Missing them usually means retrieval or reranking is running synchronously when it could be parallelised.
Async Parallel Retrieval
Dense embedding search and BM25 (sparse) search are independent. Run them concurrently:
python
import asyncioimport timefrom sentence_transformers import SentenceTransformerfrom qdrant_client import QdrantClientembed_model = SentenceTransformer("BAAI/bge-large-en-v1.5")qdrant = QdrantClient(url="http://localhost:6333")async def embed_query_async(query: str) -> list[float]: """Run embedding in a thread pool so it doesn't block the event loop.""" loop = asyncio.get_event_loop() vec = await loop.run_in_executor( None, lambda: embed_model.encode([query], normalize_embeddings=True)[0].tolist() ) return vecasync def dense_search_async(query_vec: list[float], collection: str, k: int) -> list[dict]: """Async wrapper for Qdrant search.""" loop = asyncio.get_event_loop() results = await loop.run_in_executor( None, lambda: qdrant.search(collection_name=collection, query_vector=query_vec, limit=k, with_payload=True) ) return [{"id": str(r.id), "text": r.payload["text"], "score": r.score} for r in results]async def bm25_search_async(query: str, k: int) -> list[dict]: """Async wrapper for BM25 search (using a BM25 library like rank_bm25).""" loop = asyncio.get_event_loop() # Placeholder — replace with your BM25 index lookup results = await loop.run_in_executor(None, lambda: bm25_index.search(query, k)) return resultsasync def parallel_retrieve( query: str, collection: str, k: int = 20,) -> list[dict]: """ Run dense embedding + BM25 retrieval in parallel. Merge with RRF and return top-k results. """ t0 = time.perf_counter() # Embed and search in parallel query_vec = await embed_query_async(query) dense_results, sparse_results = await asyncio.gather( dense_search_async(query_vec, collection, k), bm25_search_async(query, k), ) latency_ms = (time.perf_counter() - t0) * 1000 print(f"Parallel retrieval: {latency_ms:.1f} ms") # RRF merge all_ids = [r["id"] for r in dense_results] sparse_ids = [r["id"] for r in sparse_results] scores: dict[str, float] = {} for ranked, source_ids in [(enumerate(all_ids, 1), all_ids), (enumerate(sparse_ids, 1), sparse_ids)]: for rank, doc_id in enumerate(source_ids, start=1): scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (60 + rank) result_map = {r["id"]: r for r in dense_results + sparse_results} merged_ids = sorted(scores, key=lambda x: scores[x], reverse=True)[:k] return [result_map[d] for d in merged_ids if d in result_map]
Semantic Cache
A semantic cache stores (query_embedding, cached_answer) pairs. When a new query arrives:
Embed it.
Search the cache for any query with cosine similarity > 0.95.
If found, return the cached answer immediately — skipping retrieval and generation.
If not found, run the full pipeline and store the result in the cache.
This works because many users ask semantically identical questions with different wording. The cache hit rate in a production customer-facing RAG system is typically 20–40%.
python
import hashlibimport chromadbfrom chromadb.config import Settings# Use a lightweight ChromaDB instance as the cache storecache_db = chromadb.Client(Settings(anonymized_telemetry=False))cache_collection = cache_db.get_or_create_collection( name="semantic_cache", metadata={"hnsw:space": "cosine"},)def semantic_cache_lookup( query: str, query_vec: list[float], threshold: float = 0.95,) -> str | None: """ Look up a query in the semantic cache. Returns cached answer string if a sufficiently similar query exists, otherwise returns None. """ if cache_collection.count() == 0: return None results = cache_collection.query( query_embeddings=[query_vec], n_results=1, include=["metadatas", "distances"], ) if not results["distances"][0]: return None distance = results["distances"][0][0] similarity = 1.0 - distance # ChromaDB cosine distance → similarity if similarity >= threshold: cached_answer = results["metadatas"][0][0]["answer"] print(f"Cache HIT (similarity={similarity:.3f})") return cached_answer return Nonedef semantic_cache_store(query: str, query_vec: list[float], answer: str) -> None: """Store a query-answer pair in the semantic cache.""" cache_id = hashlib.sha256(query.encode()).hexdigest()[:16] cache_collection.upsert( ids=[cache_id], embeddings=[query_vec], metadatas=[{"query": query, "answer": answer}], )
Exact Match Cache with Redis
For truly identical queries (same string, lowercased and stripped), an exact match cache with Redis is faster and cheaper than a semantic lookup:
python
import redisimport hashlibimport jsonredis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)CACHE_TTL_SECONDS = 3600 # 1 hourdef exact_cache_get(query: str) -> str | None: """Look up an exact query string in Redis.""" key = "rag:" + hashlib.sha256(query.lower().strip().encode()).hexdigest() cached = redis_client.get(key) if cached: return json.loads(cached)["answer"] return Nonedef exact_cache_set(query: str, answer: str) -> None: """Store a query-answer pair in Redis with TTL.""" key = "rag:" + hashlib.sha256(query.lower().strip().encode()).hexdigest() redis_client.setex(key, CACHE_TTL_SECONDS, json.dumps({"answer": answer}))
Tiered Retrieval
For very large corpora, add a fast BM25 first pass to reduce the ANN search space:
python
def tiered_retrieve( query: str, bm25_top_n: int = 200, ann_top_k: int = 10,) -> list[dict]: """ Two-stage retrieval: Stage 1: BM25 filters the corpus to top-200 candidates (fast, keyword-based) Stage 2: ANN search only within those 200 IDs (avoids full 1M-vector scan) """ # Stage 1: BM25 candidate filtering bm25_candidates = bm25_index.search(query, top_n=bm25_top_n) candidate_ids = [c["id"] for c in bm25_candidates] # Stage 2: ANN search with ID filter query_vec = embed_model.encode([query], normalize_embeddings=True)[0].tolist() from qdrant_client.models import Filter, HasIdCondition results = qdrant.search( collection_name="rag_chunks", query_vector=query_vec, query_filter=Filter(must=[HasIdCondition(has_id=candidate_ids)]), limit=ann_top_k, with_payload=True, ) return [{"id": str(r.id), "text": r.payload["text"], "score": r.score} for r in results]
Index Update Strategies
| Strategy | Latency to Reflect Updates | Complexity | Best For |
|----------|---------------------------|------------|----------|
| Real-time | <1 second | Low | Low-volume, time-sensitive |
| Micro-batch | 1–5 minutes | Medium | Standard production |
| Nightly rebuild | Up to 24 hours | Low | Stable corpora |
python
import asynciofrom collections import dequeimport datetime# Micro-batch update queueupdate_queue: deque = deque()async def queue_document_update(doc_id: str, text: str, metadata: dict) -> None: """Add a document update to the pending queue.""" update_queue.append({"doc_id": doc_id, "text": text, "metadata": metadata, "queued_at": datetime.datetime.utcnow().isoformat()})async def flush_update_queue(batch_size: int = 100) -> int: """Process pending updates in batches. Call every 5 minutes from a background task.""" if not update_queue: return 0 batch = [] while update_queue and len(batch) < batch_size: batch.append(update_queue.popleft()) # Embed all texts in the batch texts = [item["text"] for item in batch] embeddings = embed_model.encode(texts, normalize_embeddings=True, batch_size=32).tolist() # Upsert into Qdrant from qdrant_client.models import PointStruct points = [ PointStruct(id=item["doc_id"], vector=emb, payload={**item["metadata"], "text": item["text"]}) for item, emb in zip(batch, embeddings) ] qdrant.upsert(collection_name="rag_chunks", points=points) return len(batch)
At Groq free tier + local models + self-hosted Qdrant: effectively $0 for development and low-volume production.
Production FastAPI Server
python
from fastapi import FastAPI, HTTPException, BackgroundTasksfrom pydantic import BaseModelfrom contextlib import asynccontextmanagerimport uuidapp = FastAPI(title="Production RAG API")class QueryRequest(BaseModel): query: str tenant_id: str k: int = 5 use_cache: bool = Trueclass QueryResponse(BaseModel): answer: str sources: list[dict] cache_hit: bool latency_ms: float@app.post("/query", response_model=QueryResponse)async def query_endpoint(request: QueryRequest) -> QueryResponse: """ Main RAG query endpoint. Order: exact cache → semantic cache → parallel retrieval → rerank → generate. """ t_start = time.perf_counter() # 1. Exact cache check (fastest) if request.use_cache: cached = exact_cache_get(request.query) if cached: return QueryResponse( answer=cached, sources=[], cache_hit=True, latency_ms=(time.perf_counter() - t_start) * 1000, ) # 2. Embed query (needed for semantic cache + retrieval) query_vec = await embed_query_async(request.query) # 3. Semantic cache check if request.use_cache: cached = semantic_cache_lookup(request.query, query_vec) if cached: return QueryResponse( answer=cached, sources=[], cache_hit=True, latency_ms=(time.perf_counter() - t_start) * 1000, ) # 4. Parallel retrieval candidates = await parallel_retrieve(request.query, collection="rag_chunks", k=50) # 5. Rerank from sentence_transformers import CrossEncoder reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") pairs = [(request.query, c["text"]) for c in candidates[:50]] scores = reranker.predict(pairs) for c, s in zip(candidates, scores): c["rerank_score"] = float(s) top_docs = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:request.k] # 6. Generate context = "\n\n".join(f"[{i+1}] {d['text']}" for i, d in enumerate(top_docs)) from groq import Groq groq = Groq() gen_response = groq.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ {"role": "system", "content": "Answer using only the provided context. Cite sources by number."}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {request.query}"}, ], temperature=0.1, max_tokens=500, ) answer = gen_response.choices[0].message.content # 7. Store in cache if request.use_cache: exact_cache_set(request.query, answer) semantic_cache_store(request.query, query_vec, answer) sources = [{"text": d["text"][:200], "score": d["rerank_score"]} for d in top_docs] return QueryResponse( answer=answer, sources=sources, cache_hit=False, latency_ms=(time.perf_counter() - t_start) * 1000, )@app.get("/health")async def health() -> dict: return {"status": "ok"}
Monitoring
Track these metrics in production:
python
import sqlite3import datetimeclass RAGMetrics: """Lightweight SQLite-based metrics store for production RAG.""" def __init__(self, db_path: str = "rag_metrics.db"): self.db_path = db_path with sqlite3.connect(db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS query_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, latency_ms REAL, cache_hit INTEGER, embed_ms REAL, retrieve_ms REAL, rerank_ms REAL, llm_ms REAL, cost_usd REAL ) """) def record(self, **kwargs) -> None: kwargs["timestamp"] = datetime.datetime.utcnow().isoformat() with sqlite3.connect(self.db_path) as conn: cols = ", ".join(kwargs.keys()) placeholders = ", ".join("?" * len(kwargs)) conn.execute(f"INSERT INTO query_metrics ({cols}) VALUES ({placeholders})", list(kwargs.values())) def get_p99_latency(self, window_hours: int = 1) -> float: since = (datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)).isoformat() with sqlite3.connect(self.db_path) as conn: rows = conn.execute( "SELECT latency_ms FROM query_metrics WHERE timestamp > ? ORDER BY latency_ms", (since,) ).fetchall() if not rows: return 0.0 p99_idx = int(len(rows) * 0.99) return rows[p99_idx][0] def get_cache_hit_rate(self, window_hours: int = 1) -> float: since = (datetime.datetime.utcnow() - datetime.timedelta(hours=window_hours)).isoformat() with sqlite3.connect(self.db_path) as conn: total, hits = conn.execute( "SELECT COUNT(*), SUM(cache_hit) FROM query_metrics WHERE timestamp > ?", (since,) ).fetchone() return hits / total if total else 0.0
Alert when: p99 latency > 3 s, cache hit rate drops below 20%, or rerank latency exceeds 500 ms (GPU may have gone to sleep).
Key Takeaways
The production RAG latency budget is p99 < 3 s; the dominant costs are reranking (~200 ms) and LLM generation (~1–2 s).
Run dense and sparse retrieval in parallel with asyncio.gather — this alone removes 100–200 ms from the critical path.
A semantic cache (cosine similarity > 0.95) achieves 20–40% hit rates on real traffic; always check the exact Redis cache first (O(1)) before the semantic lookup.
Tiered retrieval (BM25 first pass → ANN on filtered set) is the right approach for corpora larger than 10M vectors.
Use micro-batch index updates (queue + flush every 5 minutes) as the default; real-time upserts for high-urgency pipelines.
At zero cloud cost: BGE-large (free, local) + Qdrant self-hosted + Groq free tier delivers production-quality RAG for development and low-volume deployments.
Monitor p99 latency, cache hit rate, reranker latency, and LLM latency separately — each has a different fix when it degrades.
Run the quality sampler (LLM-as-judge on 10% of queries) continuously; latency and error rate metrics alone do not catch answer quality degradation.