Skip to main content
npayload is launching soon.
npayloadDocs
Concepts

Streams

Ordered message replay with consumer offsets, seek, and real-time tailing

A stream is an ordered, persistent log of messages on a channel. Unlike standard subscriptions that push messages to you as they arrive, streams let you read messages at your own pace, track your position, and replay history from any point.

How streams work

Every message published to a channel is assigned a sequence number. A stream consumer reads messages in sequence order and tracks its position using an offset. You can start from the beginning, jump to any offset or timestamp, or tail the stream in real time.

ConceptDescription
Sequence numberMonotonically increasing ID assigned to each message on the channel
Consumer offsetThe sequence number a consumer has read up to
SeekReset a consumer's offset to replay messages from a different position
TailRead new messages as they arrive, in real time

Creating a stream consumer

const consumer = await npayload.streams.createConsumer({
  channel: 'events',
  name: 'analytics-pipeline',
});

Each consumer maintains its own offset independently. Multiple consumers can read the same channel at different positions without interfering with each other.

Reading messages

From the beginning

Start from the first available message in the retention window.

const messages = await npayload.streams.read({
  consumer: 'analytics-pipeline',
  channel: 'events',
  from: 'beginning',
  limit: 100,
});

From a specific offset

Resume from where you left off.

const messages = await npayload.streams.read({
  consumer: 'analytics-pipeline',
  channel: 'events',
  from: { offset: 4582 },
  limit: 100,
});

From a timestamp

Read all messages published after a specific point in time.

const messages = await npayload.streams.read({
  consumer: 'analytics-pipeline',
  channel: 'events',
  from: { timestamp: '2026-03-01T00:00:00Z' },
  limit: 100,
});

Tail (real-time)

Follow the stream and receive new messages as they are published.

const messages = await npayload.streams.read({
  consumer: 'analytics-pipeline',
  channel: 'events',
  from: 'tail',
  limit: 50,
});

Committing offsets

After processing a batch of messages, commit the offset so you resume from the right position on the next read.

await npayload.streams.commit({
  consumer: 'analytics-pipeline',
  channel: 'events',
  offset: messages[messages.length - 1].sequenceNumber,
});

If you do not commit offsets, the consumer re-reads from its last committed position on the next call. This provides at-least-once delivery semantics.

Seek operations

Reset a consumer's offset to replay messages from a different position.

// Replay from the beginning
await npayload.streams.seek({
  consumer: 'analytics-pipeline',
  channel: 'events',
  to: 'beginning',
});

// Seek to a specific offset
await npayload.streams.seek({
  consumer: 'analytics-pipeline',
  channel: 'events',
  to: { offset: 1000 },
});

// Seek to a timestamp
await npayload.streams.seek({
  consumer: 'analytics-pipeline',
  channel: 'events',
  to: { timestamp: '2026-02-15T00:00:00Z' },
});

Retention and cleanup

Stream messages follow the channel's retentionDays setting. Messages older than the retention window are automatically deleted.

If a consumer's committed offset points to a message that has been deleted by retention cleanup, the next read starts from the earliest available message. Design your consumers to keep up with the retention window.

Use cases

Use caseHow streams help
Event sourcingReplay the full event history to rebuild state
Audit logsRead through every event in order for compliance review
Analytics pipelinesProcess events at your own pace without backpressure
Change data captureTrack changes from a specific point in time
Catch-up consumersNew services can read history from the beginning to sync up

Next steps

  • Consumer groups to distribute stream processing across multiple consumers
  • Channels to understand retention and channel types
  • Messages to learn about the payloads flowing through streams

Was this page helpful?

On this page