Skip to main content
npayload is launching soon.
npayloadDocs
PipesPatterns

AI pipeline

Build autonomous AI data processing workflows with classification, extraction, and generation

The AI pipeline pattern uses Pipes to build end-to-end intelligent processing workflows. Data flows in from channels or external sources, gets classified, enriched, analyzed, and acted upon, all within your npayload instance.

Pattern: autonomous data processing

This pattern powers the npayload marketplace use case where data providers publish real-world events and autonomous systems subscribe, process, and act on them.

const workflow = await npayload.pipes.create({
  name: 'market-data-processor',
  trigger: {
    type: 'event',
    channel: 'market-data', // Subscribed via marketplace
  },
});

// Step 1: Load and validate incoming data
workflow.addNode({
  id: 'validate',
  type: 'filter',
  expression: '{{ payload.symbol != null && payload.price != null }}',
});

// Step 2: Transform to internal format
workflow.addNode({
  id: 'normalize',
  type: 'transform',
  expression: {
    symbol: '{{ payload.symbol }}',
    price: '{{ Number(payload.price) }}',
    volume: '{{ Number(payload.volume || 0) }}',
    timestamp: '{{ payload.timestamp }}',
    source: '{{ payload.provider }}',
  },
});

// Step 3: AI analysis
workflow.addNode({
  id: 'analyze',
  type: 'agent',
  model: 'claude-sonnet-4-20250514',
  systemPrompt: `You are a market data analyst. Analyze the incoming price data
    and determine if it represents an anomaly, trend change, or normal movement.
    Consider historical context from the provided tools.`,
  tools: ['get-price-history', 'get-market-stats'],
  input: '{{ nodes["normalize"].output }}',
});

// Step 4: Route by analysis result
workflow.addNode({
  id: 'route',
  type: 'condition',
  expression: '{{ nodes["analyze"].output.anomaly === true }}',
});

// Step 5a: Alert on anomalies
workflow.addNode({
  id: 'alert',
  type: 'http',
  connector: 'my-slack',
  operation: 'sendMessage',
  params: {
    channel: '#market-alerts',
    text: 'Anomaly detected: {{ nodes["analyze"].output.summary }}',
  },
});

// Step 5b: Store for analytics
workflow.addNode({
  id: 'store',
  type: 'publish',
  channel: 'analyzed-market-data',
  payload: {
    ...nodes["normalize"].output,
    analysis: '{{ nodes["analyze"].output }}',
  },
});

workflow.connect('route', 'alert', { branch: 'true' });
workflow.connect('route', 'store', { branch: 'false' });
// Also store anomalies
workflow.connect('alert', 'store');

await npayload.pipes.activate(workflow.id);

Pattern: content moderation pipeline

Automatically moderate user-generated content:

const workflow = await npayload.pipes.create({
  name: 'content-moderator',
  trigger: { type: 'event', channel: 'user-content' },
});

// Classify content
workflow.addNode({
  id: 'classify',
  type: 'classify',
  input: '{{ payload.text }}',
  labels: ['safe', 'spam', 'offensive', 'needs-review'],
});

// Route by classification
workflow.addNode({
  id: 'route',
  type: 'switch',
  field: '{{ nodes["classify"].output.category }}',
  cases: {
    'safe': 'approve',
    'spam': 'reject',
    'offensive': 'reject',
    'needs-review': 'human-review',
  },
});

// Auto-approve safe content
workflow.addNode({
  id: 'approve',
  type: 'mutation',
  operation: 'content.approve',
  params: { contentId: '{{ payload.contentId }}' },
});

// Auto-reject with reason
workflow.addNode({
  id: 'reject',
  type: 'mutation',
  operation: 'content.reject',
  params: {
    contentId: '{{ payload.contentId }}',
    reason: '{{ nodes["classify"].output.category }}',
    confidence: '{{ nodes["classify"].output.confidence }}',
  },
});

// Queue for human review
workflow.addNode({
  id: 'human-review',
  type: 'publish',
  channel: 'moderation-queue',
  payload: {
    contentId: '{{ payload.contentId }}',
    text: '{{ payload.text }}',
    aiClassification: '{{ nodes["classify"].output }}',
  },
});

Pattern: document processing

Extract, classify, and store information from incoming documents:

const workflow = await npayload.pipes.create({
  name: 'document-processor',
  trigger: { type: 'event', channel: 'incoming-documents' },
});

// Extract structured data
workflow.addNode({
  id: 'extract',
  type: 'extract',
  input: '{{ payload.content }}',
  schema: {
    documentType: { type: 'string', enum: ['invoice', 'contract', 'receipt', 'other'] },
    parties: { type: 'array', items: { type: 'string' } },
    dates: { type: 'array', items: { type: 'string' } },
    amounts: { type: 'array', items: { type: 'number' } },
    keyTerms: { type: 'array', items: { type: 'string' } },
  },
});

// Generate summary
workflow.addNode({
  id: 'summarize',
  type: 'generate',
  prompt: `
    Summarize this {{ nodes["extract"].output.documentType }} in 2-3 sentences.
    Key parties: {{ nodes["extract"].output.parties.join(", ") }}
    Key amounts: {{ nodes["extract"].output.amounts.join(", ") }}

    Document: {{ payload.content }}
  `,
  maxTokens: 200,
});

// Store processed document
workflow.addNode({
  id: 'store',
  type: 'publish',
  channel: 'processed-documents',
  payload: {
    documentId: '{{ payload.documentId }}',
    extracted: '{{ nodes["extract"].output }}',
    summary: '{{ nodes["summarize"].output }}',
    processedAt: '{{ new Date().toISOString() }}',
  },
});

Next steps

Was this page helpful?

On this page