IoT device messaging
Device telemetry, command channels, compacted state, and geo-distributed routing
IoT systems need bidirectional communication between devices and the cloud: telemetry flowing up, commands flowing down, and device state accessible at any time. npayload provides the messaging layer for all three.
The challenge
IoT messaging has unique requirements:
- High volume: Thousands of devices sending telemetry every second
- Bidirectional: Cloud sends commands back to devices
- Last-known state: Query the latest state of any device without processing the full history
- Geo-distribution: Devices in multiple regions need low-latency local endpoints
- Reliability: Messages must not be lost, even during network interruptions
How npayload solves it
Device telemetry (device to cloud)
Devices publish telemetry to a channel. Cloud services subscribe to process the data.
// Device publishes temperature reading
await npayload.messages.publish({
channel: 'device-telemetry',
routingKey: `device.${deviceId}.temperature`,
partitionKey: deviceId, // Ordered delivery per device
payload: {
deviceId: 'dev_001',
type: 'temperature',
value: 23.5,
unit: 'celsius',
timestamp: new Date().toISOString(),
},
});Command channels (cloud to device)
Send commands to specific devices using routing keys:
// Cloud sends a command to a specific device
await npayload.messages.publish({
channel: 'device-commands',
routingKey: `device.${deviceId}`,
payload: {
command: 'set-threshold',
parameters: { maxTemperature: 30, unit: 'celsius' },
requestId: 'req_456',
},
});// Device subscribes to its own commands
await npayload.subscriptions.create({
channel: 'device-commands',
name: `device-${deviceId}`,
type: 'webhook',
filter: { routingKey: `device.${deviceId}` },
endpoint: { url: `https://${deviceId}.local/commands` },
});Compacted channels for device state
Use compacted channels to maintain the latest state for each device. Each device's state is stored by key, and only the most recent value is kept.
// Create a compacted channel for device state
await npayload.channels.create({
name: 'device-state',
type: 'compacted',
description: 'Latest known state for each device',
});
// Update device state (overwrites previous value for this key)
await npayload.messages.publish({
channel: 'device-state',
key: deviceId, // Compaction key
payload: {
deviceId: 'dev_001',
status: 'online',
firmware: '2.3.1',
lastSeen: new Date().toISOString(),
temperature: 23.5,
battery: 87,
},
});
// Query the latest state for any device
const state = await npayload.channels.getCompactedValue('device-state', deviceId);
console.log(`Device ${deviceId} status: ${state.payload.status}`);Architecture patterns
Fleet management
Organize devices into fleets using channels and routing keys:
// Per-fleet channel
await npayload.channels.create({ name: 'fleet-warehouse-a' });
await npayload.channels.create({ name: 'fleet-warehouse-b' });
// Or use routing keys within a single channel
await npayload.messages.publish({
channel: 'fleet-telemetry',
routingKey: `warehouse-a.sensor.${sensorId}`,
payload: { temperature: 22.1, humidity: 45 },
});
// Subscribe to all sensors in a warehouse
await npayload.subscriptions.create({
channel: 'fleet-telemetry',
name: 'warehouse-a-monitor',
filter: { routingKey: 'warehouse-a.*' },
type: 'webhook',
endpoint: { url: 'https://monitor.internal/warehouse-a' },
});Geo-distributed IoT
Use cross-region messaging for devices deployed globally:
await npayload.channels.create({
name: 'global-telemetry',
crossRegion: {
enabled: true,
replicationMode: 'active-active',
regions: ['us-east-1', 'eu-west-1', 'ap-southeast-1'],
},
});Devices in Asia publish to the Asia instance with low latency. The data replicates to US and EU for global processing.
Alert processing with priority
Process critical device alerts before routine telemetry:
// Critical: temperature exceeds threshold
await npayload.messages.publish({
channel: 'device-alerts',
priority: 10,
payload: {
deviceId: 'dev_001',
alert: 'temperature-exceeded',
value: 95.2,
threshold: 80.0,
severity: 'critical',
},
});
// Routine: battery low
await npayload.messages.publish({
channel: 'device-alerts',
priority: 3,
payload: {
deviceId: 'dev_002',
alert: 'battery-low',
value: 15,
severity: 'warning',
},
});Batch telemetry upload
Devices with intermittent connectivity can batch telemetry and upload in bulk:
await npayload.messages.publishBatch({
channel: 'device-telemetry',
messages: readings.map((reading) => ({
partitionKey: deviceId,
payload: {
deviceId,
...reading,
},
})),
});Why npayload for IoT
| Feature | Benefit |
|---|---|
| Compacted channels | Instant access to last-known device state |
| Routing keys | Filter events per device, fleet, or sensor type |
| Partition keys | Ordered delivery per device |
| Batch publish | Efficient bulk telemetry upload |
| Cross-region | Low-latency local endpoints with global replication |
| Priority queues | Critical alerts processed before routine data |
| Streams | Historical replay for analysis and debugging |
| Consumer groups | Scale processing across worker pools |
Next steps
- Channels guide for compacted and standard channel types
- Cross-region messaging for geo-distributed patterns
- Streams for historical data replay
Was this page helpful?