Source code for faust.transport.drivers.memory

"""Experimental: In-memory transport."""
import asyncio
from collections import defaultdict, deque
from time import time
from typing import (
    Any,
    AsyncIterator,
    Awaitable,
    ClassVar,
    Iterable,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    Type,
    cast,
)

from mode import Seconds
from mode.utils.compat import Deque
from mode.utils.futures import done_future
from mode.utils.imports import symbol_by_name

from faust.transport import base
from faust.types import Message, RecordMetadata, TP
from faust.types.transports import ConsumerT, ProducerT

# XXX mypy borks on `import faust`
faust_version = symbol_by_name('faust:__version__')


[docs]class RebalanceListener: """In-memory rebalance listener.""" def __init__(self, *args: Any, **kwargs: Any) -> None: ...
[docs]class Consumer(base.Consumer): """In-memory consumer.""" RebalanceListener: ClassVar[Type] = RebalanceListener consumer_stopped_errors: ClassVar[Tuple[Type[Exception], ...]] = ()
[docs] async def create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Seconds = 1000.0, retention: Seconds = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) -> None: ...
[docs] async def subscribe(self, topics: Iterable[str]) -> None: await cast(Transport, self.transport).subscribe(topics)
[docs] async def getmany(self, *partitions: TP, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: transport = cast(Transport, self.transport) max_per_partition = 100 if not partitions: partitions = tuple(self.assignment()) if not partitions: if await self.wait_for_stopped(transport._subscription_ready): return for tp in partitions: messages = transport._messages[tp.topic] if not messages: if await self.wait_for_stopped(transport._messages_ready): return transport._messages_ready.clear() i = 0 while messages: yield tp, messages.popleft() i += 1 if i > max_per_partition: break
def _new_topicpartition(self, topic: str, partition: int) -> TP: return TP(topic, partition)
[docs] async def perform_seek(self) -> None: ...
async def _commit(self, offsets: Mapping[TP, Tuple[int, str]]) -> bool: ...
[docs] async def pause_partitions(self, tps: Iterable[TP]) -> None: ...
[docs] async def position(self, tp: TP) -> Optional[int]: return 0
[docs] async def resume_partitions(self, partitions: Iterable[TP]) -> None: ...
[docs] async def seek_to_latest(self, *partitions: TP) -> None: ...
[docs] async def seek_to_beginning(self, *partitions: TP) -> None: ...
[docs] async def seek(self, partition: TP, offset: int) -> None: ...
[docs] def assignment(self) -> Set[TP]: return { TP(t, 0) for t in cast(Transport, self.transport)._subscription }
[docs] def highwater(self, tp: TP) -> int: return 0
[docs] async def earliest_offsets(self, *partitions: TP) -> MutableMapping[TP, int]: return {tp: 0 for tp in partitions}
[docs] async def highwaters(self, *partitions: TP) -> MutableMapping[TP, int]: return {tp: 0 for tp in partitions}
[docs]class Producer(base.Producer): """In-memory producer."""
[docs] async def create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Seconds = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) -> None: ...
[docs] async def send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int]) -> Awaitable[RecordMetadata]: res = await self.send_and_wait(topic, key, value, partition) return cast(Awaitable[RecordMetadata], done_future(res))
[docs] async def send_and_wait(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int]) -> RecordMetadata: return await cast(Transport, self.transport).send( topic, value, key, partition)
[docs]class Transport(base.Transport): """In-memory transport.""" Consumer: ClassVar[Type[ConsumerT]] = Consumer Producer: ClassVar[Type[ProducerT]] = Producer default_port = 9092 driver_version = f'memory-{faust_version}' _subscription: Set[str] _messages: MutableMapping[str, Deque[Message]] _messages_ready: asyncio.Event _subscription_ready: asyncio.Event def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._subscription = set() self._messages = defaultdict(deque) self._messages_ready = asyncio.Event(loop=self.loop) self._subscription_ready = asyncio.Event(loop=self.loop)
[docs] async def subscribe(self, topics: Iterable[str]) -> None: self._subscription_ready.clear() self._subscription.clear() self._subscription.update(topics) self._subscription_ready.set()
[docs] async def send(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int]) -> RecordMetadata: if partition is None: partition = 0 message = Message( topic, partition=partition, offset=0, timestamp=time(), timestamp_type=0, key=key, value=value, checksum=None, serialized_key_size=len(key) if key else 0, serialized_value_size=len(value) if value else 0, ) self._messages[topic].append(message) self._messages_ready.set() return RecordMetadata( topic=topic, partition=partition, topic_partition=message.tp, offset=0, )