Streams
Ordered message replay with consumer offsets, seek, and real-time tailing
A stream is an ordered, persistent log of messages on a channel. Unlike standard subscriptions that push messages to you as they arrive, streams let you read messages at your own pace, track your position, and replay history from any point.
How streams work
Every message published to a channel is assigned a sequence number. A stream consumer reads messages in sequence order and tracks its position using an offset. You can start from the beginning, jump to any offset or timestamp, or tail the stream in real time.
| Concept | Description |
|---|---|
| Sequence number | Monotonically increasing ID assigned to each message on the channel |
| Consumer offset | The sequence number a consumer has read up to |
| Seek | Reset a consumer's offset to replay messages from a different position |
| Tail | Read new messages as they arrive, in real time |
Creating a stream consumer
const consumer = await npayload.streams.createConsumer({
channel: 'events',
name: 'analytics-pipeline',
});Each consumer maintains its own offset independently. Multiple consumers can read the same channel at different positions without interfering with each other.
Reading messages
From the beginning
Start from the first available message in the retention window.
const messages = await npayload.streams.read({
consumer: 'analytics-pipeline',
channel: 'events',
from: 'beginning',
limit: 100,
});From a specific offset
Resume from where you left off.
const messages = await npayload.streams.read({
consumer: 'analytics-pipeline',
channel: 'events',
from: { offset: 4582 },
limit: 100,
});From a timestamp
Read all messages published after a specific point in time.
const messages = await npayload.streams.read({
consumer: 'analytics-pipeline',
channel: 'events',
from: { timestamp: '2026-03-01T00:00:00Z' },
limit: 100,
});Tail (real-time)
Follow the stream and receive new messages as they are published.
const messages = await npayload.streams.read({
consumer: 'analytics-pipeline',
channel: 'events',
from: 'tail',
limit: 50,
});Committing offsets
After processing a batch of messages, commit the offset so you resume from the right position on the next read.
await npayload.streams.commit({
consumer: 'analytics-pipeline',
channel: 'events',
offset: messages[messages.length - 1].sequenceNumber,
});If you do not commit offsets, the consumer re-reads from its last committed position on the next call. This provides at-least-once delivery semantics.
Seek operations
Reset a consumer's offset to replay messages from a different position.
// Replay from the beginning
await npayload.streams.seek({
consumer: 'analytics-pipeline',
channel: 'events',
to: 'beginning',
});
// Seek to a specific offset
await npayload.streams.seek({
consumer: 'analytics-pipeline',
channel: 'events',
to: { offset: 1000 },
});
// Seek to a timestamp
await npayload.streams.seek({
consumer: 'analytics-pipeline',
channel: 'events',
to: { timestamp: '2026-02-15T00:00:00Z' },
});Retention and cleanup
Stream messages follow the channel's retentionDays setting. Messages older than the retention window are automatically deleted.
If a consumer's committed offset points to a message that has been deleted by retention cleanup, the next read starts from the earliest available message. Design your consumers to keep up with the retention window.
Use cases
| Use case | How streams help |
|---|---|
| Event sourcing | Replay the full event history to rebuild state |
| Audit logs | Read through every event in order for compliance review |
| Analytics pipelines | Process events at your own pace without backpressure |
| Change data capture | Track changes from a specific point in time |
| Catch-up consumers | New services can read history from the beginning to sync up |
Next steps
- Consumer groups to distribute stream processing across multiple consumers
- Channels to understand retention and channel types
- Messages to learn about the payloads flowing through streams
Was this page helpful?