Skip to main content
Technology & EngineeringRag Pipeline498 lines

rag-production

Production-grade RAG deployment patterns. Covers caching strategies (semantic and exact), streaming responses, token budget management, fallback strategies for retrieval failures, monitoring retrieval quality, cost optimization, incremental indexing, multi-tenancy, and operational best practices for running RAG systems at scale.

Quick Summary18 lines
Ship reliable, fast, cost-efficient RAG systems that work at scale.

## Key Points

- [ ] Caching: exact + semantic cache with appropriate TTL
- [ ] Streaming: SSE or WebSocket for user-facing responses
- [ ] Token budgets: enforce limits on context and output
- [ ] Fallbacks: graceful degradation when retrieval or LLM fails
- [ ] Rate limiting: protect embedding and LLM APIs
- [ ] Multi-tenancy: data isolation verified
- [ ] Incremental indexing: pipeline for document updates
- [ ] Monitoring: latency, quality, cost dashboards
- [ ] Evaluation: automated eval suite runs on index updates
- [ ] Cost tracking: per-query cost attribution
- [ ] Security: no prompt injection via retrieved documents
- [ ] Logging: full traces for debugging (redact PII)
skilldb get rag-pipeline-skills/rag-productionFull skill: 498 lines
Paste into your CLAUDE.md or agent config

RAG in Production

Ship reliable, fast, cost-efficient RAG systems that work at scale.


Architecture Overview

                        ┌─────────────┐
    User Query ────────>│  API Gateway │
                        └──────┬──────┘
                               │
                    ┌──────────┴──────────┐
                    │    Query Pipeline    │
                    │  ┌───────────────┐  │
                    │  │ Cache Check   │  │ <── Semantic + exact cache
                    │  └───────┬───────┘  │
                    │  ┌───────┴───────┐  │
                    │  │ Query Rewrite │  │ <── Optional: HyDE, expansion
                    │  └───────┬───────┘  │
                    │  ┌───────┴───────┐  │
                    │  │  Retrieval    │  │ <── Hybrid: dense + BM25
                    │  └───────┬───────┘  │
                    │  ┌───────┴───────┐  │
                    │  │  Re-Ranking   │  │ <── Cross-encoder
                    │  └───────┬───────┘  │
                    │  ┌───────┴───────┐  │
                    │  │  Generation   │  │ <── Streaming LLM response
                    │  └───────┬───────┘  │
                    │  ┌───────┴───────┐  │
                    │  │  Logging      │  │ <── Metrics, traces, feedback
                    │  └───────────────┘  │
                    └─────────────────────┘

Caching Strategies

Exact Match Cache

import hashlib
import json
import redis

class ExactCache:
    """Cache exact query-answer pairs in Redis."""

    def __init__(self, redis_url="redis://localhost:6379", ttl=3600):
        self.client = redis.from_url(redis_url)
        self.ttl = ttl

    def _key(self, query: str, filters: dict = None) -> str:
        raw = f"{query}:{json.dumps(filters, sort_keys=True) if filters else ''}"
        return f"rag:exact:{hashlib.sha256(raw.encode()).hexdigest()}"

    def get(self, query: str, filters: dict = None):
        cached = self.client.get(self._key(query, filters))
        if cached:
            return json.loads(cached)
        return None

    def set(self, query: str, result: dict, filters: dict = None):
        self.client.setex(
            self._key(query, filters),
            self.ttl,
            json.dumps(result)
        )

Semantic Cache

class SemanticCache:
    """Cache similar queries by embedding similarity."""

    def __init__(self, vectorstore, threshold=0.95):
        self.vectorstore = vectorstore  # Separate collection for cache
        self.threshold = threshold

    def get(self, query: str, query_embedding: list):
        results = self.vectorstore.similarity_search_with_score(
            query, k=1
        )
        if results and results[0][1] >= self.threshold:
            cached_doc = results[0][0]
            return json.loads(cached_doc.metadata.get("cached_response", "{}"))
        return None

    def set(self, query: str, query_embedding: list, response: dict):
        from langchain.schema import Document
        doc = Document(
            page_content=query,
            metadata={"cached_response": json.dumps(response)}
        )
        self.vectorstore.add_documents([doc])

# Combine both caches
class RAGCache:
    def __init__(self, exact_cache, semantic_cache):
        self.exact = exact_cache
        self.semantic = semantic_cache

    def get(self, query, query_embedding=None, filters=None):
        # Try exact first (fastest)
        result = self.exact.get(query, filters)
        if result:
            return result, "exact_hit"

        # Try semantic
        if query_embedding:
            result = self.semantic.get(query, query_embedding)
            if result:
                return result, "semantic_hit"

        return None, "miss"

Streaming Responses

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
import asyncio

app = FastAPI()

async def stream_rag_response(query: str):
    """Stream RAG response as Server-Sent Events."""
    # Retrieve (non-streaming)
    docs = await asyncio.to_thread(retriever.invoke, query)
    context = "\n\n".join(d.page_content for d in docs)

    # Yield sources first
    sources = [d.metadata.get("source", "unknown") for d in docs]
    yield f"data: {json.dumps({'type': 'sources', 'sources': sources})}\n\n"

    # Stream generation
    llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True)
    prompt_text = f"Answer based on context:\nContext: {context}\nQuestion: {query}"

    async for chunk in llm.astream(prompt_text):
        if chunk.content:
            yield f"data: {json.dumps({'type': 'token', 'content': chunk.content})}\n\n"

    yield f"data: {json.dumps({'type': 'done'})}\n\n"

@app.get("/api/query")
async def query_endpoint(q: str):
    return StreamingResponse(
        stream_rag_response(q),
        media_type="text/event-stream"
    )

# Client-side consumption
# const eventSource = new EventSource('/api/query?q=How+does+auth+work');
# eventSource.onmessage = (event) => {
#   const data = JSON.parse(event.data);
#   if (data.type === 'token') appendToUI(data.content);
#   if (data.type === 'done') eventSource.close();
# };

Token Budget Management

import tiktoken

class TokenBudget:
    """Manage token allocation across RAG components."""

    def __init__(self, model="gpt-4o", max_context=128000, reserve_output=4096):
        self.encoder = tiktoken.encoding_for_model(model)
        self.max_context = max_context
        self.reserve_output = reserve_output
        self.system_prompt_tokens = 0

    def count_tokens(self, text: str) -> int:
        return len(self.encoder.encode(text))

    def set_system_prompt(self, prompt: str):
        self.system_prompt_tokens = self.count_tokens(prompt)

    def available_for_context(self) -> int:
        return self.max_context - self.system_prompt_tokens - self.reserve_output

    def fit_documents(self, docs, max_tokens=None):
        """Select documents that fit within the token budget."""
        budget = max_tokens or self.available_for_context()
        selected = []
        total_tokens = 0

        for doc in docs:
            doc_tokens = self.count_tokens(doc.page_content)
            if total_tokens + doc_tokens <= budget:
                selected.append(doc)
                total_tokens += doc_tokens
            else:
                # Optionally truncate the last document to fit
                remaining = budget - total_tokens
                if remaining > 100:  # Only include if meaningful
                    truncated_text = self.encoder.decode(
                        self.encoder.encode(doc.page_content)[:remaining]
                    )
                    doc.page_content = truncated_text
                    selected.append(doc)
                break

        return selected, total_tokens

# Usage in pipeline
budget = TokenBudget(model="gpt-4o", max_context=128000, reserve_output=4096)
budget.set_system_prompt(system_prompt)

retrieved_docs = retriever.invoke(query)  # May return 20 docs
fitted_docs, tokens_used = budget.fit_documents(retrieved_docs, max_tokens=8000)
# Now use fitted_docs for generation

Fallback Strategies

class ResilientRAG:
    """RAG pipeline with fallback strategies."""

    def __init__(self, primary_retriever, fallback_retriever, llm, cache):
        self.primary = primary_retriever
        self.fallback = fallback_retriever
        self.llm = llm
        self.cache = cache

    async def query(self, question: str) -> dict:
        # Layer 1: Cache
        cached = self.cache.get(question)
        if cached:
            return {"answer": cached["answer"], "source": "cache"}

        # Layer 2: Primary retrieval
        try:
            docs = self.primary.invoke(question)
            if docs and self._has_relevant_docs(docs):
                answer = self._generate(question, docs)
                self.cache.set(question, {"answer": answer})
                return {"answer": answer, "source": "primary"}
        except Exception as e:
            logging.warning(f"Primary retrieval failed: {e}")

        # Layer 3: Fallback retrieval (different index, simpler strategy)
        try:
            docs = self.fallback.invoke(question)
            if docs:
                answer = self._generate(question, docs)
                return {"answer": answer, "source": "fallback"}
        except Exception as e:
            logging.error(f"Fallback retrieval failed: {e}")

        # Layer 4: Graceful degradation
        return {
            "answer": "I'm unable to find relevant information to answer your question. "
                      "Please try rephrasing or contact support.",
            "source": "no_results",
        }

    def _has_relevant_docs(self, docs, min_score=0.5):
        """Check if retrieved docs meet minimum relevance."""
        if hasattr(docs[0], "metadata") and "score" in docs[0].metadata:
            return any(d.metadata["score"] >= min_score for d in docs)
        return len(docs) > 0

    def _generate(self, question, docs):
        context = "\n\n".join(d.page_content for d in docs)
        return self.llm.invoke(
            f"Answer based on context. If insufficient, say so.\nContext: {context}\nQuestion: {question}"
        ).content

Incremental Indexing

import hashlib
from datetime import datetime

class IncrementalIndexer:
    """Track document changes and only re-index what changed."""

    def __init__(self, vectorstore, metadata_db):
        self.vectorstore = vectorstore
        self.db = metadata_db  # Tracks indexed documents

    def _hash_content(self, content: str) -> str:
        return hashlib.sha256(content.encode()).hexdigest()

    def sync(self, documents):
        """Sync documents with the index. Only process changes."""
        to_add = []
        to_delete = []
        unchanged = 0

        for doc in documents:
            doc_id = doc.metadata.get("source", doc.page_content[:50])
            content_hash = self._hash_content(doc.page_content)

            existing = self.db.get(doc_id)
            if existing is None:
                # New document
                to_add.append(doc)
                self.db.set(doc_id, {"hash": content_hash, "indexed_at": datetime.utcnow().isoformat()})
            elif existing["hash"] != content_hash:
                # Changed document - delete old, add new
                to_delete.append(doc_id)
                to_add.append(doc)
                self.db.set(doc_id, {"hash": content_hash, "indexed_at": datetime.utcnow().isoformat()})
            else:
                unchanged += 1

        # Check for deleted documents
        all_current_ids = {doc.metadata.get("source") for doc in documents}
        all_indexed_ids = set(self.db.all_keys())
        removed = all_indexed_ids - all_current_ids
        to_delete.extend(removed)

        # Execute changes
        if to_delete:
            self.vectorstore.delete(ids=list(to_delete))
        if to_add:
            self.vectorstore.add_documents(to_add)

        return {
            "added": len(to_add),
            "deleted": len(to_delete),
            "unchanged": unchanged,
        }

Multi-Tenancy

class MultiTenantRAG:
    """Isolate data between tenants."""

    def __init__(self, vectorstore_factory):
        self.factory = vectorstore_factory

    def get_retriever(self, tenant_id: str):
        """Get tenant-specific retriever."""
        # Option 1: Separate collections per tenant
        vectorstore = self.factory.get_collection(f"tenant_{tenant_id}")
        return vectorstore.as_retriever(search_kwargs={"k": 5})

    def get_retriever_filtered(self, tenant_id: str):
        """Get retriever with metadata-based tenant filtering."""
        # Option 2: Single collection with tenant metadata filter
        return vectorstore.as_retriever(
            search_kwargs={
                "k": 5,
                "filter": {"tenant_id": tenant_id}
            }
        )

# Tradeoffs:
# Separate collections: Better isolation, easier deletion, more infrastructure
# Metadata filtering: Simpler ops, risk of filter bugs leaking data

Cost Optimization

class CostTracker:
    """Track and optimize RAG pipeline costs."""

    # Approximate costs (USD per 1M tokens, as of 2024)
    COSTS = {
        "text-embedding-3-small": {"input": 0.02},
        "text-embedding-3-large": {"input": 0.13},
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
    }

    def __init__(self):
        self.total_cost = 0
        self.breakdown = {}

    def log(self, model: str, input_tokens: int, output_tokens: int = 0):
        costs = self.COSTS.get(model, {"input": 0, "output": 0})
        cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
        self.total_cost += cost
        self.breakdown[model] = self.breakdown.get(model, 0) + cost

    def report(self):
        print(f"Total cost: ${self.total_cost:.4f}")
        for model, cost in self.breakdown.items():
            print(f"  {model}: ${cost:.4f}")

# Cost reduction strategies:
# 1. Use gpt-4o-mini for re-ranking/grading (not generation)
# 2. Cache embeddings and responses aggressively
# 3. Reduce embedding dimensions (Matryoshka)
# 4. Use smaller k values when possible
# 5. Batch embedding requests
# 6. Use open-source models for non-critical paths

Monitoring and Observability

import time
import logging
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RAGTrace:
    """Trace a single RAG request through the pipeline."""
    query: str
    start_time: float = field(default_factory=time.time)
    retrieval_latency_ms: Optional[float] = None
    rerank_latency_ms: Optional[float] = None
    generation_latency_ms: Optional[float] = None
    total_latency_ms: Optional[float] = None
    num_docs_retrieved: int = 0
    num_docs_after_rerank: int = 0
    top_similarity_score: Optional[float] = None
    context_tokens: int = 0
    output_tokens: int = 0
    cache_hit: bool = False
    error: Optional[str] = None

    def finish(self):
        self.total_latency_ms = (time.time() - self.start_time) * 1000

# Key metrics to dashboard:
# - p50/p95/p99 latency
# - Cache hit rate (target: >30% for cost savings)
# - Retrieval score distribution (detect when queries drift from corpus)
# - Error rate by component (retrieval vs generation)
# - User feedback ratio (thumbs up vs down)
# - Queries with no relevant results (gap analysis for corpus)
# - Cost per query (embedding + retrieval + generation)
# - Token utilization (how much of context budget is used)

Deployment Checklist

  • Caching: exact + semantic cache with appropriate TTL
  • Streaming: SSE or WebSocket for user-facing responses
  • Token budgets: enforce limits on context and output
  • Fallbacks: graceful degradation when retrieval or LLM fails
  • Rate limiting: protect embedding and LLM APIs
  • Multi-tenancy: data isolation verified
  • Incremental indexing: pipeline for document updates
  • Monitoring: latency, quality, cost dashboards
  • Evaluation: automated eval suite runs on index updates
  • Cost tracking: per-query cost attribution
  • Security: no prompt injection via retrieved documents
  • Logging: full traces for debugging (redact PII)

Anti-Patterns

  1. No caching at all -- Even simple exact-match caching saves 20-40% of LLM costs for most workloads.

  2. Synchronous indexing on write -- Index updates should be async. Do not block user requests while re-embedding documents.

  3. No fallback path -- When the vector DB is down or returns empty, the system should degrade gracefully, not crash.

  4. Logging queries without redaction -- RAG queries often contain PII. Redact or hash sensitive fields before logging.

  5. Not monitoring similarity scores -- A gradual decline in top-k similarity scores means queries are drifting from your corpus. Add new documents or adjust chunking.

  6. Ignoring cold start -- The first query after deployment has no cache. Pre-warm caches with common queries from analytics.

Install this skill directly: skilldb add rag-pipeline-skills

Get CLI access →

Related Skills

advanced-rag

Advanced RAG patterns beyond basic retrieve-and-generate. Covers multi-hop RAG, agentic RAG with tool use, graph RAG (knowledge graphs + vector retrieval), recursive retrieval, self-querying retrievers, query decomposition, citation extraction, and corrective RAG. Includes implementation patterns and guidance on when each advanced technique is warranted.

Rag Pipeline464L

chunking-strategies

Comprehensive guide to document chunking strategies for RAG pipelines. Covers fixed-size, semantic, recursive character, sentence-based, parent-child, markdown-aware, and code-aware chunking. Includes chunk size optimization, overlap strategies, and practical benchmarks for choosing the right approach based on document type and retrieval quality.

Rag Pipeline343L

embedding-models

Guide to selecting, using, and optimizing text embedding models for RAG pipelines. Covers commercial models (OpenAI text-embedding-3, Cohere embed-v3, Voyage AI) and open-source options (BGE, E5, Nomic Embed). Includes dimensionality selection, batch processing, embedding caching, fine-tuning for domain-specific retrieval, and cost analysis.

Rag Pipeline357L

rag-evaluation

Evaluating RAG systems end-to-end. Covers retrieval metrics (context precision, context recall, MRR), generation metrics (faithfulness, answer relevance, hallucination detection), the RAGAS framework, human evaluation protocols, A/B testing retrieval strategies, building evaluation datasets, and continuous monitoring in production.

Rag Pipeline501L

rag-fundamentals

Teaches the foundational architecture of Retrieval-Augmented Generation (RAG) systems. Covers why RAG outperforms fine-tuning for most knowledge-grounding use cases, the three core stages (indexing, retrieval, generation), component design, latency budgets, and evaluation metrics including faithfulness, relevance, and hallucination rate. Use when building or explaining any RAG system from scratch.

Rag Pipeline266L

rag-with-langchain

Building RAG pipelines with LangChain and LangGraph. Covers document loaders, text splitters, vector stores, retrievers, chains, and agents. Includes practical patterns for conversational RAG, multi-source retrieval, streaming, and LangGraph-based agentic RAG workflows.

Rag Pipeline460L