Skip to main content
npayload is launching soon.
npayloadDocs
PipesPatterns

Event processing

React to channel events, transform payloads, and route to multiple outputs

The event processing pattern uses Pipes to create real-time pipelines that filter, transform, enrich, and route messages as they flow through your channels.

Pattern overview

Instead of writing custom consumers that poll channels, deploy a Pipes workflow that reacts to events instantly and routes them to the right destination.

Before Pipes:

// Manual: poll, filter, transform, deliver, for every consumer
while (true) {
  const messages = await npayload.streams.pull(consumer, { limit: 100 });
  for (const msg of messages) {
    if (shouldProcess(msg)) {
      const transformed = transform(msg);
      await deliverToWebhook(transformed);
      await storeInDatabase(transformed);
    }
  }
}

With Pipes:

// Declarative: define the pipeline once, it runs automatically
const workflow = await npayload.pipes.create({
  name: 'event-processor',
  trigger: { type: 'event', channel: 'user-activity' },
});

Filter: drop noise

Only process events that matter:

workflow.addNode({
  id: 'filter-events',
  type: 'filter',
  expression: '{{ payload.event !== "heartbeat" && payload.userId != null }}',
});

Transform: reshape payloads

Normalize data before delivering to downstream systems:

workflow.addNode({
  id: 'normalize',
  type: 'transform',
  expression: {
    event: '{{ payload.event }}',
    user: '{{ payload.userId }}',
    timestamp: '{{ payload.ts || new Date().toISOString() }}',
    properties: '{{ payload.metadata || {} }}',
  },
});

Enrich: add context

Look up additional data from your database or external APIs:

workflow.addNode({
  id: 'enrich-user',
  type: 'http',
  method: 'GET',
  url: 'https://api.internal.com/users/{{ payload.userId }}',
});

workflow.addNode({
  id: 'merge-context',
  type: 'transform',
  expression: {
    ...nodes["normalize"].output,
    userPlan: '{{ nodes["enrich-user"].output.plan }}',
    userRegion: '{{ nodes["enrich-user"].output.region }}',
  },
});

Route: fan out to multiple outputs

Send processed events to different destinations based on content:

// Route by event type
workflow.addNode({
  id: 'route',
  type: 'switch',
  field: '{{ payload.event }}',
  cases: {
    'purchase.completed': 'notify-finance',
    'user.signed_up': 'welcome-flow',
    'error.occurred': 'alert-ops',
  },
  default: 'analytics-store',
});

// Notify finance team via webhook
workflow.addNode({
  id: 'notify-finance',
  type: 'http',
  method: 'POST',
  url: 'https://hooks.slack.com/services/...',
  body: {
    text: 'New purchase: {{ payload.amount }} by {{ payload.userId }}',
  },
});

// Store all events in analytics channel
workflow.addNode({
  id: 'analytics-store',
  type: 'publish',
  channel: 'analytics-events',
  payload: '{{ nodes["merge-context"].output }}',
});

Complete example: multi-tenant event router

Process events from a shared channel and route to tenant-specific destinations:

const workflow = await npayload.pipes.create({
  name: 'tenant-router',
  trigger: { type: 'event', channel: 'platform-events' },
});

// Extract tenant info
workflow.addNode({
  id: 'extract-tenant',
  type: 'transform',
  expression: {
    tenantId: '{{ payload.tenantId }}',
    event: '{{ payload.event }}',
    data: '{{ payload.data }}',
  },
});

// Publish to tenant-specific channel
workflow.addNode({
  id: 'route-to-tenant',
  type: 'publish',
  channel: 'tenant-{{ nodes["extract-tenant"].output.tenantId }}-events',
  payload: '{{ nodes["extract-tenant"].output }}',
});

await npayload.pipes.activate(workflow.id);

Next steps

Was this page helpful?

On this page