UDOM Pipeline — Observability Adapter
Version: 1.3.0 | Pipeline: 1.3-udom | Tests: 33/33 passing
Ingests JSONL telemetry from the UDOM extraction pipeline and emits structured Prometheus metrics, OpenTelemetry traces, and alerting rules mapped to SDD §6 SLOs.
Architecture
JSONL telemetry stream
│
▼
┌──────────────────────┐
│ TelemetryAdapter │ ← Core ingestion engine
│ │
│ ┌─────────────────┐ │ ┌─────────────────────┐
│ │ Step Handlers │─┼───▶│ MetricBackend │ → Prometheus
│ │ (per event type)│ │ │ (protocol) │
│ └─────────────────┘ │ └─────────────────────┘
│ │
│ ┌─────────────────┐ │ ┌─────────────────────┐
│ │ TraceBuilder │─┼───▶│ TracerBackend │ → Jaeger/OTEL
│ │ (per doc_id) │ │ │ (protocol) │
│ └─────────────────┘ │ └─────────────────────┘
│ │
│ ┌─────────────────┐ │
│ │ CircuitBreaker │ │ ← Per-engine failure detection
│ │ ThroughputTrack │ │ ← Rolling window throughput
│ └─────────────────┘ │
└──────────────────────┘
Quick Start
from udom_observability import TelemetryAdapter, InMemoryMetricBackend
# 1. Create backend (swap for PrometheusBackend in production)
backend = InMemoryMetricBackend()
adapter = TelemetryAdapter(metrics=backend)
# 2. Ingest JSONL telemetry
adapter.ingest_file("telemetry.jsonl")
# 3. Query metrics
score = backend.get_latest_gauge("udom_quality_score", grade="A")
upgrade_rate = backend.get_latest_gauge("udom_fusion_upgrade_rate")
print(f"Quality: {score:.3f}, Upgrade rate: {upgrade_rate:.1%}")
Production (Prometheus scrape endpoint)
from udom_observability import TelemetryAdapter, PrometheusBackend
backend = PrometheusBackend()
backend.start_server(port=9090) # http://localhost:9090/metrics
adapter = TelemetryAdapter(metrics=backend)
# Feed from NATS subscriber, file watcher, or batch processor
adapter.ingest_file("batch-run.jsonl")
Metrics (20 total)
| Metric | Type | Maps To |
|---|---|---|
udom_extraction_duration_seconds | histogram | TDD §7.1 Latency Profile |
udom_extraction_components_total | counter | SDD §2 Component counts |
udom_extraction_assets_total | counter | SDD §2 Image assets |
udom_extraction_errors_total | counter | SDD §5 Failure Modes |
udom_fusion_matched_total | counter | ADR-002 Fusion validation |
udom_fusion_upgraded_total | counter | 3-source value measurement |
udom_fusion_upgrade_rate | gauge | Key differentiator metric |
udom_quality_score | gauge | TDD §7 / SDD §6 SLO |
udom_quality_grade_total | counter | SDD §6 Grade distribution |
udom_quality_score_distribution | histogram | Quality trend detection |
udom_pipeline_duration_seconds | histogram | SDD §6 SLO (P95 < 45s) |
udom_pipeline_in_progress | gauge | Capacity monitoring |
udom_pipeline_completed_total | counter | Throughput tracking |
udom_pipeline_failed_total | counter | SDD §6 SLO (99.5% avail) |
udom_document_size_chars | histogram | Storage planning |
udom_document_size_lines | histogram | Storage planning |
udom_component_type_total | counter | Corpus analysis |
udom_throughput_papers_per_hour | gauge | Capacity planning |
udom_circuit_breaker_state | gauge | SDD §5 / Sys Prompt §9.4 |
udom_circuit_breaker_trips_total | counter | Incident tracking |
SLOs (from SDD §6)
| SLO | Target | Alert Rule |
|---|---|---|
| Grade A rate | ≥ 98% (7d window) | UDOMQualityGradeABelowSLO |
| P95 latency | < 45s | UDOMLatencyP95AboveSLO |
| Availability | ≥ 99.5% (30d) | UDOMAvailabilityBelowSLO |
| Data integrity | 100% hash verification | UDOMDataIntegrityFailure |
Trace Structure
udom.pipeline (root span, doc_id → trace_id)
├── udom.extract_pdf [engine=docling, elapsed_s, components, assets]
├── udom.extract_html [engine=ar5iv, elapsed_s, components, assets]
├── udom.extract_latex [engine=latex, elapsed_s, components]
├── udom.mapping [engine=fusion, elapsed_s, components, types{}]
├── udom.assembly [chars, lines]
└── udom.qa_grade [grade, score, matched, upgraded]
Trace IDs are deterministic from (doc_id, batch_id) — enables cross-system correlation.
Files
udom_observability/
├── __init__.py # Public API exports
├── metrics.py # 20 MetricSpec definitions + 4 SLOs + MetricBackend protocol
├── tracer.py # TraceBuilder + TraceSpan + TracerBackend protocol
├── adapter.py # TelemetryAdapter (core), CircuitBreaker, ThroughputTracker
└── backends.py # PrometheusBackend, InMemoryMetricBackend
alerting/
└── udom_alerts.yml # 10 Prometheus alerting rules (4 groups)
dashboards/
└── udom-pipeline-grafana.json # Grafana dashboard provisioning
tests/
└── test_adapter.py # 33 tests against real telemetry trace
Design Decisions
Protocol-based backends. MetricBackend and TracerBackend are runtime-checkable protocols — no inheritance required. Swap Prometheus for StatsD, Datadog, or a test stub with zero code changes.
Deterministic trace IDs. sha256(doc_id:batch_id)[:32] — same input always produces same trace. Enables replaying telemetry without duplicate traces.
Circuit breaker per engine. Three-state (closed/half-open/open) with configurable thresholds. Maps directly to SDD §5 Graceful Degradation Matrix.
Fusion upgrade rate as first-class metric. udom_fusion_upgrade_rate quantifies the value of 3-source extraction — if this drops to zero, the architecture isn't delivering its value proposition.