Source code for semantiva.execution.component_registry
# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Execution Component Registry for orchestrators, executors, and transports.
This module provides a registry specifically for execution components to avoid
circular import dependencies with the main ProcessorRegistry and graph builder modules.
The registry follows the dependency inversion principle by being a dependency sink
rather than creating webs of interdependence.
"""
from __future__ import annotations
from typing import Dict, List, Type
from semantiva.logger import Logger
[docs]
class ExecutionComponentRegistry:
"""Registry for execution layer components: orchestrators, executors, transports.
This registry is designed to break circular import dependencies by providing
a dedicated registration system for execution components that doesn't depend
on graph building or general class resolution functionality.
"""
_orchestrators: Dict[str, Type] = {}
_executors: Dict[str, Type] = {}
_transports: Dict[str, Type] = {}
_initialized: bool = False
[docs]
@classmethod
def register_orchestrator(cls, name: str, orchestrator_cls: Type) -> None:
"""Register an orchestrator class by name.
Args:
name: String identifier for the orchestrator
orchestrator_cls: The orchestrator class to register
"""
cls._orchestrators[name] = orchestrator_cls
Logger().debug(f"Registered orchestrator: {name}")
[docs]
@classmethod
def register_executor(cls, name: str, executor_cls: Type) -> None:
"""Register an executor class by name.
Args:
name: String identifier for the executor
executor_cls: The executor class to register
"""
cls._executors[name] = executor_cls
Logger().debug(f"Registered executor: {name}")
[docs]
@classmethod
def register_transport(cls, name: str, transport_cls: Type) -> None:
"""Register a transport class by name.
Args:
name: String identifier for the transport
transport_cls: The transport class to register
"""
cls._transports[name] = transport_cls
Logger().debug(f"Registered transport: {name}")
[docs]
@classmethod
def get_orchestrator(cls, name: str) -> Type:
"""Get an orchestrator class by name.
Args:
name: String identifier for the orchestrator
Returns:
The orchestrator class registered under ``name``.
Raises:
KeyError: If no orchestrator is registered under ``name``.
"""
try:
return cls._orchestrators[name]
except KeyError as exc: # pragma: no cover - exercised via CLI helpers
raise KeyError(name) from exc
[docs]
@classmethod
def get_executor(cls, name: str) -> Type:
"""Get an executor class by name.
Args:
name: String identifier for the executor
Returns:
The executor class registered under ``name``.
Raises:
KeyError: If no executor is registered under ``name``.
"""
try:
return cls._executors[name]
except KeyError as exc: # pragma: no cover - exercised via CLI helpers
raise KeyError(name) from exc
[docs]
@classmethod
def get_transport(cls, name: str) -> Type:
"""Get a transport class by name.
Args:
name: String identifier for the transport
Returns:
The transport class registered under ``name``.
Raises:
KeyError: If no transport is registered under ``name``.
"""
try:
return cls._transports[name]
except KeyError as exc: # pragma: no cover - exercised via CLI helpers
raise KeyError(name) from exc
[docs]
@classmethod
def initialize_defaults(cls) -> None:
"""Initialize default execution components.
This method registers the built-in orchestrators, executors, and transports.
It's designed to be called after other modules are loaded to avoid circular
import issues.
Safe to call multiple times - subsequent calls are idempotent.
"""
if cls._initialized:
return
# Import here to avoid circular dependencies
from .orchestrator.orchestrator import (
LocalSemantivaOrchestrator,
SemantivaOrchestrator,
)
from .executor.executor import SequentialSemantivaExecutor
from .transport import InMemorySemantivaTransport
# Register default orchestrators
cls.register_orchestrator(
"LocalSemantivaOrchestrator", LocalSemantivaOrchestrator
)
cls.register_orchestrator("SemantivaOrchestrator", SemantivaOrchestrator)
cls.register_orchestrator("local", LocalSemantivaOrchestrator)
# Register default executors
cls.register_executor(
"SequentialSemantivaExecutor", SequentialSemantivaExecutor
)
cls.register_executor("sequential", SequentialSemantivaExecutor)
# Register default transports
cls.register_transport("InMemorySemantivaTransport", InMemorySemantivaTransport)
cls.register_transport("in_memory", InMemorySemantivaTransport)
cls._initialized = True
Logger().debug("ExecutionComponentRegistry initialized with defaults")
[docs]
@classmethod
def get_registered_orchestrators(cls) -> Dict[str, Type]:
"""Get all registered orchestrators."""
return dict(cls._orchestrators)
[docs]
@classmethod
def get_registered_executors(cls) -> Dict[str, Type]:
"""Get all registered executors."""
return dict(cls._executors)
[docs]
@classmethod
def get_registered_transports(cls) -> Dict[str, Type]:
"""Get all registered transports."""
return dict(cls._transports)
[docs]
@classmethod
def list_orchestrators(cls) -> List[str]:
"""Return orchestrator identifiers in sorted order."""
return sorted(cls._orchestrators.keys())
[docs]
@classmethod
def list_executors(cls) -> List[str]:
"""Return executor identifiers in sorted order."""
return sorted(cls._executors.keys())
[docs]
@classmethod
def list_transports(cls) -> List[str]:
"""Return transport identifiers in sorted order."""
return sorted(cls._transports.keys())
[docs]
@classmethod
def clear(cls) -> None:
"""Clear all registered components. Primarily for testing."""
cls._orchestrators.clear()
cls._executors.clear()
cls._transports.clear()
cls._initialized = False
__all__ = ["ExecutionComponentRegistry"]