PipesGuides
Building workflows
Step-by-step guide to composing triggers, logic nodes, and action nodes into workflows
This guide covers the full workflow construction process: defining triggers, adding logic, branching, looping, and executing actions.
Workflow structure
Every Pipes workflow has three parts:
- Trigger: what starts the workflow
- Logic nodes: how data flows and transforms
- Action nodes: what happens as a result
const workflow = await npayload.pipes.create({
name: 'invoice-processor',
trigger: { type: 'event', channel: 'invoices' },
});
// Add logic
workflow.addNode({ id: 'validate', type: 'condition', ... });
workflow.addNode({ id: 'transform', type: 'transform', ... });
// Add actions
workflow.addNode({ id: 'notify', type: 'http', ... });
workflow.addNode({ id: 'store', type: 'mutation', ... });
// Connect nodes
workflow.connect('validate', 'transform', { branch: 'true' });
workflow.connect('transform', 'notify');
workflow.connect('transform', 'store');
await npayload.pipes.activate(workflow.id);Branching with conditions
Route messages based on payload content:
// Branch by order value
workflow.addNode({
id: 'check-value',
type: 'condition',
expression: '{{ payload.total > 500 }}',
});
workflow.addNode({
id: 'high-value-review',
type: 'http',
method: 'POST',
url: 'https://api.internal.com/reviews',
body: '{{ payload }}',
});
workflow.addNode({
id: 'auto-approve',
type: 'mutation',
operation: 'orders.approve',
params: { orderId: '{{ payload.id }}' },
});
workflow.connect('check-value', 'high-value-review', { branch: 'true' });
workflow.connect('check-value', 'auto-approve', { branch: 'false' });Multi-way routing with switch
Route to different handlers based on event type:
workflow.addNode({
id: 'route-event',
type: 'switch',
field: '{{ payload.event }}',
cases: {
'order.created': 'process-new',
'order.updated': 'sync-changes',
'order.cancelled': 'handle-cancel',
},
default: 'log-unknown',
});Looping over arrays
Process each item in a collection:
workflow.addNode({
id: 'process-items',
type: 'loop',
items: '{{ payload.lineItems }}',
body: [
{
id: 'check-stock',
type: 'http',
method: 'GET',
url: 'https://api.inventory.com/stock/{{ item.sku }}',
},
{
id: 'reserve',
type: 'http',
method: 'POST',
url: 'https://api.inventory.com/reserve',
body: { sku: '{{ item.sku }}', quantity: '{{ item.quantity }}' },
},
],
});Parallel execution
Run multiple operations simultaneously:
workflow.addNode({
id: 'fan-out',
type: 'parallel',
branches: [
{ id: 'notify-customer', type: 'http', url: '...' },
{ id: 'update-crm', type: 'http', url: '...' },
{ id: 'log-analytics', type: 'http', url: '...' },
],
});
// Merge waits for all branches to complete
workflow.addNode({
id: 'merge-results',
type: 'merge',
strategy: 'all', // Wait for all branches
});
workflow.connect('fan-out', 'merge-results');Data transformation
Reshape payloads between nodes:
workflow.addNode({
id: 'normalize',
type: 'transform',
expression: {
id: '{{ payload.orderId || payload.id }}',
customer: {
email: '{{ payload.customerEmail || payload.customer.email }}',
name: '{{ payload.customerName || payload.customer.name }}',
},
total: '{{ Number(payload.amount) / 100 }}', // cents to dollars
timestamp: '{{ new Date().toISOString() }}',
},
});Error handling
Add error boundaries around risky operations:
workflow.addNode({
id: 'risky-call',
type: 'http',
url: 'https://api.external.com/process',
onError: {
node: 'fallback-handler',
retries: 3,
backoff: 'exponential',
},
});
workflow.addNode({
id: 'fallback-handler',
type: 'publish',
channel: 'failed-operations',
payload: {
originalPayload: '{{ payload }}',
error: '{{ error.message }}',
timestamp: '{{ new Date().toISOString() }}',
},
});Secrets management
Reference secrets without exposing them in workflow definitions:
// Store a secret
await npayload.pipes.secrets.set({
workflow: workflow.id,
key: 'STRIPE_KEY',
value: process.env.STRIPE_SECRET_KEY!,
});
// Use in a node
workflow.addNode({
type: 'http',
headers: { Authorization: 'Bearer {{ secrets.STRIPE_KEY }}' },
// ...
});Next steps
- Using AI nodes for AI-powered workflow steps
- Saga workflows for multi-step orchestration
- Execution model for durability and retry behavior
Was this page helpful?