Creating Components (Authoring Guide)¶
Audience¶
Developers adding domain behavior via operations, probes, sources/sinks, or context processors.
Who this is for
This page is for framework developers and component authors. It assumes you are comfortable running and tweaking pipelines as described in Pipeline users and that you can read basic Python code.
If you only need to run existing pipelines, stay with Pipelines - YAML & CLI and Pipeline users.
Do This / Not That¶
✅ Use DataOperation for transformations (may update context).
✅ Use DataProbe for read-only analysis: - Declare only
input_data_type. - Do not declareoutput_data_type. - Do not declare or mutate context keys (probes are read-only).✅ Use DataSource / DataSink for I/O; ContextProcessor for domain context logic.
❌ Do not subclass internal/private processor bases.
✅ Keep constructors minimal (base classes already wire logging/observer).
Contract Highlights¶
input_data_type / output_data_type are @classmethod and return types (not instances) where applicable.
DataProbe: - must declare
input_data_type, - must not declareoutput_data_type, - must not create/update context.DataSink must not declare an output type.
Minimal Templates (pass lint)¶
DataOperation (transforms; may update context)¶
from semantiva.data_processors import DataOperation
from semantiva.examples.test_utils import FloatDataType
class DemoNormalize(DataOperation):
"""Normalize floats; writes mean/std to context."""
@classmethod
def input_data_type(cls):
return FloatDataType
@classmethod
def output_data_type(cls):
return FloatDataType
@classmethod
def get_created_keys(cls) -> list[str]:
"""Context keys this processor may create."""
return ["norm.mean", "norm.std"]
def _process_logic(self, data: FloatDataType, *, epsilon: float = 1e-8):
# Example context write via observer hook
self._notify_context_update("norm.mean", 0.0)
self._notify_context_update("norm.std", 1.0)
return data
In real components you must register any context keys you create. The
get_created_keys class method is used by the inspection and context observer
machinery to validate updates and to expose them in tools such as
semantiva inspect and semantiva dev lint.
DataProbe (read-only; no context creation)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
from semantiva.data_processors import DataProbe
from semantiva.examples.test_utils import FloatDataType
class DemoProbeReadOnly(DataProbe):
"""Probe floats (read-only); does not mutate context."""
@classmethod
def input_data_type(cls):
return FloatDataType
def _process_logic(self, data: FloatDataType, *, threshold: float = 3.0):
# Analysis only; DO NOT update context
return data
Note
Probe components never write to context themselves. When the pipeline node
specifies context_key, the node stores the probe result in context after
process returns, keeping the component stateless.
Do not pass ContextType into processors
Components must never accept ContextType directly in their process
or _process_logic methods. Context access is always mediated by pipeline
nodes and observers.
# ❌ Anti-pattern: do NOT copy this
from semantiva.context_processors.context_types import ContextType
class BadProbe:
@classmethod
def input_data_type(cls):
return FloatDataType
def _process_logic(self, data, context: ContextType): # <-- wrong
context.set_value("some.key", 1.23)
return data
Use a DataProbe that returns values, and let the node write them into
context using a context_key as shown in Adding a probe node in Python.
DataSource (produces data; declares only output type)¶
from semantiva.data_io import DataSource
from semantiva.examples.test_utils import FloatDataType
class DemoSource(DataSource):
"""Emit floats from a backing store."""
@classmethod
def output_data_type(cls):
return FloatDataType
@classmethod
def _get_data(cls, count: int = 3):
return [1.0, 2.0, 3.0][:count]
DataSink (consumes data; declares only input type)¶
from semantiva.data_io import DataSink
from semantiva.examples.test_utils import FloatDataType
class DemoSink(DataSink[FloatDataType]):
"""Persist floats to a sink."""
@classmethod
def input_data_type(cls):
return FloatDataType
@classmethod
def _send_data(cls, data: FloatDataType, *, path: str = "out.txt"):
return None
Context Processors (domain context logic)¶
Use a context processor for creating/merging domain context. Keep them focused on context state, not data transformations.
Linting Your Components¶
Use the semantiva dev lint CLI to run static contract checks against your components. It can discover processors from Python modules, source paths, Semantiva extensions, or pipeline YAML files.
Run contract checks with the CLI:
# Validate modules
semantiva dev lint --modules my_package.ext
# Scan source paths
semantiva dev lint --paths ./src ./plugins
# Validate components referenced by YAML
semantiva dev lint --yaml pipeline.yaml
# Export the rule catalog
semantiva dev lint --export-contracts CONTRACTS.md
# Detailed diagnostics
semantiva dev lint --modules my_package.ext --debug
Minimal dev loop: component in a Python pipeline¶
As a component author, you will often want to declare a component, plug it into a pipeline, run it once, and inspect the result.
The example below assumes you have a simple operation already defined
(similar to the FloatAddOperation used in other examples).
from semantiva import ContextType, Payload, Pipeline
from semantiva.data_types import BaseDataType
from semantiva.data_processors import DataOperation
class FloatDataType(BaseDataType[float]):
def validate(self, data: float) -> bool:
return True
class DemoAdd(DataOperation):
"""Toy example: add a constant to a float."""
@classmethod
def input_data_type(cls):
return FloatDataType
@classmethod
def output_data_type(cls):
return FloatDataType
def _process_logic(self, data: FloatDataType) -> FloatDataType:
# For demonstration purposes, we just add 1.0
return FloatDataType(data.data + 1.0)
# A minimal Python pipeline that uses DemoAdd
HELLO_PIPELINE_NODES = [
{
"processor": DemoAdd,
"parameters": {},
},
]
pipeline = Pipeline(HELLO_PIPELINE_NODES)
context = ContextType()
payload = Payload(data=FloatDataType(41.0), context=context)
result_payload = pipeline.process(payload)
print("Result data:", result_payload.data)
print("Context:", result_payload.context)
2025-12-06 19:51:34,769 - INFO - Starting pipeline with 1 nodes (pipeline)
2025-12-06 19:51:34,770 - INFO - Pipeline execution complete. (pipeline)
2025-12-06 19:51:34,770 - INFO - Pipeline execution report:
Pipeline Calls: 1; Elapsed Wall Time: 0.001274s; Elapsed CPU Time: 0.001270s
Node 1: DemoAdd; Elapsed CPU Time: 0.000069s; Elapsed Wall Time: 0.000071s
(pipeline)
Result data: FloatDataType(42.0)
Context: ContextType(context={})
This pattern is a development feedback loop: declare a component, wire it into a Python pipeline, run it once, and inspect the behaviour.
For production use, the same component would typically be referenced from a YAML pipeline configuration instead (see Pipelines - YAML & CLI and Semantiva Contracts for details).