Source code for semantiva.execution.transport.base

# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Defines the core transport abstraction for Semantiva's distributed execution runtime.
The transport layer moves `(data, context)` payloads between orchestrators and workers,
enabling pluggable, high-performance messaging (in-memory, NATS, Kafka, etc.).

Key concepts:
  - Message: encapsulates payload, context metadata, and an optional ack callback.
  - Subscription: sync/async iterable over incoming messages with lifecycle control.
  - SemantivaTransport: abstract interface for connecting, publishing, and subscribing.
"""

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Callable, Iterator, AsyncIterator, NamedTuple
from concurrent.futures import Future
from semantiva.context_processors import ContextType


[docs] class Message(NamedTuple): """ A single transport message carrying data, context, and optional metadata. Attributes: data: The main data object. context: Structured context accompanying the data. metadata: Transport-specific metadata (headers, chunk info, etc.). ack: A callable to acknowledge receipt (for durable transports). """ data: Any context: ContextType metadata: Dict[str, Any] ack: Callable[[], None] # call to acknowledge receipt
[docs] class Subscription(ABC): """ Represents an open subscription to a channel or topic. Provides both synchronous and asynchronous iterators over incoming Messages, plus a `close()` method to cancel the subscription and release resources. """ @abstractmethod def __iter__(self) -> Iterator[Message]: """ Blocking iterator of incoming messages. Yields: Message: next received message until subscription is closed. """ ... @abstractmethod async def __aiter__(self) -> AsyncIterator[Message]: """ Asynchronous iterator of incoming messages. Yields: Message: next received message until subscription is closed. """ ...
[docs] @abstractmethod def close(self) -> None: """ Unsubscribe / stop listening and clean up any background resources. """ ...
[docs] class SemantivaTransport(ABC): """ Abstract base for all Semantiva transport implementations. Transports handle connection lifecycle, message serialization, and low-level publish/subscribe semantics. Concrete subclasses might wrap in-memory queues, NATS, Kafka, Redis, or shared memory. """
[docs] @abstractmethod def connect(self) -> None: """ Open connections or initialize client resources. Called by orchestrators and workers before publishing/subscribing. """ ...
[docs] @abstractmethod def close(self) -> None: """ Tear down connections, stop background threads, and clean up. Ensures a graceful shutdown of the transport layer. """ ...
[docs] @abstractmethod def publish( self, channel: str, data: Any, context: ContextType, metadata: Optional[Dict[str, Any]] = None, require_ack: bool = False, ) -> Optional[Future]: """ Publish a message to the specified channel. Args: channel: Subject or topic name to publish to. data: Payload to send (arbitrary Python object). context: Context dictionary or object to accompany data. metadata: Optional transport-specific metadata (defaults to {}). require_ack: If True, return a Future that resolves when the transport confirms durable receipt. Returns: Future if require_ack=True, otherwise None. """ ...
[docs] @abstractmethod def subscribe( self, channel: str, *, callback: Optional[Callable[[Message], None]] = None ) -> Subscription: """ Subscribe to messages on the given channel. Args: channel: Subject or topic name (supports wildcards, patterns). callback: Optional function to call for each incoming Message. If provided, the Subscription’s iterator need not be used. Returns: Subscription: an iterable over incoming messages, or a subscription that pushes messages to the callback in the background. """ ...