PipesPatterns
Event processing
React to channel events, transform payloads, and route to multiple outputs
The event processing pattern uses Pipes to create real-time pipelines that filter, transform, enrich, and route messages as they flow through your channels.
Pattern overview
Instead of writing custom consumers that poll channels, deploy a Pipes workflow that reacts to events instantly and routes them to the right destination.
Before Pipes:
// Manual: poll, filter, transform, deliver, for every consumer
while (true) {
const messages = await npayload.streams.pull(consumer, { limit: 100 });
for (const msg of messages) {
if (shouldProcess(msg)) {
const transformed = transform(msg);
await deliverToWebhook(transformed);
await storeInDatabase(transformed);
}
}
}With Pipes:
// Declarative: define the pipeline once, it runs automatically
const workflow = await npayload.pipes.create({
name: 'event-processor',
trigger: { type: 'event', channel: 'user-activity' },
});Filter: drop noise
Only process events that matter:
workflow.addNode({
id: 'filter-events',
type: 'filter',
expression: '{{ payload.event !== "heartbeat" && payload.userId != null }}',
});Transform: reshape payloads
Normalize data before delivering to downstream systems:
workflow.addNode({
id: 'normalize',
type: 'transform',
expression: {
event: '{{ payload.event }}',
user: '{{ payload.userId }}',
timestamp: '{{ payload.ts || new Date().toISOString() }}',
properties: '{{ payload.metadata || {} }}',
},
});Enrich: add context
Look up additional data from your database or external APIs:
workflow.addNode({
id: 'enrich-user',
type: 'http',
method: 'GET',
url: 'https://api.internal.com/users/{{ payload.userId }}',
});
workflow.addNode({
id: 'merge-context',
type: 'transform',
expression: {
...nodes["normalize"].output,
userPlan: '{{ nodes["enrich-user"].output.plan }}',
userRegion: '{{ nodes["enrich-user"].output.region }}',
},
});Route: fan out to multiple outputs
Send processed events to different destinations based on content:
// Route by event type
workflow.addNode({
id: 'route',
type: 'switch',
field: '{{ payload.event }}',
cases: {
'purchase.completed': 'notify-finance',
'user.signed_up': 'welcome-flow',
'error.occurred': 'alert-ops',
},
default: 'analytics-store',
});
// Notify finance team via webhook
workflow.addNode({
id: 'notify-finance',
type: 'http',
method: 'POST',
url: 'https://hooks.slack.com/services/...',
body: {
text: 'New purchase: {{ payload.amount }} by {{ payload.userId }}',
},
});
// Store all events in analytics channel
workflow.addNode({
id: 'analytics-store',
type: 'publish',
channel: 'analytics-events',
payload: '{{ nodes["merge-context"].output }}',
});Complete example: multi-tenant event router
Process events from a shared channel and route to tenant-specific destinations:
const workflow = await npayload.pipes.create({
name: 'tenant-router',
trigger: { type: 'event', channel: 'platform-events' },
});
// Extract tenant info
workflow.addNode({
id: 'extract-tenant',
type: 'transform',
expression: {
tenantId: '{{ payload.tenantId }}',
event: '{{ payload.event }}',
data: '{{ payload.data }}',
},
});
// Publish to tenant-specific channel
workflow.addNode({
id: 'route-to-tenant',
type: 'publish',
channel: 'tenant-{{ nodes["extract-tenant"].output.tenantId }}-events',
payload: '{{ nodes["extract-tenant"].output }}',
});
await npayload.pipes.activate(workflow.id);Next steps
- AI pipeline for AI-powered event processing
- Building workflows for workflow construction
- Fan-out pattern for message distribution patterns
Was this page helpful?