Data streaming
Stream processing with consumer offsets, replay, and real-time analytics pipelines
npayload streams provide an ordered, persistent log of messages that consumers can read, replay, and process at their own pace. Build real-time analytics, event sourcing, and audit trails without managing Kafka clusters.
The challenge
Traditional pub/sub delivers messages once and discards them. Streaming use cases need:
- Replay: Re-process historical messages after a bug fix or schema change
- Multiple consumers: Each consumer tracks its own position independently
- Ordered processing: Messages processed in the order they were published
- Persistence: Messages retained for days or weeks, not just until delivered
How npayload solves it
Every npayload channel can be consumed as a stream. Consumers track their position with offsets and can seek to any point in the log.
Create a stream consumer
const consumer = await npayload.streams.createConsumer({
channel: 'user-events',
name: 'analytics-pipeline',
startFrom: 'earliest', // Start from the beginning of the log
});Read messages
// Pull a batch of messages from the stream
const messages = await npayload.streams.pull(consumer.gid, {
limit: 100,
});
for (const msg of messages) {
await processEvent(msg.payload);
}
// Offset is automatically advanced after pullReading modes
| Mode | Description | Use case |
|---|---|---|
earliest | Start from the first message | Full replay, backfill |
latest | Start from the current position | Real-time tailing |
offset | Start from a specific offset number | Resume after crash |
timestamp | Start from a specific time | Time-based replay |
// Start from a specific timestamp
const consumer = await npayload.streams.createConsumer({
channel: 'user-events',
name: 'replay-consumer',
startFrom: 'timestamp',
timestamp: '2025-01-01T00:00:00Z',
});Seek (reset position)
Reset a consumer's position to replay messages:
// Seek to the beginning
await npayload.streams.seek(consumer.gid, { position: 'earliest' });
// Seek to a specific offset
await npayload.streams.seek(consumer.gid, { offset: 5000 });
// Seek to a timestamp
await npayload.streams.seek(consumer.gid, {
position: 'timestamp',
timestamp: '2025-03-01T00:00:00Z',
});Use case: real-time analytics pipeline
Build a pipeline that processes events in real-time and can replay when needed:
// Producer: application publishes user activity events
await npayload.messages.publish({
channel: 'user-activity',
payload: {
event: 'page.viewed',
userId: 'usr_123',
page: '/pricing',
timestamp: new Date().toISOString(),
sessionId: 'sess_abc',
},
});
// Consumer 1: Real-time dashboard
const dashboardConsumer = await npayload.streams.createConsumer({
channel: 'user-activity',
name: 'dashboard',
startFrom: 'latest',
});
// Consumer 2: Data warehouse loader (processes everything)
const warehouseConsumer = await npayload.streams.createConsumer({
channel: 'user-activity',
name: 'warehouse-loader',
startFrom: 'earliest',
});
// Each consumer processes at its own pace
async function processBatch(consumerId: string) {
const messages = await npayload.streams.pull(consumerId, { limit: 500 });
for (const msg of messages) {
await writeToWarehouse(msg.payload);
}
}Use case: audit trail
Create an immutable audit log that can be replayed for compliance:
// Publish all API operations to an audit channel
await npayload.messages.publish({
channel: 'audit-log',
payload: {
action: 'user.updated',
actor: 'admin@example.com',
target: 'usr_456',
changes: { role: { from: 'member', to: 'admin' } },
ip: '203.0.113.1',
timestamp: new Date().toISOString(),
},
});
// Compliance team can replay the full audit log at any time
const auditConsumer = await npayload.streams.createConsumer({
channel: 'audit-log',
name: 'compliance-review',
startFrom: 'earliest',
});Use case: event sourcing
Use streams as the source of truth and rebuild state by replaying events:
// Publish domain events
await npayload.messages.publish({
channel: 'account-events',
partitionKey: accountId,
payload: { event: 'deposit', amount: 100.00 },
});
await npayload.messages.publish({
channel: 'account-events',
partitionKey: accountId,
payload: { event: 'withdrawal', amount: 25.00 },
});
// Rebuild account state by replaying events
const consumer = await npayload.streams.createConsumer({
channel: 'account-events',
name: `account-${accountId}-rebuild`,
startFrom: 'earliest',
});
let balance = 0;
const events = await npayload.streams.pull(consumer.gid, { limit: 10000 });
for (const msg of events) {
if (msg.payload.event === 'deposit') balance += msg.payload.amount;
if (msg.payload.event === 'withdrawal') balance -= msg.payload.amount;
}
console.log(`Current balance: $${balance}`); // $75.00Automate with Pipes
Instead of writing custom consumer loops, use Pipes to process stream events automatically. Pipes workflows react to channel events, run AI analysis, and take actions, all within your npayload instance.
// Create a Pipes workflow that processes market data automatically
const workflow = await npayload.pipes.create({
name: 'market-data-pipeline',
trigger: {
type: 'event',
channel: 'user-activity',
},
});
// AI-powered analysis
workflow.addNode({
id: 'analyze',
type: 'agent',
model: 'claude-sonnet-4-20250514',
systemPrompt: 'Analyze user activity for anomalies and trends.',
input: '{{ payload }}',
});
// Route based on analysis
workflow.addNode({
id: 'route',
type: 'condition',
expression: '{{ nodes["analyze"].output.anomaly === true }}',
});
// Alert on anomalies
workflow.addNode({
id: 'alert',
type: 'http',
connector: 'my-slack',
operation: 'sendMessage',
params: { channel: '#alerts', text: '{{ nodes["analyze"].output.summary }}' },
});
await npayload.pipes.activate(workflow.id);See Pipes documentation for the full workflow engine reference.
Stream retention
Configure how long messages are retained in the stream:
| Tier | Default retention | Max retention |
|---|---|---|
| Free | 24 hours | 7 days |
| Pro | 7 days | 30 days |
| Enterprise | 30 days | 365 days |
Why npayload for streaming
| Feature | Benefit |
|---|---|
| Consumer offsets | Each consumer tracks position independently |
| Replay | Re-process any range of historical messages |
| Seek | Jump to any point in the stream |
| Ordered delivery | Messages processed in publish order |
| Multiple consumers | Unlimited independent readers |
| No cluster management | No Zookeeper, no brokers, no partitions |
Next steps
- Streams concept for technical details
- Streams guide for step-by-step configuration
- Event sourcing pattern for event sourcing architecture
- Pipes for automated workflow processing of stream events
Was this page helpful?