Event sourcing
Use streams as the source of truth and rebuild state by replaying events
What is event sourcing?
Event sourcing is an architectural pattern where you store every change to your application state as an immutable event, rather than storing only the current state. The current state is derived by replaying the sequence of events from the beginning, or from a snapshot.
This gives you a complete audit trail, the ability to reconstruct state at any point in time, and the freedom to build multiple different views of the same data.
Why npayload streams are a good fit
npayload streams provide the core primitives that event sourcing requires:
- Ordered: Messages within a partition are delivered in the order they were published.
- Persistent: Messages are retained and can be replayed from any offset.
- Replayable: Consumers can seek to any position in the stream to rebuild state.
- Partitioned: Partition keys ensure all events for a given entity are in the same partition, preserving per-entity ordering.
Combined with compacted channels for materialized views and the event catalogue for schema governance, npayload gives you a complete event sourcing toolkit.
Implementation overview
An event-sourced system on npayload has three parts:
- Event store: A channel where domain events are published with partition keys. The stream on this channel is the source of truth.
- State rebuilder: A consumer that reads from the stream and builds the current state by applying events in order.
- Snapshot store: A compacted channel that holds the latest computed state for fast lookups, so you do not need to replay from the beginning every time.
Example: bank account
This example models a bank account using event sourcing. The events are deposits, withdrawals, and interest accruals. The current balance is computed by replaying all events.
Define event types
type AccountEvent =
| { type: "account.opened"; accountId: string; ownerName: string; initialDeposit: number }
| { type: "deposit.made"; accountId: string; amount: number; reference: string }
| { type: "withdrawal.made"; accountId: string; amount: number; reference: string }
| { type: "interest.accrued"; accountId: string; amount: number; rate: number }
| { type: "account.closed"; accountId: string; reason: string };Set up the event store
import { NPayload } from "@npayload/node";
const np = new NPayload({
instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
token: process.env.NPAYLOAD_TOKEN,
});
// The event store channel
const accountEvents = await np.channels.create({
name: "account-events",
description: "All bank account domain events",
});
// The snapshot store (compacted channel, keyed by accountId)
const accountSnapshots = await np.channels.create({
name: "account-snapshots",
description: "Latest account state, compacted by accountId",
type: "compacted",
});Publish domain events
When something happens in the system, publish an event. The partition key ensures all events for the same account are ordered together.
async function openAccount(accountId: string, ownerName: string, initialDeposit: number) {
await np.messages.publish({
channelId: accountEvents.id,
partitionKey: accountId,
payload: {
type: "account.opened",
accountId,
ownerName,
initialDeposit,
},
});
}
async function deposit(accountId: string, amount: number, reference: string) {
await np.messages.publish({
channelId: accountEvents.id,
partitionKey: accountId,
payload: {
type: "deposit.made",
accountId,
amount,
reference,
},
});
}
async function withdraw(accountId: string, amount: number, reference: string) {
// Validate against current snapshot before publishing
const currentState = await getAccountState(accountId);
if (currentState.balance < amount) {
throw new Error("Insufficient funds");
}
await np.messages.publish({
channelId: accountEvents.id,
partitionKey: accountId,
payload: {
type: "withdrawal.made",
accountId,
amount,
reference,
},
});
}Rebuild state from events
The state rebuilder reads events from the stream and applies each one to build the current state. This is the core of event sourcing.
interface AccountState {
accountId: string;
ownerName: string;
balance: number;
isOpen: boolean;
eventCount: number;
lastUpdated: string;
}
function applyEvent(state: AccountState | null, event: AccountEvent): AccountState {
switch (event.type) {
case "account.opened":
return {
accountId: event.accountId,
ownerName: event.ownerName,
balance: event.initialDeposit,
isOpen: true,
eventCount: 1,
lastUpdated: new Date().toISOString(),
};
case "deposit.made":
return {
...state!,
balance: state!.balance + event.amount,
eventCount: state!.eventCount + 1,
lastUpdated: new Date().toISOString(),
};
case "withdrawal.made":
return {
...state!,
balance: state!.balance - event.amount,
eventCount: state!.eventCount + 1,
lastUpdated: new Date().toISOString(),
};
case "interest.accrued":
return {
...state!,
balance: state!.balance + event.amount,
eventCount: state!.eventCount + 1,
lastUpdated: new Date().toISOString(),
};
case "account.closed":
return {
...state!,
isOpen: false,
eventCount: state!.eventCount + 1,
lastUpdated: new Date().toISOString(),
};
}
}
// Rebuild a single account's state from all its events
async function rebuildAccountState(accountId: string): Promise<AccountState> {
const events = await np.streams.read({
channelId: accountEvents.id,
partitionKey: accountId,
startOffset: "earliest",
});
let state: AccountState | null = null;
for (const msg of events.messages) {
state = applyEvent(state, msg.payload as AccountEvent);
}
if (!state) {
throw new Error(`Account ${accountId} not found`);
}
return state;
}Snapshots with compacted channels
Replaying all events from the beginning is fine for accounts with a few hundred events, but becomes slow as the event count grows. Compacted channels solve this by storing the latest computed state, keyed by entity ID.
// After rebuilding state, save a snapshot
async function saveSnapshot(state: AccountState) {
await np.messages.publish({
channelId: accountSnapshots.id,
key: state.accountId, // Compacted channels use key for deduplication
payload: state,
});
}
// Read from snapshot first, then replay only new events
async function getAccountState(accountId: string): Promise<AccountState> {
// Try the snapshot first
const snapshot = await np.channels.getCompacted({
channelId: accountSnapshots.id,
key: accountId,
});
if (snapshot) {
const state = snapshot.payload as AccountState;
// Replay events after the snapshot
const newEvents = await np.streams.read({
channelId: accountEvents.id,
partitionKey: accountId,
startOffset: "after",
afterOffset: snapshot.offset,
});
let currentState = state;
for (const msg of newEvents.messages) {
currentState = applyEvent(currentState, msg.payload as AccountEvent);
}
// Update the snapshot if new events were applied
if (newEvents.messages.length > 0) {
await saveSnapshot(currentState);
}
return currentState;
}
// No snapshot, rebuild from scratch
const state = await rebuildAccountState(accountId);
await saveSnapshot(state);
return state;
}Schema evolution with the event catalogue
As your system evolves, event schemas change. The event catalogue gives you versioned schemas so consumers know how to interpret events from different time periods.
// Register event schemas in the catalogue
await np.eventCatalogue.register({
eventType: "deposit.made",
version: 1,
schema: {
type: "object",
properties: {
type: { type: "string", const: "deposit.made" },
accountId: { type: "string" },
amount: { type: "number" },
reference: { type: "string" },
},
required: ["type", "accountId", "amount", "reference"],
},
});
// Later, add a new field (backward compatible)
await np.eventCatalogue.register({
eventType: "deposit.made",
version: 2,
schema: {
type: "object",
properties: {
type: { type: "string", const: "deposit.made" },
accountId: { type: "string" },
amount: { type: "number" },
reference: { type: "string" },
currency: { type: "string", default: "USD" },
},
required: ["type", "accountId", "amount", "reference"],
},
});Consumers can check the event version and handle both old and new formats:
function applyDeposit(state: AccountState, event: Record<string, unknown>): AccountState {
const amount = event.amount as number;
const currency = (event.currency as string) || "USD"; // Default for v1 events
return {
...state,
balance: state.balance + (currency === "USD" ? amount : convertToUsd(amount, currency)),
eventCount: state.eventCount + 1,
lastUpdated: new Date().toISOString(),
};
}Projections
One of the most powerful aspects of event sourcing is that you can build multiple read models from the same event stream. Each projection consumes the same events but produces a different view.
// Projection 1: Account balance (the primary view)
async function balanceProjection(event: AccountEvent) {
const state = await getAccountState(event.accountId);
await saveSnapshot(state);
}
// Projection 2: Transaction history (a list of all transactions)
async function transactionHistoryProjection(event: AccountEvent) {
if (event.type === "deposit.made" || event.type === "withdrawal.made") {
await np.messages.publish({
channelId: transactionHistory.id,
key: `${event.accountId}:${Date.now()}`,
payload: {
accountId: event.accountId,
type: event.type === "deposit.made" ? "credit" : "debit",
amount: event.amount,
reference: event.reference,
timestamp: new Date().toISOString(),
},
});
}
}
// Projection 3: Daily account summary (aggregated stats)
async function dailySummaryProjection(event: AccountEvent) {
const today = new Date().toISOString().split("T")[0];
const key = `${event.accountId}:${today}`;
const existing = await np.channels.getCompacted({
channelId: dailySummaries.id,
key,
});
const summary = existing?.payload ?? {
accountId: event.accountId,
date: today,
deposits: 0,
withdrawals: 0,
transactions: 0,
};
if (event.type === "deposit.made") {
summary.deposits += event.amount;
} else if (event.type === "withdrawal.made") {
summary.withdrawals += event.amount;
}
summary.transactions += 1;
await np.messages.publish({
channelId: dailySummaries.id,
key,
payload: summary,
});
}Performance considerations
Batch processing
When replaying large streams, read in batches rather than one message at a time:
async function replayInBatches(channelId: string, batchSize: number = 100) {
let offset = "earliest";
let hasMore = true;
while (hasMore) {
const batch = await np.streams.read({
channelId,
startOffset: offset,
limit: batchSize,
});
for (const msg of batch.messages) {
await processEvent(msg.payload as AccountEvent);
}
hasMore = batch.messages.length === batchSize;
if (hasMore) {
offset = batch.messages[batch.messages.length - 1].offset;
}
}
}Snapshot frequency
Take snapshots periodically, not after every event. A good rule of thumb is to snapshot every 50 to 100 events, or when the consumer starts up and finds it has more than 100 events to replay.
Partition key design
Choose partition keys that distribute load evenly. For the bank account example, the account ID is a natural partition key. Avoid partition keys that concentrate too many events in a single partition (such as a date or a popular category).
Anti-patterns
Events that are too granular
Publishing an event for every field change creates excessive noise. Group related changes into meaningful domain events:
// Bad: too granular
await publish({ type: "name.changed", name: "Alice" });
await publish({ type: "email.changed", email: "alice@example.com" });
// Good: meaningful domain event
await publish({ type: "profile.updated", name: "Alice", email: "alice@example.com" });Missing event types
If your event stream does not capture every meaningful state change, you cannot rebuild state accurately. Audit your domain model to ensure all mutations are captured as events.
No snapshots
Without snapshots, state reconstruction gets slower over time as the event count grows. Always implement a snapshot strategy for entities that accumulate many events.
Mutable events
Events must be immutable. Never update or delete events in the stream. If you need to correct a mistake, publish a new compensating event (such as "adjustment.made") that reverses the error.
Next steps
- Streams for details on stream consumption and offset management
- Event catalogue for schema governance and versioning
- CQRS pattern for separating read and write paths using event sourcing
Was this page helpful?