Source code for semantiva.data_processors.data_slicer_factory

# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Type, List, Any
from semantiva.data_types.data_types import DataCollectionType
from semantiva.data_processors.data_processors import (
    _BaseDataProcessor,
    DataOperation,
    DataProbe,
)


class _SlicingDataProcessorFactory:
    """
    Factory that dynamically creates data slicer processors.
    """

    @staticmethod
    def create(
        processor_class: Type[_BaseDataProcessor],
        input_data_collection_type: Type[DataCollectionType],
    ):
        """
        Creates a new processor class that slices data and manages context.

        Args:
            processor_class (Type): Base processor class.
            input_data_collection_type (Type[DataCollectionType]): Expected input collection type.

        Returns:
            A new processor class with slicing enabled.
        """
        processor_name = processor_class.__name__
        class_name = f"SlicerFor{processor_name}"

        if issubclass(processor_class, DataOperation):

            assert (
                processor_class.input_data_type() == processor_class.output_data_type()
            ), "Data slicing supported only for processors matching input and output data types."

            class SlicingDataOperator(processor_class):  # type: ignore[valid-type, misc]
                """
                Data Collection slicer operator.
                """

                data_type_override = input_data_collection_type

                @classmethod
                def input_data_type(cls) -> type[DataCollectionType]:
                    """Return the collection data type consumed by the slicer."""
                    return cls.data_type_override

                @classmethod
                def output_data_type(cls) -> type[DataCollectionType]:
                    """Return the collection data type produced by the slicer."""
                    return cls.data_type_override

                def process(
                    self,
                    data,
                    *args,
                    **kwargs,
                ) -> DataCollectionType:
                    """
                    Automatically slices input data and manages context.
                    """

                    processed_data = self.data_type_override.from_list([])
                    for _, data_item in enumerate(data):
                        output = super().process(data_item, *args, **kwargs)
                        processed_data.append(output)

                    return processed_data

            SlicingDataOperator.__name__ = class_name
            SlicingDataOperator.__doc__ = f"{SlicingDataOperator.__doc__} For each element in the collection: {processor_class.__doc__}"
            return SlicingDataOperator

        elif issubclass(processor_class, DataProbe):

            class SlicingDataProbe(processor_class):  # type: ignore[valid-type, misc]
                """
                Data Collection slicer probe.
                """

                input_data_type_override = input_data_collection_type

                @classmethod
                def input_data_type(cls) -> type[DataCollectionType]:
                    """Return the collection data type consumed by the probe."""
                    return cls.input_data_type_override

                def process(
                    self,
                    data,
                    *args,
                    **kwargs,
                ) -> List[Any]:
                    """
                    Automatically slices input data and manages context.
                    """

                    probed_results = []
                    for data_item in data:
                        probed_results.append(
                            super().process(data_item, *args, **kwargs)
                        )

                    return probed_results

            SlicingDataProbe.__name__ = class_name

            SlicingDataProbe.__doc__ = f"{SlicingDataProbe.__doc__} For each element in the collection: {processor_class.__doc__}"
            return SlicingDataProbe


[docs] def slicer( processor_cls: Type[_BaseDataProcessor], input_data_collection_type: Type[DataCollectionType], ): """Convenient user API for creating slicer nodes with explicit types.""" return _SlicingDataProcessorFactory.create( processor_cls, input_data_collection_type )