Skip to main content

UDOM Pipeline — Observability Adapter

Version: 1.3.0 | Pipeline: 1.3-udom | Tests: 33/33 passing

Ingests JSONL telemetry from the UDOM extraction pipeline and emits structured Prometheus metrics, OpenTelemetry traces, and alerting rules mapped to SDD §6 SLOs.

Architecture

JSONL telemetry stream


┌──────────────────────┐
│ TelemetryAdapter │ ← Core ingestion engine
│ │
│ ┌─────────────────┐ │ ┌─────────────────────┐
│ │ Step Handlers │─┼───▶│ MetricBackend │ → Prometheus
│ │ (per event type)│ │ │ (protocol) │
│ └─────────────────┘ │ └─────────────────────┘
│ │
│ ┌─────────────────┐ │ ┌─────────────────────┐
│ │ TraceBuilder │─┼───▶│ TracerBackend │ → Jaeger/OTEL
│ │ (per doc_id) │ │ │ (protocol) │
│ └─────────────────┘ │ └─────────────────────┘
│ │
│ ┌─────────────────┐ │
│ │ CircuitBreaker │ │ ← Per-engine failure detection
│ │ ThroughputTrack │ │ ← Rolling window throughput
│ └─────────────────┘ │
└──────────────────────┘

Quick Start

from udom_observability import TelemetryAdapter, InMemoryMetricBackend

# 1. Create backend (swap for PrometheusBackend in production)
backend = InMemoryMetricBackend()
adapter = TelemetryAdapter(metrics=backend)

# 2. Ingest JSONL telemetry
adapter.ingest_file("telemetry.jsonl")

# 3. Query metrics
score = backend.get_latest_gauge("udom_quality_score", grade="A")
upgrade_rate = backend.get_latest_gauge("udom_fusion_upgrade_rate")
print(f"Quality: {score:.3f}, Upgrade rate: {upgrade_rate:.1%}")

Production (Prometheus scrape endpoint)

from udom_observability import TelemetryAdapter, PrometheusBackend

backend = PrometheusBackend()
backend.start_server(port=9090) # http://localhost:9090/metrics

adapter = TelemetryAdapter(metrics=backend)
# Feed from NATS subscriber, file watcher, or batch processor
adapter.ingest_file("batch-run.jsonl")

Metrics (20 total)

MetricTypeMaps To
udom_extraction_duration_secondshistogramTDD §7.1 Latency Profile
udom_extraction_components_totalcounterSDD §2 Component counts
udom_extraction_assets_totalcounterSDD §2 Image assets
udom_extraction_errors_totalcounterSDD §5 Failure Modes
udom_fusion_matched_totalcounterADR-002 Fusion validation
udom_fusion_upgraded_totalcounter3-source value measurement
udom_fusion_upgrade_rategaugeKey differentiator metric
udom_quality_scoregaugeTDD §7 / SDD §6 SLO
udom_quality_grade_totalcounterSDD §6 Grade distribution
udom_quality_score_distributionhistogramQuality trend detection
udom_pipeline_duration_secondshistogramSDD §6 SLO (P95 < 45s)
udom_pipeline_in_progressgaugeCapacity monitoring
udom_pipeline_completed_totalcounterThroughput tracking
udom_pipeline_failed_totalcounterSDD §6 SLO (99.5% avail)
udom_document_size_charshistogramStorage planning
udom_document_size_lineshistogramStorage planning
udom_component_type_totalcounterCorpus analysis
udom_throughput_papers_per_hourgaugeCapacity planning
udom_circuit_breaker_stategaugeSDD §5 / Sys Prompt §9.4
udom_circuit_breaker_trips_totalcounterIncident tracking

SLOs (from SDD §6)

SLOTargetAlert Rule
Grade A rate≥ 98% (7d window)UDOMQualityGradeABelowSLO
P95 latency< 45sUDOMLatencyP95AboveSLO
Availability≥ 99.5% (30d)UDOMAvailabilityBelowSLO
Data integrity100% hash verificationUDOMDataIntegrityFailure

Trace Structure

udom.pipeline (root span, doc_id → trace_id)
├── udom.extract_pdf [engine=docling, elapsed_s, components, assets]
├── udom.extract_html [engine=ar5iv, elapsed_s, components, assets]
├── udom.extract_latex [engine=latex, elapsed_s, components]
├── udom.mapping [engine=fusion, elapsed_s, components, types{}]
├── udom.assembly [chars, lines]
└── udom.qa_grade [grade, score, matched, upgraded]

Trace IDs are deterministic from (doc_id, batch_id) — enables cross-system correlation.

Files

udom_observability/
├── __init__.py # Public API exports
├── metrics.py # 20 MetricSpec definitions + 4 SLOs + MetricBackend protocol
├── tracer.py # TraceBuilder + TraceSpan + TracerBackend protocol
├── adapter.py # TelemetryAdapter (core), CircuitBreaker, ThroughputTracker
└── backends.py # PrometheusBackend, InMemoryMetricBackend
alerting/
└── udom_alerts.yml # 10 Prometheus alerting rules (4 groups)
dashboards/
└── udom-pipeline-grafana.json # Grafana dashboard provisioning
tests/
└── test_adapter.py # 33 tests against real telemetry trace

Design Decisions

Protocol-based backends. MetricBackend and TracerBackend are runtime-checkable protocols — no inheritance required. Swap Prometheus for StatsD, Datadog, or a test stub with zero code changes.

Deterministic trace IDs. sha256(doc_id:batch_id)[:32] — same input always produces same trace. Enables replaying telemetry without duplicate traces.

Circuit breaker per engine. Three-state (closed/half-open/open) with configurable thresholds. Maps directly to SDD §5 Graceful Degradation Matrix.

Fusion upgrade rate as first-class metric. udom_fusion_upgrade_rate quantifies the value of 3-source extraction — if this drops to zero, the architecture isn't delivering its value proposition.