Source code for faust.topics

"""Topic - Named channel using Kafka."""
import asyncio
import re
import typing
from contextlib import suppress
from functools import partial
from typing import (
    Any,
    Awaitable,
    Callable,
    Mapping,
    Optional,
    Pattern,
    Sequence,
    Set,
    Union,
    cast,
    no_type_check,
)
from mode import Seconds, get_logger
from mode.utils.futures import stampede
from mode.utils.queues import ThrowableQueue

from .channels import Channel
from .exceptions import KeyDecodeError, ValueDecodeError
from .events import Event
from .streams import current_event
from .types import (
    AppT,
    CodecArg,
    EventT,
    FutureMessage,
    K,
    Message,
    MessageSentCallback,
    ModelArg,
    PendingMessage,
    RecordMetadata,
    TP,
    V,
)
from .types.topics import ChannelT, TopicT
from .types.transports import ProducerT

if typing.TYPE_CHECKING:  # pragma: no cover
    from mypy_extensions import DefaultNamedArg

    from .app import App

    DecodeFunction = Callable[[Message, DefaultNamedArg(bool, 'propagate')],
                              Awaitable[EventT]]
else:
    class App:
        ...  # noqa

    DecodeFunction = Callable[..., Awaitable[EventT]]

__all__ = ['Topic']

logger = get_logger(__name__)


[docs]class Topic(Channel, TopicT): """Define new topic description. Arguments: app: App instance used to create this topic description. topics: List of topic names. partitions: Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and ``autoCreateTopics`` is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration. retention: Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server. pattern: Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern. key_type: How to deserialize keys for messages in this topic. Can be a :class:`faust.Model` type, :class:`str`, :class:`bytes`, or :const:`None` for "autodetect" value_type: How to deserialize values for messages in this topic. Can be a :class:`faust.Model` type, :class:`str`, :class:`bytes`, or :const:`None` for "autodetect" active_partitions: Set of :class:`faust.types.tuples.TP` that this topic should be restricted to. Raises: TypeError: if both `topics` and `pattern` is provided. """ _partitions: Optional[int] = None _pattern: Optional[Pattern] = None def __init__(self, app: AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern] = None, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: ThrowableQueue = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, maxsize: int = None, root: ChannelT = None, active_partitions: Set[TP] = None, allow_empty: bool = False, loop: asyncio.AbstractEventLoop = None) -> None: self.topics = topics or [] super().__init__( app, key_type=key_type, value_type=value_type, loop=loop, is_iterator=is_iterator, queue=queue, maxsize=maxsize, ) self.key_serializer = key_serializer if self.key_serializer is None and key_type: self.key_serializer = _model_serializer(key_type) self.value_serializer = value_serializer if self.value_serializer is None and value_type: self.value_serializer = _model_serializer(value_type) self.pattern = cast(Pattern, pattern) self.partitions = partitions self.retention = retention self.compacting = compacting self.deleting = deleting self.replicas = replicas self.acks = acks self.internal = internal self.active_partitions = active_partitions self.config = config or {} self.allow_empty = allow_empty
[docs] async def send(self, *, key: K = None, value: V = None, partition: int = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, force: bool = False) -> Awaitable[RecordMetadata]: """Send message to topic.""" if self.app._attachments.enabled and not force: event = current_event() if event is not None: return cast(Event, event)._attach( self, key, value, partition=partition, key_serializer=key_serializer, value_serializer=value_serializer, callback=callback, ) return await self._send_now( key, value, partition=partition, key_serializer=key_serializer, value_serializer=value_serializer, callback=callback, )
[docs] async def decode(self, message: Message, *, propagate: bool = False) -> EventT: # first call to decode compiles and caches it. decode = self.decode = self._compile_decode() # type: ignore return await decode(message, propagate=propagate)
[docs] async def put(self, event: EventT) -> None: if not self.is_iterator: raise RuntimeError( f'Cannot put on Topic channel before aiter({self})') await self.queue.put(event)
def _compile_decode(self) -> DecodeFunction: app = self.app key_type = self.key_type value_type = self.value_type loads_key = app.serializers.loads_key loads_value = app.serializers.loads_value create_event = self._create_event key_serializer = self.key_serializer value_serializer = self.value_serializer async def decode(message: Message, *, propagate: bool = False) -> Any: try: k = loads_key(key_type, message.key, serializer=key_serializer) except KeyDecodeError as exc: if propagate: raise await self.on_key_decode_error(exc, message) else: try: if message.value is None and self.allow_empty: return create_event(k, None, message) v = loads_value( value_type, message.value, serializer=value_serializer) except ValueDecodeError as exc: if propagate: raise await self.on_value_decode_error(exc, message) else: return create_event(k, v, message) return decode @no_type_check # incompatible with base class, but OK def _clone_args(self) -> Mapping: return { **super()._clone_args(), **{ 'topics': self.topics, 'pattern': self.pattern, 'partitions': self.partitions, 'retention': self.retention, 'compacting': self.compacting, 'deleting': self.deleting, 'replicas': self.replicas, 'internal': self.internal, 'key_serializer': self.key_serializer, 'value_serializer': self.value_serializer, 'acks': self.acks, 'config': self.config, 'active_partitions': self.active_partitions, 'allow_empty': self.allow_empty}} @property def pattern(self) -> Optional[Pattern]: return self._pattern @pattern.setter def pattern(self, pattern: Union[str, Pattern]) -> None: if pattern and self.topics: raise TypeError('Cannot specify both topics and pattern') self._pattern = re.compile(pattern) if pattern else None @property def partitions(self) -> Optional[int]: return self._partitions @partitions.setter def partitions(self, partitions: int) -> None: if partitions == 0: raise ValueError('Topic cannot have zero partitions') self._partitions = partitions
[docs] def derive(self, **kwargs: Any) -> ChannelT: """Create new :class:`Topic` derived from this topic. Configuration will be copied from this topic, but any parameter overriden as a keyword argument. See Also: :meth:`derive_topic`: for a list of supported keyword arguments. """ return self.derive_topic(**kwargs)
[docs] def derive_topic(self, *, topics: Sequence[str] = None, key_type: ModelArg = None, value_type: ModelArg = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) -> TopicT: topics = self.topics if topics is None else topics if suffix or prefix: if self.pattern: raise ValueError( 'Cannot add prefix/suffix to Topic with pattern') topics = [f'{prefix}{topic}{suffix}' for topic in topics] return type(self)( self.app, topics=topics, pattern=self.pattern, key_type=self.key_type if key_type is None else key_type, value_type=self.value_type if value_type is None else value_type, key_serializer=( self.key_serializer if key_serializer is None else key_serializer), value_serializer=( self.value_serializer if value_serializer is None else value_serializer), partitions=self.partitions if partitions is None else partitions, retention=self.retention if retention is None else retention, compacting=self.compacting if compacting is None else compacting, deleting=self.deleting if deleting is None else deleting, config=self.config if config is None else config, internal=self.internal if internal is None else internal, )
[docs] def get_topic_name(self) -> str: if self.pattern: raise TypeError( 'Topic with pattern subscription cannot be identified') if self.topics: if len(self.topics) > 1: raise ValueError( 'Topic with multiple topic names cannot be identified') return self.topics[0] raise TypeError('Topic has no subscriptions (no pattern, no topics)')
async def _get_producer(self) -> ProducerT: return await self.app.maybe_start_producer()
[docs] async def publish_message(self, fut: FutureMessage, wait: bool = False) -> Awaitable[RecordMetadata]: app = self.app message: PendingMessage = fut.message if isinstance(message.channel, str): topic = message.channel elif isinstance(message.channel, TopicT): topic = cast(TopicT, message.channel).get_topic_name() else: topic = self.get_topic_name() key: bytes = cast(bytes, message.key) value: bytes = cast(bytes, message.value) logger.debug('send: topic=%r key=%r value=%r', topic, key, value) assert topic is not None producer = await self._get_producer() state = app.sensors.on_send_initiated( producer, topic, keysize=len(key) if key else 0, valsize=len(value) if value else 0) if wait: ret: RecordMetadata = await producer.send_and_wait( topic, key, value, partition=message.partition) app.sensors.on_send_completed(producer, state) return await self._finalize_message(fut, ret) else: fut2 = await producer.send( topic, key, value, partition=message.partition) cast(asyncio.Future, fut2).add_done_callback( cast(Callable, partial(self._on_published, message=fut))) return fut2
def _on_published(self, fut: asyncio.Future, message: FutureMessage) -> None: res: RecordMetadata = fut.result() message.set_result(res) if message.message.callback: message.message.callback(message)
[docs] def prepare_key(self, key: K, key_serializer: CodecArg) -> Any: if key is not None: return self.app.serializers.dumps_key( self.key_type, key, serializer=key_serializer or self.key_serializer) return None
[docs] def prepare_value(self, value: V, value_serializer: CodecArg) -> Any: return self.app.serializers.dumps_value( self.value_type, value, serializer=value_serializer or self.value_serializer)
[docs] def on_stop_iteration(self) -> None: pass
[docs] @stampede async def maybe_declare(self) -> None: await self.declare()
[docs] async def declare(self) -> None: producer = await self._get_producer() for topic in self.topics: partitions = self.partitions if partitions is None: partitions = self.app.conf.topic_partitions replicas = self.replicas if not replicas: replicas = self.app.conf.topic_replication_factor await producer.create_topic( topic=topic, partitions=partitions, replication=replicas, config=self.config, compacting=self.compacting, deleting=self.deleting, retention=self.retention, )
def __aiter__(self) -> ChannelT: if self.is_iterator: return self else: channel = self.clone(is_iterator=True) self.app.topics.add(channel) return channel def __str__(self) -> str: return str(self.pattern) if self.pattern else ','.join(self.topics)
def _model_serializer(typ: Any) -> Optional[CodecArg]: with suppress(AttributeError): return typ._options.serializer