Cqrs
Implement Command Query Responsibility Segregation to separate write and read models in event-sourced architectures
You are an expert in CQRS (Command Query Responsibility Segregation) for building event-sourced systems. ## Key Points - **Keep commands and queries in separate modules** to enforce the boundary at the code structure level, not just conceptually. - **Commands should be imperative and intention-revealing** (`PlaceOrder`, `CancelSubscription`), while queries should describe what data is needed (`GetOrderSummary`, `ListActiveCustomers`). - **Validate commands at the edge** (API layer) for format and schema correctness; validate business rules inside the aggregate. - **Return minimal data from command handlers.** Ideally return only an acknowledgment or the aggregate ID. If the caller needs the resulting state, they query the read side. - **Scale read and write sides independently.** The read side often needs many replicas; the write side needs strong consistency. - **Applying CQRS everywhere.** CQRS adds complexity. Use it for bounded contexts where the read and write models genuinely diverge. Simple CRUD contexts do not benefit. - **Querying the write model.** If read endpoints load aggregates from the event store, you have lost the main benefit of CQRS. Always read from projections. - **Fat commands with embedded business logic.** Commands should carry intent and data, not decisions. The aggregate decides. - **Coupling read models to aggregate internals.** Projections should be driven by published events, not by reaching into the aggregate's private state.
skilldb get event-sourcing-skills/CqrsFull skill: 159 linesCQRS Pattern — Event Sourcing
You are an expert in CQRS (Command Query Responsibility Segregation) for building event-sourced systems.
Core Philosophy
Overview
CQRS separates the write side (commands that change state) from the read side (queries that return data). In an event-sourced system, the write side persists domain events to an event store, while the read side builds optimized projections from those events. This separation allows each side to be scaled, optimized, and evolved independently.
Core Concepts
Commands: Requests to change the system's state. A command is handled by a single aggregate, validated against business rules, and results in zero or more domain events being emitted. Commands can be rejected.
Queries: Requests for data. Queries read from purpose-built read models (projections) that are optimized for specific access patterns. Queries never modify state.
Command Handler: Receives a command, loads the appropriate aggregate, invokes the aggregate's behavior, and persists resulting events.
Query Handler: Receives a query and reads from a projection or view. There is no domain logic here — just data retrieval.
Write Model: The aggregate and event store. Optimized for consistency and business rule enforcement.
Read Model: Denormalized views built by projecting events. Optimized for query performance.
Implementation Patterns
Command and Command Handler
from dataclasses import dataclass
@dataclass(frozen=True)
class PlaceOrder:
order_id: str
customer_id: str
items: list[dict]
class PlaceOrderHandler:
def __init__(self, event_store: EventStore):
self._store = event_store
def handle(self, command: PlaceOrder) -> None:
# Load or create the aggregate
try:
order = Order.load(command.order_id, self._store)
except StreamNotFound:
order = Order.create(command.order_id)
# Execute domain logic — may raise if business rules are violated
order.place(customer_id=command.customer_id, items=command.items)
# Persist new events
self._store.append(
stream_id=f"order-{command.order_id}",
events=order.pending_events,
expected_version=order.version
)
Query and Query Handler
@dataclass(frozen=True)
class GetOrderSummary:
order_id: str
class GetOrderSummaryHandler:
def __init__(self, read_db):
self._db = read_db
def handle(self, query: GetOrderSummary) -> dict | None:
return self._db.query(
"SELECT * FROM order_summaries WHERE order_id = %s",
(query.order_id,)
)
Command Bus / Mediator
class CommandBus:
def __init__(self):
self._handlers: dict[type, object] = {}
def register(self, command_type: type, handler) -> None:
self._handlers[command_type] = handler
def dispatch(self, command) -> None:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler registered for {type(command).__name__}")
handler.handle(command)
# Wiring
bus = CommandBus()
bus.register(PlaceOrder, PlaceOrderHandler(event_store))
bus.register(CancelOrder, CancelOrderHandler(event_store))
Separating Read and Write APIs (FastAPI)
from fastapi import FastAPI, HTTPException
app = FastAPI()
# Write endpoint — accepts commands
@app.post("/orders")
def place_order(body: PlaceOrderRequest):
command = PlaceOrder(
order_id=body.order_id,
customer_id=body.customer_id,
items=body.items
)
command_bus.dispatch(command)
return {"status": "accepted", "order_id": body.order_id}
# Read endpoint — queries a projection
@app.get("/orders/{order_id}")
def get_order(order_id: str):
result = query_bus.dispatch(GetOrderSummary(order_id=order_id))
if not result:
raise HTTPException(status_code=404)
return result
Best Practices
- Keep commands and queries in separate modules to enforce the boundary at the code structure level, not just conceptually.
- Commands should be imperative and intention-revealing (
PlaceOrder,CancelSubscription), while queries should describe what data is needed (GetOrderSummary,ListActiveCustomers). - Validate commands at the edge (API layer) for format and schema correctness; validate business rules inside the aggregate.
- Return minimal data from command handlers. Ideally return only an acknowledgment or the aggregate ID. If the caller needs the resulting state, they query the read side.
- Scale read and write sides independently. The read side often needs many replicas; the write side needs strong consistency.
- Use asynchronous projection updates for throughput, but provide synchronous read-your-own-writes semantics where the UX demands it (e.g., redirect to a page that waits for the projection to catch up).
Common Pitfalls
- Applying CQRS everywhere. CQRS adds complexity. Use it for bounded contexts where the read and write models genuinely diverge. Simple CRUD contexts do not benefit.
- Querying the write model. If read endpoints load aggregates from the event store, you have lost the main benefit of CQRS. Always read from projections.
- Fat commands with embedded business logic. Commands should carry intent and data, not decisions. The aggregate decides.
- Ignoring eventual consistency in the UI. After a command is accepted, the read model may not yet reflect the change. Design the UI to handle this (optimistic UI updates, polling, or subscriptions).
- Coupling read models to aggregate internals. Projections should be driven by published events, not by reaching into the aggregate's private state.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add event-sourcing-skills
Related Skills
Event Store
Design and implement event stores for persisting domain events with append-only semantics and optimistic concurrency
Event Versioning
Evolve event schemas safely over time using upcasting, weak schema strategies, and version negotiation
Eventual Consistency
Handle eventual consistency challenges in distributed event-sourced systems with practical patterns and strategies
Projections
Build and maintain read model projections from event streams for optimized query performance
Sagas
Coordinate long-running business processes across aggregates and services using sagas and process managers
Snapshots
Optimize aggregate loading with snapshot patterns to avoid replaying long event streams from the beginning