Trace Aggregator v1¶
Level 300 (Advanced)
This page is advanced reference material intended for architects and integrators. You should be comfortable with pipelines and basic inspection before reading this (see Pipeline users and Architects & system designers).
The Trace Aggregator groups Semantiva Trace Stream records into per-run and per-launch aggregates and computes completeness.
Important
Current scope is in-memory only. The aggregator accepts Python dictionaries representing validated trace records. No IO/exporters (JSONL readers/writers, SQLite/DuckDB, or CLI wiring) are implemented.
Motivation¶
The aggregator centralizes semantics previously duplicated in viewer-side code. It produces deterministic aggregates and a canonical completeness report so downstream tools (e.g. Trace Stream v1, Glossary) remain aligned.
Record Types (input)¶
The aggregator accepts any mixture of the following record types:
run_space_start,run_space_endpipeline_start,pipeline_endser(Semantic Execution Record)
See Trace Stream v1 for schema details and registry.
Aggregation Model¶
Two levels:
Per-Run (RunAggregate)
Key:
run_idCollects: one
pipeline_start(optional), zero or moreser, onepipeline_end(optional)Computes: - status:
complete|partial|invalid- problems:missing_pipeline_start,missing_pipeline_end,start_ts_gt_end_ts- coverage vs.pipeline_spec_canonical.nodes[*].node_uuid- node metrics (counts by status, last timing/error)
Per-Launch (LaunchAggregate)
Key:
(run_space_launch_id, run_space_attempt)Collects: one
run_space_start(optional), onerun_space_end(optional), and the list of attachedrun_idvaluesComputes: - status:
complete|partial|invalid- problems:missing_run_space_start,missing_run_space_end- rollup by run status
Public API¶
The only public methods are:
TraceAggregator.ingest(record: dict) -> NoneTraceAggregator.ingest_many(records: Iterable[dict]) -> NoneTraceAggregator.get_run(run_id: str) -> Optional[RunAggregate]TraceAggregator.iter_runs() -> Iterable[RunAggregate]TraceAggregator.get_launch(launch_id: str, attempt: int) -> Optional[LaunchAggregate]TraceAggregator.iter_launches() -> Iterable[LaunchAggregate]TraceAggregator.finalize_run(run_id: str) -> RunCompletenessTraceAggregator.finalize_launch(launch_id: str, attempt: int) -> LaunchCompletenessTraceAggregator.finalize_all() -> (list[RunCompleteness], list[LaunchCompleteness])
Note
All other helpers are private (underscore-prefixed) and considered implementation details.
Usage (dict-only)¶
from semantiva.trace.aggregation.aggregator import TraceAggregator
aggregator = TraceAggregator()
# Run-Space lifecycle (optional)
aggregator.ingest({"record_type": "run_space_start", "run_space_launch_id": "L1", "run_space_attempt": 1})
# Run lifecycle + node events
aggregator.ingest({"record_type": "pipeline_start", "run_id": "R1", "pipeline_id": "P",
"run_space_launch_id": "L1", "run_space_attempt": 1,
"pipeline_spec_canonical": {"nodes": [{"node_uuid": "n1"}]}})
aggregator.ingest({"record_type": "ser", "identity": {"run_id": "R1", "pipeline_id": "P", "node_id": "n1"},
"status": "succeeded", "timing": {"wall_ms": 1}})
aggregator.ingest({"record_type": "pipeline_end", "run_id": "R1"})
# Close the launch
aggregator.ingest({"record_type": "run_space_end", "run_space_launch_id": "L1", "run_space_attempt": 1})
run_report = aggregator.finalize_run("R1")
launch_report = aggregator.finalize_launch("L1", 1)
Completeness & Issues¶
Run status
- complete: saw both pipeline_start and pipeline_end
- partial: only one lifecycle edge, or nodes present with a missing edge
- invalid: no lifecycle and no nodes, or structural contradictions
Run problems
- missing_pipeline_start
- missing_pipeline_end
- start_ts_gt_end_ts
Launch problems
- missing_run_space_start
- missing_run_space_end
Terminal node statuses
- succeeded, error, skipped, cancelled
Cross-References¶
Trace Stream schemas: Trace Stream v1
Terminology: Glossary