Connectors
Bridge npayload to Kafka, SQS, EventBridge, SNS, Azure Service Bus, GCP Pub/Sub, HTTP, and Pipes
Connectors bridge npayload channels with external messaging systems. You can ingest messages from external sources into npayload, deliver npayload messages to external targets, or do both for bidirectional sync.
How connectors work
A connector links an npayload channel to an external system. Each connector has:
- Direction. Inbound (external to npayload), outbound (npayload to external), or bidirectional.
- Provider. The external system type (Kafka, SQS, etc.).
- Credentials. Authentication details for the external system.
- Configuration. Provider-specific settings (topic name, queue URL, etc.).
Available connectors
| Provider | Inbound | Outbound | Description |
|---|---|---|---|
| Apache Kafka | Yes | Yes | Kafka topics and consumer groups |
| Amazon SQS | Yes | Yes | SQS standard and FIFO queues |
| Amazon EventBridge | No | Yes | EventBridge event buses |
| Amazon SNS | Yes | Yes | SNS topics |
| Azure Service Bus | Yes | Yes | Service Bus queues and topics |
| GCP Pub/Sub | Yes | Yes | Pub/Sub topics and subscriptions |
| HTTP | Yes | Yes | Generic HTTP endpoints (webhooks) |
| Pipes | Yes | Yes | npayload visual workflow engine |
Setting up a connector
Step 1: Register credentials
Store your external system credentials securely. Credentials are encrypted at rest and never exposed in API responses.
const credential = await npayload.connectors.createCredential({
name: 'production-kafka',
provider: 'kafka',
config: {
brokers: ['kafka-1.example.com:9092', 'kafka-2.example.com:9092'],
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
},
ssl: true,
},
});Step 2: Create the connector
Link a credential to an npayload channel with direction and provider-specific settings.
const connector = await npayload.connectors.create({
channel: 'order-events',
credentialGid: credential.gid,
provider: 'kafka',
direction: 'bidirectional',
config: {
topic: 'orders',
consumerGroup: 'npayload-ingest',
},
});const connector = await npayload.connectors.create({
channel: 'task-queue',
credentialGid: credential.gid,
provider: 'sqs',
direction: 'outbound',
config: {
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/tasks',
messageGroupId: 'default', // Required for FIFO queues
},
});const connector = await npayload.connectors.create({
channel: 'system-events',
credentialGid: credential.gid,
provider: 'eventbridge',
direction: 'outbound',
config: {
eventBusName: 'custom-events',
source: 'npayload.orders',
detailType: 'OrderEvent',
},
});const connector = await npayload.connectors.create({
channel: 'webhooks-out',
credentialGid: credential.gid,
provider: 'http',
direction: 'outbound',
config: {
url: 'https://api.partner.com/events',
method: 'POST',
headers: {
'X-API-Key': 'partner-key-123',
},
},
});Step 3: Activate the connector
Connectors are created in a paused state. Activate when ready.
await npayload.connectors.activate(connector.gid);Bidirectional sync
When a connector is set to bidirectional, messages flow in both directions:
- Messages published to the npayload channel are forwarded to the external system
- Messages arriving in the external system are ingested into the npayload channel
const connector = await npayload.connectors.create({
channel: 'shared-events',
credentialGid: credential.gid,
provider: 'kafka',
direction: 'bidirectional',
config: {
topic: 'shared-events',
consumerGroup: 'npayload-sync',
},
});Bidirectional connectors include deduplication to prevent message loops. Messages originating from the external system are tagged so they are not sent back.
Provider-specific configuration
Amazon SQS
// Standard queue
const sqsCredential = await npayload.connectors.createCredential({
name: 'aws-sqs',
provider: 'sqs',
config: {
region: 'us-east-1',
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
},
});
// FIFO queue (preserves message ordering)
await npayload.connectors.create({
channel: 'ordered-tasks',
credentialGid: sqsCredential.gid,
provider: 'sqs',
direction: 'outbound',
config: {
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/tasks.fifo',
messageGroupId: 'default',
},
});Azure Service Bus
const azureCredential = await npayload.connectors.createCredential({
name: 'azure-servicebus',
provider: 'azure-service-bus',
config: {
connectionString: process.env.AZURE_SB_CONNECTION_STRING!,
},
});
await npayload.connectors.create({
channel: 'notifications',
credentialGid: azureCredential.gid,
provider: 'azure-service-bus',
direction: 'inbound',
config: {
queueName: 'incoming-notifications',
},
});GCP Pub/Sub
const gcpCredential = await npayload.connectors.createCredential({
name: 'gcp-pubsub',
provider: 'gcp-pubsub',
config: {
projectId: 'my-project',
credentials: JSON.parse(process.env.GCP_SERVICE_ACCOUNT_JSON!),
},
});
await npayload.connectors.create({
channel: 'telemetry',
credentialGid: gcpCredential.gid,
provider: 'gcp-pubsub',
direction: 'bidirectional',
config: {
topic: 'telemetry-events',
subscription: 'npayload-ingest',
},
});Managing connectors
// List all connectors
const connectors = await npayload.connectors.list();
// Get connector status
const status = await npayload.connectors.get(connector.gid);
console.log(status.state); // "active" | "paused" | "error"
console.log(status.lastSyncAt); // Last successful sync timestamp
console.log(status.messagesIn); // Total messages ingested
console.log(status.messagesOut); // Total messages forwarded
// Pause a connector
await npayload.connectors.pause(connector.gid);
// Resume a connector
await npayload.connectors.resume(connector.gid);
// Delete a connector
await npayload.connectors.delete(connector.gid);Credential management
Credentials are stored separately from connectors so you can reuse them across multiple connectors.
// List credentials
const creds = await npayload.connectors.listCredentials();
// Update a credential (e.g., rotate a password)
await npayload.connectors.updateCredential(credential.gid, {
config: {
brokers: ['kafka-1.example.com:9092'],
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_NEW_PASSWORD!,
},
ssl: true,
},
});
// Delete a credential (fails if any connectors reference it)
await npayload.connectors.deleteCredential(credential.gid);Credential secrets are write-only. Once stored, they are never returned in API responses. You can update them but not read them back.
Best practices
- Store external credentials using the credential API rather than hardcoding them in connector configuration
- Start with a paused connector and activate after verifying the configuration
- Use bidirectional connectors only when you need two-way sync. One-way connectors are simpler to reason about
- Monitor connector status for error states. A connector in error state has stopped processing and needs attention
- Rotate external credentials through the credential update API to avoid downtime
Next steps
Was this page helpful?