Skip to main content
Technology & EngineeringEvent Sourcing159 lines

Cqrs

Implement Command Query Responsibility Segregation to separate write and read models in event-sourced architectures

Quick Summary15 lines
You are an expert in CQRS (Command Query Responsibility Segregation) for building event-sourced systems.

## Key Points

- **Keep commands and queries in separate modules** to enforce the boundary at the code structure level, not just conceptually.
- **Commands should be imperative and intention-revealing** (`PlaceOrder`, `CancelSubscription`), while queries should describe what data is needed (`GetOrderSummary`, `ListActiveCustomers`).
- **Validate commands at the edge** (API layer) for format and schema correctness; validate business rules inside the aggregate.
- **Return minimal data from command handlers.** Ideally return only an acknowledgment or the aggregate ID. If the caller needs the resulting state, they query the read side.
- **Scale read and write sides independently.** The read side often needs many replicas; the write side needs strong consistency.
- **Applying CQRS everywhere.** CQRS adds complexity. Use it for bounded contexts where the read and write models genuinely diverge. Simple CRUD contexts do not benefit.
- **Querying the write model.** If read endpoints load aggregates from the event store, you have lost the main benefit of CQRS. Always read from projections.
- **Fat commands with embedded business logic.** Commands should carry intent and data, not decisions. The aggregate decides.
- **Coupling read models to aggregate internals.** Projections should be driven by published events, not by reaching into the aggregate's private state.
skilldb get event-sourcing-skills/CqrsFull skill: 159 lines
Paste into your CLAUDE.md or agent config

CQRS Pattern — Event Sourcing

You are an expert in CQRS (Command Query Responsibility Segregation) for building event-sourced systems.

Core Philosophy

Overview

CQRS separates the write side (commands that change state) from the read side (queries that return data). In an event-sourced system, the write side persists domain events to an event store, while the read side builds optimized projections from those events. This separation allows each side to be scaled, optimized, and evolved independently.

Core Concepts

Commands: Requests to change the system's state. A command is handled by a single aggregate, validated against business rules, and results in zero or more domain events being emitted. Commands can be rejected.

Queries: Requests for data. Queries read from purpose-built read models (projections) that are optimized for specific access patterns. Queries never modify state.

Command Handler: Receives a command, loads the appropriate aggregate, invokes the aggregate's behavior, and persists resulting events.

Query Handler: Receives a query and reads from a projection or view. There is no domain logic here — just data retrieval.

Write Model: The aggregate and event store. Optimized for consistency and business rule enforcement.

Read Model: Denormalized views built by projecting events. Optimized for query performance.

Implementation Patterns

Command and Command Handler

from dataclasses import dataclass

@dataclass(frozen=True)
class PlaceOrder:
    order_id: str
    customer_id: str
    items: list[dict]

class PlaceOrderHandler:
    def __init__(self, event_store: EventStore):
        self._store = event_store

    def handle(self, command: PlaceOrder) -> None:
        # Load or create the aggregate
        try:
            order = Order.load(command.order_id, self._store)
        except StreamNotFound:
            order = Order.create(command.order_id)

        # Execute domain logic — may raise if business rules are violated
        order.place(customer_id=command.customer_id, items=command.items)

        # Persist new events
        self._store.append(
            stream_id=f"order-{command.order_id}",
            events=order.pending_events,
            expected_version=order.version
        )

Query and Query Handler

@dataclass(frozen=True)
class GetOrderSummary:
    order_id: str

class GetOrderSummaryHandler:
    def __init__(self, read_db):
        self._db = read_db

    def handle(self, query: GetOrderSummary) -> dict | None:
        return self._db.query(
            "SELECT * FROM order_summaries WHERE order_id = %s",
            (query.order_id,)
        )

Command Bus / Mediator

class CommandBus:
    def __init__(self):
        self._handlers: dict[type, object] = {}

    def register(self, command_type: type, handler) -> None:
        self._handlers[command_type] = handler

    def dispatch(self, command) -> None:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler registered for {type(command).__name__}")
        handler.handle(command)

# Wiring
bus = CommandBus()
bus.register(PlaceOrder, PlaceOrderHandler(event_store))
bus.register(CancelOrder, CancelOrderHandler(event_store))

Separating Read and Write APIs (FastAPI)

from fastapi import FastAPI, HTTPException

app = FastAPI()

# Write endpoint — accepts commands
@app.post("/orders")
def place_order(body: PlaceOrderRequest):
    command = PlaceOrder(
        order_id=body.order_id,
        customer_id=body.customer_id,
        items=body.items
    )
    command_bus.dispatch(command)
    return {"status": "accepted", "order_id": body.order_id}

# Read endpoint — queries a projection
@app.get("/orders/{order_id}")
def get_order(order_id: str):
    result = query_bus.dispatch(GetOrderSummary(order_id=order_id))
    if not result:
        raise HTTPException(status_code=404)
    return result

Best Practices

  • Keep commands and queries in separate modules to enforce the boundary at the code structure level, not just conceptually.
  • Commands should be imperative and intention-revealing (PlaceOrder, CancelSubscription), while queries should describe what data is needed (GetOrderSummary, ListActiveCustomers).
  • Validate commands at the edge (API layer) for format and schema correctness; validate business rules inside the aggregate.
  • Return minimal data from command handlers. Ideally return only an acknowledgment or the aggregate ID. If the caller needs the resulting state, they query the read side.
  • Scale read and write sides independently. The read side often needs many replicas; the write side needs strong consistency.
  • Use asynchronous projection updates for throughput, but provide synchronous read-your-own-writes semantics where the UX demands it (e.g., redirect to a page that waits for the projection to catch up).

Common Pitfalls

  • Applying CQRS everywhere. CQRS adds complexity. Use it for bounded contexts where the read and write models genuinely diverge. Simple CRUD contexts do not benefit.
  • Querying the write model. If read endpoints load aggregates from the event store, you have lost the main benefit of CQRS. Always read from projections.
  • Fat commands with embedded business logic. Commands should carry intent and data, not decisions. The aggregate decides.
  • Ignoring eventual consistency in the UI. After a command is accepted, the read model may not yet reflect the change. Design the UI to handle this (optimistic UI updates, polling, or subscriptions).
  • Coupling read models to aggregate internals. Projections should be driven by published events, not by reaching into the aggregate's private state.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add event-sourcing-skills

Get CLI access →