GadaaLabs
RAG Engineering — Production Retrieval-Augmented Generation
Lesson 12

Enterprise RAG — Security, Compliance & Multi-Tenant

24 min

The Enterprise Requirements Gap

A RAG system that works for a single user or a small team requires fundamental architectural additions before it can run in an enterprise:

  • Access control: not every user should see every document. A sales document should not be retrievable by a contractor who is only authorised to access the engineering wiki.
  • PII protection: documents fed into the index may contain names, emails, social security numbers, or credit card data. Indexing these without review violates data handling policies.
  • Audit logging: regulated industries (finance, healthcare, legal) require an immutable record of every query, every document retrieved, and every answer generated.
  • Data residency: many organisations must ensure data does not leave a specific geographic region.
  • Freshness: enterprise documents change constantly. A connector that detects changes and reindexes only the changed content keeps the index current without full rebuilds.

This lesson covers each requirement with working implementation patterns.

Multi-Tenancy and Access Control

Per-Tenant Collections

Each tenant gets its own Qdrant collection. Queries for tenant A are physically impossible to return documents from tenant B's collection because they are in completely separate data structures. This is the strongest isolation model.

Cost: O(tenants) HNSW graph overhead. For 1000 tenants at 10K documents each: 1000 × ~50 MB = ~50 GB RAM. Practical for <100 tenants; too expensive at enterprise scale.

Metadata-Filtered Single Collection

All tenants share one collection. Each chunk has a tenant_id payload field. Every query includes a mandatory tenant_id filter. The risk: if application code forgets the filter, cross-tenant data leaks silently.

Mitigation: wrap all search calls in a TenantClient that makes it architecturally impossible to forget the filter.

python
from qdrant_client import QdrantClient
from qdrant_client.models import Filter, FieldCondition, MatchValue, PointStruct, VectorParams, Distance
from sentence_transformers import SentenceTransformer

qdrant = QdrantClient(url="http://localhost:6333")
embed_model = SentenceTransformer("BAAI/bge-large-en-v1.5")


class TenantRAGClient:
    """
    Enforces tenant isolation at the access-control layer.
    Application code never calls qdrant directly — only this client.
    """

    def __init__(self, tenant_id: str, user_roles: list[str]):
        self._tenant_id = tenant_id
        self._user_roles = set(user_roles)

    def _build_access_filter(self) -> Filter:
        """Build a filter that enforces both tenant isolation and role-based access."""
        return Filter(
            must=[
                FieldCondition(key="tenant_id", match=MatchValue(value=self._tenant_id)),
            ],
            should=[
                # Document is accessible if the user has at least one of its allowed_roles
                FieldCondition(key="allowed_roles", match=MatchValue(value=role))
                for role in self._user_roles
            ] or None,
        )

    def search(self, query: str, k: int = 5) -> list[dict]:
        """Always-filtered search — caller cannot bypass tenant + role filter."""
        query_vec = embed_model.encode([query], normalize_embeddings=True)[0].tolist()
        results = qdrant.search(
            collection_name="enterprise_rag",
            query_vector=query_vec,
            query_filter=self._build_access_filter(),
            limit=k,
            with_payload=True,
        )
        return [{"text": r.payload["text"], "source": r.payload.get("source", ""), "score": r.score} for r in results]


def upsert_document_chunk(
    chunk_id: str,
    text: str,
    embedding: list[float],
    tenant_id: str,
    allowed_roles: list[str],
    source: str,
    doc_id: str,
) -> None:
    """Insert a chunk with access control metadata."""
    qdrant.upsert(
        collection_name="enterprise_rag",
        points=[PointStruct(
            id=chunk_id,
            vector=embedding,
            payload={
                "text": text,
                "tenant_id": tenant_id,
                "allowed_roles": allowed_roles,  # e.g. ["engineer", "admin"]
                "source": source,
                "doc_id": doc_id,
            },
        )],
    )

PII Detection and Redaction

Before indexing any document, scan for PII using a combination of spaCy NER (for names, organisations, locations) and regex patterns (for structured PII like SSNs, credit cards, and emails).

python
import re
import spacy
from dataclasses import dataclass, field

# Load the small English model — pip install spacy && python -m spacy download en_core_web_sm
try:
    nlp = spacy.load("en_core_web_sm")
except OSError:
    nlp = None  # graceful degradation if spacy is not installed

PII_PATTERNS = {
    "email": re.compile(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Z|a-z]{2,}\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "credit_card": re.compile(r'\b(?:\d{4}[\s\-]?){3}\d{4}\b'),
    "phone_us": re.compile(r'\b(?:\+1[\s\-]?)?\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}\b'),
    "ip_address": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
}


@dataclass
class PIIDetectionResult:
    has_pii: bool
    findings: list[dict] = field(default_factory=list)
    redacted_text: str = ""


class PIIDetector:
    """Detect and optionally redact PII from text before indexing."""

    def scan(self, text: str) -> PIIDetectionResult:
        """Scan text for PII using regex patterns and spaCy NER."""
        findings = []

        # Regex-based PII
        for pii_type, pattern in PII_PATTERNS.items():
            for match in pattern.finditer(text):
                findings.append({
                    "type": pii_type,
                    "value": match.group(),
                    "start": match.start(),
                    "end": match.end(),
                })

        # NER-based PII (person names, etc.)
        if nlp is not None:
            doc = nlp(text[:100_000])  # spacy has a token limit
            for ent in doc.ents:
                if ent.label_ in {"PERSON", "GPE"}:
                    findings.append({
                        "type": f"ner_{ent.label_.lower()}",
                        "value": ent.text,
                        "start": ent.start_char,
                        "end": ent.end_char,
                    })

        return PIIDetectionResult(
            has_pii=len(findings) > 0,
            findings=findings,
            redacted_text=self._redact(text, findings),
        )

    def _redact(self, text: str, findings: list[dict]) -> str:
        """Replace PII spans with [REDACTED_TYPE] tokens."""
        # Sort by position descending so replacements don't shift offsets
        sorted_findings = sorted(findings, key=lambda x: x["start"], reverse=True)
        result = text
        for finding in sorted_findings:
            label = f"[REDACTED_{finding['type'].upper()}]"
            result = result[:finding["start"]] + label + result[finding["end"]:]
        return result

Audit Logging

Every query in a regulated system must be logged immutably: who asked what, when, which documents were retrieved, and what answer was returned.

python
import sqlite3
import datetime
import hashlib
import json


class AuditLogger:
    """
    Immutable audit log for RAG queries.
    Uses SQLite with INSERT-only access — no UPDATE or DELETE.
    In production: use an append-only table in PostgreSQL or write to CloudTrail/BigQuery.
    """

    def __init__(self, db_path: str = "rag_audit.db"):
        self.db_path = db_path
        with sqlite3.connect(db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS audit_log (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    event_id TEXT NOT NULL UNIQUE,
                    timestamp TEXT NOT NULL,
                    user_id TEXT NOT NULL,
                    tenant_id TEXT NOT NULL,
                    query_text TEXT NOT NULL,
                    retrieved_chunk_ids TEXT NOT NULL,
                    answer_hash TEXT NOT NULL,
                    model_used TEXT NOT NULL,
                    latency_ms REAL
                )
            """)
            # CREATE TRIGGER to prevent UPDATE/DELETE (SQLite trigger for immutability)
            conn.execute("""
                CREATE TRIGGER IF NOT EXISTS prevent_update
                BEFORE UPDATE ON audit_log
                BEGIN SELECT RAISE(ABORT, 'Audit log is immutable'); END
            """)

    def log_query(
        self,
        user_id: str,
        tenant_id: str,
        query: str,
        retrieved_chunk_ids: list[str],
        answer: str,
        model: str,
        latency_ms: float,
    ) -> str:
        """Log a completed RAG query. Returns the event_id."""
        event_id = hashlib.sha256(
            f"{user_id}{query}{datetime.datetime.utcnow().isoformat()}".encode()
        ).hexdigest()[:32]

        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT INTO audit_log
                   (event_id, timestamp, user_id, tenant_id, query_text,
                    retrieved_chunk_ids, answer_hash, model_used, latency_ms)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    event_id,
                    datetime.datetime.utcnow().isoformat(),
                    user_id,
                    tenant_id,
                    query,
                    json.dumps(retrieved_chunk_ids),
                    hashlib.sha256(answer.encode()).hexdigest(),
                    model,
                    latency_ms,
                ),
            )
        return event_id

On-Premise Air-Gapped Deployment

For organisations that cannot send data to any cloud service, the full stack runs locally:

Embeddings:  BGE-large-en-v1.5 via sentence-transformers (no API call)
Vector DB:   Qdrant in Docker or Kubernetes (no cloud dependency)
LLM:         Ollama (llama3.3:70b) or vLLM serving (no API call)
Reranker:    cross-encoder/ms-marco-MiniLM-L-6-v2 (local)
Cache:       Redis in Docker
python
# On-premise LLM call via Ollama's OpenAI-compatible endpoint
from openai import OpenAI

ollama_client = OpenAI(
    base_url="http://localhost:11434/v1",
    api_key="ollama",  # required but unused
)


def generate_answer_local(context: str, question: str) -> str:
    """Generate an answer using a locally-served LLM via Ollama."""
    response = ollama_client.chat.completions.create(
        model="llama3.3:70b",
        messages=[
            {"role": "system", "content": "Answer using only the provided context."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
        ],
        temperature=0.1,
        max_tokens=500,
    )
    return response.choices[0].message.content

Incremental Reindexing with Change Detection

Rebuilding the full index on every document change is expensive. Instead, compute a content fingerprint (SHA-256) for each document. When the connector polls for changes, compare fingerprints and reindex only the changed documents.

python
import hashlib


def compute_doc_fingerprint(content: str) -> str:
    """Content-based fingerprint for change detection."""
    return hashlib.sha256(content.encode("utf-8")).hexdigest()


def incremental_reindex(
    doc_id: str,
    new_content: str,
    fingerprint_store: dict,  # doc_id -> last_fingerprint (SQLite or Redis in production)
    chunk_fn,
    embed_fn,
    upsert_fn,
    delete_chunks_fn,
) -> dict:
    """
    Reindex a document only if its content has changed.
    Deletes old chunks by doc_id before inserting new ones.
    """
    new_fingerprint = compute_doc_fingerprint(new_content)
    old_fingerprint = fingerprint_store.get(doc_id)

    if old_fingerprint == new_fingerprint:
        return {"action": "skipped", "doc_id": doc_id}

    # Delete old chunks for this document
    delete_chunks_fn(doc_id)

    # Re-chunk and re-embed
    chunks = chunk_fn(new_content)
    embeddings = embed_fn([c["text"] for c in chunks])
    for chunk, embedding in zip(chunks, embeddings):
        upsert_fn(chunk_id=f"{doc_id}_{chunk['index']}", doc_id=doc_id,
                  text=chunk["text"], embedding=embedding)

    fingerprint_store[doc_id] = new_fingerprint
    return {"action": "reindexed", "doc_id": doc_id, "chunks": len(chunks)}

SharePoint Connector

python
import requests


class SharePointConnector:
    """
    Polls SharePoint via Microsoft Graph API for changed files.
    Requires an app registration with Files.Read.All permission.
    """

    def __init__(self, tenant_id: str, client_id: str, client_secret: str, site_id: str):
        self.site_id = site_id
        self.token = self._get_token(tenant_id, client_id, client_secret)

    def _get_token(self, tenant_id: str, client_id: str, client_secret: str) -> str:
        resp = requests.post(
            f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
            data={
                "client_id": client_id,
                "client_secret": client_secret,
                "scope": "https://graph.microsoft.com/.default",
                "grant_type": "client_credentials",
            },
        )
        resp.raise_for_status()
        return resp.json()["access_token"]

    def get_changed_files(self, delta_token: str | None = None) -> tuple[list[dict], str]:
        """
        Use the Graph API delta endpoint to get files changed since the last poll.
        Returns (list_of_files, new_delta_token).
        """
        url = (
            f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/root/delta"
            if delta_token is None
            else f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/root/delta(token='{delta_token}')"
        )
        headers = {"Authorization": f"Bearer {self.token}"}
        resp = requests.get(url, headers=headers)
        resp.raise_for_status()
        data = resp.json()

        changed_files = [
            {"id": item["id"], "name": item["name"], "last_modified": item.get("lastModifiedDateTime")}
            for item in data.get("value", [])
            if "file" in item and not item.get("deleted")
        ]
        new_delta_token = data.get("@odata.deltaLink", "").split("token='")[-1].rstrip("'")
        return changed_files, new_delta_token

    def download_file_content(self, file_id: str) -> bytes:
        """Download the raw content of a file by its Graph API ID."""
        url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/items/{file_id}/content"
        resp = requests.get(url, headers={"Authorization": f"Bearer {self.token}"})
        resp.raise_for_status()
        return resp.content

Enterprise Architecture Summary

User Request


API Gateway (rate limiting, auth, DLP output scan)


TenantRAGClient (enforces tenant_id + role filters)

    ├── Exact Cache (Redis) ─── hit → return
    ├── Semantic Cache (Qdrant cache collection) ─── hit → return


Parallel Retrieval
    ├── Dense (Qdrant, filtered HNSW)
    └── Sparse (BM25 index)


Reranker (MiniLM local or Cohere API)


LLM Generation (Groq cloud or Ollama on-prem)


DLP Output Scanner (PII check before returning)


AuditLogger.log_query() [immutable]


Response to User

Key Takeaways

  • Per-tenant collections provide strong isolation but scale poorly; metadata-filtered single collections are cost-effective but require a TenantClient wrapper that makes missing filters architecturally impossible.
  • Document-level access control via allowed_roles payloads is the RAG equivalent of row-level security — enforce it at the retrieval layer, not the application layer.
  • Scan every document for PII before indexing using both regex (SSN, credit card, email) and spaCy NER (person names); redact or flag and block indexing until reviewed.
  • Immutable audit logs (INSERT-only, UPDATE trigger) satisfy compliance requirements in finance, healthcare, and legal; log user_id, query, retrieved chunk IDs, and answer hash.
  • On-premise air-gapped deployment is fully viable: BGE-large + Qdrant + Ollama + Redis with no external API calls.
  • Incremental reindexing with SHA-256 content fingerprints avoids full rebuilds; only changed documents are rechunked and re-embedded.
  • SharePoint connectors use the Microsoft Graph delta endpoint to poll for changes since the last run, minimising API calls.
  • Enterprise SLA targets: 99.9% uptime (~8.7 hours downtime per year), faithfulness >0.80, and p99 retrieval latency <500 ms.