Skip to main content
Technology & EngineeringEvent Sourcing234 lines

Snapshots

Optimize aggregate loading with snapshot patterns to avoid replaying long event streams from the beginning

Quick Summary15 lines
You are an expert in snapshot strategies for building event-sourced systems.

## Key Points

- **Treat snapshots as disposable caches.** They must never be the source of truth. If a snapshot is missing or corrupt, the system falls back to full event replay.
- **Version your snapshot schemas.** Include a schema version in the snapshot so that deserialization logic can detect and discard incompatible snapshots.
- **Take snapshots asynchronously** in a background process rather than in the hot path of command handling. This keeps write latency predictable.
- **Set a sensible interval.** Snapshot too frequently and you waste storage and writes. Snapshot too rarely and load times degrade. Profile your aggregate to find the right threshold.
- **Store snapshots separately from events.** A snapshot is a performance optimization, not a domain event. Mixing them into the event stream complicates projections and subscriptions.
- **Relying on snapshots for correctness.** If your system breaks when snapshots are deleted, you have a bug. Full replay must always produce the correct state.
- **Snapshotting too early in the project.** Snapshots add complexity. Only introduce them when profiling shows that event replay is actually a bottleneck.
- **Including transient or computed data in snapshots.** Only snapshot the minimal state needed to reconstruct the aggregate. Derived values should be recomputed.
- **Not testing the full-replay path.** Because snapshots bypass most events, bugs in event handlers can hide behind valid snapshots. Regularly test full replay to catch regressions.
skilldb get event-sourcing-skills/SnapshotsFull skill: 234 lines
Paste into your CLAUDE.md or agent config

Snapshot Patterns — Event Sourcing

You are an expert in snapshot strategies for building event-sourced systems.

Core Philosophy

Overview

In event sourcing, an aggregate's current state is reconstructed by replaying all its events from the beginning of its stream. For aggregates with hundreds or thousands of events, this replay becomes slow. Snapshots solve this by periodically capturing a serialized copy of the aggregate's state at a known version. On subsequent loads, the system restores the snapshot and replays only the events that occurred after it.

Core Concepts

Snapshot: A serialized representation of an aggregate's state at a specific stream version. It is a performance optimization, not a source of truth — the event stream remains authoritative.

Snapshot Store: A dedicated store (or a special event in the stream) where snapshots are persisted. It is separate from and subordinate to the event store.

Snapshot Frequency: The interval (in event count or time) at which new snapshots are taken. Common strategies include every N events, on every Nth load, or on a scheduled background job.

Invalidation: When snapshot serialization logic changes (e.g., a new field is added to the aggregate), existing snapshots become stale. They must be discarded and regenerated.

Implementation Patterns

Snapshot Data Class

from dataclasses import dataclass
import json

@dataclass
class Snapshot:
    stream_id: str
    version: int
    state: dict
    created_at: str

    def serialize(self) -> str:
        return json.dumps({
            "stream_id": self.stream_id,
            "version": self.version,
            "state": self.state,
            "created_at": self.created_at,
        })

    @classmethod
    def deserialize(cls, raw: str) -> "Snapshot":
        data = json.loads(raw)
        return cls(**data)

Snapshot Store (SQL)

CREATE TABLE snapshots (
    stream_id    TEXT PRIMARY KEY,
    version      INT NOT NULL,
    state        JSONB NOT NULL,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);
class SnapshotStore:
    def __init__(self, connection):
        self._conn = connection

    def get(self, stream_id: str) -> Snapshot | None:
        with self._conn.cursor() as cur:
            cur.execute(
                "SELECT stream_id, version, state, created_at FROM snapshots WHERE stream_id = %s",
                (stream_id,)
            )
            row = cur.fetchone()
            if row:
                return Snapshot(
                    stream_id=row[0], version=row[1],
                    state=row[2], created_at=str(row[3])
                )
            return None

    def save(self, snapshot: Snapshot) -> None:
        with self._conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO snapshots (stream_id, version, state)
                VALUES (%s, %s, %s)
                ON CONFLICT (stream_id) DO UPDATE SET
                    version = EXCLUDED.version,
                    state = EXCLUDED.state,
                    created_at = now()
                """,
                (snapshot.stream_id, snapshot.version, json.dumps(snapshot.state))
            )
            self._conn.commit()

Aggregate Loading with Snapshots

class AggregateRepository:
    SNAPSHOT_INTERVAL = 100  # Take a snapshot every 100 events

    def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore):
        self._events = event_store
        self._snapshots = snapshot_store

    def load(self, aggregate_cls, stream_id: str):
        aggregate = aggregate_cls()

        # 1. Try to load from snapshot
        snapshot = self._snapshots.get(stream_id)
        if snapshot:
            aggregate.restore_from_snapshot(snapshot.state)
            aggregate._version = snapshot.version
            from_version = snapshot.version
        else:
            from_version = 0

        # 2. Replay only events after the snapshot
        events = self._events.read_stream(stream_id, from_version=from_version)
        for event in events:
            aggregate._apply(event, is_new=False)

        return aggregate

    def save(self, stream_id: str, aggregate) -> None:
        self._events.append(
            stream_id=stream_id,
            events=aggregate.pending_events,
            expected_version=aggregate.version - len(aggregate.pending_events)
        )

        # 3. Take a snapshot if the interval threshold is reached
        if aggregate.version % self.SNAPSHOT_INTERVAL == 0:
            snapshot = Snapshot(
                stream_id=stream_id,
                version=aggregate.version,
                state=aggregate.to_snapshot(),
                created_at=datetime.utcnow().isoformat()
            )
            self._snapshots.save(snapshot)

Aggregate Snapshot Support

class Order:
    def __init__(self):
        self._version = 0
        self._pending_events = []
        self.order_id = None
        self.status = None
        self.items = []
        self.total = 0

    def to_snapshot(self) -> dict:
        return {
            "order_id": self.order_id,
            "status": self.status,
            "items": self.items,
            "total": self.total,
        }

    def restore_from_snapshot(self, state: dict) -> None:
        self.order_id = state["order_id"]
        self.status = state["status"]
        self.items = state["items"]
        self.total = state["total"]

    @property
    def version(self):
        return self._version

Background Snapshot Worker

def snapshot_worker(event_store, snapshot_store, aggregate_cls, stream_ids, interval=100):
    """Periodically create snapshots for aggregates that have grown past the threshold."""
    for stream_id in stream_ids:
        existing = snapshot_store.get(stream_id)
        existing_version = existing.version if existing else 0

        events = event_store.read_stream(stream_id, from_version=existing_version)
        if len(events) < interval:
            continue  # Not enough new events to justify a new snapshot

        # Rebuild aggregate to current state
        aggregate = aggregate_cls()
        if existing:
            aggregate.restore_from_snapshot(existing.state)
            aggregate._version = existing.version
        for event in events:
            aggregate._apply(event, is_new=False)

        snapshot_store.save(Snapshot(
            stream_id=stream_id,
            version=aggregate.version,
            state=aggregate.to_snapshot(),
            created_at=datetime.utcnow().isoformat()
        ))

Best Practices

  • Treat snapshots as disposable caches. They must never be the source of truth. If a snapshot is missing or corrupt, the system falls back to full event replay.
  • Version your snapshot schemas. Include a schema version in the snapshot so that deserialization logic can detect and discard incompatible snapshots.
  • Take snapshots asynchronously in a background process rather than in the hot path of command handling. This keeps write latency predictable.
  • Set a sensible interval. Snapshot too frequently and you waste storage and writes. Snapshot too rarely and load times degrade. Profile your aggregate to find the right threshold.
  • Store snapshots separately from events. A snapshot is a performance optimization, not a domain event. Mixing them into the event stream complicates projections and subscriptions.

Common Pitfalls

  • Relying on snapshots for correctness. If your system breaks when snapshots are deleted, you have a bug. Full replay must always produce the correct state.
  • Forgetting to invalidate snapshots after aggregate refactoring. If you rename or restructure aggregate fields, old snapshots will deserialize incorrectly. Version the snapshot format and discard stale versions.
  • Snapshotting too early in the project. Snapshots add complexity. Only introduce them when profiling shows that event replay is actually a bottleneck.
  • Including transient or computed data in snapshots. Only snapshot the minimal state needed to reconstruct the aggregate. Derived values should be recomputed.
  • Not testing the full-replay path. Because snapshots bypass most events, bugs in event handlers can hide behind valid snapshots. Regularly test full replay to catch regressions.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add event-sourcing-skills

Get CLI access →