Source code for semantiva.pipeline.pipeline

# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict, List, Optional
from .payload import Payload
from semantiva.logger import Logger
from .payload_processors import _PayloadProcessor
from .nodes.nodes import (
    _PipelineNode,
    _ProbeResultCollectorNode,
)
from .graph_builder import build_canonical_spec
from semantiva.execution.transport import (
    SemantivaTransport,
    InMemorySemantivaTransport,
)
from semantiva.execution.orchestrator.orchestrator import (
    SemantivaOrchestrator,
    LocalSemantivaOrchestrator,
)
from semantiva.trace.model import TraceDriver


[docs] class Pipeline(_PayloadProcessor): """A class for orchestrating multiple payload operations by sequentially processing data and context.""" pipeline_configuration: List[Dict] nodes: List[_PipelineNode] def __init__( self, pipeline_configuration: List[Dict], logger: Optional[Logger] = None, transport: Optional[SemantivaTransport] = None, orchestrator: Optional[SemantivaOrchestrator] = None, trace: Optional[TraceDriver] = None, ): """ Initializes the pipeline with the given configuration, logger, transport, and orchestrator. Args: pipeline_configuration (List[Dict]): A list of dictionaries containing the pipeline configuration. logger (Optional[Logger], optional): An optional logger instance for logging information. Defaults to None. transport (Optional[SemantivaTransport], optional): An optional transport mechanism for the pipeline. If not provided, an InMemorySemantivaTransport instance will be used. Defaults to None. orchestrator (Optional[SemantivaOrchestrator], optional): An optional orchestrator for managing pipeline execution. If not provided, a LocalSemantivaOrchestrator instance will be used. Defaults to None. trace (Optional[TraceDriver], optional): An optional trace driver for capturing execution events. When provided, captures complete execution records including error events with timing data. Attributes: pipeline_configuration (List[Dict]): Stores the pipeline configuration. transport (SemantivaTransport): The transport mechanism used by the pipeline. orchestrator (SemantivaOrchestrator): The orchestrator managing the pipeline execution. nodes (List): The initialized nodes of the pipeline. """ super().__init__(logger) self.pipeline_configuration = pipeline_configuration self.transport = transport or InMemorySemantivaTransport() self.orchestrator = orchestrator or LocalSemantivaOrchestrator() self.trace = trace self._run_metadata: dict[str, Any] | None = None # Precompute canonical spec and resolved descriptors for validation canonical, resolved = build_canonical_spec(pipeline_configuration) self.canonical_spec = canonical self.resolved_spec = resolved self.nodes = [] if self.logger: self.logger.debug(f"Initialized {self.__class__.__name__}") def _process(self, payload: Payload) -> Payload: """ Processes the pipeline by executing the orchestrator with the provided data and context. This method starts a stopwatch timer to measure the execution time of the pipeline, logs the start and completion of the pipeline processing, and provides a detailed timing report upon completion. Args: payload (Payload): Input payload for the pipeline. Returns: Payload: The processed payload after all nodes are executed. Logs: - Info: Logs the start and completion of the pipeline processing. - Debug: Logs a detailed timing report of the pipeline execution. """ self.logger.info("Starting pipeline with %s nodes", len(self.nodes)) self.stop_watch.start() # existing pipeline timer start run_meta = self._run_metadata result_payload = self.orchestrator.execute( pipeline_spec=self.resolved_spec, payload=payload, transport=self.transport, logger=self.logger, trace=self.trace, canonical_spec=self.canonical_spec, run_metadata=run_meta, ) self.nodes = self.orchestrator.last_nodes self._run_metadata = None self.stop_watch.stop() # existing pipeline timer stop self.logger.info("Pipeline execution complete.") self.logger.info( "Pipeline execution report:\n\n\tPipeline %s\n%s\n", str(self.stop_watch), self.get_timers(), ) return result_payload
[docs] def set_run_metadata(self, metadata: Optional[Dict[str, Any]]) -> None: """Set metadata that should be attached to the next run for tracing.""" self._run_metadata = dict(metadata or {})
[docs] def get_timers(self) -> str: """ Retrieve timing information for each node's execution. Returns: str: A formatted string displaying node number, operation name, elapsed CPU time, and elapsed wall time for each node. """ timer_info = [ f"\t\tNode {i + 1}: {type(node.processor).__name__}; " f"\tElapsed CPU Time: {node.stop_watch.elapsed_cpu_time():.6f}s; " f"\tElapsed Wall Time: {node.stop_watch.elapsed_wall_time():.6f}s" for i, node in enumerate(self.nodes) ] return "\n".join(timer_info)
[docs] def get_probe_results(self) -> Dict[str, List[Any]]: """ Retrieve the collected data from all probe collector nodes in the pipeline. This method iterates through the pipeline's nodes and checks for instances of `_ProbeResultCollectorNode`. For each such node, it retrieves the collected data and associates it with the corresponding node's index in the pipeline. Returns: Dict[str, List[Any]]: A dictionary where keys are node identifiers (e.g., "Node 1/ProbeName"), and values are the collected data from the probe nodes. Example: If Node 1 and Node 3 are probe nodes, the result might look like: { "Node 1/ProbeName": [<collected_data_1>], "Node 3/ProbeName": [<collected_data_3>] } """ # Dictionary to store probe results keyed by node identifiers probe_results = {} # Iterate over all nodes in the pipeline for i, node in enumerate(self.nodes): if isinstance(node, _ProbeResultCollectorNode): # Add the collected data from the node to the results dictionary assert hasattr(node, "get_collected_data") probe_results[f"Node {i + 1}/{type(node.processor).__name__}"] = ( node.get_collected_data() ) # Return the dictionary of probe results return probe_results
@classmethod def _define_metadata(cls) -> Dict[str, Any]: # Define the metadata for the Pipeline class component_metadata = { "component_type": "Pipeline", } return component_metadata