Skip to main content
npayload is launching soon.
npayloadDocs
Patterns

CQRS pattern

Separate read and write paths using channels, streams, and compacted state

What is CQRS?

CQRS (Command Query Responsibility Segregation) is an architectural pattern that separates the write model from the read model. Instead of using a single data model for both reading and writing, you maintain separate models optimized for each purpose.

The write side handles commands and publishes events. The read side consumes those events and builds denormalized, query-optimized views. This separation lets you scale reads and writes independently and tailor each model to its specific access patterns.

How npayload enables CQRS

npayload provides the primitives for each part of the CQRS pipeline:

Componentnpayload primitivePurpose
Command handlingChannels with message groupsOrdered command processing per entity
Event publishingChannels with partition keysDurable, ordered event storage
Event consumptionStreamsReplay events to build read models
Read model storageCompacted channelsKey-value store with latest-value-wins semantics
Schema governanceEvent catalogueVersioned event schemas

Implementation example: user profile service

This example shows a user profile service where the write side processes profile update commands and the read side builds a denormalized view that combines data from multiple sources.

Write side

The write side receives commands, validates them, and publishes domain events. Commands are published to a channel with message groups to ensure per-user ordering.

write-side.ts
import { NPayload } from "@npayload/node";

const np = new NPayload({
  instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
  token: process.env.NPAYLOAD_TOKEN,
});

// Command channel (write side input)
const profileCommands = await np.channels.create({
  name: "profile-commands",
  description: "Commands for updating user profiles",
});

// Event channel (write side output)
const profileEvents = await np.channels.create({
  name: "profile-events",
  description: "Domain events for user profile changes",
});

// Handle a profile update command
async function handleUpdateProfile(command: {
  userId: string;
  updates: {
    displayName?: string;
    bio?: string;
    avatarUrl?: string;
    preferences?: Record<string, string>;
  };
}) {
  // Validate the command
  if (command.updates.displayName && command.updates.displayName.length > 100) {
    throw new Error("Display name must be 100 characters or fewer");
  }

  if (command.updates.bio && command.updates.bio.length > 500) {
    throw new Error("Bio must be 500 characters or fewer");
  }

  // Publish the domain event
  await np.messages.publish({
    channelId: profileEvents.id,
    partitionKey: command.userId,
    payload: {
      type: "profile.updated",
      userId: command.userId,
      updates: command.updates,
      timestamp: new Date().toISOString(),
    },
  });
}

// Handle a profile creation command
async function handleCreateProfile(command: {
  userId: string;
  displayName: string;
  email: string;
}) {
  await np.messages.publish({
    channelId: profileEvents.id,
    partitionKey: command.userId,
    payload: {
      type: "profile.created",
      userId: command.userId,
      displayName: command.displayName,
      email: command.email,
      timestamp: new Date().toISOString(),
    },
  });
}

Read side

The read side consumes profile events and builds a denormalized view stored in a compacted channel. This view is optimized for fast lookups by user ID.

read-side.ts
// Compacted channel for the read model
const profileReadModel = await np.channels.create({
  name: "profile-read-model",
  description: "Denormalized profile view, keyed by userId",
  type: "compacted",
});

// Also consume activity events from another channel
const activityEvents = await np.channels.create({
  name: "activity-events",
  description: "User activity events from the activity service",
});

interface ProfileView {
  userId: string;
  displayName: string;
  email: string;
  bio: string;
  avatarUrl: string;
  preferences: Record<string, string>;
  lastActive: string;
  postCount: number;
  followerCount: number;
  updatedAt: string;
}

// Process profile events into the read model
async function processProfileEvent(event: {
  type: string;
  userId: string;
  [key: string]: unknown;
}) {
  // Get the current read model state
  const existing = await np.channels.getCompacted({
    channelId: profileReadModel.id,
    key: event.userId,
  });

  let view: ProfileView;

  if (event.type === "profile.created") {
    view = {
      userId: event.userId,
      displayName: event.displayName as string,
      email: event.email as string,
      bio: "",
      avatarUrl: "",
      preferences: {},
      lastActive: event.timestamp as string,
      postCount: 0,
      followerCount: 0,
      updatedAt: event.timestamp as string,
    };
  } else if (event.type === "profile.updated") {
    const current = existing?.payload as ProfileView;
    const updates = event.updates as Partial<ProfileView>;
    view = {
      ...current,
      ...updates,
      updatedAt: event.timestamp as string,
    };
  } else {
    return;
  }

  // Write to the compacted channel (latest value wins)
  await np.messages.publish({
    channelId: profileReadModel.id,
    key: event.userId,
    payload: view,
  });
}

// Process activity events to enrich the profile view
async function processActivityEvent(event: {
  type: string;
  userId: string;
  timestamp: string;
}) {
  const existing = await np.channels.getCompacted({
    channelId: profileReadModel.id,
    key: event.userId,
  });

  if (!existing) return;

  const view = existing.payload as ProfileView;

  if (event.type === "post.created") {
    view.postCount += 1;
  } else if (event.type === "follower.added") {
    view.followerCount += 1;
  } else if (event.type === "follower.removed") {
    view.followerCount = Math.max(0, view.followerCount - 1);
  }

  view.lastActive = event.timestamp;

  await np.messages.publish({
    channelId: profileReadModel.id,
    key: event.userId,
    payload: view,
  });
}

Query side

Reading from the compacted channel gives you instant lookups without replaying events:

query.ts
// Fast lookup by userId
async function getProfile(userId: string): Promise<ProfileView | null> {
  const result = await np.channels.getCompacted({
    channelId: profileReadModel.id,
    key: userId,
  });

  return result ? (result.payload as ProfileView) : null;
}

// Example API endpoint
async function handleGetProfile(req: Request): Promise<Response> {
  const userId = new URL(req.url).searchParams.get("userId");
  if (!userId) {
    return new Response("userId required", { status: 400 });
  }

  const profile = await getProfile(userId);
  if (!profile) {
    return new Response("Not found", { status: 404 });
  }

  return new Response(JSON.stringify(profile), {
    headers: { "Content-Type": "application/json" },
  });
}

Eventual consistency

CQRS introduces eventual consistency between the write side and the read side. After a command is processed and an event is published, there is a brief window before the read model is updated.

What this means in practice

  • A user updates their profile and immediately refreshes the page. For a brief moment, they may see the old data.
  • Two users viewing the same profile may see slightly different data if one read model consumer is behind the other.

How to handle it

Optimistic updates on the client: After a successful write, update the UI immediately with the expected new state while the read model catches up.

optimistic.ts
async function updateProfileWithOptimism(userId: string, updates: Partial<ProfileView>) {
  // Send the command
  await handleUpdateProfile({ userId, updates });

  // Return the expected new state immediately
  const current = await getProfile(userId);
  return { ...current, ...updates };
}

Read-your-writes consistency: For critical flows, read from the event stream directly after writing to confirm the event was published, rather than reading from the read model.

Stale data indicators: In dashboards and admin tools, show the last-updated timestamp from the read model so users know how fresh the data is.

Multiple read models

A key advantage of CQRS is building different read models from the same events. Each model is optimized for a specific query pattern.

multiple-models.ts
// Read model 1: Full profile (keyed by userId)
const fullProfileModel = await np.channels.create({
  name: "profile-full",
  type: "compacted",
});

// Read model 2: Profile search index (keyed by displayName)
const profileSearchModel = await np.channels.create({
  name: "profile-search",
  type: "compacted",
});

// Read model 3: Active users leaderboard (keyed by activity rank)
const leaderboardModel = await np.channels.create({
  name: "profile-leaderboard",
  type: "compacted",
});

// Same event, three different projections
async function projectProfileEvent(event: {
  type: string;
  userId: string;
  displayName?: string;
  [key: string]: unknown;
}) {
  // Projection 1: Full profile view
  await updateFullProfile(event);

  // Projection 2: Search index (only on name changes)
  if (event.type === "profile.created" || event.displayName) {
    await np.messages.publish({
      channelId: profileSearchModel.id,
      key: (event.displayName as string).toLowerCase(),
      payload: {
        userId: event.userId,
        displayName: event.displayName,
      },
    });
  }

  // Projection 3: Leaderboard (recalculate on activity events)
  if (event.type === "post.created") {
    await recalculateLeaderboard(event.userId);
  }
}

Rebuilding read models

If you need to change a read model's structure or fix a bug in the projection logic, you can rebuild it from scratch by replaying the event stream:

rebuild.ts
async function rebuildReadModel(
  sourceChannelId: string,
  targetChannelId: string,
  projectionFn: (event: Record<string, unknown>) => Promise<void>
) {
  console.log("Starting read model rebuild...");

  let offset = "earliest";
  let processed = 0;
  let hasMore = true;

  while (hasMore) {
    const batch = await np.streams.read({
      channelId: sourceChannelId,
      startOffset: offset,
      limit: 200,
    });

    for (const msg of batch.messages) {
      await projectionFn(msg.payload as Record<string, unknown>);
      processed++;
    }

    hasMore = batch.messages.length === 200;
    if (hasMore) {
      offset = batch.messages[batch.messages.length - 1].offset;
    }

    console.log(`Processed ${processed} events...`);
  }

  console.log(`Rebuild complete. ${processed} events processed.`);
}

// Rebuild the full profile model
await rebuildReadModel(
  profileEvents.id,
  fullProfileModel.id,
  processProfileEvent
);

When NOT to use CQRS

CQRS adds complexity. Do not use it when:

  • Simple CRUD: If your read and write models are identical and your access patterns are straightforward, a single model is simpler and sufficient.
  • Low traffic: The scaling benefits of CQRS matter at scale. For small applications, the operational overhead is not worth it.
  • Strong consistency required: If your application cannot tolerate any eventual consistency (such as financial ledgers that must be immediately consistent), CQRS requires additional complexity to provide read-your-writes guarantees.
  • Small team: CQRS increases the number of moving parts. If your team is small, the maintenance burden may outweigh the benefits.

A good rule of thumb: start with a simple architecture and adopt CQRS for specific bounded contexts where read and write access patterns diverge significantly.

Next steps

Was this page helpful?

On this page