A RAG system that works for a single user or a small team requires fundamental architectural additions before it can run in an enterprise:
Access control: not every user should see every document. A sales document should not be retrievable by a contractor who is only authorised to access the engineering wiki.
PII protection: documents fed into the index may contain names, emails, social security numbers, or credit card data. Indexing these without review violates data handling policies.
Audit logging: regulated industries (finance, healthcare, legal) require an immutable record of every query, every document retrieved, and every answer generated.
Data residency: many organisations must ensure data does not leave a specific geographic region.
Freshness: enterprise documents change constantly. A connector that detects changes and reindexes only the changed content keeps the index current without full rebuilds.
This lesson covers each requirement with working implementation patterns.
Multi-Tenancy and Access Control
Per-Tenant Collections
Each tenant gets its own Qdrant collection. Queries for tenant A are physically impossible to return documents from tenant B's collection because they are in completely separate data structures. This is the strongest isolation model.
Cost: O(tenants) HNSW graph overhead. For 1000 tenants at 10K documents each: 1000 × ~50 MB = ~50 GB RAM. Practical for <100 tenants; too expensive at enterprise scale.
Metadata-Filtered Single Collection
All tenants share one collection. Each chunk has a tenant_id payload field. Every query includes a mandatory tenant_id filter. The risk: if application code forgets the filter, cross-tenant data leaks silently.
Mitigation: wrap all search calls in a TenantClient that makes it architecturally impossible to forget the filter.
python
from qdrant_client import QdrantClientfrom qdrant_client.models import Filter, FieldCondition, MatchValue, PointStruct, VectorParams, Distancefrom sentence_transformers import SentenceTransformerqdrant = QdrantClient(url="http://localhost:6333")embed_model = SentenceTransformer("BAAI/bge-large-en-v1.5")class TenantRAGClient: """ Enforces tenant isolation at the access-control layer. Application code never calls qdrant directly — only this client. """ def __init__(self, tenant_id: str, user_roles: list[str]): self._tenant_id = tenant_id self._user_roles = set(user_roles) def _build_access_filter(self) -> Filter: """Build a filter that enforces both tenant isolation and role-based access.""" return Filter( must=[ FieldCondition(key="tenant_id", match=MatchValue(value=self._tenant_id)), ], should=[ # Document is accessible if the user has at least one of its allowed_roles FieldCondition(key="allowed_roles", match=MatchValue(value=role)) for role in self._user_roles ] or None, ) def search(self, query: str, k: int = 5) -> list[dict]: """Always-filtered search — caller cannot bypass tenant + role filter.""" query_vec = embed_model.encode([query], normalize_embeddings=True)[0].tolist() results = qdrant.search( collection_name="enterprise_rag", query_vector=query_vec, query_filter=self._build_access_filter(), limit=k, with_payload=True, ) return [{"text": r.payload["text"], "source": r.payload.get("source", ""), "score": r.score} for r in results]def upsert_document_chunk( chunk_id: str, text: str, embedding: list[float], tenant_id: str, allowed_roles: list[str], source: str, doc_id: str,) -> None: """Insert a chunk with access control metadata.""" qdrant.upsert( collection_name="enterprise_rag", points=[PointStruct( id=chunk_id, vector=embedding, payload={ "text": text, "tenant_id": tenant_id, "allowed_roles": allowed_roles, # e.g. ["engineer", "admin"] "source": source, "doc_id": doc_id, }, )], )
PII Detection and Redaction
Before indexing any document, scan for PII using a combination of spaCy NER (for names, organisations, locations) and regex patterns (for structured PII like SSNs, credit cards, and emails).
python
import reimport spacyfrom dataclasses import dataclass, field# Load the small English model — pip install spacy && python -m spacy download en_core_web_smtry: nlp = spacy.load("en_core_web_sm")except OSError: nlp = None # graceful degradation if spacy is not installedPII_PATTERNS = { "email": re.compile(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Z|a-z]{2,}\b'), "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'), "credit_card": re.compile(r'\b(?:\d{4}[\s\-]?){3}\d{4}\b'), "phone_us": re.compile(r'\b(?:\+1[\s\-]?)?\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}\b'), "ip_address": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),}@dataclassclass PIIDetectionResult: has_pii: bool findings: list[dict] = field(default_factory=list) redacted_text: str = ""class PIIDetector: """Detect and optionally redact PII from text before indexing.""" def scan(self, text: str) -> PIIDetectionResult: """Scan text for PII using regex patterns and spaCy NER.""" findings = [] # Regex-based PII for pii_type, pattern in PII_PATTERNS.items(): for match in pattern.finditer(text): findings.append({ "type": pii_type, "value": match.group(), "start": match.start(), "end": match.end(), }) # NER-based PII (person names, etc.) if nlp is not None: doc = nlp(text[:100_000]) # spacy has a token limit for ent in doc.ents: if ent.label_ in {"PERSON", "GPE"}: findings.append({ "type": f"ner_{ent.label_.lower()}", "value": ent.text, "start": ent.start_char, "end": ent.end_char, }) return PIIDetectionResult( has_pii=len(findings) > 0, findings=findings, redacted_text=self._redact(text, findings), ) def _redact(self, text: str, findings: list[dict]) -> str: """Replace PII spans with [REDACTED_TYPE] tokens.""" # Sort by position descending so replacements don't shift offsets sorted_findings = sorted(findings, key=lambda x: x["start"], reverse=True) result = text for finding in sorted_findings: label = f"[REDACTED_{finding['type'].upper()}]" result = result[:finding["start"]] + label + result[finding["end"]:] return result
Audit Logging
Every query in a regulated system must be logged immutably: who asked what, when, which documents were retrieved, and what answer was returned.
python
import sqlite3import datetimeimport hashlibimport jsonclass AuditLogger: """ Immutable audit log for RAG queries. Uses SQLite with INSERT-only access — no UPDATE or DELETE. In production: use an append-only table in PostgreSQL or write to CloudTrail/BigQuery. """ def __init__(self, db_path: str = "rag_audit.db"): self.db_path = db_path with sqlite3.connect(db_path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL UNIQUE, timestamp TEXT NOT NULL, user_id TEXT NOT NULL, tenant_id TEXT NOT NULL, query_text TEXT NOT NULL, retrieved_chunk_ids TEXT NOT NULL, answer_hash TEXT NOT NULL, model_used TEXT NOT NULL, latency_ms REAL ) """) # CREATE TRIGGER to prevent UPDATE/DELETE (SQLite trigger for immutability) conn.execute(""" CREATE TRIGGER IF NOT EXISTS prevent_update BEFORE UPDATE ON audit_log BEGIN SELECT RAISE(ABORT, 'Audit log is immutable'); END """) def log_query( self, user_id: str, tenant_id: str, query: str, retrieved_chunk_ids: list[str], answer: str, model: str, latency_ms: float, ) -> str: """Log a completed RAG query. Returns the event_id.""" event_id = hashlib.sha256( f"{user_id}{query}{datetime.datetime.utcnow().isoformat()}".encode() ).hexdigest()[:32] with sqlite3.connect(self.db_path) as conn: conn.execute( """INSERT INTO audit_log (event_id, timestamp, user_id, tenant_id, query_text, retrieved_chunk_ids, answer_hash, model_used, latency_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( event_id, datetime.datetime.utcnow().isoformat(), user_id, tenant_id, query, json.dumps(retrieved_chunk_ids), hashlib.sha256(answer.encode()).hexdigest(), model, latency_ms, ), ) return event_id
On-Premise Air-Gapped Deployment
For organisations that cannot send data to any cloud service, the full stack runs locally:
Embeddings: BGE-large-en-v1.5 via sentence-transformers (no API call)Vector DB: Qdrant in Docker or Kubernetes (no cloud dependency)LLM: Ollama (llama3.3:70b) or vLLM serving (no API call)Reranker: cross-encoder/ms-marco-MiniLM-L-6-v2 (local)Cache: Redis in Docker
python
# On-premise LLM call via Ollama's OpenAI-compatible endpointfrom openai import OpenAIollama_client = OpenAI( base_url="http://localhost:11434/v1", api_key="ollama", # required but unused)def generate_answer_local(context: str, question: str) -> str: """Generate an answer using a locally-served LLM via Ollama.""" response = ollama_client.chat.completions.create( model="llama3.3:70b", messages=[ {"role": "system", "content": "Answer using only the provided context."}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}, ], temperature=0.1, max_tokens=500, ) return response.choices[0].message.content
Incremental Reindexing with Change Detection
Rebuilding the full index on every document change is expensive. Instead, compute a content fingerprint (SHA-256) for each document. When the connector polls for changes, compare fingerprints and reindex only the changed documents.
python
import hashlibdef compute_doc_fingerprint(content: str) -> str: """Content-based fingerprint for change detection.""" return hashlib.sha256(content.encode("utf-8")).hexdigest()def incremental_reindex( doc_id: str, new_content: str, fingerprint_store: dict, # doc_id -> last_fingerprint (SQLite or Redis in production) chunk_fn, embed_fn, upsert_fn, delete_chunks_fn,) -> dict: """ Reindex a document only if its content has changed. Deletes old chunks by doc_id before inserting new ones. """ new_fingerprint = compute_doc_fingerprint(new_content) old_fingerprint = fingerprint_store.get(doc_id) if old_fingerprint == new_fingerprint: return {"action": "skipped", "doc_id": doc_id} # Delete old chunks for this document delete_chunks_fn(doc_id) # Re-chunk and re-embed chunks = chunk_fn(new_content) embeddings = embed_fn([c["text"] for c in chunks]) for chunk, embedding in zip(chunks, embeddings): upsert_fn(chunk_id=f"{doc_id}_{chunk['index']}", doc_id=doc_id, text=chunk["text"], embedding=embedding) fingerprint_store[doc_id] = new_fingerprint return {"action": "reindexed", "doc_id": doc_id, "chunks": len(chunks)}
SharePoint Connector
python
import requestsclass SharePointConnector: """ Polls SharePoint via Microsoft Graph API for changed files. Requires an app registration with Files.Read.All permission. """ def __init__(self, tenant_id: str, client_id: str, client_secret: str, site_id: str): self.site_id = site_id self.token = self._get_token(tenant_id, client_id, client_secret) def _get_token(self, tenant_id: str, client_id: str, client_secret: str) -> str: resp = requests.post( f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", data={ "client_id": client_id, "client_secret": client_secret, "scope": "https://graph.microsoft.com/.default", "grant_type": "client_credentials", }, ) resp.raise_for_status() return resp.json()["access_token"] def get_changed_files(self, delta_token: str | None = None) -> tuple[list[dict], str]: """ Use the Graph API delta endpoint to get files changed since the last poll. Returns (list_of_files, new_delta_token). """ url = ( f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/root/delta" if delta_token is None else f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/root/delta(token='{delta_token}')" ) headers = {"Authorization": f"Bearer {self.token}"} resp = requests.get(url, headers=headers) resp.raise_for_status() data = resp.json() changed_files = [ {"id": item["id"], "name": item["name"], "last_modified": item.get("lastModifiedDateTime")} for item in data.get("value", []) if "file" in item and not item.get("deleted") ] new_delta_token = data.get("@odata.deltaLink", "").split("token='")[-1].rstrip("'") return changed_files, new_delta_token def download_file_content(self, file_id: str) -> bytes: """Download the raw content of a file by its Graph API ID.""" url = f"https://graph.microsoft.com/v1.0/sites/{self.site_id}/drive/items/{file_id}/content" resp = requests.get(url, headers={"Authorization": f"Bearer {self.token}"}) resp.raise_for_status() return resp.content
Enterprise Architecture Summary
User Request │ ▼API Gateway (rate limiting, auth, DLP output scan) │ ▼TenantRAGClient (enforces tenant_id + role filters) │ ├── Exact Cache (Redis) ─── hit → return ├── Semantic Cache (Qdrant cache collection) ─── hit → return │ ▼Parallel Retrieval ├── Dense (Qdrant, filtered HNSW) └── Sparse (BM25 index) │ ▼Reranker (MiniLM local or Cohere API) │ ▼LLM Generation (Groq cloud or Ollama on-prem) │ ▼DLP Output Scanner (PII check before returning) │ ▼AuditLogger.log_query() [immutable] │ ▼Response to User
Key Takeaways
Per-tenant collections provide strong isolation but scale poorly; metadata-filtered single collections are cost-effective but require a TenantClient wrapper that makes missing filters architecturally impossible.
Document-level access control via allowed_roles payloads is the RAG equivalent of row-level security — enforce it at the retrieval layer, not the application layer.
Scan every document for PII before indexing using both regex (SSN, credit card, email) and spaCy NER (person names); redact or flag and block indexing until reviewed.
Immutable audit logs (INSERT-only, UPDATE trigger) satisfy compliance requirements in finance, healthcare, and legal; log user_id, query, retrieved chunk IDs, and answer hash.
On-premise air-gapped deployment is fully viable: BGE-large + Qdrant + Ollama + Redis with no external API calls.
Incremental reindexing with SHA-256 content fingerprints avoids full rebuilds; only changed documents are rechunked and re-embedded.
SharePoint connectors use the Microsoft Graph delta endpoint to poll for changes since the last run, minimising API calls.
Enterprise SLA targets: 99.9% uptime (~8.7 hours downtime per year), faithfulness >0.80, and p99 retrieval latency <500 ms.