Event Store
Design and implement event stores for persisting domain events with append-only semantics and optimistic concurrency
You are an expert in event store design for building event-sourced systems. ## Key Points - `stream_id` — identifies the aggregate instance - `event_type` — the fully qualified event name - `data` — the serialized event payload - `metadata` — correlation IDs, causation IDs, timestamps, user context - `version` — the position of this event within its stream - `global_position` — a monotonically increasing position across all streams - **Keep events small and focused.** Each event should represent one meaningful fact. Avoid stuffing multiple state changes into a single event. - **Include correlation and causation IDs** in event metadata to enable distributed tracing across services. - **Use event type registries** to map event type strings to concrete classes, making deserialization explicit and versionable. - **Partition or archive old events** for streams that grow very large, keeping active query paths fast. - **Prefer JSONB over opaque blobs** in relational stores so you can query event payloads when debugging or building ad-hoc projections. - **Treat the event store as infrastructure, not business logic.** Keep the store generic; domain meaning lives in the events and aggregates.
skilldb get event-sourcing-skills/Event StoreFull skill: 163 linesEvent Store — Event Sourcing
You are an expert in event store design for building event-sourced systems.
Core Philosophy
Overview
An event store is the persistence layer for an event-sourced system. Rather than storing the current state of an entity, it stores the full sequence of domain events that led to that state. The event store is append-only: events are immutable facts that have already happened, and they are never updated or deleted.
Core Concepts
Streams: Events are grouped into streams, typically one per aggregate instance. A stream is identified by a stream ID (e.g., order-abc123) and contains an ordered sequence of events.
Event Envelope: Each persisted event is wrapped in metadata:
stream_id— identifies the aggregate instanceevent_type— the fully qualified event namedata— the serialized event payloadmetadata— correlation IDs, causation IDs, timestamps, user contextversion— the position of this event within its streamglobal_position— a monotonically increasing position across all streams
Optimistic Concurrency: When appending events, the writer specifies the expected version of the stream. If another write has occurred since the stream was read, a concurrency conflict is raised.
Global Ordering: A global position or sequence number enables subscribers to process all events across all streams in a deterministic order.
Implementation Patterns
Relational Event Store Schema
CREATE TABLE events (
global_position BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
stream_version INT NOT NULL,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (stream_id, stream_version)
);
CREATE INDEX idx_events_stream ON events (stream_id, stream_version);
CREATE INDEX idx_events_type ON events (event_type);
Append with Optimistic Concurrency (Python)
class EventStore:
def __init__(self, connection):
self._conn = connection
def append(self, stream_id: str, events: list[DomainEvent], expected_version: int) -> None:
"""Append events to a stream with optimistic concurrency control."""
with self._conn.cursor() as cur:
for i, event in enumerate(events):
version = expected_version + i + 1
try:
cur.execute(
"""
INSERT INTO events (stream_id, stream_version, event_type, data, metadata)
VALUES (%s, %s, %s, %s, %s)
""",
(stream_id, version, event.event_type,
json.dumps(event.data), json.dumps(event.metadata))
)
except UniqueViolation:
raise ConcurrencyError(
f"Stream '{stream_id}' expected version {expected_version}, "
f"but a conflicting write was detected."
)
self._conn.commit()
def read_stream(self, stream_id: str, from_version: int = 0) -> list[dict]:
"""Read all events from a stream starting at a given version."""
with self._conn.cursor() as cur:
cur.execute(
"""
SELECT stream_version, event_type, data, metadata, created_at
FROM events
WHERE stream_id = %s AND stream_version > %s
ORDER BY stream_version ASC
""",
(stream_id, from_version)
)
return cur.fetchall()
def read_all(self, from_position: int = 0, batch_size: int = 500) -> list[dict]:
"""Read all events across all streams from a global position."""
with self._conn.cursor() as cur:
cur.execute(
"""
SELECT global_position, stream_id, event_type, data, metadata
FROM events
WHERE global_position > %s
ORDER BY global_position ASC
LIMIT %s
""",
(from_position, batch_size)
)
return cur.fetchall()
Aggregate Rehydration
class Aggregate:
def __init__(self):
self._version = 0
self._pending_events = []
@classmethod
def load(cls, stream_id: str, event_store: EventStore):
aggregate = cls()
events = event_store.read_stream(stream_id)
for event in events:
aggregate._apply(event, is_new=False)
return aggregate
def _apply(self, event, is_new=True):
handler = getattr(self, f"_on_{event.event_type}", None)
if handler:
handler(event.data)
self._version += 1
if is_new:
self._pending_events.append(event)
Best Practices
- Keep events small and focused. Each event should represent one meaningful fact. Avoid stuffing multiple state changes into a single event.
- Include correlation and causation IDs in event metadata to enable distributed tracing across services.
- Use event type registries to map event type strings to concrete classes, making deserialization explicit and versionable.
- Partition or archive old events for streams that grow very large, keeping active query paths fast.
- Prefer JSONB over opaque blobs in relational stores so you can query event payloads when debugging or building ad-hoc projections.
- Treat the event store as infrastructure, not business logic. Keep the store generic; domain meaning lives in the events and aggregates.
Common Pitfalls
- Skipping optimistic concurrency control. Without version checks, concurrent writes can silently corrupt aggregate state.
- Storing derived data in events. Events should capture the intent and input, not computed results. Derived data belongs in projections.
- Using mutable event schemas. Changing the shape of already-persisted events breaks replay. Use event versioning (upcasting) instead.
- Exposing the event store directly to consumers. Read models and APIs should consume projections, not raw event streams, to avoid coupling consumers to internal domain events.
- Neglecting global ordering. Without a reliable global position, building consistent cross-stream projections becomes extremely difficult.
Anti-Patterns
Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.
Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.
Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.
Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.
Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.
Install this skill directly: skilldb add event-sourcing-skills
Related Skills
Cqrs
Implement Command Query Responsibility Segregation to separate write and read models in event-sourced architectures
Event Versioning
Evolve event schemas safely over time using upcasting, weak schema strategies, and version negotiation
Eventual Consistency
Handle eventual consistency challenges in distributed event-sourced systems with practical patterns and strategies
Projections
Build and maintain read model projections from event streams for optimized query performance
Sagas
Coordinate long-running business processes across aggregates and services using sagas and process managers
Snapshots
Optimize aggregate loading with snapshot patterns to avoid replaying long event streams from the beginning