Skip to content

Experience Pipeline ​

This pipeline turns raw interaction events into compact, privacy-safe Experience Units ready for fast retrieval.

Stages ​

  1. Ingest (append-only)
  • Endpoint: POST /api/v1/experience (body: ExperienceEvent.v0)
  • Idempotency: client may resend with same id β€” server must upsert (no duplicates)
  • Validation: required fields, privacy.mode present; reject malformed with 4xx (transport errors use 5xx)
  • PII handling: if privacy.mode == redact, run redaction on input/output before persisting fulltext
  • KV hints: apply kv_policy_hint when persisting (pin > compress > evict)
  1. Summarize (batch/background)
  • Grouping: by request_id/session/time windows or semantic proximity
  • Output: ExperienceUnit.v0 (1–3 sentence summary + entities/refs + quality_score)
  • Redaction: re-validate privacy; summary must not leak PII
  • Scheduling: small, frequent batches to keep freshness; backoff under load
  • Quality: compute quality_score (0..1) from signals: tests passed, feedback.rating, errors absent
  1. Index (hybrid)
  • Vector: embeddings of summary (or embedding_ref)
  • Sparse: keywords/phrases; BM25/lexical scoring
  • Filters: recency, channel, actor.type, entities, privacy.mode
  • Maintenance: periodic compaction; apply compress/evict policies

Production Implementation Framework ​

Pipeline Performance Metrics ​

typescript
interface PipelineMetrics {
  ingestion: {
    events_per_second: number;
    average_processing_time_ms: number;
    privacy_redaction_rate: number;
    error_rate: number;
  };
  
  summarization: {
    batch_processing_time_ms: number;
    quality_score_distribution: number[];
    entity_extraction_accuracy: number;
    privacy_leak_rate: number; // Must be 0
  };
  
  indexing: {
    index_update_latency_ms: number;
    vector_embedding_time_ms: number;
    search_performance_ms: number;
    index_size_mb: number;
  };
}

Testing Framework ​

typescript
describe('Experience Pipeline Tests', () => {
  test('end-to-end pipeline performance', async () => {
    const test_event = {
      id: 'test_event_001',
      type: 'user_query',
      query: 'implement user authentication',
      outcome: { success: true, quality_score: 0.85 },
      privacy: { mode: 'redact' }
    };
    
    // Stage 1: Ingest
    const ingest_start = Date.now();
    const ingest_result = await pipeline.ingest(test_event);
    const ingest_time = Date.now() - ingest_start;
    
    expect(ingest_result.stored).toBe(true);
    expect(ingest_time).toBeLessThan(50); // < 50ms ingest
    
    // Stage 2: Summarization (background)
    const summary_result = await pipeline.waitForSummarization(test_event.id, 5000);
    expect(summary_result.summary.length).toBeLessThan(300); // Concise summary
    expect(summary_result.quality_score).toBeGreaterThan(0.7);
    
    // Stage 3: Indexing
    const search_result = await pipeline.search({
      query: 'user authentication',
      top_k: 5
    });
    
    expect(search_result.results.some(r => r.id === test_event.id)).toBe(true);
    expect(search_result.latency_ms).toBeLessThan(8); // < 8ms retrieval SLA
  });
  
  test('privacy redaction works correctly', async () => {
    const event_with_pii = {
      type: 'code_review',
      content: 'User john.doe@company.com submitted PR with API key sk-123456',
      privacy: { mode: 'redact' }
    };
    
    const result = await pipeline.ingest(event_with_pii);
    const summary = await pipeline.waitForSummarization(result.id);
    
    expect(summary.summary).not.toContain('john.doe@company.com');
    expect(summary.summary).not.toContain('sk-123456');
    expect(summary.privacy_redacted).toBe(true);
  });
});

Pipeline Monitoring & Observability ​

yaml
experience_pipeline_monitoring:
  metrics:
    - name: l4_pipeline_stage_duration_seconds
      type: histogram
      buckets: [0.01, 0.05, 0.1, 0.5, 1.0, 2.0]
      labels: [stage, privacy_mode]
      
    - name: l4_privacy_redaction_success_rate
      type: gauge
      labels: [redaction_type]
      
  alerts:
    - name: L4PipelineBacklog
      condition: l4_pipeline_queue_depth > 1000
      severity: warning
      
    - name: L4PrivacyLeak
      condition: l4_privacy_leak_detected > 0
      severity: critical

Error modes and retries ​

  • 4xx (validation): return schema-shaped error with options[] e.g., return_minimal, retry_with_reduced_scope
  • 5xx (transient): include Retry-After or X-Retry-After-Ms; clients may retry with jitter
  • Backpressure: when queue depth high, ingest returns 202 Accepted with { queued: true, eta_ms }

Example 202 Accepted

json
{
	"queued": true,
	"eta_ms": 350
}
  • Privacy breach prevention: if redaction fails, store only metadata (no raw text) and mark unit privacy.mode=block

Backpressure and freshness ​

  • Targets: summarize lag p95 < 2 s under normal load
  • Degradation: reduce batch size, skip heavy enrichers, prioritize recent events
  • Health: /experience/health reports queue depth, summarize lag, index latency

Storage and retention ​

  • Raw events: TTL configurable; units kept longer with compression
  • Pin small β€œgolden” units; compress long tail; evict stale/low-signal in off-peak windows

See also:

  • ./contracts-registry.md
  • ../orchestration/retry-actions.md
  • ../orchestration/privacy.md