rag-production
Production-grade RAG deployment patterns. Covers caching strategies (semantic and exact), streaming responses, token budget management, fallback strategies for retrieval failures, monitoring retrieval quality, cost optimization, incremental indexing, multi-tenancy, and operational best practices for running RAG systems at scale.
Ship reliable, fast, cost-efficient RAG systems that work at scale. ## Key Points - [ ] Caching: exact + semantic cache with appropriate TTL - [ ] Streaming: SSE or WebSocket for user-facing responses - [ ] Token budgets: enforce limits on context and output - [ ] Fallbacks: graceful degradation when retrieval or LLM fails - [ ] Rate limiting: protect embedding and LLM APIs - [ ] Multi-tenancy: data isolation verified - [ ] Incremental indexing: pipeline for document updates - [ ] Monitoring: latency, quality, cost dashboards - [ ] Evaluation: automated eval suite runs on index updates - [ ] Cost tracking: per-query cost attribution - [ ] Security: no prompt injection via retrieved documents - [ ] Logging: full traces for debugging (redact PII)
skilldb get rag-pipeline-skills/rag-productionFull skill: 498 linesRAG in Production
Ship reliable, fast, cost-efficient RAG systems that work at scale.
Architecture Overview
┌─────────────┐
User Query ────────>│ API Gateway │
└──────┬──────┘
│
┌──────────┴──────────┐
│ Query Pipeline │
│ ┌───────────────┐ │
│ │ Cache Check │ │ <── Semantic + exact cache
│ └───────┬───────┘ │
│ ┌───────┴───────┐ │
│ │ Query Rewrite │ │ <── Optional: HyDE, expansion
│ └───────┬───────┘ │
│ ┌───────┴───────┐ │
│ │ Retrieval │ │ <── Hybrid: dense + BM25
│ └───────┬───────┘ │
│ ┌───────┴───────┐ │
│ │ Re-Ranking │ │ <── Cross-encoder
│ └───────┬───────┘ │
│ ┌───────┴───────┐ │
│ │ Generation │ │ <── Streaming LLM response
│ └───────┬───────┘ │
│ ┌───────┴───────┐ │
│ │ Logging │ │ <── Metrics, traces, feedback
│ └───────────────┘ │
└─────────────────────┘
Caching Strategies
Exact Match Cache
import hashlib
import json
import redis
class ExactCache:
"""Cache exact query-answer pairs in Redis."""
def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
self.client = redis.from_url(redis_url)
self.ttl = ttl
def _key(self, query: str, filters: dict = None) -> str:
raw = f"{query}:{json.dumps(filters, sort_keys=True) if filters else ''}"
return f"rag:exact:{hashlib.sha256(raw.encode()).hexdigest()}"
def get(self, query: str, filters: dict = None):
cached = self.client.get(self._key(query, filters))
if cached:
return json.loads(cached)
return None
def set(self, query: str, result: dict, filters: dict = None):
self.client.setex(
self._key(query, filters),
self.ttl,
json.dumps(result)
)
Semantic Cache
class SemanticCache:
"""Cache similar queries by embedding similarity."""
def __init__(self, vectorstore, threshold=0.95):
self.vectorstore = vectorstore # Separate collection for cache
self.threshold = threshold
def get(self, query: str, query_embedding: list):
results = self.vectorstore.similarity_search_with_score(
query, k=1
)
if results and results[0][1] >= self.threshold:
cached_doc = results[0][0]
return json.loads(cached_doc.metadata.get("cached_response", "{}"))
return None
def set(self, query: str, query_embedding: list, response: dict):
from langchain.schema import Document
doc = Document(
page_content=query,
metadata={"cached_response": json.dumps(response)}
)
self.vectorstore.add_documents([doc])
# Combine both caches
class RAGCache:
def __init__(self, exact_cache, semantic_cache):
self.exact = exact_cache
self.semantic = semantic_cache
def get(self, query, query_embedding=None, filters=None):
# Try exact first (fastest)
result = self.exact.get(query, filters)
if result:
return result, "exact_hit"
# Try semantic
if query_embedding:
result = self.semantic.get(query, query_embedding)
if result:
return result, "semantic_hit"
return None, "miss"
Streaming Responses
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
import asyncio
app = FastAPI()
async def stream_rag_response(query: str):
"""Stream RAG response as Server-Sent Events."""
# Retrieve (non-streaming)
docs = await asyncio.to_thread(retriever.invoke, query)
context = "\n\n".join(d.page_content for d in docs)
# Yield sources first
sources = [d.metadata.get("source", "unknown") for d in docs]
yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"
# Stream generation
llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
prompt_text = f"Answer based on context:\nContext: {context}\nQuestion: {query}"
async for chunk in llm.astream(prompt_text):
if chunk.content:
yield f"data: {json.dumps({'type': 'token', 'content': chunk.content})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
@app.get("/api/query")
async def query_endpoint(q: str):
return StreamingResponse(
stream_rag_response(q),
media_type="text/event-stream"
)
# Client-side consumption
# const eventSource = new EventSource('/api/query?q=How+does+auth+work');
# eventSource.onmessage = (event) => {
# const data = JSON.parse(event.data);
# if (data.type === 'token') appendToUI(data.content);
# if (data.type === 'done') eventSource.close();
# };
Token Budget Management
import tiktoken
class TokenBudget:
"""Manage token allocation across RAG components."""
def __init__(self, model="gpt-4o", max_context=128000, reserve_output=4096):
self.encoder = tiktoken.encoding_for_model(model)
self.max_context = max_context
self.reserve_output = reserve_output
self.system_prompt_tokens = 0
def count_tokens(self, text: str) -> int:
return len(self.encoder.encode(text))
def set_system_prompt(self, prompt: str):
self.system_prompt_tokens = self.count_tokens(prompt)
def available_for_context(self) -> int:
return self.max_context - self.system_prompt_tokens - self.reserve_output
def fit_documents(self, docs, max_tokens=None):
"""Select documents that fit within the token budget."""
budget = max_tokens or self.available_for_context()
selected = []
total_tokens = 0
for doc in docs:
doc_tokens = self.count_tokens(doc.page_content)
if total_tokens + doc_tokens <= budget:
selected.append(doc)
total_tokens += doc_tokens
else:
# Optionally truncate the last document to fit
remaining = budget - total_tokens
if remaining > 100: # Only include if meaningful
truncated_text = self.encoder.decode(
self.encoder.encode(doc.page_content)[:remaining]
)
doc.page_content = truncated_text
selected.append(doc)
break
return selected, total_tokens
# Usage in pipeline
budget = TokenBudget(model="gpt-4o", max_context=128000, reserve_output=4096)
budget.set_system_prompt(system_prompt)
retrieved_docs = retriever.invoke(query) # May return 20 docs
fitted_docs, tokens_used = budget.fit_documents(retrieved_docs, max_tokens=8000)
# Now use fitted_docs for generation
Fallback Strategies
class ResilientRAG:
"""RAG pipeline with fallback strategies."""
def __init__(self, primary_retriever, fallback_retriever, llm, cache):
self.primary = primary_retriever
self.fallback = fallback_retriever
self.llm = llm
self.cache = cache
async def query(self, question: str) -> dict:
# Layer 1: Cache
cached = self.cache.get(question)
if cached:
return {"answer": cached["answer"], "source": "cache"}
# Layer 2: Primary retrieval
try:
docs = self.primary.invoke(question)
if docs and self._has_relevant_docs(docs):
answer = self._generate(question, docs)
self.cache.set(question, {"answer": answer})
return {"answer": answer, "source": "primary"}
except Exception as e:
logging.warning(f"Primary retrieval failed: {e}")
# Layer 3: Fallback retrieval (different index, simpler strategy)
try:
docs = self.fallback.invoke(question)
if docs:
answer = self._generate(question, docs)
return {"answer": answer, "source": "fallback"}
except Exception as e:
logging.error(f"Fallback retrieval failed: {e}")
# Layer 4: Graceful degradation
return {
"answer": "I'm unable to find relevant information to answer your question. "
"Please try rephrasing or contact support.",
"source": "no_results",
}
def _has_relevant_docs(self, docs, min_score=0.5):
"""Check if retrieved docs meet minimum relevance."""
if hasattr(docs[0], "metadata") and "score" in docs[0].metadata:
return any(d.metadata["score"] >= min_score for d in docs)
return len(docs) > 0
def _generate(self, question, docs):
context = "\n\n".join(d.page_content for d in docs)
return self.llm.invoke(
f"Answer based on context. If insufficient, say so.\nContext: {context}\nQuestion: {question}"
).content
Incremental Indexing
import hashlib
from datetime import datetime
class IncrementalIndexer:
"""Track document changes and only re-index what changed."""
def __init__(self, vectorstore, metadata_db):
self.vectorstore = vectorstore
self.db = metadata_db # Tracks indexed documents
def _hash_content(self, content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
def sync(self, documents):
"""Sync documents with the index. Only process changes."""
to_add = []
to_delete = []
unchanged = 0
for doc in documents:
doc_id = doc.metadata.get("source", doc.page_content[:50])
content_hash = self._hash_content(doc.page_content)
existing = self.db.get(doc_id)
if existing is None:
# New document
to_add.append(doc)
self.db.set(doc_id, {"hash": content_hash, "indexed_at": datetime.utcnow().isoformat()})
elif existing["hash"] != content_hash:
# Changed document - delete old, add new
to_delete.append(doc_id)
to_add.append(doc)
self.db.set(doc_id, {"hash": content_hash, "indexed_at": datetime.utcnow().isoformat()})
else:
unchanged += 1
# Check for deleted documents
all_current_ids = {doc.metadata.get("source") for doc in documents}
all_indexed_ids = set(self.db.all_keys())
removed = all_indexed_ids - all_current_ids
to_delete.extend(removed)
# Execute changes
if to_delete:
self.vectorstore.delete(ids=list(to_delete))
if to_add:
self.vectorstore.add_documents(to_add)
return {
"added": len(to_add),
"deleted": len(to_delete),
"unchanged": unchanged,
}
Multi-Tenancy
class MultiTenantRAG:
"""Isolate data between tenants."""
def __init__(self, vectorstore_factory):
self.factory = vectorstore_factory
def get_retriever(self, tenant_id: str):
"""Get tenant-specific retriever."""
# Option 1: Separate collections per tenant
vectorstore = self.factory.get_collection(f"tenant_{tenant_id}")
return vectorstore.as_retriever(search_kwargs={"k": 5})
def get_retriever_filtered(self, tenant_id: str):
"""Get retriever with metadata-based tenant filtering."""
# Option 2: Single collection with tenant metadata filter
return vectorstore.as_retriever(
search_kwargs={
"k": 5,
"filter": {"tenant_id": tenant_id}
}
)
# Tradeoffs:
# Separate collections: Better isolation, easier deletion, more infrastructure
# Metadata filtering: Simpler ops, risk of filter bugs leaking data
Cost Optimization
class CostTracker:
"""Track and optimize RAG pipeline costs."""
# Approximate costs (USD per 1M tokens, as of 2024)
COSTS = {
"text-embedding-3-small": {"input": 0.02},
"text-embedding-3-large": {"input": 0.13},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
}
def __init__(self):
self.total_cost = 0
self.breakdown = {}
def log(self, model: str, input_tokens: int, output_tokens: int = 0):
costs = self.COSTS.get(model, {"input": 0, "output": 0})
cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
self.total_cost += cost
self.breakdown[model] = self.breakdown.get(model, 0) + cost
def report(self):
print(f"Total cost: ${self.total_cost:.4f}")
for model, cost in self.breakdown.items():
print(f" {model}: ${cost:.4f}")
# Cost reduction strategies:
# 1. Use gpt-4o-mini for re-ranking/grading (not generation)
# 2. Cache embeddings and responses aggressively
# 3. Reduce embedding dimensions (Matryoshka)
# 4. Use smaller k values when possible
# 5. Batch embedding requests
# 6. Use open-source models for non-critical paths
Monitoring and Observability
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RAGTrace:
"""Trace a single RAG request through the pipeline."""
query: str
start_time: float = field(default_factory=time.time)
retrieval_latency_ms: Optional[float] = None
rerank_latency_ms: Optional[float] = None
generation_latency_ms: Optional[float] = None
total_latency_ms: Optional[float] = None
num_docs_retrieved: int = 0
num_docs_after_rerank: int = 0
top_similarity_score: Optional[float] = None
context_tokens: int = 0
output_tokens: int = 0
cache_hit: bool = False
error: Optional[str] = None
def finish(self):
self.total_latency_ms = (time.time() - self.start_time) * 1000
# Key metrics to dashboard:
# - p50/p95/p99 latency
# - Cache hit rate (target: >30% for cost savings)
# - Retrieval score distribution (detect when queries drift from corpus)
# - Error rate by component (retrieval vs generation)
# - User feedback ratio (thumbs up vs down)
# - Queries with no relevant results (gap analysis for corpus)
# - Cost per query (embedding + retrieval + generation)
# - Token utilization (how much of context budget is used)
Deployment Checklist
- Caching: exact + semantic cache with appropriate TTL
- Streaming: SSE or WebSocket for user-facing responses
- Token budgets: enforce limits on context and output
- Fallbacks: graceful degradation when retrieval or LLM fails
- Rate limiting: protect embedding and LLM APIs
- Multi-tenancy: data isolation verified
- Incremental indexing: pipeline for document updates
- Monitoring: latency, quality, cost dashboards
- Evaluation: automated eval suite runs on index updates
- Cost tracking: per-query cost attribution
- Security: no prompt injection via retrieved documents
- Logging: full traces for debugging (redact PII)
Anti-Patterns
-
No caching at all -- Even simple exact-match caching saves 20-40% of LLM costs for most workloads.
-
Synchronous indexing on write -- Index updates should be async. Do not block user requests while re-embedding documents.
-
No fallback path -- When the vector DB is down or returns empty, the system should degrade gracefully, not crash.
-
Logging queries without redaction -- RAG queries often contain PII. Redact or hash sensitive fields before logging.
-
Not monitoring similarity scores -- A gradual decline in top-k similarity scores means queries are drifting from your corpus. Add new documents or adjust chunking.
-
Ignoring cold start -- The first query after deployment has no cache. Pre-warm caches with common queries from analytics.
Install this skill directly: skilldb add rag-pipeline-skills
Related Skills
advanced-rag
Advanced RAG patterns beyond basic retrieve-and-generate. Covers multi-hop RAG, agentic RAG with tool use, graph RAG (knowledge graphs + vector retrieval), recursive retrieval, self-querying retrievers, query decomposition, citation extraction, and corrective RAG. Includes implementation patterns and guidance on when each advanced technique is warranted.
chunking-strategies
Comprehensive guide to document chunking strategies for RAG pipelines. Covers fixed-size, semantic, recursive character, sentence-based, parent-child, markdown-aware, and code-aware chunking. Includes chunk size optimization, overlap strategies, and practical benchmarks for choosing the right approach based on document type and retrieval quality.
embedding-models
Guide to selecting, using, and optimizing text embedding models for RAG pipelines. Covers commercial models (OpenAI text-embedding-3, Cohere embed-v3, Voyage AI) and open-source options (BGE, E5, Nomic Embed). Includes dimensionality selection, batch processing, embedding caching, fine-tuning for domain-specific retrieval, and cost analysis.
rag-evaluation
Evaluating RAG systems end-to-end. Covers retrieval metrics (context precision, context recall, MRR), generation metrics (faithfulness, answer relevance, hallucination detection), the RAGAS framework, human evaluation protocols, A/B testing retrieval strategies, building evaluation datasets, and continuous monitoring in production.
rag-fundamentals
Teaches the foundational architecture of Retrieval-Augmented Generation (RAG) systems. Covers why RAG outperforms fine-tuning for most knowledge-grounding use cases, the three core stages (indexing, retrieval, generation), component design, latency budgets, and evaluation metrics including faithfulness, relevance, and hallucination rate. Use when building or explaining any RAG system from scratch.
rag-with-langchain
Building RAG pipelines with LangChain and LangGraph. Covers document loaders, text splitters, vector stores, retrievers, chains, and agents. Includes practical patterns for conversational RAG, multi-source retrieval, streaming, and LangGraph-based agentic RAG workflows.