Skip to main content
Technology & EngineeringEvent Sourcing163 lines

Event Store

Design and implement event stores for persisting domain events with append-only semantics and optimistic concurrency

Quick Summary18 lines
You are an expert in event store design for building event-sourced systems.

## Key Points

- `stream_id` — identifies the aggregate instance
- `event_type` — the fully qualified event name
- `data` — the serialized event payload
- `metadata` — correlation IDs, causation IDs, timestamps, user context
- `version` — the position of this event within its stream
- `global_position` — a monotonically increasing position across all streams
- **Keep events small and focused.** Each event should represent one meaningful fact. Avoid stuffing multiple state changes into a single event.
- **Include correlation and causation IDs** in event metadata to enable distributed tracing across services.
- **Use event type registries** to map event type strings to concrete classes, making deserialization explicit and versionable.
- **Partition or archive old events** for streams that grow very large, keeping active query paths fast.
- **Prefer JSONB over opaque blobs** in relational stores so you can query event payloads when debugging or building ad-hoc projections.
- **Treat the event store as infrastructure, not business logic.** Keep the store generic; domain meaning lives in the events and aggregates.
skilldb get event-sourcing-skills/Event StoreFull skill: 163 lines
Paste into your CLAUDE.md or agent config

Event Store — Event Sourcing

You are an expert in event store design for building event-sourced systems.

Core Philosophy

Overview

An event store is the persistence layer for an event-sourced system. Rather than storing the current state of an entity, it stores the full sequence of domain events that led to that state. The event store is append-only: events are immutable facts that have already happened, and they are never updated or deleted.

Core Concepts

Streams: Events are grouped into streams, typically one per aggregate instance. A stream is identified by a stream ID (e.g., order-abc123) and contains an ordered sequence of events.

Event Envelope: Each persisted event is wrapped in metadata:

  • stream_id — identifies the aggregate instance
  • event_type — the fully qualified event name
  • data — the serialized event payload
  • metadata — correlation IDs, causation IDs, timestamps, user context
  • version — the position of this event within its stream
  • global_position — a monotonically increasing position across all streams

Optimistic Concurrency: When appending events, the writer specifies the expected version of the stream. If another write has occurred since the stream was read, a concurrency conflict is raised.

Global Ordering: A global position or sequence number enables subscribers to process all events across all streams in a deterministic order.

Implementation Patterns

Relational Event Store Schema

CREATE TABLE events (
    global_position  BIGSERIAL PRIMARY KEY,
    stream_id        TEXT NOT NULL,
    stream_version   INT NOT NULL,
    event_type       TEXT NOT NULL,
    data             JSONB NOT NULL,
    metadata         JSONB NOT NULL DEFAULT '{}',
    created_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
    UNIQUE (stream_id, stream_version)
);

CREATE INDEX idx_events_stream ON events (stream_id, stream_version);
CREATE INDEX idx_events_type ON events (event_type);

Append with Optimistic Concurrency (Python)

class EventStore:
    def __init__(self, connection):
        self._conn = connection

    def append(self, stream_id: str, events: list[DomainEvent], expected_version: int) -> None:
        """Append events to a stream with optimistic concurrency control."""
        with self._conn.cursor() as cur:
            for i, event in enumerate(events):
                version = expected_version + i + 1
                try:
                    cur.execute(
                        """
                        INSERT INTO events (stream_id, stream_version, event_type, data, metadata)
                        VALUES (%s, %s, %s, %s, %s)
                        """,
                        (stream_id, version, event.event_type,
                         json.dumps(event.data), json.dumps(event.metadata))
                    )
                except UniqueViolation:
                    raise ConcurrencyError(
                        f"Stream '{stream_id}' expected version {expected_version}, "
                        f"but a conflicting write was detected."
                    )
            self._conn.commit()

    def read_stream(self, stream_id: str, from_version: int = 0) -> list[dict]:
        """Read all events from a stream starting at a given version."""
        with self._conn.cursor() as cur:
            cur.execute(
                """
                SELECT stream_version, event_type, data, metadata, created_at
                FROM events
                WHERE stream_id = %s AND stream_version > %s
                ORDER BY stream_version ASC
                """,
                (stream_id, from_version)
            )
            return cur.fetchall()

    def read_all(self, from_position: int = 0, batch_size: int = 500) -> list[dict]:
        """Read all events across all streams from a global position."""
        with self._conn.cursor() as cur:
            cur.execute(
                """
                SELECT global_position, stream_id, event_type, data, metadata
                FROM events
                WHERE global_position > %s
                ORDER BY global_position ASC
                LIMIT %s
                """,
                (from_position, batch_size)
            )
            return cur.fetchall()

Aggregate Rehydration

class Aggregate:
    def __init__(self):
        self._version = 0
        self._pending_events = []

    @classmethod
    def load(cls, stream_id: str, event_store: EventStore):
        aggregate = cls()
        events = event_store.read_stream(stream_id)
        for event in events:
            aggregate._apply(event, is_new=False)
        return aggregate

    def _apply(self, event, is_new=True):
        handler = getattr(self, f"_on_{event.event_type}", None)
        if handler:
            handler(event.data)
        self._version += 1
        if is_new:
            self._pending_events.append(event)

Best Practices

  • Keep events small and focused. Each event should represent one meaningful fact. Avoid stuffing multiple state changes into a single event.
  • Include correlation and causation IDs in event metadata to enable distributed tracing across services.
  • Use event type registries to map event type strings to concrete classes, making deserialization explicit and versionable.
  • Partition or archive old events for streams that grow very large, keeping active query paths fast.
  • Prefer JSONB over opaque blobs in relational stores so you can query event payloads when debugging or building ad-hoc projections.
  • Treat the event store as infrastructure, not business logic. Keep the store generic; domain meaning lives in the events and aggregates.

Common Pitfalls

  • Skipping optimistic concurrency control. Without version checks, concurrent writes can silently corrupt aggregate state.
  • Storing derived data in events. Events should capture the intent and input, not computed results. Derived data belongs in projections.
  • Using mutable event schemas. Changing the shape of already-persisted events breaks replay. Use event versioning (upcasting) instead.
  • Exposing the event store directly to consumers. Read models and APIs should consume projections, not raw event streams, to avoid coupling consumers to internal domain events.
  • Neglecting global ordering. Without a reliable global position, building consistent cross-stream projections becomes extremely difficult.

Anti-Patterns

Over-engineering for hypothetical scale. Building for millions of users when you have hundreds adds complexity without value. Solve today's problems first.

Ignoring the existing ecosystem. Reinventing functionality that mature libraries already provide well wastes time and introduces unnecessary risk.

Premature abstraction. Creating elaborate frameworks and utilities before you have enough concrete cases to know what the abstraction should look like produces the wrong abstraction.

Neglecting error handling at boundaries. Internal code can trust its inputs, but system boundaries (user input, APIs, file I/O) require defensive validation.

Skipping documentation for obvious code. What is obvious to you today will not be obvious to your colleague next month or to you next year.

Install this skill directly: skilldb add event-sourcing-skills

Get CLI access →