Skip to main content
npayload is launching soon.
npayloadDocs
Patterns

Event sourcing

Use streams as the source of truth and rebuild state by replaying events

What is event sourcing?

Event sourcing is an architectural pattern where you store every change to your application state as an immutable event, rather than storing only the current state. The current state is derived by replaying the sequence of events from the beginning, or from a snapshot.

This gives you a complete audit trail, the ability to reconstruct state at any point in time, and the freedom to build multiple different views of the same data.

Why npayload streams are a good fit

npayload streams provide the core primitives that event sourcing requires:

  • Ordered: Messages within a partition are delivered in the order they were published.
  • Persistent: Messages are retained and can be replayed from any offset.
  • Replayable: Consumers can seek to any position in the stream to rebuild state.
  • Partitioned: Partition keys ensure all events for a given entity are in the same partition, preserving per-entity ordering.

Combined with compacted channels for materialized views and the event catalogue for schema governance, npayload gives you a complete event sourcing toolkit.

Implementation overview

An event-sourced system on npayload has three parts:

  1. Event store: A channel where domain events are published with partition keys. The stream on this channel is the source of truth.
  2. State rebuilder: A consumer that reads from the stream and builds the current state by applying events in order.
  3. Snapshot store: A compacted channel that holds the latest computed state for fast lookups, so you do not need to replay from the beginning every time.

Example: bank account

This example models a bank account using event sourcing. The events are deposits, withdrawals, and interest accruals. The current balance is computed by replaying all events.

Define event types

events.ts
type AccountEvent =
  | { type: "account.opened"; accountId: string; ownerName: string; initialDeposit: number }
  | { type: "deposit.made"; accountId: string; amount: number; reference: string }
  | { type: "withdrawal.made"; accountId: string; amount: number; reference: string }
  | { type: "interest.accrued"; accountId: string; amount: number; rate: number }
  | { type: "account.closed"; accountId: string; reason: string };

Set up the event store

setup.ts
import { NPayload } from "@npayload/node";

const np = new NPayload({
  instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
  token: process.env.NPAYLOAD_TOKEN,
});

// The event store channel
const accountEvents = await np.channels.create({
  name: "account-events",
  description: "All bank account domain events",
});

// The snapshot store (compacted channel, keyed by accountId)
const accountSnapshots = await np.channels.create({
  name: "account-snapshots",
  description: "Latest account state, compacted by accountId",
  type: "compacted",
});

Publish domain events

When something happens in the system, publish an event. The partition key ensures all events for the same account are ordered together.

commands.ts
async function openAccount(accountId: string, ownerName: string, initialDeposit: number) {
  await np.messages.publish({
    channelId: accountEvents.id,
    partitionKey: accountId,
    payload: {
      type: "account.opened",
      accountId,
      ownerName,
      initialDeposit,
    },
  });
}

async function deposit(accountId: string, amount: number, reference: string) {
  await np.messages.publish({
    channelId: accountEvents.id,
    partitionKey: accountId,
    payload: {
      type: "deposit.made",
      accountId,
      amount,
      reference,
    },
  });
}

async function withdraw(accountId: string, amount: number, reference: string) {
  // Validate against current snapshot before publishing
  const currentState = await getAccountState(accountId);
  if (currentState.balance < amount) {
    throw new Error("Insufficient funds");
  }

  await np.messages.publish({
    channelId: accountEvents.id,
    partitionKey: accountId,
    payload: {
      type: "withdrawal.made",
      accountId,
      amount,
      reference,
    },
  });
}

Rebuild state from events

The state rebuilder reads events from the stream and applies each one to build the current state. This is the core of event sourcing.

rebuilder.ts
interface AccountState {
  accountId: string;
  ownerName: string;
  balance: number;
  isOpen: boolean;
  eventCount: number;
  lastUpdated: string;
}

function applyEvent(state: AccountState | null, event: AccountEvent): AccountState {
  switch (event.type) {
    case "account.opened":
      return {
        accountId: event.accountId,
        ownerName: event.ownerName,
        balance: event.initialDeposit,
        isOpen: true,
        eventCount: 1,
        lastUpdated: new Date().toISOString(),
      };

    case "deposit.made":
      return {
        ...state!,
        balance: state!.balance + event.amount,
        eventCount: state!.eventCount + 1,
        lastUpdated: new Date().toISOString(),
      };

    case "withdrawal.made":
      return {
        ...state!,
        balance: state!.balance - event.amount,
        eventCount: state!.eventCount + 1,
        lastUpdated: new Date().toISOString(),
      };

    case "interest.accrued":
      return {
        ...state!,
        balance: state!.balance + event.amount,
        eventCount: state!.eventCount + 1,
        lastUpdated: new Date().toISOString(),
      };

    case "account.closed":
      return {
        ...state!,
        isOpen: false,
        eventCount: state!.eventCount + 1,
        lastUpdated: new Date().toISOString(),
      };
  }
}

// Rebuild a single account's state from all its events
async function rebuildAccountState(accountId: string): Promise<AccountState> {
  const events = await np.streams.read({
    channelId: accountEvents.id,
    partitionKey: accountId,
    startOffset: "earliest",
  });

  let state: AccountState | null = null;
  for (const msg of events.messages) {
    state = applyEvent(state, msg.payload as AccountEvent);
  }

  if (!state) {
    throw new Error(`Account ${accountId} not found`);
  }

  return state;
}

Snapshots with compacted channels

Replaying all events from the beginning is fine for accounts with a few hundred events, but becomes slow as the event count grows. Compacted channels solve this by storing the latest computed state, keyed by entity ID.

snapshots.ts
// After rebuilding state, save a snapshot
async function saveSnapshot(state: AccountState) {
  await np.messages.publish({
    channelId: accountSnapshots.id,
    key: state.accountId, // Compacted channels use key for deduplication
    payload: state,
  });
}

// Read from snapshot first, then replay only new events
async function getAccountState(accountId: string): Promise<AccountState> {
  // Try the snapshot first
  const snapshot = await np.channels.getCompacted({
    channelId: accountSnapshots.id,
    key: accountId,
  });

  if (snapshot) {
    const state = snapshot.payload as AccountState;

    // Replay events after the snapshot
    const newEvents = await np.streams.read({
      channelId: accountEvents.id,
      partitionKey: accountId,
      startOffset: "after",
      afterOffset: snapshot.offset,
    });

    let currentState = state;
    for (const msg of newEvents.messages) {
      currentState = applyEvent(currentState, msg.payload as AccountEvent);
    }

    // Update the snapshot if new events were applied
    if (newEvents.messages.length > 0) {
      await saveSnapshot(currentState);
    }

    return currentState;
  }

  // No snapshot, rebuild from scratch
  const state = await rebuildAccountState(accountId);
  await saveSnapshot(state);
  return state;
}

Schema evolution with the event catalogue

As your system evolves, event schemas change. The event catalogue gives you versioned schemas so consumers know how to interpret events from different time periods.

catalogue.ts
// Register event schemas in the catalogue
await np.eventCatalogue.register({
  eventType: "deposit.made",
  version: 1,
  schema: {
    type: "object",
    properties: {
      type: { type: "string", const: "deposit.made" },
      accountId: { type: "string" },
      amount: { type: "number" },
      reference: { type: "string" },
    },
    required: ["type", "accountId", "amount", "reference"],
  },
});

// Later, add a new field (backward compatible)
await np.eventCatalogue.register({
  eventType: "deposit.made",
  version: 2,
  schema: {
    type: "object",
    properties: {
      type: { type: "string", const: "deposit.made" },
      accountId: { type: "string" },
      amount: { type: "number" },
      reference: { type: "string" },
      currency: { type: "string", default: "USD" },
    },
    required: ["type", "accountId", "amount", "reference"],
  },
});

Consumers can check the event version and handle both old and new formats:

versioned-consumer.ts
function applyDeposit(state: AccountState, event: Record<string, unknown>): AccountState {
  const amount = event.amount as number;
  const currency = (event.currency as string) || "USD"; // Default for v1 events

  return {
    ...state,
    balance: state.balance + (currency === "USD" ? amount : convertToUsd(amount, currency)),
    eventCount: state.eventCount + 1,
    lastUpdated: new Date().toISOString(),
  };
}

Projections

One of the most powerful aspects of event sourcing is that you can build multiple read models from the same event stream. Each projection consumes the same events but produces a different view.

projections.ts
// Projection 1: Account balance (the primary view)
async function balanceProjection(event: AccountEvent) {
  const state = await getAccountState(event.accountId);
  await saveSnapshot(state);
}

// Projection 2: Transaction history (a list of all transactions)
async function transactionHistoryProjection(event: AccountEvent) {
  if (event.type === "deposit.made" || event.type === "withdrawal.made") {
    await np.messages.publish({
      channelId: transactionHistory.id,
      key: `${event.accountId}:${Date.now()}`,
      payload: {
        accountId: event.accountId,
        type: event.type === "deposit.made" ? "credit" : "debit",
        amount: event.amount,
        reference: event.reference,
        timestamp: new Date().toISOString(),
      },
    });
  }
}

// Projection 3: Daily account summary (aggregated stats)
async function dailySummaryProjection(event: AccountEvent) {
  const today = new Date().toISOString().split("T")[0];
  const key = `${event.accountId}:${today}`;

  const existing = await np.channels.getCompacted({
    channelId: dailySummaries.id,
    key,
  });

  const summary = existing?.payload ?? {
    accountId: event.accountId,
    date: today,
    deposits: 0,
    withdrawals: 0,
    transactions: 0,
  };

  if (event.type === "deposit.made") {
    summary.deposits += event.amount;
  } else if (event.type === "withdrawal.made") {
    summary.withdrawals += event.amount;
  }
  summary.transactions += 1;

  await np.messages.publish({
    channelId: dailySummaries.id,
    key,
    payload: summary,
  });
}

Performance considerations

Batch processing

When replaying large streams, read in batches rather than one message at a time:

batch-replay.ts
async function replayInBatches(channelId: string, batchSize: number = 100) {
  let offset = "earliest";
  let hasMore = true;

  while (hasMore) {
    const batch = await np.streams.read({
      channelId,
      startOffset: offset,
      limit: batchSize,
    });

    for (const msg of batch.messages) {
      await processEvent(msg.payload as AccountEvent);
    }

    hasMore = batch.messages.length === batchSize;
    if (hasMore) {
      offset = batch.messages[batch.messages.length - 1].offset;
    }
  }
}

Snapshot frequency

Take snapshots periodically, not after every event. A good rule of thumb is to snapshot every 50 to 100 events, or when the consumer starts up and finds it has more than 100 events to replay.

Partition key design

Choose partition keys that distribute load evenly. For the bank account example, the account ID is a natural partition key. Avoid partition keys that concentrate too many events in a single partition (such as a date or a popular category).

Anti-patterns

Events that are too granular

Publishing an event for every field change creates excessive noise. Group related changes into meaningful domain events:

// Bad: too granular
await publish({ type: "name.changed", name: "Alice" });
await publish({ type: "email.changed", email: "alice@example.com" });

// Good: meaningful domain event
await publish({ type: "profile.updated", name: "Alice", email: "alice@example.com" });

Missing event types

If your event stream does not capture every meaningful state change, you cannot rebuild state accurately. Audit your domain model to ensure all mutations are captured as events.

No snapshots

Without snapshots, state reconstruction gets slower over time as the event count grows. Always implement a snapshot strategy for entities that accumulate many events.

Mutable events

Events must be immutable. Never update or delete events in the stream. If you need to correct a mistake, publish a new compensating event (such as "adjustment.made") that reverses the error.

Next steps

  • Streams for details on stream consumption and offset management
  • Event catalogue for schema governance and versioning
  • CQRS pattern for separating read and write paths using event sourcing

Was this page helpful?

On this page