Processing Pipeline
How inbound webhooks are validated, matched, and converted into interactions.
This page describes the full processing pipeline from when a webhook arrives to when an interaction is created.
Pipeline stages
1. Source Lookup
2. Signature Verification
3. Source Schema Validation
4. Mapping Selection
5. Mapping Schema Validation
6. Deduplication Check
7. Interaction Creation1. Source lookup
The source is resolved from the webhook URL:
/api/inbound/:slug/:sourceId— lookup by UUID/api/inbound/:slug/by-mnemonic/:mnemonic— lookup by mnemonic
Source lookups are cached for 60 seconds. If the source is not found or inactive, the request is rejected with 404.
2. Signature verification
If the source has a verification configuration, the request signature is verified:
- Read the raw request body (captured before JSON parsing)
- Extract the signature from the configured header
- Strip the signature prefix if configured (e.g.
sha256=) - Compute the expected signature using the secret and algorithm
- Compare using timing-safe comparison
If verification fails, the request is rejected with 401.
Replay protection: If timestampField and replayTolerance are configured, the event timestamp is checked. Events older than the tolerance window are rejected.
3. Source schema validation
If the source has a payloadSchema, the parsed payload is validated against it using AJV. Validation errors return 400 with details:
{
"error": "Source schema validation failed",
"details": [
{ "path": "/id", "message": "must match pattern \"^evt_\"" }
]
}4. Mapping selection
All active mappings for the source are loaded and evaluated:
- Sort by
priority(highest first) - For each mapping, evaluate all filter conditions against the payload
- The first mapping where all conditions match is selected
If no mapping matches, the webhook is acknowledged with 200 but no interaction is created.
5. Mapping schema validation
If the matched mapping has a payloadSchema:
- If
schemaRootPathis set, extract the sub-object at that path - Validate the (sub-)object against the schema
Validation errors return 400 with details.
6. Deduplication check
If the mapping has an idempotencyPath:
- Extract the value at that path from the payload
- Query for existing interactions with the same resource type, action, and idempotency value
- If a duplicate exists, return
200with{ "duplicate": true }
7. Interaction creation
Build and insert the interaction:
- Extract the resource ID from
resourceIdPath - If
fieldMappingsare defined, extract each field from the payload. If not, use the entire payload as the interaction object - Extract metadata paths (
userId,sessionId,uowId, etc.) and merge them into the interaction with_prefixed keys - Auto-generate a
uowIdif not mapped - Insert the interaction with the source's
originId, mapping'sresourceTypeIdandactionId
Queue vs sync processing
Queue mode (production)
In production, webhooks are enqueued for async processing via QStash:
Webhook → Source Validation → Enqueue → QStash → Platform Ingest → Full PipelineThis provides reliable delivery with automatic retries on transient failures.
Sync mode (development)
For local testing, append ?sync=true to bypass the queue and process the full pipeline in a single request:
Webhook → Full Pipeline → Response with interaction IDSync mode is disabled when NODE_ENV=production.
Error handling
| Stage | HTTP Status | Response |
|---|---|---|
| Source not found | 404 | { "error": "Source not found" } |
| Source inactive | 404 | { "error": "Source not found" } |
| Signature invalid | 401 | { "error": "Signature verification failed" } |
| Replay detected | 401 | { "error": "Event too old" } |
| Source schema fail | 400 | { "error": "Source schema validation failed", "details": [...] } |
| No mapping match | 200 | { "message": "No matching mapping found" } |
| Mapping schema fail | 400 | { "error": "Mapping schema validation failed", "details": [...] } |
| Duplicate detected | 200 | { "duplicate": true } |
| Success | 200 | { "interactionId": "uuid" } |
SDK support
All Provenance SDKs include inbound source and mapping management:
// Node.js / TypeScript SDK
const api = await provenance.api();
// Sources
const sources = await api.inboundSources.list();
const source = await api.inboundSources.create({ title: 'Stripe', mnemonic: 'STRIPE', ... });
// Mappings
const mappings = await api.inboundMappings.list();
const mapping = await api.inboundMappings.create({ sourceId: source.id, ... });
// Test a mapping
const result = await api.inboundMappings.test(mapping.id, { payload: { ... } });# Python SDK
api = await provenance.api()
sources = await api.inbound_sources.list()
source = await api.inbound_sources.create({...})
mappings = await api.inbound_mappings.list()
result = await api.inbound_mappings.test(mapping_id, {"payload": {...}})// Java SDK
ProvenanceAPI api = provenance.api();
Map sources = api.inboundSources().list();
Map source = api.inboundSources().create(Map.of("title", "Stripe", ...));
Map result = api.testInboundMapping(mappingId, Map.of("payload", Map.of(...)));MCP tools
The MCP server exposes inbound management tools for AI agents:
get_inbound_sources— list all sourcescreate_inbound_source— create a new sourceget_inbound_mappings— list all mappingscreate_inbound_mapping— create a new mappingtest_inbound_mapping— test a mapping with a sample payload