Source code for semantiva.execution.transport.in_memory

# Copyright 2025 Semantiva authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
In-process transport implementation for Semantiva that stores messages in memory.
Ideal for local development, testing, or single-machine demos without external dependencies.

This transport supports:
  - Wildcard channel subscriptions using Unix shell-style patterns (fnmatch).
  - Synchronous and asynchronous iteration over matching messages.
  - Optional callback-based consumption in a background thread.
  - No-op acknowledgments and connect/close methods, since it's all in-process.
"""

import threading
from collections import defaultdict, deque
from typing import Any, Dict, Optional, Callable
from concurrent.futures import Future
from fnmatch import fnmatch
from semantiva.context_processors import ContextType

from .base import SemantivaTransport, Subscription, Message


[docs] class InMemorySubscription(Subscription): """ Subscription for in-memory channels matching a given pattern. Iterates over queued Message objects whose channel name matches the provided pattern (supports '*' wildcards). Once all matching messages are drained, the iterator exits. Calling close() stops iteration early. """ def __init__(self, queues: Dict[str, tuple[deque, threading.Lock]], pattern: str): """ Args: queues: Shared mapping of channel -> (deque of Message, Lock) pattern: Channel pattern to match (exact name or wildcard) """ self._queues = queues self._pattern = pattern self._closed = False def __iter__(self): """ Blocking iterator over matching messages. Yields: Message: next queued message whose channel matches the pattern. Terminates when: - close() is called, or - no more matching messages remain in any queue. """ while not self._closed: found = False # Scan all channels for matches for channel, (q, lock) in list(self._queues.items()): if fnmatch(channel, self._pattern): with lock: msg = q.popleft() if q else None if msg: found = True yield msg break # yield one message at a time # If no message found for this pattern, exit if not found: break async def __aiter__(self): """ Asynchronous iterator fallback. Yields the same messages as the synchronous iterator, but in an async context. """ for msg in self: yield msg
[docs] def close(self) -> None: """ Stop the subscription. Subsequent iterations will terminate. """ self._closed = True
[docs] class InMemorySemantivaTransport(SemantivaTransport): """ In-memory transport that implements the SemantivaTransport API. - Maintains per-channel queues of Message objects. - publish() appends to the appropriate queue. - subscribe() returns an InMemorySubscription for pattern-based consumption. - connect()/close() are no-ops, provided for API symmetry. """ def __init__(self) -> None: # channel -> (deque of Message, threading.Lock) self._queues: Dict[str, tuple[deque, threading.Lock]] = defaultdict( lambda: (deque(), threading.Lock()) ) self._connected = False
[docs] def connect(self) -> None: """ Establish the transport. No real connection needed for in-memory; sets an internal flag. """ self._connected = True
[docs] def close(self) -> None: """ Tear down the transport. No real cleanup needed for in-memory; clears the internal flag. """ self._connected = False
[docs] def publish( self, channel: str, data: Any, context: ContextType, metadata: Optional[Dict[str, Any]] = None, require_ack: bool = False, ) -> Optional[Future]: """ Publish a Message to a channel. Args: channel: Channel name (string). data: Payload for the message. context: Context dictionary accompanying data. metadata: Optional metadata dict (defaults to {}). require_ack: If True, returns a completed Future to simulate ack. Returns: Future if require_ack=True, else None. """ q, lock = self._queues[channel] msg = Message( data=data, context=context, metadata=metadata or {}, ack=lambda: None, # No-op ack for in-memory ) with lock: q.append(msg) if require_ack: fut: Future = Future() fut.set_result(None) return fut return None
[docs] def subscribe( self, channel: str, *, callback: Optional[Callable[[Message], None]] = None ) -> Subscription: """ Subscribe to messages matching the given channel pattern. Args: channel: Channel name or wildcard pattern (e.g., "jobs.*.cfg"). callback: Optional function to call for each incoming Message. If provided, a background thread is started that iterates and invokes callback(msg). Returns: InMemorySubscription instance for manual iteration if no callback, or a subscription with callback-driven background thread. """ sub = InMemorySubscription(self._queues, channel) if callback: # Launch a daemon thread that pushes each Message to the callback def _runner(): for msg in sub: callback(msg) t = threading.Thread(target=_runner, daemon=True) t.start() return sub