Skip to main content
npayload is launching soon.
npayloadDocs
Use Cases

Data streaming

Stream processing with consumer offsets, replay, and real-time analytics pipelines

npayload streams provide an ordered, persistent log of messages that consumers can read, replay, and process at their own pace. Build real-time analytics, event sourcing, and audit trails without managing Kafka clusters.

The challenge

Traditional pub/sub delivers messages once and discards them. Streaming use cases need:

  • Replay: Re-process historical messages after a bug fix or schema change
  • Multiple consumers: Each consumer tracks its own position independently
  • Ordered processing: Messages processed in the order they were published
  • Persistence: Messages retained for days or weeks, not just until delivered

How npayload solves it

Every npayload channel can be consumed as a stream. Consumers track their position with offsets and can seek to any point in the log.

Create a stream consumer

const consumer = await npayload.streams.createConsumer({
  channel: 'user-events',
  name: 'analytics-pipeline',
  startFrom: 'earliest', // Start from the beginning of the log
});

Read messages

// Pull a batch of messages from the stream
const messages = await npayload.streams.pull(consumer.gid, {
  limit: 100,
});

for (const msg of messages) {
  await processEvent(msg.payload);
}

// Offset is automatically advanced after pull

Reading modes

ModeDescriptionUse case
earliestStart from the first messageFull replay, backfill
latestStart from the current positionReal-time tailing
offsetStart from a specific offset numberResume after crash
timestampStart from a specific timeTime-based replay
// Start from a specific timestamp
const consumer = await npayload.streams.createConsumer({
  channel: 'user-events',
  name: 'replay-consumer',
  startFrom: 'timestamp',
  timestamp: '2025-01-01T00:00:00Z',
});

Seek (reset position)

Reset a consumer's position to replay messages:

// Seek to the beginning
await npayload.streams.seek(consumer.gid, { position: 'earliest' });

// Seek to a specific offset
await npayload.streams.seek(consumer.gid, { offset: 5000 });

// Seek to a timestamp
await npayload.streams.seek(consumer.gid, {
  position: 'timestamp',
  timestamp: '2025-03-01T00:00:00Z',
});

Use case: real-time analytics pipeline

Build a pipeline that processes events in real-time and can replay when needed:

// Producer: application publishes user activity events
await npayload.messages.publish({
  channel: 'user-activity',
  payload: {
    event: 'page.viewed',
    userId: 'usr_123',
    page: '/pricing',
    timestamp: new Date().toISOString(),
    sessionId: 'sess_abc',
  },
});

// Consumer 1: Real-time dashboard
const dashboardConsumer = await npayload.streams.createConsumer({
  channel: 'user-activity',
  name: 'dashboard',
  startFrom: 'latest',
});

// Consumer 2: Data warehouse loader (processes everything)
const warehouseConsumer = await npayload.streams.createConsumer({
  channel: 'user-activity',
  name: 'warehouse-loader',
  startFrom: 'earliest',
});

// Each consumer processes at its own pace
async function processBatch(consumerId: string) {
  const messages = await npayload.streams.pull(consumerId, { limit: 500 });

  for (const msg of messages) {
    await writeToWarehouse(msg.payload);
  }
}

Use case: audit trail

Create an immutable audit log that can be replayed for compliance:

// Publish all API operations to an audit channel
await npayload.messages.publish({
  channel: 'audit-log',
  payload: {
    action: 'user.updated',
    actor: 'admin@example.com',
    target: 'usr_456',
    changes: { role: { from: 'member', to: 'admin' } },
    ip: '203.0.113.1',
    timestamp: new Date().toISOString(),
  },
});

// Compliance team can replay the full audit log at any time
const auditConsumer = await npayload.streams.createConsumer({
  channel: 'audit-log',
  name: 'compliance-review',
  startFrom: 'earliest',
});

Use case: event sourcing

Use streams as the source of truth and rebuild state by replaying events:

// Publish domain events
await npayload.messages.publish({
  channel: 'account-events',
  partitionKey: accountId,
  payload: { event: 'deposit', amount: 100.00 },
});

await npayload.messages.publish({
  channel: 'account-events',
  partitionKey: accountId,
  payload: { event: 'withdrawal', amount: 25.00 },
});

// Rebuild account state by replaying events
const consumer = await npayload.streams.createConsumer({
  channel: 'account-events',
  name: `account-${accountId}-rebuild`,
  startFrom: 'earliest',
});

let balance = 0;
const events = await npayload.streams.pull(consumer.gid, { limit: 10000 });

for (const msg of events) {
  if (msg.payload.event === 'deposit') balance += msg.payload.amount;
  if (msg.payload.event === 'withdrawal') balance -= msg.payload.amount;
}

console.log(`Current balance: $${balance}`); // $75.00

Automate with Pipes

Instead of writing custom consumer loops, use Pipes to process stream events automatically. Pipes workflows react to channel events, run AI analysis, and take actions, all within your npayload instance.

// Create a Pipes workflow that processes market data automatically
const workflow = await npayload.pipes.create({
  name: 'market-data-pipeline',
  trigger: {
    type: 'event',
    channel: 'user-activity',
  },
});

// AI-powered analysis
workflow.addNode({
  id: 'analyze',
  type: 'agent',
  model: 'claude-sonnet-4-20250514',
  systemPrompt: 'Analyze user activity for anomalies and trends.',
  input: '{{ payload }}',
});

// Route based on analysis
workflow.addNode({
  id: 'route',
  type: 'condition',
  expression: '{{ nodes["analyze"].output.anomaly === true }}',
});

// Alert on anomalies
workflow.addNode({
  id: 'alert',
  type: 'http',
  connector: 'my-slack',
  operation: 'sendMessage',
  params: { channel: '#alerts', text: '{{ nodes["analyze"].output.summary }}' },
});

await npayload.pipes.activate(workflow.id);

See Pipes documentation for the full workflow engine reference.

Stream retention

Configure how long messages are retained in the stream:

TierDefault retentionMax retention
Free24 hours7 days
Pro7 days30 days
Enterprise30 days365 days

Why npayload for streaming

FeatureBenefit
Consumer offsetsEach consumer tracks position independently
ReplayRe-process any range of historical messages
SeekJump to any point in the stream
Ordered deliveryMessages processed in publish order
Multiple consumersUnlimited independent readers
No cluster managementNo Zookeeper, no brokers, no partitions

Next steps

Was this page helpful?

On this page