Skip to main content
npayload is launching soon.
npayloadDocs
Guides

Streams

Ordered, replayable message logs with consumer offsets, seek, and stream consumer groups

Streams provide an ordered, replayable log of messages on a channel. Unlike subscriptions (which deliver messages once), streams let you read messages at any position, replay history, and track your consumer's progress with offsets.

What streams are

Every channel in npayload automatically maintains a stream: a durable, ordered log of all published messages. Streams are read-only. You do not create or configure them separately.

Streams are useful when you need to:

  • Replay historical messages (backfill a new service)
  • Resume processing from where you left off after a restart
  • Audit or debug the exact sequence of messages on a channel

Reading from a stream

Read messages from a channel's stream starting at a specific position.

// Read from the beginning
const messages = await npayload.streams.read('orders', {
  position: 'beginning',
  limit: 50,
});

for (const msg of messages.items) {
  console.log(msg.offset, msg.payload);
}
const messages = await npayload.streams.read('orders', {
  position: 'beginning',
  limit: 100,
});

Reads the oldest messages first. Useful for full replay or backfill scenarios.

const messages = await npayload.streams.read('orders', {
  position: 'end',
  limit: 10,
});

Reads the most recent messages. Useful for tailing the stream.

const messages = await npayload.streams.read('orders', {
  position: { offset: 4250 },
  limit: 50,
});

Reads from a specific offset. Useful for resuming from a saved position.

Each message in the response includes an offset (a monotonically increasing integer) and a cursor for pagination:

const page1 = await npayload.streams.read('orders', {
  position: 'beginning',
  limit: 100,
});

// Continue reading from where you left off
if (page1.cursor) {
  const page2 = await npayload.streams.read('orders', {
    position: { cursor: page1.cursor },
    limit: 100,
  });
}

Consumer offsets

Track your consumer's position in the stream so you can resume after restarts. npayload stores the offset server-side.

// Commit the offset after processing
await npayload.streams.commitOffset('orders', {
  consumerId: 'analytics-service',
  offset: lastProcessedOffset,
});

// On restart, get the last committed offset
const saved = await npayload.streams.getOffset('orders', {
  consumerId: 'analytics-service',
});

const messages = await npayload.streams.read('orders', {
  position: { offset: saved.offset + 1 },
  limit: 100,
});

Consumer IDs are strings you define. Use a stable identifier for each consumer instance, such as your service name or a combination of service name and shard.

Seek to a position

Reset a consumer's offset to re-read messages from a different position.

// Seek to the beginning (replay everything)
await npayload.streams.seek('orders', {
  consumerId: 'analytics-service',
  position: 'beginning',
});

// Seek to a specific offset
await npayload.streams.seek('orders', {
  consumerId: 'analytics-service',
  position: { offset: 1000 },
});

// Seek to the end (skip all existing, only read new)
await npayload.streams.seek('orders', {
  consumerId: 'analytics-service',
  position: 'end',
});

Consumer groups for streams

Stream consumer groups distribute stream reading across multiple consumers. Each consumer in the group reads a portion of the stream, and offsets are tracked per group.

// Create a stream consumer group
const group = await npayload.streams.createConsumerGroup('orders', {
  name: 'analytics-workers',
});

// Each worker reads from the group
const messages = await npayload.streams.readGroup('orders', {
  group: 'analytics-workers',
  consumerId: 'worker-1',
  limit: 50,
});

for (const msg of messages.items) {
  await processAnalytics(msg.payload);
}

// Acknowledge processed messages
await npayload.streams.ackGroup('orders', {
  group: 'analytics-workers',
  consumerId: 'worker-1',
  offset: messages.items[messages.items.length - 1].offset,
});

Stream consumer groups divide the stream by offset ranges. This differs from subscription consumer groups, which distribute individual messages round-robin.

Replay and catch-up patterns

Full backfill

Replay the entire stream for a new service.

let cursor: string | undefined;

do {
  const page = await npayload.streams.read('orders', {
    position: cursor ? { cursor } : 'beginning',
    limit: 100,
  });

  for (const msg of page.items) {
    await indexOrder(msg.payload);
  }

  cursor = page.cursor;
} while (cursor);

Catch-up after downtime

Resume from the last committed offset.

const saved = await npayload.streams.getOffset('orders', {
  consumerId: 'search-indexer',
});

let position: any = saved.offset !== undefined
  ? { offset: saved.offset + 1 }
  : 'beginning';

const messages = await npayload.streams.read('orders', {
  position,
  limit: 100,
});

for (const msg of messages.items) {
  await indexOrder(msg.payload);
}

if (messages.items.length > 0) {
  await npayload.streams.commitOffset('orders', {
    consumerId: 'search-indexer',
    offset: messages.items[messages.items.length - 1].offset,
  });
}

Best practices

  • Commit offsets after processing, not before. This ensures you do not skip messages if your consumer crashes mid-batch
  • Use descriptive consumer IDs that reflect the service and its purpose, such as search-indexer or billing-aggregator
  • Paginate reads with reasonable batch sizes (50 to 100). Very large batches increase memory usage and processing time
  • Use stream consumer groups when you need parallel stream processing. For simple single-consumer replay, plain offset tracking is sufficient
  • Remember that stream data follows the channel's retention policy. Messages older than the retention period are no longer available in the stream

Next steps

Was this page helpful?

On this page