Command Palette

Search for a command to run...

Level 3 · 30 min

Event Sourcing

Event Sourcing stores the state of a system as an ordered sequence of immutable domain events rather than the current state. The current state is derived by replaying events. This unlocks temporal queries, full audit history, and decoupled projections — at the cost of complexity and eventual consistency.

Event Store and Append-Only

The event store is an append-only log of domain events. Events are immutable facts in the past tense: OrderPlaced, PaymentReceived, OrderShipped. You never update or delete events — only append new ones. The current state of an aggregate is derived by replaying its events in order, giving you a complete history by design. Event store implementations: EventStoreDB (purpose-built, native stream model with projections engine, $by_category system stream for aggregate-type queries), PostgreSQL (events table: `stream_id UUID, version BIGINT, event_type VARCHAR, payload JSONB, created_at TIMESTAMPTZ, PRIMARY KEY(stream_id, version)` — version is the per-stream sequence number), Kafka log-compacted topics (works but lacks strong optimistic concurrency without extra machinery). Optimistic concurrency control is enforced via the expected version: when appending event N+1, assert that current stream version is N. In PostgreSQL: `INSERT INTO events WHERE stream_id=$1 AND version=$2` — if the INSERT fails (version conflict), another writer appended first; retry the command. In EventStoreDB: `AppendToStream(streamId, expectedRevision)` throws WrongExpectedVersionException on conflict. Without this check, two concurrent commands on the same aggregate could both succeed, producing inconsistent state — e.g., two simultaneous withdrawals from the same account both passing the balance check.

Event Replay and Projections

A fintech company's event-sourced ledger had 5 years of data — 220M events for 8M accounts. Regulatory audit required reconstructing account state as of 18 months ago. With a traditional CRUD system, historical state reconstruction would require either point-in-time backups (costly, inflexible) or a full audit log table (often incomplete in practice). With event sourcing: filter events by `created_at < audit_date`, replay per-account, done — exact state at any point in time with zero additional infrastructure. The entire audit passed in 11 hours of batch replay. Event replay rebuilds aggregate state by applying events sequentially to an empty state object. Projections (read models) are separate from the event store — they subscribe to the event stream and maintain denormalized views optimized for specific queries. A single stream of OrderPlaced/OrderShipped events can feed: OrderSummaryProjection (PostgreSQL table), CustomerOrderHistoryProjection (Redis hash), and OrderAnalyticsProjection (ClickHouse time-series) — independently, without coupling. Each projection tracks its own `last_processed_position` checkpoint. On restart, it resumes from that checkpoint, replaying only missed events. Projections are disposable — drop the table, replay from event 0, get a fresh view with any schema you want. Key insight from Designing Data-Intensive Applications: "Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs. From an application point of view it is more meaningful to record the user's actions as immutable events, rather than recording the effect of those actions on a mutable database." Kleppmann distinguishes event sourcing from CDC: CDC extracts changes at the storage level (WAL parsing) without application awareness, while event sourcing captures business intent at the domain level — "student cancelled course enrollment" rather than "one row deleted from enrollments table." This semantic richness is what enables new features to be derived from historical events without any data migration: an event recorded for one purpose can be reinterpreted years later to drive a projection the original developers never anticipated.

Snapshots and Event Versioning

Snapshots capture the aggregate state at a point in time, stored alongside events. On load: fetch latest snapshot for `aggregate_id`, then replay only events with `version > snapshot.version`. Store snapshots every N events (e.g., every 50 or 100). Without snapshots, an aggregate with 10,000 events takes 10,000 apply() calls on every load — at scale this becomes the bottleneck. Snapshot storage: same event store (as a special event type), or a separate snapshot table `(aggregate_id, version, state JSONB)`. Event versioning is a production reality: once an event is in the store, it's permanent. Strategies: (1) Weak schema (JSON, Avro with defaults) — add optional fields; old consumers ignore unknown fields, new consumers use defaults for missing fields. (2) Upcasting — a chain of transformers applied at read time: `OrderPlacedV1 → OrderPlacedV2Upcast → OrderPlacedV2`. The upcaster is pure code, version-controlled, tested. (3) Event name versioning — `OrderPlaced`, `OrderPlacedV2` as separate event types; the aggregate apply() handles both. Never rename or remove fields from existing event types without an upcast strategy — you'll break replay of historical events.

Key Takeaways

  • Events are immutable facts — you never update them. The append-only constraint is what enables time travel and audit logs.
  • Projections are rebuilt from events — this makes schema migrations trivial (rebuild from scratch with new schema) at the cost of rebuild time.
  • Snapshots are an optimization, not a requirement — add them when replay performance becomes a problem, not prematurely.

Code example

// Event-sourced aggregate with optimistic concurrency
class OrderAggregate {
  private String id;
  private OrderStatus status;
  private int version = -1;  // -1 = new (no events)
  private List<DomainEvent> uncommittedEvents = new ArrayList<>();

  static OrderAggregate rehydrate(List<DomainEvent> history) {
    var order = new OrderAggregate();
    history.forEach(order::apply);  // apply mutates state only
    return order;
  }

  void placeOrder(PlaceOrderCommand cmd) {
    if (status != null) throw new IllegalStateException("Already placed");
    raise(new OrderPlaced(cmd.getOrderId(), cmd.getItems(), cmd.getCustomerId()));
  }

  private void raise(DomainEvent event) {
    apply(event);                    // mutate state immediately
    uncommittedEvents.add(event);    // queue for persistence
  }

  private void apply(DomainEvent event) {
    version++;                       // track version for optimistic concurrency
    if (event instanceof OrderPlaced e) {
      this.id = e.getOrderId();
      this.status = OrderStatus.PLACED;
    } else if (event instanceof OrderShipped e) {
      this.status = OrderStatus.SHIPPED;
    }
  }

  int getVersion() { return version; }  // caller uses this as expectedVersion
}