Data Processors =============== Overview -------- - :py:class:`semantiva.data_processors.data_processors.DataOperation`: transforms data - :py:class:`semantiva.data_processors.data_processors.DataProbe`: inspects/records - :py:class:`semantiva.data_processors.data_processors.OperationTopologyFactory` and slicing utilities - :py:class:`semantiva.data_processors.parametric_sweep_factory.ParametricSweepFactory`: generate collections by sweeping parameters using :py:class:`~semantiva.data_processors.parametric_sweep_factory.RangeSpec`, :py:class:`~semantiva.data_processors.parametric_sweep_factory.SequenceSpec`, and :py:class:`~semantiva.data_processors.parametric_sweep_factory.FromContext`. Supports both product mode (Cartesian combinations, default) and zip mode (element-wise pairing). **Important**: All processor parameters must be explicitly declared in the ``_process_logic`` signature. `**kwargs` is not allowed for reliable provenance tracking. Each parameter must be individually named to ensure proper parameter resolution and inspection. Parameter Validation Examples ----------------------------- Valid Parameter Patterns ~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python from semantiva.data_processors import DataOperation class ValidProcessor(DataOperation): def _process_logic(self, data, factor: float, threshold: float = 0.5): # ✅ All parameters explicitly declared return data.apply_transformation(factor, threshold) Invalid Parameter Patterns (Rejected) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python # ❌ This will raise ValueError during introspection class InvalidProcessor(DataOperation): def _process_logic(self, data, **kwargs): # This pattern is rejected for provenance reliability pass Exception: Parametric Sweep Processors ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Only certain factory-generated processors are allowed to use `**kwargs`: .. code-block:: python # ✅ This is allowed - generated by ParametricSweepFactory class ParametricSweepSomeProcessor(DataOperation): def _process_logic(self, data, **kwargs): # This works because class name contains "ParametricSweep" pass Examples -------- Parametric Sweep Specifications ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The :py:class:`~semantiva.data_processors.parametric_sweep_factory.ParametricSweepFactory` uses typed variable specifications for maximum flexibility and clarity: **RangeSpec: Numeric Ranges** Define numeric ranges with precise control over scaling and endpoints: .. code-block:: python from semantiva.data_processors.parametric_sweep_factory import RangeSpec # Linear range with endpoint included (default) linear_range = RangeSpec(lo=0.0, hi=1.0, steps=5) # [0.0, 0.25, 0.5, 0.75, 1.0] # Logarithmic range for exponential exploration log_range = RangeSpec(lo=0.001, hi=1.0, steps=4, scale='log') # [0.001, 0.01, 0.1, 1.0] # Exclude endpoint for half-open intervals half_open = RangeSpec(lo=0.0, hi=1.0, steps=4, endpoint=False) # [0.0, 0.25, 0.5, 0.75] **SequenceSpec: Explicit Values** Define explicit sequences including non-numeric values: .. code-block:: python from semantiva.data_processors.parametric_sweep_factory import SequenceSpec # Numeric sequences values = SequenceSpec([1.0, 2.5, 3.7, 5.1]) # File lists files = SequenceSpec(["data1.csv", "data2.csv", "model_v3.pkl"]) # Algorithm names algorithms = SequenceSpec(["sgd", "adam", "rmsprop"]) **FromContext: Runtime Sequences** Pull sequences from pipeline context for dynamic parameter discovery: .. code-block:: python from semantiva.data_processors.parametric_sweep_factory import FromContext # Reference context keys populated by earlier processors discovered_files = FromContext("file_list") hyperparams = FromContext("search_space") **Sweep Modes** The factory supports two combination modes: **Product Mode (Default)** Variables are combined in Cartesian product fashion for comprehensive exploration: .. code-block:: python Sweep = ParametricSweepFactory.create( element_source=ModelDataSource, collection_output=ResultCollection, vars={ "learning_rate": RangeSpec(0.001, 0.1, steps=3, scale='log'), "batch_size": SequenceSpec([16, 32, 64]), "optimizer": SequenceSpec(["adam", "sgd"]) }, parametric_expressions={"experiment_id": "f'{optimizer}_lr{learning_rate}_bs{batch_size}'"}, mode="product" # default, produces 3 × 3 × 2 = 18 combinations ) **Zip Mode** Variables are paired element-wise for coordinated parameter sweeps: .. code-block:: python Sweep = ParametricSweepFactory.create( element_source=ProcessorDataSource, collection_output=ProcessedCollection, vars={ "input_file": SequenceSpec(["data1.txt", "data2.txt", "data3.txt"]), "scale_factor": RangeSpec(0.5, 1.5, steps=3), "method": SequenceSpec(["fast", "accurate", "balanced"]) }, parametric_expressions={"output_file": "input_file.replace('.txt', '_processed.txt')"}, mode="zip" # produces 3 coordinated combinations ) **Zip Mode with Broadcast** Handle sequences of different lengths using broadcast behavior: .. code-block:: python Sweep = ParametricSweepFactory.create( element_source=AnalysisDataSource, collection_output=AnalysisCollection, vars={ "data_files": SequenceSpec(["file1.csv", "file2.csv", "file3.csv"]), "method": SequenceSpec(["fast"]), # Single method repeated "quality": RangeSpec(0.1, 0.9, steps=3) }, parametric_expressions={"output_path": "f'{data_files}_{method}_{quality:.1f}.out'"}, mode="zip", broadcast=True # shorter sequences repeated to match longest ) # Produces 3 combinations with method="fast" repeated for each file **Context Integration** Both modes support runtime parameter discovery: .. code-block:: python # File discovery followed by processing sweep DiscoveryOp = FileDiscoveryFactory.create(...) # populates "discovered_files" in context ProcessingSweep = ParametricSweepFactory.create( element_source=FileProcessor, collection_output=ProcessedCollection, vars={ "input_file": FromContext("discovered_files"), "processing_params": FromContext("param_grid") }, parametric_expressions={"output_name": "f'processed_{input_file}_{processing_params}'"}, mode="product" # test all parameter combinations on all discovered files ) **Performance Considerations** - **Product Mode**: Exponential scaling - produces N1 × N2 × ... × Nk elements - **Zip Mode**: Linear scaling - produces max(N1, N2, ..., Nk) elements (with broadcast) or N elements (without broadcast when all sequences have identical length) Use product mode carefully with large parameter spaces. For hyperparameter optimization, consider using specialized optimization frameworks for large search spaces rather than exhaustive grid search. **YAML Configuration** Parametric sweeps can be configured through YAML using the `sweep:` prefix with the VarSpec API: .. code-block:: yaml pipeline: nodes: - processor: "sweep:ModelDataSource:ResultCollection" parameters: vars: learning_rate: {lo: 0.001, hi: 0.1, steps: 3, scale: "log"} batch_size: [16, 32, 64] # SequenceSpec shorthand for simple lists optimizer: {values: ["adam", "sgd"]} # Explicit SequenceSpec parametric_expressions: experiment_id: "f'{optimizer}_lr{learning_rate}_bs{batch_size}'" mode: "product" # default YAML shorthand for pulling sequences from pipeline context ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Use ``from_context`` in the ``vars`` section to read a sequence from a context key at runtime. The example below (included from the repository) shows a minimal runnable pipeline. .. literalinclude:: ../../semantiva/examples/from_context_sweep_demo.yaml :language: yaml :caption: FromContext sweep demo Run it with: .. code-block:: bash semantiva run semantiva/examples/from_context_sweep_demo.yaml --context float_values='[1.3, 2.5, 3.7]' Result (brief): the sweep invokes the configured data source once per sequence item, producing a ``FloatDataCollection`` of ``FloatDataType`` elements (one per input value). The sequence used by the sweep is published to context as ``input_value_values``; the original ``float_values`` key remains available. Traditional Parametric Sweep Example ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. literalinclude:: ../../tests/parametric_sweep_demo.yaml :language: yaml :caption: Parametric sweep demo Annotated walkthrough --------------------- Slicers can wrap either a :py:class:`~semantiva.data_processors.data_processors.DataOperation` or a :py:class:`~semantiva.data_processors.data_processors.DataProbe`: - **Operator slicing** returns a new collection whose element type matches the underlying operation. - **Probe slicing** leaves the collection untouched and collects probe outputs into context via ``context_keyword``. This example demonstrates a parametric sweep using resolvers: 1. ``sweep:FloatValueDataSource:FloatDataCollection`` *Generates a collection by sweeping an independent variable.* - ``vars: { t: [-1, 2] }`` – defines the sweep variable as a range from -1 to 2 - ``parametric_expressions: { value: "2.0 * t" }`` – calculates a value from ``t`` The sweep produces a ``FloatDataCollection`` of items whose **data** results from the expression and whose **context** carries the sweep variable(s). 2. ``slicer:FloatCollectValueProbe:FloatDataCollection`` *Collects values from a collection and writes them into the **context**.* - ``context_keyword: "sweep_results"`` – the collected values appear at ``payload.context["sweep_results"]``. 3. ``FloatCollectionSumOperation`` *Aggregates the collection into a scalar sum on the **data** channel.* 4. ``FloatBasicProbe`` *Records the final scalar into the **context**.* - ``context_keyword: "final_sum"`` – the scalar is stored at ``payload.context["final_sum"]``. Run it ------ .. code-block:: bash # Validate first (shows node_uuid, parameters) semantiva inspect tests/parametric_sweep_demo.yaml --extended # Then execute semantiva run tests/parametric_sweep_demo.yaml -v After execution, check logs and your context: a list at ``sweep_results`` and a scalar ``final_sum`` should be present. Use :doc:`studio_viewer` to visualize the graph, and :doc:`ser` to record timings if needed. Autodoc ------- .. automodule:: semantiva.data_processors.data_processors :members: :undoc-members: :show-inheritance: .. automodule:: semantiva.data_processors.data_slicer_factory :members: :undoc-members: .. automodule:: semantiva.data_processors.parametric_sweep_factory :members: :undoc-members: