Experience Pipeline β
This pipeline turns raw interaction events into compact, privacy-safe Experience Units ready for fast retrieval.
Stages β
- Ingest (append-only)
- Endpoint: POST /api/v1/experience (body: ExperienceEvent.v0)
- Idempotency: client may resend with same id β server must upsert (no duplicates)
- Validation: required fields, privacy.mode present; reject malformed with 4xx (transport errors use 5xx)
- PII handling: if privacy.mode == redact, run redaction on input/output before persisting fulltext
- KV hints: apply kv_policy_hint when persisting (pin > compress > evict)
- Summarize (batch/background)
- Grouping: by request_id/session/time windows or semantic proximity
- Output: ExperienceUnit.v0 (1β3 sentence summary + entities/refs + quality_score)
- Redaction: re-validate privacy; summary must not leak PII
- Scheduling: small, frequent batches to keep freshness; backoff under load
- Quality: compute quality_score (0..1) from signals: tests passed, feedback.rating, errors absent
- Index (hybrid)
- Vector: embeddings of summary (or embedding_ref)
- Sparse: keywords/phrases; BM25/lexical scoring
- Filters: recency, channel, actor.type, entities, privacy.mode
- Maintenance: periodic compaction; apply compress/evict policies
Production Implementation Framework β
Pipeline Performance Metrics β
typescript
interface PipelineMetrics {
ingestion: {
events_per_second: number;
average_processing_time_ms: number;
privacy_redaction_rate: number;
error_rate: number;
};
summarization: {
batch_processing_time_ms: number;
quality_score_distribution: number[];
entity_extraction_accuracy: number;
privacy_leak_rate: number; // Must be 0
};
indexing: {
index_update_latency_ms: number;
vector_embedding_time_ms: number;
search_performance_ms: number;
index_size_mb: number;
};
}
Testing Framework β
typescript
describe('Experience Pipeline Tests', () => {
test('end-to-end pipeline performance', async () => {
const test_event = {
id: 'test_event_001',
type: 'user_query',
query: 'implement user authentication',
outcome: { success: true, quality_score: 0.85 },
privacy: { mode: 'redact' }
};
// Stage 1: Ingest
const ingest_start = Date.now();
const ingest_result = await pipeline.ingest(test_event);
const ingest_time = Date.now() - ingest_start;
expect(ingest_result.stored).toBe(true);
expect(ingest_time).toBeLessThan(50); // < 50ms ingest
// Stage 2: Summarization (background)
const summary_result = await pipeline.waitForSummarization(test_event.id, 5000);
expect(summary_result.summary.length).toBeLessThan(300); // Concise summary
expect(summary_result.quality_score).toBeGreaterThan(0.7);
// Stage 3: Indexing
const search_result = await pipeline.search({
query: 'user authentication',
top_k: 5
});
expect(search_result.results.some(r => r.id === test_event.id)).toBe(true);
expect(search_result.latency_ms).toBeLessThan(8); // < 8ms retrieval SLA
});
test('privacy redaction works correctly', async () => {
const event_with_pii = {
type: 'code_review',
content: 'User john.doe@company.com submitted PR with API key sk-123456',
privacy: { mode: 'redact' }
};
const result = await pipeline.ingest(event_with_pii);
const summary = await pipeline.waitForSummarization(result.id);
expect(summary.summary).not.toContain('john.doe@company.com');
expect(summary.summary).not.toContain('sk-123456');
expect(summary.privacy_redacted).toBe(true);
});
});
Pipeline Monitoring & Observability β
yaml
experience_pipeline_monitoring:
metrics:
- name: l4_pipeline_stage_duration_seconds
type: histogram
buckets: [0.01, 0.05, 0.1, 0.5, 1.0, 2.0]
labels: [stage, privacy_mode]
- name: l4_privacy_redaction_success_rate
type: gauge
labels: [redaction_type]
alerts:
- name: L4PipelineBacklog
condition: l4_pipeline_queue_depth > 1000
severity: warning
- name: L4PrivacyLeak
condition: l4_privacy_leak_detected > 0
severity: critical
Error modes and retries β
- 4xx (validation): return schema-shaped error with options[] e.g., return_minimal, retry_with_reduced_scope
- 5xx (transient): include Retry-After or X-Retry-After-Ms; clients may retry with jitter
- Backpressure: when queue depth high, ingest returns 202 Accepted with
{ queued: true, eta_ms }
Example 202 Accepted
json
{
"queued": true,
"eta_ms": 350
}
- Privacy breach prevention: if redaction fails, store only metadata (no raw text) and mark unit privacy.mode=block
Backpressure and freshness β
- Targets: summarize lag p95 < 2 s under normal load
- Degradation: reduce batch size, skip heavy enrichers, prioritize recent events
- Health: /experience/health reports queue depth, summarize lag, index latency
Storage and retention β
- Raw events: TTL configurable; units kept longer with compression
- Pin small βgoldenβ units; compress long tail; evict stale/low-signal in off-peak windows
See also:
- ./contracts-registry.md
- ../orchestration/retry-actions.md
- ../orchestration/privacy.md