Pipelines in Semantiva

Overview & Mental Model

  • Users define processors with domain logic and configure pipelines in YAML; nodes are created by factories.

  • Pipelines are sequences of nodes across data/context/IO channels.

Defining Pipelines in YAML

A Semantiva pipeline is described in a YAML file using the pipeline: key. Each node is defined under nodes: with a processor and optional parameters.

pipeline:
  nodes:
    - processor: FloatValueDataSource
      parameters:
        value: 1.0
    - processor: FloatMultiplyOperation
      parameters:
        factor: 2.0
    - processor: FloatTxtFileSaver
      parameters:
        path: "output_float.txt"

Each processor entry references a component (by fully qualified class name or short name if registered). The parameters map configures that processor. Nodes may also define ports if they connect to non-default inputs/outputs.

Parameter resolution (precedence)

At runtime, node parameters are resolved as:

  1. Node parameters: in YAML

  2. Existing payload.context values

  3. Python defaults in the processor’s _process_logic signature

Missing required parameters raise:

KeyError: Unable to resolve parameter 'name' from context, node configuration, or defaults.

Unknown configuration parameters that are not accepted by the processor are reported during inspection and raise an error before execution.

Parameter Validation in YAML Configuration

Invalid Parameters Detected

Parameters in YAML configuration are validated against processor signatures:

# ❌ This will be detected as invalid
pipeline:
  nodes:
    - processor: FloatMultiplyOperation
      parameters:
        factor: 2.0      # ✅ Valid parameter
        facotr: 3.0      # ❌ Invalid (typo)
        unknown: "test"  # ❌ Invalid (not accepted by processor)

CLI Inspection with –strict

# Report invalid parameters and exit non-zero
semantiva inspect bad_pipeline.yaml --strict

# Output:
# Invalid configuration parameters:
# - node #0 (FloatMultiplyOperation): facotr
# - node #0 (FloatMultiplyOperation): unknown

Runtime Validation

# This will raise InvalidNodeParameterError at pipeline construction
from semantiva.pipeline import Pipeline
from semantiva.exceptions import InvalidNodeParameterError

try:
    pipeline = Pipeline(configs_with_invalid_params)
except InvalidNodeParameterError as e:
    print(f"Invalid parameters: {e.invalid}")

Canonical spec & identity

Pipelines are normalized into GraphV1, producing deterministic identities: PipelineId for the pipeline and node_uuid per node. See Canonical Graph Builder.

Running a Pipeline from Python

You can load a pipeline from YAML and execute it programmatically.

from semantiva.pipeline import Pipeline, load_pipeline_from_yaml

nodes = load_pipeline_from_yaml("hello_pipeline.yaml")
p = Pipeline(nodes)
result = p.process()  # -> :term:`Payload`

print(result.data)     # e.g., FloatDataType(2.0)
print(result.context)  # dict-like context object

Note that Pipeline.process always returns a Payload. There is no form that accepts separate data and context arguments.

What is a Payload?

Every pipeline step in Semantiva consumes and produces a Payload.

A Payload is a typed envelope with two channels:

  • payload.data - the primary data object, a subclass of BaseDataType.

  • payload.context - a dictionary-like structure carrying metadata keys and values.

Together, the data and context channels flow through the pipeline, enabling dynamic parameter injection, state propagation, and metadata logging.

Public API Surface

Extension Points

Objects in Pipeline Configurations

Resolvers enable declarative references to objects and values:

  • model:PolynomialFittingModel:degree=2 — instantiate a descriptor-backed model

  • slicer:/context/roi/window — pull a value from the context map

  • rename:, delete: — transform parameter maps

Example:

pipeline:
  nodes:
    - processor: ModelFittingContextProcessor
      parameters:
        fitting_model: "model:PolynomialFittingModel:degree=2"
        context_keyword: "fit_coefficients"

See Extending Semantiva (Registries & Extensions) for resolver overview and best practices.

Autodoc

class semantiva.pipeline.pipeline.Pipeline(pipeline_configuration, logger=None, transport=None, orchestrator=None, trace=None)[source]

Bases: _PayloadProcessor

get_probe_results()[source]
get_timers()[source]
nodes
pipeline_configuration
set_run_metadata(metadata)[source]
class semantiva.pipeline.payload.Payload(data, context)[source]

Bases: object