Skip to main content
npayload is launching soon.
npayloadDocs
Patterns

Saga orchestration

Coordinate multi-step distributed transactions with compensating actions

What is a saga?

A saga is a sequence of local transactions where each step has a corresponding compensating action. If any step fails, the saga executes compensations in reverse order to undo the work completed so far. This gives you reliable multi-step operations across distributed services without two-phase commits.

Consider an order processing flow: reserve inventory, charge payment, send confirmation. If the payment charge fails, you need to release the reserved inventory. A saga makes this rollback automatic and reliable.

Orchestration vs choreography

There are two approaches to implementing sagas:

  • Orchestration: A central coordinator service manages the saga, deciding what step to execute next and handling compensations. The coordinator publishes commands and listens for results.
  • Choreography: Each service listens for events and decides independently what to do next. There is no central coordinator.

npayload supports both approaches. This guide focuses on orchestration because it gives you better visibility into saga state and simpler error handling. For simple two or three step flows, choreography may be sufficient.

Example: order processing saga

Here is a complete order processing saga with three steps and their compensations:

StepActionCompensation
1Reserve inventoryRelease inventory
2Charge paymentRefund payment
3Send confirmation(none, terminal step)

Setting up channels

Each service gets its own command and response channel. The orchestrator publishes commands and subscribes to responses.

saga-channels.ts
import { NPayload } from "@npayload/node";

const np = new NPayload({
  instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
  token: process.env.NPAYLOAD_TOKEN,
});

// Command channels (orchestrator publishes, services subscribe)
const inventoryCommands = await np.channels.create({
  name: "inventory-commands",
  description: "Commands for the inventory service",
});

const paymentCommands = await np.channels.create({
  name: "payment-commands",
  description: "Commands for the payment service",
});

const notificationCommands = await np.channels.create({
  name: "notification-commands",
  description: "Commands for the notification service",
});

// Response channel (services publish, orchestrator subscribes)
const sagaResponses = await np.channels.create({
  name: "saga-responses",
  description: "Saga step results from all services",
});

Implementing the orchestrator

The orchestrator uses message groups to ensure all messages for a single saga are processed in order. The groupKey is the order ID, guaranteeing that steps for the same order never interleave.

orchestrator.ts
import { NPayload } from "@npayload/node";

const np = new NPayload({
  instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
  token: process.env.NPAYLOAD_TOKEN,
});

interface SagaState {
  orderId: string;
  step: number;
  status: "running" | "compensating" | "completed" | "failed";
  completedSteps: string[];
}

// Start a new saga
async function startOrderSaga(order: {
  orderId: string;
  items: Array<{ sku: string; quantity: number }>;
  paymentMethod: string;
  customerEmail: string;
}) {
  const sagaState: SagaState = {
    orderId: order.orderId,
    step: 1,
    status: "running",
    completedSteps: [],
  };

  // Step 1: Reserve inventory
  await np.messages.publish({
    channelId: inventoryCommands.id,
    groupKey: order.orderId,
    routingKey: "inventory.reserve",
    payload: {
      sagaId: order.orderId,
      action: "reserve",
      items: order.items,
    },
  });

  return sagaState;
}

// Handle responses from services
async function handleSagaResponse(response: {
  sagaId: string;
  step: string;
  success: boolean;
  error?: string;
  data?: Record<string, unknown>;
}) {
  if (response.success) {
    await advanceSaga(response);
  } else {
    await compensateSaga(response);
  }
}

Advancing the saga

When a step succeeds, the orchestrator moves to the next step. Transactional publish ensures the state update and the next command are sent atomically.

advance.ts
async function advanceSaga(response: {
  sagaId: string;
  step: string;
  data?: Record<string, unknown>;
}) {
  const { sagaId, step } = response;

  if (step === "inventory.reserved") {
    // Step 1 complete, move to step 2: charge payment
    await np.messages.publishTransactional({
      messages: [
        {
          channelId: sagaResponses.id,
          groupKey: sagaId,
          payload: {
            sagaId,
            step: "inventory.reserved",
            status: "step_completed",
          },
        },
        {
          channelId: paymentCommands.id,
          groupKey: sagaId,
          routingKey: "payment.charge",
          payload: {
            sagaId,
            action: "charge",
            amount: response.data?.totalAmount,
            paymentMethod: response.data?.paymentMethod,
          },
        },
      ],
    });
  } else if (step === "payment.charged") {
    // Step 2 complete, move to step 3: send confirmation
    await np.messages.publishTransactional({
      messages: [
        {
          channelId: sagaResponses.id,
          groupKey: sagaId,
          payload: {
            sagaId,
            step: "payment.charged",
            status: "step_completed",
          },
        },
        {
          channelId: notificationCommands.id,
          groupKey: sagaId,
          routingKey: "notification.send",
          payload: {
            sagaId,
            action: "send_confirmation",
            orderId: sagaId,
          },
        },
      ],
    });
  } else if (step === "notification.sent") {
    // All steps complete
    await np.messages.publish({
      channelId: sagaResponses.id,
      groupKey: sagaId,
      payload: {
        sagaId,
        status: "saga_completed",
      },
    });
  }
}

Compensating on failure

When a step fails, the orchestrator runs compensations in reverse order for all previously completed steps.

compensate.ts
async function compensateSaga(response: {
  sagaId: string;
  step: string;
  error?: string;
}) {
  const { sagaId, step, error } = response;

  console.error(`Saga ${sagaId} failed at ${step}: ${error}`);

  if (step === "payment.charge") {
    // Payment failed, compensate step 1: release inventory
    await np.messages.publish({
      channelId: inventoryCommands.id,
      groupKey: sagaId,
      routingKey: "inventory.release",
      payload: {
        sagaId,
        action: "release",
        reason: `Payment failed: ${error}`,
      },
    });
  }

  // Record the saga failure
  await np.messages.publish({
    channelId: sagaResponses.id,
    groupKey: sagaId,
    payload: {
      sagaId,
      status: "saga_failed",
      failedAt: step,
      error,
    },
  });
}

Implementing a service participant

Each service subscribes to its command channel, executes the work, and publishes a response.

inventory-service.ts
import { NPayload } from "@npayload/node";

const np = new NPayload({
  instanceUrl: process.env.NPAYLOAD_INSTANCE_URL,
  token: process.env.NPAYLOAD_TOKEN,
});

// Webhook handler for inventory commands
async function handleInventoryCommand(message: {
  payload: { sagaId: string; action: string; items?: Array<{ sku: string; quantity: number }> };
}) {
  const { sagaId, action, items } = message.payload;

  try {
    if (action === "reserve") {
      // Reserve inventory in your database
      const reservation = await reserveInventory(items!);

      await np.messages.publish({
        channelId: sagaResponses.id,
        groupKey: sagaId,
        payload: {
          sagaId,
          step: "inventory.reserved",
          success: true,
          data: { reservationId: reservation.id, totalAmount: reservation.total },
        },
      });
    } else if (action === "release") {
      // Compensating action: release the reservation
      await releaseInventory(sagaId);

      await np.messages.publish({
        channelId: sagaResponses.id,
        groupKey: sagaId,
        payload: {
          sagaId,
          step: "inventory.released",
          success: true,
        },
      });
    }
  } catch (err) {
    await np.messages.publish({
      channelId: sagaResponses.id,
      groupKey: sagaId,
      payload: {
        sagaId,
        step: `inventory.${action}`,
        success: false,
        error: (err as Error).message,
      },
    });
  }
}

Handling partial failures

Partial failures are the hardest part of distributed transactions. Here are strategies for common scenarios:

Compensation failure

If a compensation itself fails, the message lands in the DLQ. Monitor the DLQ and handle these manually or with automated retry logic:

compensation-monitor.ts
// Check for stuck compensations
const dlqEntries = await np.dlq.list({
  channelId: inventoryCommands.id,
  limit: 50,
});

for (const entry of dlqEntries.items) {
  const payload = entry.payload as { action: string; sagaId: string };
  if (payload.action === "release") {
    console.error(
      `Compensation stuck for saga ${payload.sagaId}. Manual intervention required.`
    );
  }
}

Idempotent operations

Every saga step and compensation must be idempotent. If a message is delivered twice, the result should be the same. Use the saga ID and step name as an idempotency key in your service logic.

Timeout handling

If a service does not respond within a reasonable time, treat it as a failure and begin compensation:

timeout.ts
async function startSagaWithTimeout(order: { orderId: string }) {
  await startOrderSaga(order);

  // Set a timeout for the entire saga
  setTimeout(async () => {
    const status = await getSagaStatus(order.orderId);
    if (status !== "completed") {
      console.warn(`Saga ${order.orderId} timed out, starting compensation`);
      await compensateSaga({
        sagaId: order.orderId,
        step: status.currentStep,
        error: "Saga timed out",
      });
    }
  }, 30_000); // 30 second timeout
}

Monitoring saga execution

Use streams to observe saga progress in real time. The saga response channel serves as an audit log of every step:

saga-monitor.ts
// Stream all saga events for monitoring
const stream = await np.streams.read({
  channelId: sagaResponses.id,
  startOffset: "latest",
});

for (const event of stream.messages) {
  const { sagaId, status, step } = event.payload as {
    sagaId: string;
    status: string;
    step?: string;
  };

  console.log(`[${sagaId}] ${status} ${step ? `at ${step}` : ""}`);

  if (status === "saga_failed") {
    // Alert on failures
    await alertOps(`Saga ${sagaId} failed`);
  }
}

Anti-patterns

Sagas that are too long

Sagas with more than 5 to 7 steps become difficult to reason about and compensate. If your saga has many steps, consider breaking it into smaller, independent sagas or using a hierarchical approach where a parent saga coordinates child sagas.

Missing compensations

Every step except the final terminal step must have a compensation. Forgetting a compensation means your system can be left in an inconsistent state after a failure. Document compensations alongside each step.

Non-idempotent steps

If a payment charge step is not idempotent and the message is delivered twice, the customer gets charged twice. Always use idempotency keys or check-before-write logic in your service implementations.

Saga state in the orchestrator only

Do not rely solely on in-memory saga state in the orchestrator. If the orchestrator restarts, the saga state is lost. Persist saga state in your database and use the message stream as the recovery log.

Next steps

Was this page helpful?

On this page