GadaaLabs
GuidesBuild a Production RAG Pipeline From Scratch
advanced 90 minMarch 29, 2026

Build a Production RAG Pipeline From Scratch

Go from zero to a production-ready Retrieval-Augmented Generation system — chunking, embeddings, vector search, reranking, and evaluation.

Prerequisites

pythonbasic-llm-knowledge

Build a Production RAG Pipeline From Scratch

Retrieval-Augmented Generation (RAG) is the dominant architecture for grounding large language models in private or up-to-date knowledge. By the end of this guide you will have a fully working RAG pipeline — from raw documents to a graded, production-hardened system — written in plain Python with no LangChain magic hiding the details.

Why RAG Over Fine-Tuning

When you want an LLM to answer questions about your company's internal documents, you have two broad options: fine-tune the model on your data, or give the model relevant context at inference time (RAG). Fine-tuning is almost never the right choice for knowledge injection:

  • Cost: Fine-tuning a 70B parameter model costs thousands of dollars and must be repeated every time your data changes.
  • Freshness: Fine-tuned knowledge is frozen at training time. RAG retrieves from a live index that you update in minutes.
  • Transparency: RAG answers cite their sources. You can verify every claim against the retrieved chunks. Fine-tuned knowledge is opaque.
  • Hallucination control: A model with no retrieved context will confabulate. With RAG, the model is anchored to retrieved text — hallucinations are measurably lower.

RAG shines for: customer support over documentation, internal knowledge bases, legal and compliance Q&A, and any domain where data changes frequently.

Architecture Overview

A RAG system has two distinct pipelines:

Indexing pipeline (run once, then incrementally):

  1. Load documents
  2. Clean and preprocess
  3. Chunk into segments
  4. Embed each chunk
  5. Store embeddings + text in a vector database

Query pipeline (run at inference time):

  1. Embed the user query
  2. Retrieve top-k chunks by similarity
  3. (Optional) Rerank retrieved chunks
  4. Construct a prompt from query + chunks
  5. Generate an answer with an LLM

Install Dependencies

bash
uv add groq sentence-transformers chromadb langchain-text-splitters \
       rank-bm25 python-dotenv ragas datasets

Document Loading and Preprocessing

In production, documents arrive as PDFs, Word files, web pages, or database records. For this guide we work with plain text, but the chunking and embedding steps are identical regardless of source format.

python
from pathlib import Path

# Simulate loading documents from a directory
def load_documents(directory: str) -> list[dict]:
    """Load .txt files and return list of {id, text, source} dicts."""
    docs = []
    for path in Path(directory).glob("**/*.txt"):
        text = path.read_text(encoding="utf-8")
        # Basic preprocessing: normalize whitespace
        text = " ".join(text.split())
        docs.append({
            "id": str(path),
            "text": text,
            "source": path.name,
        })
    return docs

Preprocessing rules for production:

  • Remove headers/footers from PDFs (page numbers, running titles)
  • Strip HTML tags from web content
  • Normalize unicode (NFC normalization)
  • Remove null bytes and control characters
  • Detect and skip near-duplicate documents using MinHash

Chunking Strategies

Chunking is the most impactful engineering decision in a RAG system. Chunks that are too large dilute relevance; chunks that are too small lose context. Three strategies cover most use cases:

1. Recursive Character Text Splitter

The default workhorse. It splits on paragraph boundaries, then sentences, then words — recursively reducing until chunks are under the target size.

python
from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,        # characters
    chunk_overlap=64,      # overlap preserves cross-chunk context
    separators=["\n\n", "\n", ". ", " ", ""],
)

def chunk_documents(docs: list[dict]) -> list[dict]:
    chunks = []
    for doc in docs:
        splits = splitter.split_text(doc["text"])
        for i, split in enumerate(splits):
            chunks.append({
                "id": f"{doc['id']}::chunk_{i}",
                "text": split,
                "source": doc["source"],
                "chunk_index": i,
            })
    return chunks

2. Sentence-Boundary Chunking

Better for prose. Uses spaCy or NLTK sentence tokenization to avoid cutting sentences mid-way.

python
import re

def sentence_chunk(text: str, max_sentences: int = 5, overlap: int = 1) -> list[str]:
    """Group sentences into chunks of max_sentences with overlap."""
    # Simple regex sentence splitter (use spaCy for production)
    sentences = re.split(r'(?<=[.!?])\s+', text.strip())
    chunks = []
    for i in range(0, len(sentences), max_sentences - overlap):
        group = sentences[i : i + max_sentences]
        chunks.append(" ".join(group))
        if i + max_sentences >= len(sentences):
            break
    return chunks

3. Semantic Chunking

Groups sentences by semantic similarity — splits only when the topic shifts. More accurate but slower.

python
from sentence_transformers import SentenceTransformer
import numpy as np

def semantic_chunk(text: str, model: SentenceTransformer, threshold: float = 0.4) -> list[str]:
    sentences = re.split(r'(?<=[.!?])\s+', text.strip())
    if len(sentences) < 2:
        return sentences

    embeddings = model.encode(sentences, normalize_embeddings=True)
    # Compute cosine similarity between adjacent sentences
    similarities = [
        np.dot(embeddings[i], embeddings[i + 1])
        for i in range(len(embeddings) - 1)
    ]

    # Split where similarity drops below threshold (topic shift)
    chunks, current = [], [sentences[0]]
    for i, sim in enumerate(similarities):
        if sim < threshold:
            chunks.append(" ".join(current))
            current = [sentences[i + 1]]
        else:
            current.append(sentences[i + 1])
    chunks.append(" ".join(current))
    return chunks

Note: For most production systems, start with the recursive splitter at chunk_size=512. Only invest in semantic chunking after you have measured that retrieval quality is the bottleneck — it adds latency and complexity.

Embedding Models

Embeddings are dense vector representations of text. Similar texts have similar vectors (measured by cosine similarity). We use sentence-transformers — open-source models that run locally.

python
from sentence_transformers import SentenceTransformer
import numpy as np

# BAAI/bge-small-en-v1.5: fast, small, strong for English retrieval
# all-MiniLM-L6-v2: very fast, slightly lower quality
# BAAI/bge-large-en-v1.5: slower, best quality
EMBEDDING_MODEL = "BAAI/bge-small-en-v1.5"

def load_embedding_model() -> SentenceTransformer:
    return SentenceTransformer(EMBEDDING_MODEL)

def embed_texts(texts: list[str], model: SentenceTransformer) -> np.ndarray:
    """Embed a list of texts, normalized for cosine similarity."""
    return model.encode(
        texts,
        normalize_embeddings=True,
        batch_size=64,
        show_progress_bar=True,
    )

Note: BGE models expect a query prefix for retrieval. When embedding queries (not documents), prepend "Represent this sentence for searching relevant passages: " to the query text. Document chunks are embedded as-is.

ChromaDB Vector Store

ChromaDB is an open-source vector database that runs in-process (no server needed for development) or as a server in production.

python
import chromadb
from chromadb.utils import embedding_functions

def build_vector_store(
    chunks: list[dict],
    model: SentenceTransformer,
    persist_dir: str = "./chroma_db",
) -> chromadb.Collection:
    client = chromadb.PersistentClient(path=persist_dir)

    # Delete existing collection if rebuilding
    try:
        client.delete_collection("documents")
    except Exception:
        pass

    collection = client.create_collection(
        name="documents",
        metadata={"hnsw:space": "cosine"},  # Use cosine similarity
    )

    texts = [c["text"] for c in chunks]
    embeddings = embed_texts(texts, model).tolist()
    ids = [c["id"] for c in chunks]
    metadatas = [{"source": c["source"], "chunk_index": c["chunk_index"]} for c in chunks]

    # ChromaDB batches at 5461 items max
    batch_size = 5000
    for i in range(0, len(chunks), batch_size):
        collection.add(
            ids=ids[i : i + batch_size],
            embeddings=embeddings[i : i + batch_size],
            documents=texts[i : i + batch_size],
            metadatas=metadatas[i : i + batch_size],
        )

    print(f"Indexed {collection.count()} chunks.")
    return collection

def load_vector_store(persist_dir: str = "./chroma_db") -> chromadb.Collection:
    client = chromadb.PersistentClient(path=persist_dir)
    return client.get_collection("documents")

Basic Retrieval and Prompt Construction

python
def retrieve(
    query: str,
    collection: chromadb.Collection,
    model: SentenceTransformer,
    top_k: int = 5,
) -> list[dict]:
    query_prefix = "Represent this sentence for searching relevant passages: "
    query_embedding = model.encode(query_prefix + query, normalize_embeddings=True).tolist()

    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k,
        include=["documents", "metadatas", "distances"],
    )

    chunks = []
    for doc, meta, dist in zip(
        results["documents"][0],
        results["metadatas"][0],
        results["distances"][0],
    ):
        chunks.append({"text": doc, "source": meta["source"], "score": 1 - dist})
    return chunks

def build_prompt(query: str, retrieved_chunks: list[dict]) -> str:
    context = "\n\n---\n\n".join(
        f"[Source: {c['source']}]\n{c['text']}" for c in retrieved_chunks
    )
    return f"""You are a helpful assistant. Answer the question using only the provided context.
If the context does not contain enough information, say so explicitly.

Context:
{context}

Question: {query}

Answer:"""

Groq LLM Integration

python
import os
from dotenv import load_dotenv
from groq import Groq

load_dotenv()

def generate_answer(prompt: str, model: str = "llama-3.3-70b-versatile") -> str:
    client = Groq()
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,   # Low temperature for factual retrieval tasks
        max_tokens=512,
    )
    return response.choices[0].message.content

Hybrid Search: BM25 + Dense Retrieval

Dense embedding search excels at semantic similarity but misses exact keyword matches. BM25 is great for keywords but misses paraphrases. Combining both (hybrid search) beats either alone.

python
from rank_bm25 import BM25Okapi

class HybridRetriever:
    def __init__(self, chunks: list[dict], collection: chromadb.Collection, model: SentenceTransformer):
        self.chunks = chunks
        self.collection = collection
        self.model = model

        # Build BM25 index
        tokenized = [c["text"].lower().split() for c in chunks]
        self.bm25 = BM25Okapi(tokenized)
        self.ids = [c["id"] for c in chunks]

    def retrieve(self, query: str, top_k: int = 10, alpha: float = 0.5) -> list[dict]:
        """
        alpha controls blend: 0.0 = pure BM25, 1.0 = pure dense, 0.5 = equal.
        """
        n_candidates = top_k * 3

        # Dense retrieval scores
        query_prefix = "Represent this sentence for searching relevant passages: "
        q_emb = self.model.encode(query_prefix + query, normalize_embeddings=True).tolist()
        dense_results = self.collection.query(
            query_embeddings=[q_emb],
            n_results=n_candidates,
            include=["documents", "metadatas", "distances"],
        )
        dense_scores = {
            id_: 1 - dist
            for id_, dist in zip(
                [m["source"] + str(m["chunk_index"]) for m in dense_results["metadatas"][0]],
                dense_results["distances"][0],
            )
        }

        # BM25 scores
        bm25_raw = self.bm25.get_scores(query.lower().split())
        # Normalise BM25 scores to [0, 1]
        max_bm25 = max(bm25_raw) if max(bm25_raw) > 0 else 1
        bm25_scores = {self.ids[i]: score / max_bm25 for i, score in enumerate(bm25_raw)}

        # Reciprocal Rank Fusion (RRF) — robust rank-based fusion
        all_ids = set(dense_scores) | set(bm25_scores)
        fused = {}
        for id_ in all_ids:
            fused[id_] = alpha * dense_scores.get(id_, 0) + (1 - alpha) * bm25_scores.get(id_, 0)

        top_ids = sorted(fused, key=lambda x: fused[x], reverse=True)[:top_k]
        id_to_chunk = {c["id"]: c for c in self.chunks}
        return [
            {**id_to_chunk[id_], "score": fused[id_]}
            for id_ in top_ids
            if id_ in id_to_chunk
        ]

Cross-Encoder Reranking

After hybrid retrieval, run a cross-encoder to rerank the top candidates. Cross-encoders jointly encode the query and each document — much more accurate than bi-encoders for final ranking, but too slow to run over the entire corpus.

python
from sentence_transformers import CrossEncoder

def rerank(
    query: str,
    candidates: list[dict],
    model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
    top_k: int = 5,
) -> list[dict]:
    reranker = CrossEncoder(model_name)
    pairs = [(query, c["text"]) for c in candidates]
    scores = reranker.predict(pairs)

    for chunk, score in zip(candidates, scores):
        chunk["rerank_score"] = float(score)

    reranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
    return reranked[:top_k]

Evaluation with RAGAS

RAGAS is the standard evaluation framework for RAG systems. It measures:

  • Faithfulness: Does the answer contain only claims supported by the context?
  • Answer Relevancy: How directly does the answer address the question?
  • Context Precision: Are the retrieved chunks actually relevant?
  • Context Recall: Did retrieval surface all relevant information?
python
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall

def evaluate_rag(qa_pairs: list[dict]) -> dict:
    """
    qa_pairs: list of {question, answer, contexts, ground_truth}
    - question: str
    - answer: str (generated by the RAG pipeline)
    - contexts: list[str] (retrieved chunks)
    - ground_truth: str (reference answer)
    """
    dataset = Dataset.from_list(qa_pairs)
    result = evaluate(
        dataset,
        metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
    )
    return result

Create a small golden test set of 20-50 questions with known correct answers before deploying. Track RAGAS scores in a spreadsheet or MLflow across pipeline versions.

Production Considerations

Caching: Cache embeddings for documents that have not changed. A simple MD5 hash of the chunk text as the cache key works well.

Async: Wrap embedding and Chroma calls with asyncio when building a web API. Use httpx.AsyncClient for Groq calls.

Error handling: Groq will rate-limit at high throughput. Implement exponential backoff:

python
import time
from groq import RateLimitError

def generate_with_retry(prompt: str, max_retries: int = 3) -> str:
    client = Groq()
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model="llama-3.3-70b-versatile",
                messages=[{"role": "user", "content": prompt}],
                max_tokens=512,
            )
            return response.choices[0].message.content
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            wait = 2 ** attempt
            print(f"Rate limited, waiting {wait}s...")
            time.sleep(wait)

Full End-to-End Pipeline

python
import os
from pathlib import Path
from dotenv import load_dotenv

load_dotenv()

# --- CONFIG ---
DOCS_DIR = "./docs"
CHROMA_DIR = "./chroma_db"
REBUILD_INDEX = True

# --- INDEXING PIPELINE ---
print("Loading documents...")
docs = load_documents(DOCS_DIR)

print("Chunking...")
chunks = chunk_documents(docs)
print(f"  {len(docs)} documents → {len(chunks)} chunks")

print("Loading embedding model...")
embed_model = load_embedding_model()

if REBUILD_INDEX:
    print("Building vector index...")
    collection = build_vector_store(chunks, embed_model, CHROMA_DIR)
else:
    collection = load_vector_store(CHROMA_DIR)

# Build hybrid retriever
retriever = HybridRetriever(chunks, collection, embed_model)

# --- QUERY PIPELINE ---
def answer(question: str) -> str:
    # Step 1: Hybrid retrieval
    candidates = retriever.retrieve(question, top_k=20, alpha=0.6)

    # Step 2: Rerank
    final_chunks = rerank(question, candidates, top_k=5)

    # Step 3: Generate
    prompt = build_prompt(question, final_chunks)
    return generate_with_retry(prompt)

# --- TEST ---
question = "What are the main components of a RAG system?"
print(f"\nQ: {question}")
print(f"A: {answer(question)}")

Summary

  • RAG beats fine-tuning for knowledge injection because it is cheaper, keeps data fresh, and produces verifiable, cited answers.
  • The indexing pipeline (load → chunk → embed → store) runs once; the query pipeline (embed query → retrieve → rerank → generate) runs at inference time.
  • Chunking strategy is the highest-impact engineering decision: start with recursive character splitting at 512 characters with 64-character overlap.
  • Hybrid search (BM25 + dense embeddings) consistently outperforms either method alone; add cross-encoder reranking as a final step for highest accuracy.
  • Evaluate your pipeline with RAGAS metrics (faithfulness, answer relevancy, context precision, recall) on a golden test set before shipping to production.