Consumer groups
Shared subscriptions with automatic load balancing across multiple consumers
A consumer group is a set of consumers that share a subscription. Each message is delivered to exactly one consumer in the group, distributing the workload automatically. This lets you scale message processing horizontally by adding more consumers.
How consumer groups work
When a message is published to a channel with a consumer group subscription, npayload selects one consumer from the group and delivers the message to that consumer. The selection uses round-robin by default, ensuring an even distribution of messages.
| Concept | Description |
|---|---|
| Consumer group | A named group of consumers sharing a single subscription |
| Consumer | An individual member of the group that processes messages |
| Rebalancing | Automatic redistribution of work when consumers join or leave |
| Heartbeat | Periodic signal from a consumer to indicate it is still alive |
Creating a consumer group
const group = await npayload.consumerGroups.create({
channel: 'tasks',
name: 'task-workers',
});Adding consumers
Register individual consumers as members of the group.
await npayload.consumerGroups.addConsumer({
group: 'task-workers',
consumerId: 'worker-1',
endpoint: {
url: 'https://workers.example.com/worker-1/tasks',
},
});
await npayload.consumerGroups.addConsumer({
group: 'task-workers',
consumerId: 'worker-2',
endpoint: {
url: 'https://workers.example.com/worker-2/tasks',
},
});Rebalancing
When a consumer joins or leaves the group, npayload automatically rebalances the workload. Messages in flight are not affected. Only future deliveries are redistributed.
| Event | What happens |
|---|---|
| Consumer joins | Future messages are distributed to include the new consumer |
| Consumer leaves | Unacknowledged messages are reassigned to remaining consumers |
| Consumer fails heartbeat | Treated as a leave. Messages are redistributed after a grace period. |
Rebalancing does not interrupt messages that are already being processed. In-flight deliveries complete on their current consumer before new assignments take effect.
Consumer heartbeats
Each consumer sends periodic heartbeats to indicate it is alive and able to process messages. If a consumer misses heartbeats beyond the configured threshold, it is removed from the group and its pending messages are reassigned.
// Heartbeats are sent automatically by the SDK
// Configure the interval when creating the consumer
await npayload.consumerGroups.addConsumer({
group: 'task-workers',
consumerId: 'worker-3',
endpoint: {
url: 'https://workers.example.com/worker-3/tasks',
},
heartbeat: {
intervalMs: 10000, // Send heartbeat every 10 seconds
timeoutMs: 30000, // Remove after 30 seconds of silence
},
});Acknowledging messages
Consumers must acknowledge messages after processing. Unacknowledged messages are redelivered to another consumer after the acknowledgement timeout.
// In your webhook handler
app.post('/tasks', async (req, res) => {
const { messageId, groupId } = req.body;
try {
await processTask(req.body.payload);
// Acknowledge successful processing
await npayload.consumerGroups.ack({
group: 'task-workers',
messageId,
});
res.status(200).json({ received: true });
} catch (error) {
// Negative acknowledgement: message is redelivered to another consumer
await npayload.consumerGroups.nack({
group: 'task-workers',
messageId,
});
res.status(500).json({ error: 'Processing failed' });
}
});Offset management
Consumer groups track a shared offset for the group. Individual consumers within the group do not maintain separate offsets, ensuring that every message is processed exactly once by the group as a whole.
// Check the group's current offset
const status = await npayload.consumerGroups.status('task-workers');
console.log('Current offset:', status.offset);
console.log('Lag:', status.lag); // Messages behind the latest
console.log('Active consumers:', status.activeConsumers);Removing consumers
// Remove a specific consumer
await npayload.consumerGroups.removeConsumer({
group: 'task-workers',
consumerId: 'worker-2',
});Removing a consumer triggers rebalancing. Any unacknowledged messages assigned to that consumer are reassigned to the remaining members.
Use cases
| Use case | How consumer groups help |
|---|---|
| Horizontal scaling | Add more consumers to handle increased message volume |
| Worker pools | Distribute tasks across a pool of identical workers |
| Rolling deployments | Drain one consumer, deploy, rejoin. The group handles redistribution. |
| Geographic distribution | Place consumers in different regions for lower latency |
Next steps
- Streams to read messages at your own pace with offset tracking
- Subscriptions to understand other delivery modes
- Dead letter queue to handle messages that fail processing
Was this page helpful?