Snapshots
Optimize aggregate loading with snapshot patterns to avoid replaying long event streams from the beginning
You are an expert in snapshot strategies for building event-sourced systems. ## Key Points - **Treat snapshots as disposable caches.** They must never be the source of truth. If a snapshot is missing or corrupt, the system falls back to full event replay. - **Version your snapshot schemas.** Include a schema version in the snapshot so that deserialization logic can detect and discard incompatible snapshots. - **Take snapshots asynchronously** in a background process rather than in the hot path of command handling. This keeps write latency predictable. - **Set a sensible interval.** Snapshot too frequently and you waste storage and writes. Snapshot too rarely and load times degrade. Profile your aggregate to find the right threshold. - **Store snapshots separately from events.** A snapshot is a performance optimization, not a domain event. Mixing them into the event stream complicates projections and subscriptions. - **Relying on snapshots for correctness.** If your system breaks when snapshots are deleted, you have a bug. Full replay must always produce the correct state. - **Snapshotting too early in the project.** Snapshots add complexity. Only introduce them when profiling shows that event replay is actually a bottleneck. - **Including transient or computed data in snapshots.** Only snapshot the minimal state needed to reconstruct the aggregate. Derived values should be recomputed. - **Not testing the full-replay path.** Because snapshots bypass most events, bugs in event handlers can hide behind valid snapshots. Regularly test full replay to catch regressions.
skilldb get event-sourcing-skills/SnapshotsFull skill: 234 linesSnapshot Patterns — Event Sourcing
You are an expert in snapshot strategies for building event-sourced systems.
Core Philosophy
Overview
In event sourcing, an aggregate's current state is reconstructed by replaying all its events from the beginning of its stream. For aggregates with hundreds or thousands of events, this replay becomes slow. Snapshots solve this by periodically capturing a serialized copy of the aggregate's state at a known version. On subsequent loads, the system restores the snapshot and replays only the events that occurred after it.
Core Concepts
Snapshot: A serialized representation of an aggregate's state at a specific stream version. It is a performance optimization, not a source of truth — the event stream remains authoritative.
Snapshot Store: A dedicated store (or a special event in the stream) where snapshots are persisted. It is separate from and subordinate to the event store.
Snapshot Frequency: The interval (in event count or time) at which new snapshots are taken. Common strategies include every N events, on every Nth load, or on a scheduled background job.
Invalidation: When snapshot serialization logic changes (e.g., a new field is added to the aggregate), existing snapshots become stale. They must be discarded and regenerated.
Implementation Patterns
Snapshot Data Class
from dataclasses import dataclass
import json
@dataclass
class Snapshot:
stream_id: str
version: int
state: dict
created_at: str
def serialize(self) -> str:
return json.dumps({
"stream_id": self.stream_id,
"version": self.version,
"state": self.state,
"created_at": self.created_at,
})
@classmethod
def deserialize(cls, raw: str) -> "Snapshot":
data = json.loads(raw)
return cls(**data)
Snapshot Store (SQL)
CREATE TABLE snapshots (
stream_id TEXT PRIMARY KEY,
version INT NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
class SnapshotStore:
def __init__(self, connection):
self._conn = connection
def get(self, stream_id: str) -> Snapshot | None:
with self._conn.cursor() as cur:
cur.execute(
"SELECT stream_id, version, state, created_at FROM snapshots WHERE stream_id = %s",
(stream_id,)
)
row = cur.fetchone()
if row:
return Snapshot(
stream_id=row[0], version=row[1],
state=row[2], created_at=str(row[3])
)
return None
def save(self, snapshot: Snapshot) -> None:
with self._conn.cursor() as cur:
cur.execute(
"""
INSERT INTO snapshots (stream_id, version, state)
VALUES (%s, %s, %s)
ON CONFLICT (stream_id) DO UPDATE SET
version = EXCLUDED.version,
state = EXCLUDED.state,
created_at = now()
""",
(snapshot.stream_id, snapshot.version, json.dumps(snapshot.state))
)
self._conn.commit()
Aggregate Loading with Snapshots
class AggregateRepository:
SNAPSHOT_INTERVAL = 100 # Take a snapshot every 100 events
def __init__(self, event_store: EventStore, snapshot_store: SnapshotStore):
self._events = event_store
self._snapshots = snapshot_store
def load(self, aggregate_cls, stream_id: str):
aggregate = aggregate_cls()
# 1. Try to load from snapshot
snapshot = self._snapshots.get(stream_id)
if snapshot:
aggregate.restore_from_snapshot(snapshot.state)
aggregate._version = snapshot.version
from_version = snapshot.version
else:
from_version = 0
# 2. Replay only events after the snapshot
events = self._events.read_stream(stream_id, from_version=from_version)
for event in events:
aggregate._apply(event, is_new=False)
return aggregate
def save(self, stream_id: str, aggregate) -> None:
self._events.append(
stream_id=stream_id,
events=aggregate.pending_events,
expected_version=aggregate.version - len(aggregate.pending_events)
)
# 3. Take a snapshot if the interval threshold is reached
if aggregate.version % self.SNAPSHOT_INTERVAL == 0:
snapshot = Snapshot(
stream_id=stream_id,
version=aggregate.version,
state=aggregate.to_snapshot(),
created_at=datetime.utcnow().isoformat()
)
self._snapshots.save(snapshot)
Aggregate Snapshot Support
class Order:
def __init__(self):
self._version = 0
self._pending_events = []
self.order_id = None
self.status = None
self.items = []
self.total = 0
def to_snapshot(self) -> dict:
return {
"order_id": self.order_id,
"status": self.status,
"items": self.items,
"total": self.total,
}
def restore_from_snapshot(self, state: dict) -> None:
self.order_id = state["order_id"]
self.status = state["status"]
self.items = state["items"]
self.total = state["total"]
@property
def version(self):
return self._version
Background Snapshot Worker
def snapshot_worker(event_store, snapshot_store, aggregate_cls, stream_ids, interval=100):
"""Periodically create snapshots for aggregates that have grown past the threshold."""
for stream_id in stream_ids:
existing = snapshot_store.get(stream_id)
existing_version = existing.version if existing else 0
events = event_store.read_stream(stream_id, from_version=existing_version)
if len(events) < interval:
continue # Not enough new events to justify a new snapshot
# Rebuild aggregate to current state
aggregate = aggregate_cls()
if existing:
aggregate.restore_from_snapshot(existing.state)
aggregate._version = existing.version
for event in events:
aggregate._apply(event, is_new=False)
snapshot_store.save(Snapshot(
stream_id=stream_id,
version=aggregate.version,
state=aggregate.to_snapshot(),
created_at=datetime.utcnow().isoformat()
))
Best Practices
- Treat snapshots as disposable caches. They must never be the source of truth. If a snapshot is missing or corrupt, the system falls back to full event replay.
- Version your snapshot schemas. Include a schema version in the snapshot so that deserialization logic can detect and discard incompatible snapshots.
- Take snapshots asynchronously in a background process rather than in the hot path of command handling. This keeps write latency predictable.
- Set a sensible interval. Snapshot too frequently and you waste storage and writes. Snapshot too rarely and load times degrade. Profile your aggregate to find the right threshold.
- Store snapshots separately from events. A snapshot is a performance optimization, not a domain event. Mixing them into the event stream complicates projections and subscriptions.
Common Pitfalls
- Relying on snapshots for correctness. If your system breaks when snapshots are deleted, you have a bug. Full replay must always produce the correct state.
- Forgetting to invalidate snapshots after aggregate refactoring. If you rename or restructure aggregate fields, old snapshots will deserialize incorrectly. Version the snapshot format and discard stale versions.
- Snapshotting too early in the project. Snapshots add complexity. Only introduce them when profiling shows that event replay is actually a bottleneck.
- Including transient or computed data in snapshots. Only snapshot the minimal state needed to reconstruct the aggregate. Derived values should be recomputed.
- Not testing the full-replay path. Because snapshots bypass most events, bugs in event handlers can hide behind valid snapshots. Regularly test full replay to catch regressions.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add event-sourcing-skills
Related Skills
Cqrs
Implement Command Query Responsibility Segregation to separate write and read models in event-sourced architectures
Event Store
Design and implement event stores for persisting domain events with append-only semantics and optimistic concurrency
Event Versioning
Evolve event schemas safely over time using upcasting, weak schema strategies, and version negotiation
Eventual Consistency
Handle eventual consistency challenges in distributed event-sourced systems with practical patterns and strategies
Projections
Build and maintain read model projections from event streams for optimized query performance
Sagas
Coordinate long-running business processes across aggregates and services using sagas and process managers