Pipelines in Python¶
Who this is for
This page is for framework developers, component authors and advanced R&D workflows. You define and run pipelines directly in Python for rapid iteration, testing, and learning Semantiva.
For production pipelines and configuration as an artefact of record, see Pipelines - YAML & CLI instead.
Overview¶
Python pipelines let you exercise the same execution engine used for YAML pipelines, but without creating a production configuration artefact. Use them for notebooks, tests, and quick feedback loops while developing processors. Once stable, port the configuration to YAML for CLI use and version control.
Hello pipeline in pure Python¶
Start with a minimal “hello” pipeline entirely in Python. The configuration is a list of node definitions; each node references a processor class and its parameters. The first node is a data source, so the incoming payload does not need data.
from semantiva import ContextType, Payload, Pipeline
# Example processors used in this guide
from semantiva.examples.test_utils import (
FloatValueDataSource,
FloatAddOperation,
)
# Python configuration for a simple arithmetic pipeline.
HELLO_PIPELINE_NODES = [
{
"processor": FloatValueDataSource,
"parameters": {
"value": 1.0, # fixed in configuration
},
},
{
"processor": FloatAddOperation,
"parameters": {
"addend": 2.0, # fixed in configuration
},
},
]
# Build the pipeline directly from the Python configuration above.
pipeline = Pipeline(HELLO_PIPELINE_NODES)
# Create payload and context explicitly. Payload starts with no data because
# the first node is a data source.
context = ContextType()
payload = Payload(data=None, context=context)
# Run the pipeline.
result_payload = pipeline.process(payload)
print("data:", result_payload.data.data)
print("context:", result_payload.context)
2025-12-06 15:35:26,375 - INFO - Starting pipeline with 2 nodes (pipeline)
2025-12-06 15:35:26,377 - INFO - Pipeline execution complete. (pipeline)
2025-12-06 15:35:26,377 - INFO - Pipeline execution report:
Pipeline Calls: 1; Elapsed Wall Time: 0.001885s; Elapsed CPU Time: 0.001882s
Node 1: FloatValueDataSource; Elapsed CPU Time: 0.000059s; Elapsed Wall Time: 0.000061s
Node 2: FloatAddOperation; Elapsed CPU Time: 0.000069s; Elapsed Wall Time: 0.000071s
(pipeline)
data: 3.0
context: ContextType(context={})
In this example, the context remains empty for the entire execution. The
printed context shows the internal representation of ContextType,
which should contain no keys after the pipeline runs.
Changing parameters via configuration vs context¶
Semantiva resolves node parameters from configuration, context and Python defaults, as described in Basic Concepts. A simple way to see this is to reuse the pipeline configuration from the previous section and remove the parameter from the node so that it is resolved from context instead.
Below we define a new configuration
HELLO_PIPELINE_NODES_CONTEXT that is identical to
HELLO_PIPELINE_NODES except that the FloatAddOperation node has
no addend in its parameters mapping. The value is instead provided
via context under the same name.
from semantiva import ContextType, Payload, Pipeline
from semantiva.examples.test_utils import (
FloatValueDataSource,
FloatAddOperation,
)
# Same structure as HELLO_PIPELINE_NODES, but without "addend" in the
# parameters mapping for FloatAddOperation.
HELLO_PIPELINE_NODES_CONTEXT = [
{
"processor": FloatValueDataSource,
"parameters": {
"value": 1.0,
},
},
{
"processor": FloatAddOperation,
# No 'addend' here: it will be resolved from the context.
"parameters": {},
},
]
pipeline = Pipeline(HELLO_PIPELINE_NODES_CONTEXT)
# Context now provides the value for "addend".
context = ContextType()
context.set_value("addend", 2.0)
# As before, data is None because the first node is a data source.
payload = Payload(data=None, context=context)
result_payload = pipeline.process(payload)
print("data:", result_payload.data.data)
print("context:", result_payload.context)
2025-12-06 15:35:39,113 - INFO - Starting pipeline with 2 nodes (pipeline)
2025-12-06 15:35:39,115 - INFO - Pipeline execution complete. (pipeline)
2025-12-06 15:35:39,115 - INFO - Pipeline execution report:
Pipeline Calls: 1; Elapsed Wall Time: 0.001887s; Elapsed CPU Time: 0.001884s
Node 1: FloatValueDataSource; Elapsed CPU Time: 0.000073s; Elapsed Wall Time: 0.000073s
Node 2: FloatAddOperation; Elapsed CPU Time: 0.000046s; Elapsed Wall Time: 0.000046s
(pipeline)
data: 3.0
context: ContextType(context={'addend': 2.0})
Conceptually:
In the previous example (
HELLO_PIPELINE_NODES), the node’sparametersmapping providesaddend: 2.0, so configuration supplies the value and the context stays empty.In this example (
HELLO_PIPELINE_NODES_CONTEXT),addendis left undefined in the node and instead provided viacontext.set_value("addend", 2.0). The numerical result is the same, but the context now contains an entry foraddend.
Adding a probe node in Python¶
Probes observe the data channel and return a value. The pipeline node, not
the probe component itself, decides whether to store that value in the context
via context_key. Probe nodes must declare a non-empty context_key to
satisfy Semantiva Contracts.
To extend the Python pipeline with a probe, add a new node that wraps
FloatCollectValueProbe and specifies a context key:
from semantiva import ContextType, Payload, Pipeline
from semantiva.examples.test_utils import (
FloatValueDataSource,
FloatAddOperation,
FloatCollectValueProbe,
)
# Configuration with a probe node at the end.
PIPELINE_WITH_PROBE_NODES = [
{
"processor": FloatValueDataSource,
"parameters": {
"value": 1.0,
},
},
{
"processor": FloatAddOperation,
"parameters": {
"addend": 2.0,
},
},
{
# Probe node: observes data and writes result into context["result"].
"processor": FloatCollectValueProbe,
"context_key": "result", # mandatory for probe nodes
"parameters": {},
},
]
pipeline = Pipeline(PIPELINE_WITH_PROBE_NODES)
context = ContextType()
payload = Payload(data=None, context=context)
result_payload = pipeline.process(payload)
print("data:", result_payload.data.data)
print("context:", result_payload.context)
print("result in context:", result_payload.context.get_value("result"))
2025-12-06 15:35:45,248 - INFO - Starting pipeline with 3 nodes (pipeline)
2025-12-06 15:35:45,251 - INFO - Pipeline execution complete. (pipeline)
2025-12-06 15:35:45,251 - INFO - Pipeline execution report:
Pipeline Calls: 1; Elapsed Wall Time: 0.002366s; Elapsed CPU Time: 0.002362s
Node 1: FloatValueDataSource; Elapsed CPU Time: 0.000104s; Elapsed Wall Time: 0.000105s
Node 2: FloatAddOperation; Elapsed CPU Time: 0.000060s; Elapsed Wall Time: 0.000060s
Node 3: FloatCollectValueProbe; Elapsed CPU Time: 0.000059s; Elapsed Wall Time: 0.000059s
(pipeline)
data: 3.0
context: ContextType(context={'result': 3.0})
result in context: 3.0
From Python’s perspective, you:
Construct a
Payloadwith data and context (here data starts asNonebecause the first node is a data source).Call
pipeline.process.Let the last node add the
"result"entry to the context.
In notebooks or test modules, you will often:
Define processors (see Creating Components (Authoring Guide)) and small pipeline configurations such as
HELLO_PIPELINE_NODESorPIPELINE_WITH_PROBE_NODESside by side.Run them with different payloads and contexts to debug behaviour.
Once stable, port the configuration to YAML for CLI use and version control.
Where to go next¶
To see how the same concepts are expressed in YAML for production use, see Pipelines - YAML & CLI.
If you author components, also visit Creating Components (Authoring Guide) and Semantiva Contracts.