Skip to main content
npayload is launching soon.
npayloadDocs
PipesGuides

Building workflows

Step-by-step guide to composing triggers, logic nodes, and action nodes into workflows

This guide covers the full workflow construction process: defining triggers, adding logic, branching, looping, and executing actions.

Workflow structure

Every Pipes workflow has three parts:

  1. Trigger: what starts the workflow
  2. Logic nodes: how data flows and transforms
  3. Action nodes: what happens as a result
const workflow = await npayload.pipes.create({
  name: 'invoice-processor',
  trigger: { type: 'event', channel: 'invoices' },
});

// Add logic
workflow.addNode({ id: 'validate', type: 'condition', ... });
workflow.addNode({ id: 'transform', type: 'transform', ... });

// Add actions
workflow.addNode({ id: 'notify', type: 'http', ... });
workflow.addNode({ id: 'store', type: 'mutation', ... });

// Connect nodes
workflow.connect('validate', 'transform', { branch: 'true' });
workflow.connect('transform', 'notify');
workflow.connect('transform', 'store');

await npayload.pipes.activate(workflow.id);

Branching with conditions

Route messages based on payload content:

// Branch by order value
workflow.addNode({
  id: 'check-value',
  type: 'condition',
  expression: '{{ payload.total > 500 }}',
});

workflow.addNode({
  id: 'high-value-review',
  type: 'http',
  method: 'POST',
  url: 'https://api.internal.com/reviews',
  body: '{{ payload }}',
});

workflow.addNode({
  id: 'auto-approve',
  type: 'mutation',
  operation: 'orders.approve',
  params: { orderId: '{{ payload.id }}' },
});

workflow.connect('check-value', 'high-value-review', { branch: 'true' });
workflow.connect('check-value', 'auto-approve', { branch: 'false' });

Multi-way routing with switch

Route to different handlers based on event type:

workflow.addNode({
  id: 'route-event',
  type: 'switch',
  field: '{{ payload.event }}',
  cases: {
    'order.created': 'process-new',
    'order.updated': 'sync-changes',
    'order.cancelled': 'handle-cancel',
  },
  default: 'log-unknown',
});

Looping over arrays

Process each item in a collection:

workflow.addNode({
  id: 'process-items',
  type: 'loop',
  items: '{{ payload.lineItems }}',
  body: [
    {
      id: 'check-stock',
      type: 'http',
      method: 'GET',
      url: 'https://api.inventory.com/stock/{{ item.sku }}',
    },
    {
      id: 'reserve',
      type: 'http',
      method: 'POST',
      url: 'https://api.inventory.com/reserve',
      body: { sku: '{{ item.sku }}', quantity: '{{ item.quantity }}' },
    },
  ],
});

Parallel execution

Run multiple operations simultaneously:

workflow.addNode({
  id: 'fan-out',
  type: 'parallel',
  branches: [
    { id: 'notify-customer', type: 'http', url: '...' },
    { id: 'update-crm', type: 'http', url: '...' },
    { id: 'log-analytics', type: 'http', url: '...' },
  ],
});

// Merge waits for all branches to complete
workflow.addNode({
  id: 'merge-results',
  type: 'merge',
  strategy: 'all', // Wait for all branches
});

workflow.connect('fan-out', 'merge-results');

Data transformation

Reshape payloads between nodes:

workflow.addNode({
  id: 'normalize',
  type: 'transform',
  expression: {
    id: '{{ payload.orderId || payload.id }}',
    customer: {
      email: '{{ payload.customerEmail || payload.customer.email }}',
      name: '{{ payload.customerName || payload.customer.name }}',
    },
    total: '{{ Number(payload.amount) / 100 }}', // cents to dollars
    timestamp: '{{ new Date().toISOString() }}',
  },
});

Error handling

Add error boundaries around risky operations:

workflow.addNode({
  id: 'risky-call',
  type: 'http',
  url: 'https://api.external.com/process',
  onError: {
    node: 'fallback-handler',
    retries: 3,
    backoff: 'exponential',
  },
});

workflow.addNode({
  id: 'fallback-handler',
  type: 'publish',
  channel: 'failed-operations',
  payload: {
    originalPayload: '{{ payload }}',
    error: '{{ error.message }}',
    timestamp: '{{ new Date().toISOString() }}',
  },
});

Secrets management

Reference secrets without exposing them in workflow definitions:

// Store a secret
await npayload.pipes.secrets.set({
  workflow: workflow.id,
  key: 'STRIPE_KEY',
  value: process.env.STRIPE_SECRET_KEY!,
});

// Use in a node
workflow.addNode({
  type: 'http',
  headers: { Authorization: 'Bearer {{ secrets.STRIPE_KEY }}' },
  // ...
});

Next steps

Was this page helpful?

On this page