Data Processors¶
Overview¶
semantiva.data_processors.data_processors.DataOperation
: transforms datasemantiva.data_processors.data_processors.DataProbe
: inspects/recordssemantiva.data_processors.data_processors.OperationTopologyFactory
and slicing utilitiessemantiva.data_processors.parametric_sweep_factory.ParametricSweepFactory
: generate collections by sweeping parameters usingRangeSpec
,SequenceSpec
, andFromContext
. Supports both product mode (Cartesian combinations, default) and zip mode (element-wise pairing).
Important: All processor parameters must be explicitly declared in the _process_logic
signature.
**kwargs is not allowed for reliable provenance tracking. Each parameter must be individually named
to ensure proper parameter resolution and inspection.
Parameter Validation Examples¶
Valid Parameter Patterns¶
from semantiva.data_processors import DataOperation
class ValidProcessor(DataOperation):
def _process_logic(self, data, factor: float, threshold: float = 0.5):
# ✅ All parameters explicitly declared
return data.apply_transformation(factor, threshold)
Invalid Parameter Patterns (Rejected)¶
# ❌ This will raise ValueError during introspection
class InvalidProcessor(DataOperation):
def _process_logic(self, data, **kwargs):
# This pattern is rejected for provenance reliability
pass
Exception: Parametric Sweep Processors¶
Only certain factory-generated processors are allowed to use **kwargs:
# ✅ This is allowed - generated by ParametricSweepFactory
class ParametricSweepSomeProcessor(DataOperation):
def _process_logic(self, data, **kwargs):
# This works because class name contains "ParametricSweep"
pass
Examples¶
Parametric Sweep Specifications¶
The ParametricSweepFactory
uses typed variable specifications for maximum flexibility and clarity:
RangeSpec: Numeric Ranges
Define numeric ranges with precise control over scaling and endpoints:
from semantiva.data_processors.parametric_sweep_factory import RangeSpec
# Linear range with endpoint included (default)
linear_range = RangeSpec(lo=0.0, hi=1.0, steps=5) # [0.0, 0.25, 0.5, 0.75, 1.0]
# Logarithmic range for exponential exploration
log_range = RangeSpec(lo=0.001, hi=1.0, steps=4, scale='log') # [0.001, 0.01, 0.1, 1.0]
# Exclude endpoint for half-open intervals
half_open = RangeSpec(lo=0.0, hi=1.0, steps=4, endpoint=False) # [0.0, 0.25, 0.5, 0.75]
SequenceSpec: Explicit Values
Define explicit sequences including non-numeric values:
from semantiva.data_processors.parametric_sweep_factory import SequenceSpec
# Numeric sequences
values = SequenceSpec([1.0, 2.5, 3.7, 5.1])
# File lists
files = SequenceSpec(["data1.csv", "data2.csv", "model_v3.pkl"])
# Algorithm names
algorithms = SequenceSpec(["sgd", "adam", "rmsprop"])
FromContext: Runtime Sequences
Pull sequences from pipeline context for dynamic parameter discovery:
from semantiva.data_processors.parametric_sweep_factory import FromContext
# Reference context keys populated by earlier processors
discovered_files = FromContext("file_list")
hyperparams = FromContext("search_space")
Sweep Modes
The factory supports two combination modes:
Product Mode (Default)
Variables are combined in Cartesian product fashion for comprehensive exploration:
Sweep = ParametricSweepFactory.create(
element_source=ModelDataSource,
collection_output=ResultCollection,
vars={
"learning_rate": RangeSpec(0.001, 0.1, steps=3, scale='log'),
"batch_size": SequenceSpec([16, 32, 64]),
"optimizer": SequenceSpec(["adam", "sgd"])
},
parametric_expressions={"experiment_id": "f'{optimizer}_lr{learning_rate}_bs{batch_size}'"},
mode="product" # default, produces 3 × 3 × 2 = 18 combinations
)
Zip Mode
Variables are paired element-wise for coordinated parameter sweeps:
Sweep = ParametricSweepFactory.create(
element_source=ProcessorDataSource,
collection_output=ProcessedCollection,
vars={
"input_file": SequenceSpec(["data1.txt", "data2.txt", "data3.txt"]),
"scale_factor": RangeSpec(0.5, 1.5, steps=3),
"method": SequenceSpec(["fast", "accurate", "balanced"])
},
parametric_expressions={"output_file": "input_file.replace('.txt', '_processed.txt')"},
mode="zip" # produces 3 coordinated combinations
)
Zip Mode with Broadcast
Handle sequences of different lengths using broadcast behavior:
Sweep = ParametricSweepFactory.create(
element_source=AnalysisDataSource,
collection_output=AnalysisCollection,
vars={
"data_files": SequenceSpec(["file1.csv", "file2.csv", "file3.csv"]),
"method": SequenceSpec(["fast"]), # Single method repeated
"quality": RangeSpec(0.1, 0.9, steps=3)
},
parametric_expressions={"output_path": "f'{data_files}_{method}_{quality:.1f}.out'"},
mode="zip",
broadcast=True # shorter sequences repeated to match longest
)
# Produces 3 combinations with method="fast" repeated for each file
Context Integration
Both modes support runtime parameter discovery:
# File discovery followed by processing sweep
DiscoveryOp = FileDiscoveryFactory.create(...) # populates "discovered_files" in context
ProcessingSweep = ParametricSweepFactory.create(
element_source=FileProcessor,
collection_output=ProcessedCollection,
vars={
"input_file": FromContext("discovered_files"),
"processing_params": FromContext("param_grid")
},
parametric_expressions={"output_name": "f'processed_{input_file}_{processing_params}'"},
mode="product" # test all parameter combinations on all discovered files
)
Performance Considerations
Product Mode: Exponential scaling - produces N1 × N2 × … × Nk elements
Zip Mode: Linear scaling - produces max(N1, N2, …, Nk) elements (with broadcast) or N elements (without broadcast when all sequences have identical length)
Use product mode carefully with large parameter spaces. For hyperparameter optimization, consider using specialized optimization frameworks for large search spaces rather than exhaustive grid search.
YAML Configuration
Parametric sweeps can be configured through YAML using the sweep: prefix with the VarSpec API:
pipeline:
nodes:
- processor: "sweep:ModelDataSource:ResultCollection"
parameters:
vars:
learning_rate: {lo: 0.001, hi: 0.1, steps: 3, scale: "log"}
batch_size: [16, 32, 64] # SequenceSpec shorthand for simple lists
optimizer: {values: ["adam", "sgd"]} # Explicit SequenceSpec
parametric_expressions:
experiment_id: "f'{optimizer}_lr{learning_rate}_bs{batch_size}'"
mode: "product" # default
YAML shorthand for pulling sequences from pipeline context¶
Use from_context
in the vars
section to read a sequence from a
context key at runtime. The example below (included from the repository)
shows a minimal runnable pipeline.
extensions: ["semantiva-examples"]
pipeline:
nodes:
- processor: "sweep:FloatValueDataSourceWithDefault:FloatDataCollection"
parameters:
vars:
input_value: { from_context: discovered_values }
parametric_expressions:
value: "float(input_value)"
Run it with:
semantiva run semantiva/examples/from_context_sweep_demo.yaml --context float_values='[1.3, 2.5, 3.7]'
Result (brief): the sweep invokes the configured data source once per
sequence item, producing a FloatDataCollection
of FloatDataType
elements (one per input value). The sequence used by the sweep is
published to context as input_value_values
; the original
float_values
key remains available.
Traditional Parametric Sweep Example¶
# Parametric sweep demonstration
# This shows the YAML format for parametric sweeps
extensions: ["semantiva-examples"]
pipeline:
nodes:
# Parametric sweep with configuration
- processor: "sweep:FloatValueDataSource:FloatDataCollection"
parameters:
vars:
t: {lo: -1, hi: 2, steps: 3}
parametric_expressions:
# Complex expressions using the independent variable 't'
value: "2.0 * t"
# Process the generated collection
- processor: "slicer:FloatCollectValueProbe:FloatDataCollection"
context_keyword: "sweep_results"
# Aggregate results
- processor: "FloatCollectionSumOperation"
# Store final result
- processor: "FloatBasicProbe"
context_keyword: "final_sum"
Annotated walkthrough¶
Slicers can wrap either a DataOperation
or a DataProbe
:
Operator slicing returns a new collection whose element type matches the underlying operation.
Probe slicing leaves the collection untouched and collects probe outputs into context via
context_keyword
.
This example demonstrates a parametric sweep using resolvers:
sweep:FloatValueDataSource:FloatDataCollection
Generates a collection by sweeping an independent variable.
vars: { t: [-1, 2] }
– defines the sweep variable as a range from -1 to 2parametric_expressions: { value: "2.0 * t" }
– calculates a value fromt
The sweep produces a
FloatDataCollection
of items whose data results from the expression and whose context carries the sweep variable(s).slicer:FloatCollectValueProbe:FloatDataCollection
Collects values from a collection and writes them into the **context*.*
context_keyword: "sweep_results"
– the collected values appear atpayload.context["sweep_results"]
.
FloatCollectionSumOperation
Aggregates the collection into a scalar sum on the **data* channel.*
FloatBasicProbe
Records the final scalar into the **context*.*
context_keyword: "final_sum"
– the scalar is stored atpayload.context["final_sum"]
.
Run it¶
# Validate first (shows node_uuid, parameters)
semantiva inspect tests/parametric_sweep_demo.yaml --extended
# Then execute
semantiva run tests/parametric_sweep_demo.yaml -v
After execution, check logs and your context: a list at sweep_results
and a
scalar final_sum
should be present. Use Semantiva Studio Viewer to visualize the
graph, and Step Evidence Record (SER) v0 — draft to record timings if needed.
Autodoc¶
- class semantiva.data_processors.data_processors.DataOperation(context_observer=None, logger=None)[source]¶
Bases:
_BaseDataProcessor
- context_observer¶
- class semantiva.data_processors.data_processors.DataProbe(logger=None)[source]¶
Bases:
_BaseDataProcessor
- class semantiva.data_processors.data_processors.ParameterInfo(default=<object object>, annotation='Unknown')[source]¶
Bases:
object
- annotation = 'Unknown'¶
- default = <object object>¶
- semantiva.data_processors.data_slicer_factory.slicer(processor_cls, input_data_collection_type)[source]¶