PipesPatterns
AI pipeline
Build autonomous AI data processing workflows with classification, extraction, and generation
The AI pipeline pattern uses Pipes to build end-to-end intelligent processing workflows. Data flows in from channels or external sources, gets classified, enriched, analyzed, and acted upon, all within your npayload instance.
Pattern: autonomous data processing
This pattern powers the npayload marketplace use case where data providers publish real-world events and autonomous systems subscribe, process, and act on them.
const workflow = await npayload.pipes.create({
name: 'market-data-processor',
trigger: {
type: 'event',
channel: 'market-data', // Subscribed via marketplace
},
});
// Step 1: Load and validate incoming data
workflow.addNode({
id: 'validate',
type: 'filter',
expression: '{{ payload.symbol != null && payload.price != null }}',
});
// Step 2: Transform to internal format
workflow.addNode({
id: 'normalize',
type: 'transform',
expression: {
symbol: '{{ payload.symbol }}',
price: '{{ Number(payload.price) }}',
volume: '{{ Number(payload.volume || 0) }}',
timestamp: '{{ payload.timestamp }}',
source: '{{ payload.provider }}',
},
});
// Step 3: AI analysis
workflow.addNode({
id: 'analyze',
type: 'agent',
model: 'claude-sonnet-4-20250514',
systemPrompt: `You are a market data analyst. Analyze the incoming price data
and determine if it represents an anomaly, trend change, or normal movement.
Consider historical context from the provided tools.`,
tools: ['get-price-history', 'get-market-stats'],
input: '{{ nodes["normalize"].output }}',
});
// Step 4: Route by analysis result
workflow.addNode({
id: 'route',
type: 'condition',
expression: '{{ nodes["analyze"].output.anomaly === true }}',
});
// Step 5a: Alert on anomalies
workflow.addNode({
id: 'alert',
type: 'http',
connector: 'my-slack',
operation: 'sendMessage',
params: {
channel: '#market-alerts',
text: 'Anomaly detected: {{ nodes["analyze"].output.summary }}',
},
});
// Step 5b: Store for analytics
workflow.addNode({
id: 'store',
type: 'publish',
channel: 'analyzed-market-data',
payload: {
...nodes["normalize"].output,
analysis: '{{ nodes["analyze"].output }}',
},
});
workflow.connect('route', 'alert', { branch: 'true' });
workflow.connect('route', 'store', { branch: 'false' });
// Also store anomalies
workflow.connect('alert', 'store');
await npayload.pipes.activate(workflow.id);Pattern: content moderation pipeline
Automatically moderate user-generated content:
const workflow = await npayload.pipes.create({
name: 'content-moderator',
trigger: { type: 'event', channel: 'user-content' },
});
// Classify content
workflow.addNode({
id: 'classify',
type: 'classify',
input: '{{ payload.text }}',
labels: ['safe', 'spam', 'offensive', 'needs-review'],
});
// Route by classification
workflow.addNode({
id: 'route',
type: 'switch',
field: '{{ nodes["classify"].output.category }}',
cases: {
'safe': 'approve',
'spam': 'reject',
'offensive': 'reject',
'needs-review': 'human-review',
},
});
// Auto-approve safe content
workflow.addNode({
id: 'approve',
type: 'mutation',
operation: 'content.approve',
params: { contentId: '{{ payload.contentId }}' },
});
// Auto-reject with reason
workflow.addNode({
id: 'reject',
type: 'mutation',
operation: 'content.reject',
params: {
contentId: '{{ payload.contentId }}',
reason: '{{ nodes["classify"].output.category }}',
confidence: '{{ nodes["classify"].output.confidence }}',
},
});
// Queue for human review
workflow.addNode({
id: 'human-review',
type: 'publish',
channel: 'moderation-queue',
payload: {
contentId: '{{ payload.contentId }}',
text: '{{ payload.text }}',
aiClassification: '{{ nodes["classify"].output }}',
},
});Pattern: document processing
Extract, classify, and store information from incoming documents:
const workflow = await npayload.pipes.create({
name: 'document-processor',
trigger: { type: 'event', channel: 'incoming-documents' },
});
// Extract structured data
workflow.addNode({
id: 'extract',
type: 'extract',
input: '{{ payload.content }}',
schema: {
documentType: { type: 'string', enum: ['invoice', 'contract', 'receipt', 'other'] },
parties: { type: 'array', items: { type: 'string' } },
dates: { type: 'array', items: { type: 'string' } },
amounts: { type: 'array', items: { type: 'number' } },
keyTerms: { type: 'array', items: { type: 'string' } },
},
});
// Generate summary
workflow.addNode({
id: 'summarize',
type: 'generate',
prompt: `
Summarize this {{ nodes["extract"].output.documentType }} in 2-3 sentences.
Key parties: {{ nodes["extract"].output.parties.join(", ") }}
Key amounts: {{ nodes["extract"].output.amounts.join(", ") }}
Document: {{ payload.content }}
`,
maxTokens: 200,
});
// Store processed document
workflow.addNode({
id: 'store',
type: 'publish',
channel: 'processed-documents',
payload: {
documentId: '{{ payload.documentId }}',
extracted: '{{ nodes["extract"].output }}',
summary: '{{ nodes["summarize"].output }}',
processedAt: '{{ new Date().toISOString() }}',
},
});Next steps
- Using AI nodes for detailed AI node configuration
- Event processing for non-AI event pipelines
- Data streaming for stream processing use cases
Was this page helpful?