Skip to main content
npayload is launching soon.
npayloadDocs
Concepts

Consumer groups

Shared subscriptions with automatic load balancing across multiple consumers

A consumer group is a set of consumers that share a subscription. Each message is delivered to exactly one consumer in the group, distributing the workload automatically. This lets you scale message processing horizontally by adding more consumers.

How consumer groups work

When a message is published to a channel with a consumer group subscription, npayload selects one consumer from the group and delivers the message to that consumer. The selection uses round-robin by default, ensuring an even distribution of messages.

ConceptDescription
Consumer groupA named group of consumers sharing a single subscription
ConsumerAn individual member of the group that processes messages
RebalancingAutomatic redistribution of work when consumers join or leave
HeartbeatPeriodic signal from a consumer to indicate it is still alive

Creating a consumer group

const group = await npayload.consumerGroups.create({
  channel: 'tasks',
  name: 'task-workers',
});

Adding consumers

Register individual consumers as members of the group.

await npayload.consumerGroups.addConsumer({
  group: 'task-workers',
  consumerId: 'worker-1',
  endpoint: {
    url: 'https://workers.example.com/worker-1/tasks',
  },
});

await npayload.consumerGroups.addConsumer({
  group: 'task-workers',
  consumerId: 'worker-2',
  endpoint: {
    url: 'https://workers.example.com/worker-2/tasks',
  },
});

Rebalancing

When a consumer joins or leaves the group, npayload automatically rebalances the workload. Messages in flight are not affected. Only future deliveries are redistributed.

EventWhat happens
Consumer joinsFuture messages are distributed to include the new consumer
Consumer leavesUnacknowledged messages are reassigned to remaining consumers
Consumer fails heartbeatTreated as a leave. Messages are redistributed after a grace period.

Rebalancing does not interrupt messages that are already being processed. In-flight deliveries complete on their current consumer before new assignments take effect.

Consumer heartbeats

Each consumer sends periodic heartbeats to indicate it is alive and able to process messages. If a consumer misses heartbeats beyond the configured threshold, it is removed from the group and its pending messages are reassigned.

// Heartbeats are sent automatically by the SDK
// Configure the interval when creating the consumer
await npayload.consumerGroups.addConsumer({
  group: 'task-workers',
  consumerId: 'worker-3',
  endpoint: {
    url: 'https://workers.example.com/worker-3/tasks',
  },
  heartbeat: {
    intervalMs: 10000,  // Send heartbeat every 10 seconds
    timeoutMs: 30000,   // Remove after 30 seconds of silence
  },
});

Acknowledging messages

Consumers must acknowledge messages after processing. Unacknowledged messages are redelivered to another consumer after the acknowledgement timeout.

// In your webhook handler
app.post('/tasks', async (req, res) => {
  const { messageId, groupId } = req.body;

  try {
    await processTask(req.body.payload);

    // Acknowledge successful processing
    await npayload.consumerGroups.ack({
      group: 'task-workers',
      messageId,
    });

    res.status(200).json({ received: true });
  } catch (error) {
    // Negative acknowledgement: message is redelivered to another consumer
    await npayload.consumerGroups.nack({
      group: 'task-workers',
      messageId,
    });

    res.status(500).json({ error: 'Processing failed' });
  }
});

Offset management

Consumer groups track a shared offset for the group. Individual consumers within the group do not maintain separate offsets, ensuring that every message is processed exactly once by the group as a whole.

// Check the group's current offset
const status = await npayload.consumerGroups.status('task-workers');

console.log('Current offset:', status.offset);
console.log('Lag:', status.lag); // Messages behind the latest
console.log('Active consumers:', status.activeConsumers);

Removing consumers

// Remove a specific consumer
await npayload.consumerGroups.removeConsumer({
  group: 'task-workers',
  consumerId: 'worker-2',
});

Removing a consumer triggers rebalancing. Any unacknowledged messages assigned to that consumer are reassigned to the remaining members.

Use cases

Use caseHow consumer groups help
Horizontal scalingAdd more consumers to handle increased message volume
Worker poolsDistribute tasks across a pool of identical workers
Rolling deploymentsDrain one consumer, deploy, rejoin. The group handles redistribution.
Geographic distributionPlace consumers in different regions for lower latency

Next steps

Was this page helpful?

On this page