Data Processors

Overview

Important: All processor parameters must be explicitly declared in the _process_logic signature. **kwargs is not allowed for reliable provenance tracking. Each parameter must be individually named to ensure proper parameter resolution and inspection.

Parameter Validation Examples

Valid Parameter Patterns

from semantiva.data_processors import DataOperation

class ValidProcessor(DataOperation):
    def _process_logic(self, data, factor: float, threshold: float = 0.5):
        # ✅ All parameters explicitly declared
        return data.apply_transformation(factor, threshold)

Invalid Parameter Patterns (Rejected)

# ❌ This will raise ValueError during introspection
class InvalidProcessor(DataOperation):
    def _process_logic(self, data, **kwargs):
        # This pattern is rejected for provenance reliability
        pass

Exception: Parametric Sweep Processors

Only certain factory-generated processors are allowed to use **kwargs:

# ✅ This is allowed - generated by ParametricSweepFactory
class ParametricSweepSomeProcessor(DataOperation):
    def _process_logic(self, data, **kwargs):
        # This works because class name contains "ParametricSweep"
        pass

Examples

Parametric Sweep Specifications

The ParametricSweepFactory uses typed variable specifications for maximum flexibility and clarity:

RangeSpec: Numeric Ranges

Define numeric ranges with precise control over scaling and endpoints:

from semantiva.data_processors.parametric_sweep_factory import RangeSpec

# Linear range with endpoint included (default)
linear_range = RangeSpec(lo=0.0, hi=1.0, steps=5)  # [0.0, 0.25, 0.5, 0.75, 1.0]

# Logarithmic range for exponential exploration
log_range = RangeSpec(lo=0.001, hi=1.0, steps=4, scale='log')  # [0.001, 0.01, 0.1, 1.0]

# Exclude endpoint for half-open intervals
half_open = RangeSpec(lo=0.0, hi=1.0, steps=4, endpoint=False)  # [0.0, 0.25, 0.5, 0.75]

SequenceSpec: Explicit Values

Define explicit sequences including non-numeric values:

from semantiva.data_processors.parametric_sweep_factory import SequenceSpec

# Numeric sequences
values = SequenceSpec([1.0, 2.5, 3.7, 5.1])

# File lists
files = SequenceSpec(["data1.csv", "data2.csv", "model_v3.pkl"])

# Algorithm names
algorithms = SequenceSpec(["sgd", "adam", "rmsprop"])

FromContext: Runtime Sequences

Pull sequences from pipeline context for dynamic parameter discovery:

from semantiva.data_processors.parametric_sweep_factory import FromContext

# Reference context keys populated by earlier processors
discovered_files = FromContext("file_list")
hyperparams = FromContext("search_space")

Sweep Modes

The factory supports two combination modes:

Product Mode (Default)

Variables are combined in Cartesian product fashion for comprehensive exploration:

Sweep = ParametricSweepFactory.create(
    element_source=ModelDataSource,
    collection_output=ResultCollection,
    vars={
        "learning_rate": RangeSpec(0.001, 0.1, steps=3, scale='log'),
        "batch_size": SequenceSpec([16, 32, 64]),
        "optimizer": SequenceSpec(["adam", "sgd"])
    },
    parametric_expressions={"experiment_id": "f'{optimizer}_lr{learning_rate}_bs{batch_size}'"},
    mode="product"  # default, produces 3 × 3 × 2 = 18 combinations
)

Zip Mode

Variables are paired element-wise for coordinated parameter sweeps:

Sweep = ParametricSweepFactory.create(
    element_source=ProcessorDataSource,
    collection_output=ProcessedCollection,
    vars={
        "input_file": SequenceSpec(["data1.txt", "data2.txt", "data3.txt"]),
        "scale_factor": RangeSpec(0.5, 1.5, steps=3),
        "method": SequenceSpec(["fast", "accurate", "balanced"])
    },
    parametric_expressions={"output_file": "input_file.replace('.txt', '_processed.txt')"},
    mode="zip"  # produces 3 coordinated combinations
)

Zip Mode with Broadcast

Handle sequences of different lengths using broadcast behavior:

Sweep = ParametricSweepFactory.create(
    element_source=AnalysisDataSource,
    collection_output=AnalysisCollection,
    vars={
        "data_files": SequenceSpec(["file1.csv", "file2.csv", "file3.csv"]),
        "method": SequenceSpec(["fast"]),  # Single method repeated
        "quality": RangeSpec(0.1, 0.9, steps=3)
    },
    parametric_expressions={"output_path": "f'{data_files}_{method}_{quality:.1f}.out'"},
    mode="zip",
    broadcast=True  # shorter sequences repeated to match longest
)
# Produces 3 combinations with method="fast" repeated for each file

Context Integration

Both modes support runtime parameter discovery:

# File discovery followed by processing sweep
DiscoveryOp = FileDiscoveryFactory.create(...)  # populates "discovered_files" in context

ProcessingSweep = ParametricSweepFactory.create(
    element_source=FileProcessor,
    collection_output=ProcessedCollection,
    vars={
        "input_file": FromContext("discovered_files"),
        "processing_params": FromContext("param_grid")
    },
    parametric_expressions={"output_name": "f'processed_{input_file}_{processing_params}'"},
    mode="product"  # test all parameter combinations on all discovered files
)

Performance Considerations

  • Product Mode: Exponential scaling - produces N1 × N2 × … × Nk elements

  • Zip Mode: Linear scaling - produces max(N1, N2, …, Nk) elements (with broadcast) or N elements (without broadcast when all sequences have identical length)

Use product mode carefully with large parameter spaces. For hyperparameter optimization, consider using specialized optimization frameworks for large search spaces rather than exhaustive grid search.

YAML Configuration

Parametric sweeps can be configured through YAML using the sweep: prefix with the VarSpec API:

pipeline:
  nodes:
    - processor: "sweep:ModelDataSource:ResultCollection"
      parameters:
        vars:
          learning_rate: {lo: 0.001, hi: 0.1, steps: 3, scale: "log"}
          batch_size: [16, 32, 64]  # SequenceSpec shorthand for simple lists
          optimizer: {values: ["adam", "sgd"]}  # Explicit SequenceSpec
        parametric_expressions:
          experiment_id: "f'{optimizer}_lr{learning_rate}_bs{batch_size}'"
        mode: "product"  # default

YAML shorthand for pulling sequences from pipeline context

Use from_context in the vars section to read a sequence from a context key at runtime. The example below (included from the repository) shows a minimal runnable pipeline.

FromContext sweep demo
extensions: ["semantiva-examples"]

pipeline:
  nodes:
    - processor: "sweep:FloatValueDataSourceWithDefault:FloatDataCollection"
      parameters:
        vars:
          input_value: { from_context: discovered_values }
        parametric_expressions:
          value: "float(input_value)"

Run it with:

semantiva run semantiva/examples/from_context_sweep_demo.yaml --context float_values='[1.3, 2.5, 3.7]'

Result (brief): the sweep invokes the configured data source once per sequence item, producing a FloatDataCollection of FloatDataType elements (one per input value). The sequence used by the sweep is published to context as input_value_values; the original float_values key remains available.

Traditional Parametric Sweep Example

Parametric sweep demo
# Parametric sweep demonstration
# This shows the YAML format for parametric sweeps
extensions: ["semantiva-examples"]

pipeline:
  nodes:
    # Parametric sweep with configuration
    - processor: "sweep:FloatValueDataSource:FloatDataCollection"
      parameters:
        vars:
          t: {lo: -1, hi: 2, steps: 3}
        parametric_expressions:
          # Complex expressions using the independent variable 't'
          value: "2.0 * t"
          
    # Process the generated collection
    - processor: "slicer:FloatCollectValueProbe:FloatDataCollection"
      context_keyword: "sweep_results"
    
    # Aggregate results
    - processor: "FloatCollectionSumOperation"
    
    # Store final result
    - processor: "FloatBasicProbe"
      context_keyword: "final_sum"

Annotated walkthrough

Slicers can wrap either a DataOperation or a DataProbe:

  • Operator slicing returns a new collection whose element type matches the underlying operation.

  • Probe slicing leaves the collection untouched and collects probe outputs into context via context_keyword.

This example demonstrates a parametric sweep using resolvers:

  1. sweep:FloatValueDataSource:FloatDataCollection

    Generates a collection by sweeping an independent variable.

    • vars: { t: [-1, 2] } – defines the sweep variable as a range from -1 to 2

    • parametric_expressions: { value: "2.0 * t" } – calculates a value from t

    The sweep produces a FloatDataCollection of items whose data results from the expression and whose context carries the sweep variable(s).

  2. slicer:FloatCollectValueProbe:FloatDataCollection

    Collects values from a collection and writes them into the **context*.*

    • context_keyword: "sweep_results" – the collected values appear at payload.context["sweep_results"].

  3. FloatCollectionSumOperation

    Aggregates the collection into a scalar sum on the **data* channel.*

  4. FloatBasicProbe

    Records the final scalar into the **context*.*

    • context_keyword: "final_sum" – the scalar is stored at payload.context["final_sum"].

Run it

# Validate first (shows node_uuid, parameters)
semantiva inspect tests/parametric_sweep_demo.yaml --extended

# Then execute
semantiva run tests/parametric_sweep_demo.yaml -v

After execution, check logs and your context: a list at sweep_results and a scalar final_sum should be present. Use Semantiva Studio Viewer to visualize the graph, and Step Evidence Record (SER) v0 — draft to record timings if needed.

Autodoc

class semantiva.data_processors.data_processors.DataOperation(context_observer=None, logger=None)[source]

Bases: _BaseDataProcessor

classmethod context_keys()[source]
context_observer
classmethod get_created_keys()[source]
abstract classmethod output_data_type()[source]
classmethod signature_string()[source]
class semantiva.data_processors.data_processors.DataProbe(logger=None)[source]

Bases: _BaseDataProcessor

classmethod get_created_keys()[source]
classmethod signature_string()[source]
class semantiva.data_processors.data_processors.OperationTopologyFactory[source]

Bases: object

classmethod create_data_operation(input_type, output_type, class_name, _process_logic=None)[source]
class semantiva.data_processors.data_processors.ParameterInfo(default=<object object>, annotation='Unknown')[source]

Bases: object

annotation = 'Unknown'
default = <object object>
semantiva.data_processors.data_slicer_factory.slicer(processor_cls, input_data_collection_type)[source]
class semantiva.data_processors.parametric_sweep_factory.FromContext(key)[source]
class semantiva.data_processors.parametric_sweep_factory.ParametricSweepFactory[source]
static create(*, element, element_kind='DataSource', collection_output, vars, parametric_expressions=None, static_params=None, include_independent=False, mode='product', broadcast=False, name=None, expression_evaluator=None)[source]
class semantiva.data_processors.parametric_sweep_factory.RangeSpec(lo, hi, steps, scale='linear', endpoint=True)[source]
endpoint = True
hi
lo
scale = 'linear'
steps
class semantiva.data_processors.parametric_sweep_factory.SequenceSpec(values)[source]
values