PipesGuides
Saga workflows
Multi-step orchestration with automatic compensation on failure
Sagas coordinate multi-step workflows where each step must either complete successfully or be compensated (rolled back). Pipes supports sagas natively with automatic compensation chains.
The problem
Consider an order fulfillment flow:
- Reserve inventory
- Charge payment
- Create shipment
If step 3 fails, you need to refund the payment (undo step 2) and release the inventory (undo step 1). Doing this manually is error-prone. Pipes handles it automatically.
Define a saga workflow
const workflow = await npayload.pipes.create({
name: 'order-saga',
trigger: { type: 'event', channel: 'orders', filter: { event: 'order.created' } },
mode: 'saga',
});Add steps with compensations
Each saga step defines both a forward action and a compensation action:
// Step 1: Reserve inventory
workflow.addSagaStep({
id: 'reserve-inventory',
action: {
type: 'http',
method: 'POST',
url: 'https://api.inventory.com/reserve',
body: {
sku: '{{ payload.sku }}',
quantity: '{{ payload.quantity }}',
},
},
compensation: {
type: 'http',
method: 'POST',
url: 'https://api.inventory.com/release',
body: {
reservationId: '{{ steps["reserve-inventory"].output.reservationId }}',
},
},
});
// Step 2: Charge payment
workflow.addSagaStep({
id: 'charge-payment',
action: {
type: 'http',
connector: 'my-stripe',
operation: 'createCharge',
params: {
amount: '{{ payload.total }}',
currency: 'usd',
customer: '{{ payload.customerId }}',
},
},
compensation: {
type: 'http',
connector: 'my-stripe',
operation: 'createRefund',
params: {
chargeId: '{{ steps["charge-payment"].output.chargeId }}',
},
},
});
// Step 3: Create shipment
workflow.addSagaStep({
id: 'create-shipment',
action: {
type: 'http',
method: 'POST',
url: 'https://api.shipping.com/shipments',
body: {
orderId: '{{ payload.id }}',
address: '{{ payload.shipping }}',
items: '{{ payload.lineItems }}',
},
},
// Terminal step -- no compensation needed
});How compensation works
When a step fails:
- Pipes stops forward execution immediately
- Compensation runs in reverse order from the last completed step
- Each compensation action retries with the same policy as forward actions
- If compensation itself fails, the saga enters a
compensation_failedstate for manual review
Forward: reserve-inventory -> charge-payment -> create-shipment (FAILS)
Compensate: charge-payment.compensation -> reserve-inventory.compensationSaga with conditions
Add conditional steps within a saga:
workflow.addSagaStep({
id: 'fraud-check',
action: {
type: 'http',
url: 'https://api.fraud.com/check',
body: { amount: '{{ payload.total }}', customer: '{{ payload.customerId }}' },
},
// No compensation -- read-only step
});
// Only charge if fraud check passes
workflow.addSagaStep({
id: 'charge-payment',
condition: '{{ steps["fraud-check"].output.approved === true }}',
action: { ... },
compensation: { ... },
});Monitoring sagas
Track saga execution and compensation status:
const execution = await npayload.pipes.executions.get(executionId);
console.log(execution.sagaStatus);
// "completed" | "compensating" | "compensated" | "compensation_failed"
console.log(execution.steps);
// [
// { id: 'reserve-inventory', status: 'completed', compensated: false },
// { id: 'charge-payment', status: 'completed', compensated: true },
// { id: 'create-shipment', status: 'failed', error: '...' },
// ]Timeout and deadlines
Set timeouts for the entire saga or individual steps:
const workflow = await npayload.pipes.create({
name: 'time-sensitive-saga',
trigger: { type: 'event', channel: 'orders' },
mode: 'saga',
timeout: 300000, // 5 minutes for the entire saga
});
workflow.addSagaStep({
id: 'reserve',
timeout: 30000, // 30 seconds for this step
action: { ... },
compensation: { ... },
});Next steps
- Saga orchestration pattern for architectural patterns
- Execution model for durability and retry behavior
- Building workflows for general workflow construction
Was this page helpful?