Source code for semantiva.inspection.builder

# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Pipeline Inspection Builder.

This module provides the core inspection data structures and builder functionality
for analyzing Semantiva pipeline configurations. The builder constructs detailed inspection
data while being resilient to configuration errors.

Key Design Principles:
- **Error Resilience**: Never raises exceptions during inspection
- **Comprehensive Data Collection**: Captures all metadata needed for analysis
- **Parameter Resolution Tracking**: Records where parameters come from
- **Context Flow Analysis**: Tracks context key lifecycle across nodes

The inspection data structures provide a single source of truth for:
- Text-based inspection reports (CLI tools)
- JSON representations (web interfaces)
- Validation and error checking
- Parameter resolution analysis
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import inspect
from semantiva.data_processors.data_processors import ParameterInfo
from semantiva.exceptions import InvalidNodeParameterError

from semantiva.pipeline.nodes.nodes import (
    _PipelineNode,
    _ProbeContextInjectorNode,
    _ContextProcessorNode,
)
from semantiva.pipeline.nodes._pipeline_node_factory import _pipeline_node_factory
from semantiva.pipeline._param_resolution import (
    inspect_origin,
    classify_unknown_config_params,
)


[docs] @dataclass class NodeInspection: """Detailed inspection data for a single pipeline node. This class captures all relevant information about a pipeline node including its configuration, parameter resolution, context interactions, and any errors encountered during inspection. Attributes: index: 1-based position of node in the pipeline sequence node_class: Name of the pipeline node wrapper class (e.g., '_DataOperationNode') processor_class: Name of the actual processor class (e.g., 'ImageNormalizer') component_type: Semantic component type from metadata (e.g., 'data_processor') input_type: Expected input data type (None for context-only nodes) output_type: Expected output data type (None for context-only nodes) config_params: Parameters resolved from pipeline configuration or defaults default_params: Parameters resolved from operation signature defaults (if not overridden by the context) context_params: Parameters resolved from pipeline context, with origin tracking created_keys: Context keys that this node adds to the pipeline context suppressed_keys: Context keys that this node removes from the pipeline context docstring: Documentation string from the processor class invalid_parameters: Issues for parameters not accepted by the processor is_configuration_valid: False if invalid parameters were detected errors: List of error messages encountered during node inspection Parameter Resolution Tracking: The `context_params` dict maps parameter names to their origin node index: - `None` indicates the parameter comes from initial pipeline context - Integer value indicates the 1-based index of the node that created the key """ index: int node_class: str processor_class: str component_type: str input_type: Optional[type] output_type: Optional[type] config_params: Dict[str, Any] default_params: Dict[str, Any] context_params: Dict[str, Optional[int]] created_keys: set[str] suppressed_keys: set[str] docstring: str invalid_parameters: List[Dict[str, Any]] is_configuration_valid: bool errors: List[str] = field(default_factory=list)
[docs] @dataclass class PipelineInspection: """Complete inspection results for an entire pipeline configuration. This class aggregates inspection data across all nodes and provides pipeline-level analysis including context key flow and validation results. Attributes: nodes: List of per-node inspection data in pipeline execution order required_context_keys: Context keys that must be provided in initial payload key_origin: Mapping of context keys to the node index that created them errors: List of pipeline-level error messages Context Key Flow Analysis: The `key_origin` dict tracks the lifecycle of context keys: - Keys with `None` origin must be provided in initial context - Keys with integer origin are created by that node (1-based index) - Keys may be created, used, and deleted by different nodes """ nodes: List[NodeInspection] required_context_keys: set[str] key_origin: Dict[str, Optional[int]] errors: List[str] = field(default_factory=list)
[docs] def build_pipeline_inspection( node_configs: List[Dict[str, Any]], ) -> PipelineInspection: """ Build a :class:`PipelineInspection` from a pipeline configuration. This function analyzes a raw pipeline configuration to produce detailed inspection results. The function is designed to be error-resilient and will never raise exceptions - instead, it captures error information within the inspection data structure. Args: node_configs: List of node configuration dictionaries Returns: PipelineInspection: Complete inspection results including per-node analysis, context key flow, and any errors encountered Error Handling: This function never raises exceptions. Instead, errors are captured at appropriate levels: - Pipeline-level errors go in `PipelineInspection.errors` - Node-level errors go in `NodeInspection.errors` for the affected node - Invalid nodes are still included with placeholder information Context Key Tracking: The function performs sophisticated context key flow analysis: 1. Tracks which parameters each node requires from context 2. Records which node created each context key (origin tracking) 3. Handles context key deletion and validates usage after deletion 4. Identifies external context keys required from initial payload Implementation Notes: - Attempts to construct nodes via node factory - Tracks parameter resolution from config vs. context sources - Handles special node types (probes, context processors) appropriately - Maintains deleted key tracking to validate parameter availability """ nodes = [] # Initialize tracking data structures inspection_nodes: List[NodeInspection] = [] key_origin: Dict[str, int] = {} # Maps context keys to the node that created them deleted_keys: set[str] = set() # Tracks keys that have been deleted from context all_required_params: set[str] = set() # All parameters required from context all_created_keys: set[str] = set() # All keys created by any node errors: List[str] = [] # Process each node configuration for index, node_cfg in enumerate(node_configs, start=1): node: Optional[_PipelineNode] = None node_errors: List[str] = [] # Attempt to construct node from configuration try: node = _pipeline_node_factory(node_cfg) nodes.append(node) except InvalidNodeParameterError as exc: msg = str(exc) errors.append(f"Node {index}: {msg}") issues = [ {"name": k, "reason": "unknown_parameter"} for k in exc.invalid.keys() ] inspection_nodes.append( NodeInspection( index=index, node_class="Invalid", processor_class=exc.processor_fqcn.split(".")[-1], component_type="Unknown", input_type=None, output_type=None, config_params=node_cfg.get("parameters", {}), default_params={}, context_params={}, created_keys=set(), suppressed_keys=set(), docstring="", invalid_parameters=issues, is_configuration_valid=False, errors=[msg], ) ) continue except Exception as exc: # pragma: no cover - defensive # Node construction failed - create placeholder inspection data msg = str(exc) errors.append(f"Node {index}: {msg}") inspection_nodes.append( NodeInspection( index=index, node_class="Invalid", processor_class=str(node_cfg.get("processor", "Unknown")), component_type="Unknown", input_type=None, output_type=None, config_params=node_cfg.get("parameters", {}), default_params={}, context_params={}, created_keys=set(), suppressed_keys=set(), docstring="", invalid_parameters=[], is_configuration_valid=False, errors=[msg], ) ) continue # Extract node and processor information processor = node.processor metadata = node.get_metadata() issues = classify_unknown_config_params( processor_cls=processor.__class__, processor_config=node.processor_config, ) # Analyze parameter resolution with defaults param_details: Dict[str, ParameterInfo] = {} # Prefer signature-based details when available (explicit params) if hasattr(processor.__class__, "_retrieve_parameter_details"): param_details = processor.__class__._retrieve_parameter_details( processor.__class__._process_logic, ["self", "data"] ) # If signature has only **kwargs (dynamic factories), fall back to get_processing_parameter_names if not param_details: gppn = getattr(processor.__class__, "get_processing_parameter_names", None) if callable(gppn): # Build a minimal param_details mapping names -> unknown/default param_details = { name: ParameterInfo(default=None, annotation="Unknown") for name in gppn() } config_params: Dict[str, Any] = dict(node.processor_config) default_params: Dict[str, Any] = {} context_params: Dict[str, Optional[int]] = {} required_params: set[str] = set() for name in param_details.keys(): origin, origin_idx, default_value = inspect_origin( name=name, processor_cls=processor.__class__, processor_config=node.processor_config, key_origin=key_origin, deleted_keys=deleted_keys, ) if origin == "config": continue if origin == "context": context_params[name] = origin_idx required_params.add(name) elif origin == "default": config_params[name] = default_value default_params[name] = default_value elif origin == "required": context_params[name] = origin_idx required_params.add(name) # Merge explicit context requirements exposed by processor hook = getattr(processor.__class__, "get_context_requirements", None) if callable(hook): for key in hook(): if key not in context_params: context_params[key] = key_origin.get(key) required_params.add(key) all_required_params.update(required_params) # Analyze context key creation created_keys: set[str] = set() get_ck = getattr(processor.__class__, "get_created_keys", None) if callable(get_ck): created_keys = set(get_ck()) # Special handling for probe nodes that inject results into context if isinstance(node, _ProbeContextInjectorNode): created_keys.add(node.context_keyword) key_origin[node.context_keyword] = index # Update key origin tracking for all created keys for key in created_keys: if key in deleted_keys: # Key is being recreated after deletion deleted_keys.remove(key) key_origin.setdefault(key, index) all_created_keys.update(created_keys) # Analyze context key suppression/deletion suppressed_keys = set() if isinstance(node, _ContextProcessorNode): suppressed_keys = set(node.get_suppressed_keys()) deleted_keys.update(suppressed_keys) # Validate parameter availability against deleted keys missing_deleted = (required_params & deleted_keys) - suppressed_keys if missing_deleted - set(config_params.keys()): node_errors.append( f"Node {index} requires context keys previously deleted: {sorted(missing_deleted)}" ) # Create inspection data for this node node_inspection = NodeInspection( index=index, node_class=node.__class__.__name__, processor_class=processor.__class__.__name__, component_type=metadata.get("component_type", "Unknown"), input_type=getattr(node, "input_data_type", lambda: None)(), output_type=getattr(node, "output_data_type", lambda: None)(), config_params=config_params, default_params=default_params, context_params=context_params, created_keys=created_keys, suppressed_keys=suppressed_keys, docstring=inspect.getdoc(processor.__class__) or "No description provided.", invalid_parameters=issues, is_configuration_valid=(len(issues) == 0), errors=node_errors, ) inspection_nodes.append(node_inspection) # Calculate pipeline-level required context keys # These are parameters required by nodes but not created by any node required_context_keys = all_required_params - all_created_keys return PipelineInspection( nodes=inspection_nodes, required_context_keys=required_context_keys, key_origin={k: v for k, v in key_origin.items()}, errors=errors, )