Skip to main content
npayload is launching soon.
npayloadDocs
Guides

Connectors

Bridge npayload to Kafka, SQS, EventBridge, SNS, Azure Service Bus, GCP Pub/Sub, HTTP, and Pipes

Connectors bridge npayload channels with external messaging systems. You can ingest messages from external sources into npayload, deliver npayload messages to external targets, or do both for bidirectional sync.

How connectors work

A connector links an npayload channel to an external system. Each connector has:

  • Direction. Inbound (external to npayload), outbound (npayload to external), or bidirectional.
  • Provider. The external system type (Kafka, SQS, etc.).
  • Credentials. Authentication details for the external system.
  • Configuration. Provider-specific settings (topic name, queue URL, etc.).

Available connectors

ProviderInboundOutboundDescription
Apache KafkaYesYesKafka topics and consumer groups
Amazon SQSYesYesSQS standard and FIFO queues
Amazon EventBridgeNoYesEventBridge event buses
Amazon SNSYesYesSNS topics
Azure Service BusYesYesService Bus queues and topics
GCP Pub/SubYesYesPub/Sub topics and subscriptions
HTTPYesYesGeneric HTTP endpoints (webhooks)
PipesYesYesnpayload visual workflow engine

Setting up a connector

Step 1: Register credentials

Store your external system credentials securely. Credentials are encrypted at rest and never exposed in API responses.

const credential = await npayload.connectors.createCredential({
  name: 'production-kafka',
  provider: 'kafka',
  config: {
    brokers: ['kafka-1.example.com:9092', 'kafka-2.example.com:9092'],
    sasl: {
      mechanism: 'scram-sha-256',
      username: process.env.KAFKA_USERNAME!,
      password: process.env.KAFKA_PASSWORD!,
    },
    ssl: true,
  },
});

Step 2: Create the connector

Link a credential to an npayload channel with direction and provider-specific settings.

const connector = await npayload.connectors.create({
  channel: 'order-events',
  credentialGid: credential.gid,
  provider: 'kafka',
  direction: 'bidirectional',
  config: {
    topic: 'orders',
    consumerGroup: 'npayload-ingest',
  },
});
const connector = await npayload.connectors.create({
  channel: 'task-queue',
  credentialGid: credential.gid,
  provider: 'sqs',
  direction: 'outbound',
  config: {
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/tasks',
    messageGroupId: 'default', // Required for FIFO queues
  },
});
const connector = await npayload.connectors.create({
  channel: 'system-events',
  credentialGid: credential.gid,
  provider: 'eventbridge',
  direction: 'outbound',
  config: {
    eventBusName: 'custom-events',
    source: 'npayload.orders',
    detailType: 'OrderEvent',
  },
});
const connector = await npayload.connectors.create({
  channel: 'webhooks-out',
  credentialGid: credential.gid,
  provider: 'http',
  direction: 'outbound',
  config: {
    url: 'https://api.partner.com/events',
    method: 'POST',
    headers: {
      'X-API-Key': 'partner-key-123',
    },
  },
});

Step 3: Activate the connector

Connectors are created in a paused state. Activate when ready.

await npayload.connectors.activate(connector.gid);

Bidirectional sync

When a connector is set to bidirectional, messages flow in both directions:

  • Messages published to the npayload channel are forwarded to the external system
  • Messages arriving in the external system are ingested into the npayload channel
const connector = await npayload.connectors.create({
  channel: 'shared-events',
  credentialGid: credential.gid,
  provider: 'kafka',
  direction: 'bidirectional',
  config: {
    topic: 'shared-events',
    consumerGroup: 'npayload-sync',
  },
});

Bidirectional connectors include deduplication to prevent message loops. Messages originating from the external system are tagged so they are not sent back.

Provider-specific configuration

Amazon SQS

// Standard queue
const sqsCredential = await npayload.connectors.createCredential({
  name: 'aws-sqs',
  provider: 'sqs',
  config: {
    region: 'us-east-1',
    accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
  },
});

// FIFO queue (preserves message ordering)
await npayload.connectors.create({
  channel: 'ordered-tasks',
  credentialGid: sqsCredential.gid,
  provider: 'sqs',
  direction: 'outbound',
  config: {
    queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789/tasks.fifo',
    messageGroupId: 'default',
  },
});

Azure Service Bus

const azureCredential = await npayload.connectors.createCredential({
  name: 'azure-servicebus',
  provider: 'azure-service-bus',
  config: {
    connectionString: process.env.AZURE_SB_CONNECTION_STRING!,
  },
});

await npayload.connectors.create({
  channel: 'notifications',
  credentialGid: azureCredential.gid,
  provider: 'azure-service-bus',
  direction: 'inbound',
  config: {
    queueName: 'incoming-notifications',
  },
});

GCP Pub/Sub

const gcpCredential = await npayload.connectors.createCredential({
  name: 'gcp-pubsub',
  provider: 'gcp-pubsub',
  config: {
    projectId: 'my-project',
    credentials: JSON.parse(process.env.GCP_SERVICE_ACCOUNT_JSON!),
  },
});

await npayload.connectors.create({
  channel: 'telemetry',
  credentialGid: gcpCredential.gid,
  provider: 'gcp-pubsub',
  direction: 'bidirectional',
  config: {
    topic: 'telemetry-events',
    subscription: 'npayload-ingest',
  },
});

Managing connectors

// List all connectors
const connectors = await npayload.connectors.list();

// Get connector status
const status = await npayload.connectors.get(connector.gid);
console.log(status.state);         // "active" | "paused" | "error"
console.log(status.lastSyncAt);    // Last successful sync timestamp
console.log(status.messagesIn);    // Total messages ingested
console.log(status.messagesOut);   // Total messages forwarded

// Pause a connector
await npayload.connectors.pause(connector.gid);

// Resume a connector
await npayload.connectors.resume(connector.gid);

// Delete a connector
await npayload.connectors.delete(connector.gid);

Credential management

Credentials are stored separately from connectors so you can reuse them across multiple connectors.

// List credentials
const creds = await npayload.connectors.listCredentials();

// Update a credential (e.g., rotate a password)
await npayload.connectors.updateCredential(credential.gid, {
  config: {
    brokers: ['kafka-1.example.com:9092'],
    sasl: {
      mechanism: 'scram-sha-256',
      username: process.env.KAFKA_USERNAME!,
      password: process.env.KAFKA_NEW_PASSWORD!,
    },
    ssl: true,
  },
});

// Delete a credential (fails if any connectors reference it)
await npayload.connectors.deleteCredential(credential.gid);

Credential secrets are write-only. Once stored, they are never returned in API responses. You can update them but not read them back.

Best practices

  • Store external credentials using the credential API rather than hardcoding them in connector configuration
  • Start with a paused connector and activate after verifying the configuration
  • Use bidirectional connectors only when you need two-way sync. One-way connectors are simpler to reason about
  • Monitor connector status for error states. A connector in error state has stopped processing and needs attention
  • Rotate external credentials through the credential update API to avoid downtime

Next steps

Was this page helpful?

On this page