Sagas
Coordinate long-running business processes across aggregates and services using sagas and process managers
You are an expert in sagas and process managers for building event-sourced systems. ## Key Points - **Make every step idempotent.** A saga may receive the same event more than once due to at-least-once delivery. Idempotent command handlers prevent duplicate side effects. - **Design compensating actions carefully.** Compensation is not always a simple "undo." For example, compensating a shipped order might mean issuing a return, not deleting the shipment record. - **Persist saga state in the event store** so the saga benefits from the same durability and replayability as the rest of the system. - **Use correlation IDs** to link all events and commands within a saga instance, enabling end-to-end tracing. - **Set explicit timeouts** for every step that waits for an external response. Without timeouts, a saga can hang indefinitely. - **Keep saga logic simple.** A saga should coordinate, not contain business rules. Business rules belong in the aggregates that receive the saga's commands. - **Trying to use distributed transactions instead of sagas.** Two-phase commit does not scale and creates tight coupling. Sagas with compensation are the event-sourcing-native approach. - **Forgetting compensation paths.** Every forward step needs a corresponding compensation step. If you cannot compensate, reconsider the workflow design. - **Not testing timeout scenarios.** Timeouts are not edge cases — they are expected in distributed systems. Test them explicitly. - **Running saga logic inside the aggregate.** Aggregates enforce invariants for a single consistency boundary. Cross-aggregate coordination belongs in the saga.
skilldb get event-sourcing-skills/SagasFull skill: 184 linesSagas and Process Managers — Event Sourcing
You are an expert in sagas and process managers for building event-sourced systems.
Core Philosophy
Overview
Sagas (also called process managers) coordinate multi-step business processes that span multiple aggregates or services. Unlike a single aggregate command that either succeeds or fails atomically, a saga orchestrates a sequence of steps over time, reacting to events and issuing commands. When a step fails, the saga executes compensating actions to undo previously completed steps.
Core Concepts
Saga: A stateful object that listens for domain events and dispatches commands in response. It tracks which steps have been completed and decides what to do next — including compensating for failures.
Orchestration vs. Choreography: In orchestration, a central process manager explicitly controls the flow. In choreography, each service reacts independently to events with no central coordinator. Sagas typically follow the orchestration model.
Compensating Actions: Event-sourced systems do not use distributed transactions. Instead, when a step fails, the saga issues compensating commands that semantically reverse the effect of prior steps (e.g., RefundPayment to reverse ChargePayment).
Saga State: The saga persists its own state — either as events in the event store or as a document in a dedicated store. This state records which steps have completed and what data has been collected along the way.
Timeout and Deadlines: Sagas must handle the case where an expected event never arrives. Scheduled timeouts trigger fallback logic or compensation.
Implementation Patterns
Order Fulfillment Saga
class OrderFulfillmentSaga:
"""
Coordinates: OrderPlaced -> ReserveInventory -> ChargePayment -> ShipOrder
Compensates on failure at any step.
"""
def __init__(self, saga_id: str):
self.saga_id = saga_id
self.state = "not_started"
self.order_id = None
self.payment_id = None
self._commands = []
def handle(self, event: dict) -> list:
self._commands = []
handler = {
"OrderPlaced": self._on_order_placed,
"InventoryReserved": self._on_inventory_reserved,
"InventoryReservationFailed": self._on_inventory_failed,
"PaymentCharged": self._on_payment_charged,
"PaymentFailed": self._on_payment_failed,
}.get(event["event_type"])
if handler:
handler(event["data"])
return self._commands
def _on_order_placed(self, data):
self.order_id = data["order_id"]
self.state = "awaiting_inventory"
self._commands.append(ReserveInventory(
order_id=self.order_id,
items=data["items"]
))
def _on_inventory_reserved(self, data):
self.state = "awaiting_payment"
self._commands.append(ChargePayment(
order_id=self.order_id,
amount=data["total"],
customer_id=data["customer_id"]
))
def _on_inventory_failed(self, data):
self.state = "compensating"
self._commands.append(RejectOrder(
order_id=self.order_id,
reason="Insufficient inventory"
))
self.state = "failed"
def _on_payment_charged(self, data):
self.payment_id = data["payment_id"]
self.state = "awaiting_shipment"
self._commands.append(ShipOrder(order_id=self.order_id))
def _on_payment_failed(self, data):
self.state = "compensating"
# Compensate: release the inventory that was reserved
self._commands.append(ReleaseInventory(
order_id=self.order_id,
items=data["items"]
))
self._commands.append(RejectOrder(
order_id=self.order_id,
reason="Payment declined"
))
self.state = "failed"
Saga Persistence as Events
class SagaStore:
def __init__(self, event_store: EventStore):
self._store = event_store
def load(self, saga_id: str) -> OrderFulfillmentSaga:
saga = OrderFulfillmentSaga(saga_id)
events = self._store.read_stream(f"saga-{saga_id}")
for event in events:
saga.apply_stored_event(event)
return saga
def save(self, saga: OrderFulfillmentSaga, new_events: list) -> None:
self._store.append(
stream_id=f"saga-{saga.saga_id}",
events=new_events,
expected_version=saga.version
)
Saga Coordinator with Timeout Support
class SagaCoordinator:
def __init__(self, saga_store, command_bus, scheduler):
self._store = saga_store
self._bus = command_bus
self._scheduler = scheduler
def on_event(self, event: dict):
saga_id = self._resolve_saga_id(event)
saga = self._store.load(saga_id)
commands = saga.handle(event)
for command in commands:
self._bus.dispatch(command)
self._store.save(saga, saga.pending_events)
# Schedule a timeout if we are waiting for an external response
if saga.state.startswith("awaiting_"):
self._scheduler.schedule(
saga_id=saga_id,
timeout_seconds=300,
timeout_event={"event_type": "SagaTimedOut", "data": {"saga_id": saga_id}}
)
def _resolve_saga_id(self, event) -> str:
return event["data"].get("order_id", event["data"].get("saga_id"))
Best Practices
- Make every step idempotent. A saga may receive the same event more than once due to at-least-once delivery. Idempotent command handlers prevent duplicate side effects.
- Design compensating actions carefully. Compensation is not always a simple "undo." For example, compensating a shipped order might mean issuing a return, not deleting the shipment record.
- Persist saga state in the event store so the saga benefits from the same durability and replayability as the rest of the system.
- Use correlation IDs to link all events and commands within a saga instance, enabling end-to-end tracing.
- Set explicit timeouts for every step that waits for an external response. Without timeouts, a saga can hang indefinitely.
- Keep saga logic simple. A saga should coordinate, not contain business rules. Business rules belong in the aggregates that receive the saga's commands.
Common Pitfalls
- Trying to use distributed transactions instead of sagas. Two-phase commit does not scale and creates tight coupling. Sagas with compensation are the event-sourcing-native approach.
- Forgetting compensation paths. Every forward step needs a corresponding compensation step. If you cannot compensate, reconsider the workflow design.
- Coupling sagas to internal events of other services. Sagas should react to published integration events, not internal domain events. This boundary prevents tight coupling across service boundaries.
- Not testing timeout scenarios. Timeouts are not edge cases — they are expected in distributed systems. Test them explicitly.
- Running saga logic inside the aggregate. Aggregates enforce invariants for a single consistency boundary. Cross-aggregate coordination belongs in the saga.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add event-sourcing-skills
Related Skills
Cqrs
Implement Command Query Responsibility Segregation to separate write and read models in event-sourced architectures
Event Store
Design and implement event stores for persisting domain events with append-only semantics and optimistic concurrency
Event Versioning
Evolve event schemas safely over time using upcasting, weak schema strategies, and version negotiation
Eventual Consistency
Handle eventual consistency challenges in distributed event-sourced systems with practical patterns and strategies
Projections
Build and maintain read model projections from event streams for optimized query performance
Snapshots
Optimize aggregate loading with snapshot patterns to avoid replaying long event streams from the beginning