Source code for faust.channels

"""Channel.

A channel is used to send values to streams.

The stream will iterate over incoming events in the channel.

"""
import asyncio
from typing import (
    Any,
    Awaitable,
    Callable,
    Mapping,
    MutableSet,
    Optional,
    Set,
    cast,
)
from weakref import WeakSet

from mode import Seconds, get_logger, want_seconds
from mode.utils.futures import maybe_async, stampede
from mode.utils.queues import ThrowableQueue

from .events import Event
from .types import (
    AppT,
    ChannelT,
    CodecArg,
    EventT,
    FutureMessage,
    K,
    Message,
    MessageSentCallback,
    ModelArg,
    PendingMessage,
    RecordMetadata,
    StreamT,
    TP,
    V,
)
from .types.core import HeadersArg, OpenHeadersArg, prepare_headers
from .types.tuples import _PendingMessage_to_Message

__all__ = ['Channel']

logger = get_logger(__name__)


[docs]class Channel(ChannelT): """Create new channel. Arguments: app: The app that created this channel (``app.channel()``) key_type: The Model used for keys in this channel. value_type: The Model used for values in this channel. maxsize: The maximum number of messages this channel can hold. If exceeded any new ``put`` call will block until a message is removed from the channel. loop: The :mod:`asyncio` event loop to use. """ app: AppT key_type: Optional[ModelArg] value_type: Optional[ModelArg] is_iterator: bool _queue: Optional[ThrowableQueue] _root: Optional['Channel'] _subscribers: MutableSet['Channel'] def __init__(self, app: AppT, *, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, queue: ThrowableQueue = None, maxsize: int = None, root: ChannelT = None, active_partitions: Set[TP] = None, loop: asyncio.AbstractEventLoop = None) -> None: self.app = app self.loop = loop self.key_type = key_type self.value_type = value_type self.is_iterator = is_iterator self._queue = queue self.maxsize = maxsize self.deliver = self._compile_deliver() # type: ignore self._root = cast(Channel, root) self.active_partitions = active_partitions self._subscribers = WeakSet() @property def queue(self) -> ThrowableQueue: if self._queue is None: # this should only be set after clone = channel.__aiter__() # which means the loop is not accessed by merely defining # a channel at module scope. maxsize = self.maxsize if maxsize is None: maxsize = self.app.conf.stream_buffer_maxsize self._queue = self.app.FlowControlQueue( maxsize=maxsize, loop=self.loop, clear_on_resume=True, ) return self._queue
[docs] def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> ChannelT: is_it = is_iterator if is_iterator is not None else self.is_iterator subchannel: ChannelT = self._clone(is_iterator=is_it, **kwargs) if is_it: (self._root or self)._subscribers.add(cast(Channel, subchannel)) # make sure queue is created at this point # ^ it's a cached_property subchannel.queue return subchannel
[docs] def clone_using_queue(self, queue: asyncio.Queue) -> ChannelT: return self.clone(queue=queue, is_iterator=True)
def _clone(self, **kwargs: Any) -> ChannelT: return type(self)(**{**self._clone_args(), **kwargs}) def _clone_args(self) -> Mapping: # How to create a copy of this channel. return { 'app': self.app, 'loop': self.loop, 'key_type': self.key_type, 'value_type': self.value_type, 'maxsize': self.maxsize, 'root': self._root if self._root is not None else self, 'queue': None, 'active_partitions': self.active_partitions, }
[docs] def stream(self, **kwargs: Any) -> StreamT: """Create stream reading from this channel.""" return self.app.stream(self, **kwargs)
[docs] def get_topic_name(self) -> str: raise NotImplementedError('Channels are unnamed topics')
[docs] async def send(self, *, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, force: bool = False) -> Awaitable[RecordMetadata]: """Send message to channel.""" return await self._send_now( key, value, partition=partition, timestamp=timestamp, headers=headers, key_serializer=key_serializer, value_serializer=value_serializer, callback=callback, )
[docs] def as_future_message( self, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None) -> FutureMessage: return FutureMessage( PendingMessage( self, self.prepare_key(key, key_serializer), self.prepare_value(value, value_serializer), key_serializer=key_serializer, value_serializer=value_serializer, partition=partition, timestamp=timestamp, headers=self.prepare_headers(headers), callback=callback, # Python 3.6.0: NamedTuple doesn't support optional fields # [ask] topic=None, offset=None, ), )
[docs] def prepare_headers( self, headers: Optional[HeadersArg]) -> OpenHeadersArg: if headers is not None: return prepare_headers(headers) return {}
async def _send_now( self, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None) -> Awaitable[RecordMetadata]: return await self.publish_message( self.as_future_message( key, value, partition, timestamp, headers, key_serializer, value_serializer, callback))
[docs] async def publish_message(self, fut: FutureMessage, wait: bool = True) -> Awaitable[RecordMetadata]: event = self._create_event( fut.message.key, fut.message.value, fut.message.headers, message=_PendingMessage_to_Message(fut.message)) await self.put(event) topic, partition = tp = TP( fut.message.topic or '<anon>', fut.message.partition or -1) return await self._finalize_message( fut, RecordMetadata( topic=topic, partition=partition, topic_partition=tp, offset=-1, timestamp=fut.message.timestamp, timestamp_type=1, ), )
async def _finalize_message(self, fut: FutureMessage, result: RecordMetadata) -> FutureMessage: fut.set_result(result) if fut.message.callback: await maybe_async(fut.message.callback(fut)) return fut
[docs] @stampede async def maybe_declare(self) -> None: ...
[docs] async def declare(self) -> None: ...
[docs] def prepare_key(self, key: K, key_serializer: CodecArg) -> Any: return key
[docs] def prepare_value(self, value: V, value_serializer: CodecArg) -> Any: return value
[docs] async def decode(self, message: Message, *, propagate: bool = False) -> EventT: return self._create_event( message.key, message.value, message.headers, message=message)
[docs] async def deliver(self, message: Message) -> None: # pragma: no cover ... # closure compiled at __init__
def _compile_deliver(self) -> Callable[[Message], Awaitable[None]]: put = None async def deliver(message: Message) -> None: nonlocal put if put is None: # NOTE circumvents self.put, using queue directly put = self.queue.put event = await self.decode(message) await put(event) return deliver def _create_event(self, key: K, value: V, headers: Optional[HeadersArg], message: Message) -> EventT: return Event(self.app, key, value, headers, message)
[docs] async def put(self, value: Any) -> None: root = self._root if self._root is not None else self for subscriber in root._subscribers: await subscriber.queue.put(value)
[docs] async def get(self, *, timeout: Seconds = None) -> Any: timeout_: float = want_seconds(timeout) if timeout_: return await asyncio.wait_for(self.queue.get(), timeout=timeout_) return await self.queue.get()
[docs] def empty(self) -> bool: return self.queue.empty()
[docs] async def on_key_decode_error(self, exc: Exception, message: Message) -> None: await self.on_decode_error(exc, message) await self.throw(exc)
[docs] async def on_value_decode_error(self, exc: Exception, message: Message) -> None: await self.on_decode_error(exc, message) await self.throw(exc)
[docs] async def on_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] def on_stop_iteration(self) -> None: ...
[docs] def derive(self, **kwargs: Any) -> ChannelT: return self
def __aiter__(self) -> ChannelT: return self if self.is_iterator else self.clone(is_iterator=True) async def __anext__(self) -> EventT: if not self.is_iterator: raise RuntimeError('Need to call channel.__aiter__()') return await self.queue.get()
[docs] async def throw(self, exc: BaseException) -> None: self.queue._throw(exc)
def _throw(self, exc: BaseException) -> None: self.queue._throw(exc) def __repr__(self) -> str: s = f'<{self.label}@{self._repr_id()}' if self.active_partitions is not None: if self.active_partitions: active = '{' + ', '.join(sorted( f'{tp.topic}:{tp.partition}' for tp in self.active_partitions)) + '}' else: active = '{<pending for assignment>}' s += f' active={active}' s += '>' return s def __str__(self) -> str: return '<ANON>' def _repr_id(self) -> str: return f'{id(self):#x}' @property def subscriber_count(self) -> int: return len(self._subscribers) @property def label(self) -> str: sym = '(*)' if self.is_iterator else '' return f'{sym}{type(self).__name__}: {self}'