Execution

The execution layer controls how a configured pipeline runs. It is built from three components working together:

  • an orchestrator that walks the pipeline graph and coordinates node

    execution,

  • an executor that runs individual node tasks, and

  • a transport that carries payloads between nodes.

By default Semantiva uses a local, single-process stack that is well suited for development and small-scale runs on a single machine:

  • LocalSemantivaOrchestrator traverses the pipeline graph, instantiates

    nodes, and drives execution.

  • SequentialSemantivaExecutor runs node tasks one after another in the

    current process.

  • InMemorySemantivaTransport publishes node outputs via in-memory

    channels without serialization.

These defaults can be overridden via the execution block in the pipeline configuration or the corresponding CLI flags (see Semantiva CLI).

Component Registry System

Semantiva uses a dual-registry architecture to manage execution components while avoiding circular import dependencies:

ExecutionComponentRegistry

Specialized registry for orchestrators, executors, and transports. This registry is designed to break circular import dependencies with the main ProcessorRegistry and graph builder modules.

build_orchestrator()

Factory function that uses ExecutionComponentRegistry to construct orchestrators from configuration, supporting dependency injection of executors and transports.

The component registry follows the dependency inversion principle, acting as a dependency sink rather than creating webs of interdependence. See Registry System for detailed architectural information.

Template-method orchestrator

The core lifecycle now lives in SemantivaOrchestrator. It canonicalises the pipeline, snapshots context before/after each node, collects context_delta details, emits built-in pre/post assertions, and attaches environment pins for every SER. Concrete orchestrators implement only two hooks:

_submit_and_wait(node_callable, *, ser_hooks)

Runs a node and returns its Payload.

_publish(node, data, context, transport)

Forwards the node output through the orchestrator’s transport.

All error handling, timing, and tracing responsibilities are handled by the base class. LocalSemantivaOrchestrator simply delegates to the injected executor/transport while benefiting from the shared SER logic.

During on_pipeline_start the orchestrator also emits a semantic fingerprint: pipeline_config_id summarises the set of (node_uuid, semantic_id) pairs computed from live processor metadata, and node_semantic_ids exposes the per-node values used in that hash. Structural identifiers (pipeline_id and node UUIDs) remain unchanged.

Public API Surface

Autodoc

class semantiva.execution.component_registry.ExecutionComponentRegistry[source]
classmethod clear()[source]
classmethod get_executor(name)[source]
classmethod get_orchestrator(name)[source]
classmethod get_registered_executors()[source]
classmethod get_registered_orchestrators()[source]
classmethod get_registered_transports()[source]
classmethod get_transport(name)[source]
classmethod initialize_defaults()[source]
classmethod list_executors()[source]
classmethod list_orchestrators()[source]
classmethod list_transports()[source]
classmethod register_executor(name, executor_cls)[source]
classmethod register_orchestrator(name, orchestrator_cls)[source]
classmethod register_transport(name, transport_cls)[source]
semantiva.execution.orchestrator.factory.build_orchestrator(exec_cfg, *, transport=None, executor=None)[source]
class semantiva.execution.executor.executor.SemantivaExecutor[source]
class SERHooks(upstream, trigger, upstream_evidence, context_delta_provider, pre_checks, post_checks_provider, env_pins_provider, redaction_policy_provider)[source]
context_delta_provider
env_pins_provider
post_checks_provider
pre_checks
redaction_policy_provider
trigger
upstream
upstream_evidence
abstract submit(fn, *args, ser_hooks=None, **kwargs)[source]
class semantiva.execution.executor.executor.SequentialSemantivaExecutor[source]
submit(fn, *args, ser_hooks=None, **kwargs)[source]
class semantiva.execution.orchestrator.orchestrator.LocalSemantivaOrchestrator(executor=None)[source]
class semantiva.execution.orchestrator.orchestrator.SemantivaOrchestrator[source]
configure_run_metadata(metadata)[source]
execute(pipeline_spec, payload, transport, logger, trace=None, canonical_spec=None, run_metadata=None)[source]
property last_nodes
class semantiva.execution.transport.base.Message(data, context, metadata, ack)[source]
ack
context
data
metadata
class semantiva.execution.transport.base.SemantivaTransport[source]
abstract close()[source]
abstract connect()[source]
abstract publish(channel, data, context, metadata=None, require_ack=False)[source]
abstract subscribe(channel, *, callback=None)[source]
class semantiva.execution.transport.base.Subscription[source]
abstract close()[source]
class semantiva.execution.transport.in_memory.InMemorySemantivaTransport[source]
close()[source]
connect()[source]
publish(channel, data, context, metadata=None, require_ack=False)[source]
subscribe(channel, *, callback=None)[source]
class semantiva.execution.transport.in_memory.InMemorySubscription(queues, pattern)[source]
close()[source]
class semantiva.execution.job_queue.queue_orchestrator.QueueSemantivaOrchestrator(transport, stop_event=None, logger=None)[source]
enqueue(pipeline_cfg, *, data=None, context=None, return_future=False, registry_profile=None)[source]
run_forever()[source]
stop()[source]