Execution

The execution module provides the orchestration infrastructure for running Semantiva pipelines. It includes comprehensive error handling and integration with the tracing system to capture complete execution records, including error events with timing data and exception details.

Component Registry System

Semantiva uses a dual-registry architecture to manage execution components while avoiding circular import dependencies:

ExecutionComponentRegistry

Specialized registry for orchestrators, executors, and transports. This registry is designed to break circular import dependencies with the main ProcessorRegistry and graph builder modules.

build_orchestrator()

Factory function that uses ExecutionComponentRegistry to construct orchestrators from configuration, supporting dependency injection of executors and transports.

The component registry follows the dependency inversion principle, acting as a dependency sink rather than creating webs of interdependence. See Registry System for detailed architectural information.

Template-method orchestrator

The core lifecycle now lives in SemantivaOrchestrator. It canonicalises the pipeline, snapshots context before/after each node, collects io_delta details, emits built-in pre/post checks, and attaches environment pins for every SER. Concrete orchestrators implement only two hooks:

_submit_and_wait(node_callable, *, ser_hooks)

Runs a node and returns its Payload.

_publish(node, data, context, transport)

Forwards the node output through the orchestrator’s transport.

All error handling, timing, and tracing responsibilities are handled by the base class. LocalSemantivaOrchestrator simply delegates to the injected executor/transport while benefiting from the shared SER logic.

Public API Surface

Autodoc

class semantiva.execution.component_registry.ExecutionComponentRegistry[source]
classmethod clear()[source]
classmethod get_executor(name)[source]
classmethod get_orchestrator(name)[source]
classmethod get_registered_executors()[source]
classmethod get_registered_orchestrators()[source]
classmethod get_registered_transports()[source]
classmethod get_transport(name)[source]
classmethod initialize_defaults()[source]
classmethod list_executors()[source]
classmethod list_orchestrators()[source]
classmethod list_transports()[source]
classmethod register_executor(name, executor_cls)[source]
classmethod register_orchestrator(name, orchestrator_cls)[source]
classmethod register_transport(name, transport_cls)[source]
semantiva.execution.orchestrator.factory.build_orchestrator(exec_cfg, *, transport=None, executor=None)[source]
class semantiva.execution.executor.executor.SemantivaExecutor[source]
class SERHooks(upstream, trigger, upstream_evidence, io_delta_provider, pre_checks, post_checks_provider, env_pins_provider, redaction_policy_provider)[source]
env_pins_provider
io_delta_provider
post_checks_provider
pre_checks
redaction_policy_provider
trigger
upstream
upstream_evidence
abstract submit(fn, *args, ser_hooks=None, **kwargs)[source]
class semantiva.execution.executor.executor.SequentialSemantivaExecutor[source]
submit(fn, *args, ser_hooks=None, **kwargs)[source]
class semantiva.execution.orchestrator.orchestrator.LocalSemantivaOrchestrator(executor=None)[source]
class semantiva.execution.orchestrator.orchestrator.SemantivaOrchestrator[source]
configure_run_metadata(metadata)[source]
execute(pipeline_spec, payload, transport, logger, trace=None, canonical_spec=None, run_metadata=None)[source]
property last_nodes
class semantiva.execution.transport.base.Message(data, context, metadata, ack)[source]
ack
context
data
metadata
class semantiva.execution.transport.base.SemantivaTransport[source]
abstract close()[source]
abstract connect()[source]
abstract publish(channel, data, context, metadata=None, require_ack=False)[source]
abstract subscribe(channel, *, callback=None)[source]
class semantiva.execution.transport.base.Subscription[source]
abstract close()[source]
class semantiva.execution.transport.in_memory.InMemorySemantivaTransport[source]
close()[source]
connect()[source]
publish(channel, data, context, metadata=None, require_ack=False)[source]
subscribe(channel, *, callback=None)[source]
class semantiva.execution.transport.in_memory.InMemorySubscription(queues, pattern)[source]
close()[source]
class semantiva.execution.job_queue.queue_orchestrator.QueueSemantivaOrchestrator(transport, stop_event=None, logger=None)[source]
enqueue(pipeline_cfg, *, data=None, context=None, return_future=False, registry_profile=None)[source]
run_forever()[source]
stop()[source]