Source code for faust.transport.utils
"""Transport utils - scheduling."""
from typing import (
Any,
Dict,
Iterator,
List,
Mapping,
MutableMapping,
Optional,
Set,
Tuple,
)
from mode.utils.compat import OrderedDict
from faust.types import TP
from faust.types.transports import SchedulingStrategyT
__all__ = [
'TopicIndexMap',
'DefaultSchedulingStrategy',
'TopicBuffer',
]
# But we want to process records from topics in round-robin order.
# We convert records into a mapping from topic-name to "chain-of-buffers":
# topic_index['topic-name'] = chain(all_topic_partition_buffers)
# This means we can get the next message available in any topic
# by doing: next(topic_index['topic_name'])
TopicIndexMap = MutableMapping[str, 'TopicBuffer']
[docs]class DefaultSchedulingStrategy(SchedulingStrategyT):
"""Consumer record scheduler.
Delivers records in round robin between both topics and partitions.
"""
[docs] @classmethod
def map_from_records(cls, records: Mapping[TP, List]) -> TopicIndexMap:
"""Convert records to topic index map."""
topic_index: TopicIndexMap = {}
for tp, messages in records.items():
try:
entry = topic_index[tp.topic]
except KeyError:
entry = topic_index[tp.topic] = TopicBuffer()
entry.add(tp, messages)
return topic_index
[docs] def iterate(self, records: Mapping[TP, List]) -> Iterator[Tuple[TP, Any]]:
"""Iterate over records in round-robin order."""
return self.records_iterator(self.map_from_records(records))
[docs] def records_iterator(self,
index: TopicIndexMap) -> Iterator[Tuple[TP, Any]]:
"""Iterate over topic index map in round-robin order."""
to_remove: Set[str] = set()
sentinel = object()
_next = next
while index:
for topic in to_remove:
index.pop(topic, None)
for topic, messages in index.items():
item = _next(messages, sentinel)
if item is sentinel:
# this topic is now empty,
# but we cannot remove from dict while iterating over it,
# so move that to the outer loop.
to_remove.add(topic)
continue
tp, record = item # type: ignore
yield tp, record
[docs]class TopicBuffer(Iterator):
"""Data structure managing the buffer for incoming records in a topic."""
_buffers: Dict[TP, Iterator]
_it: Optional[Iterator]
def __init__(self) -> None:
# note: this is a regular dict, but ordered on Python 3.6
# we use this alias to signify it must be ordered.
self._buffers = OrderedDict()
# getmany calls next(_TopicBuffer), and does not call iter(),
# so the first call to next caches an iterator.
self._it = None
[docs] def add(self, tp: TP, buffer: List) -> None:
"""Add topic partition buffer to the cycle."""
assert tp not in self._buffers
self._buffers[tp] = iter(buffer)
def __iter__(self) -> Iterator[Tuple[TP, Any]]:
buffers = self._buffers
buffers_items = buffers.items
buffers_remove = buffers.pop
sentinel = object()
to_remove: Set[TP] = set()
mark_as_to_remove = to_remove.add
while buffers:
for tp in to_remove:
buffers_remove(tp, None)
for tp, buffer in buffers_items():
item = next(buffer, sentinel)
if item is sentinel:
mark_as_to_remove(tp)
continue
yield tp, item
def __next__(self) -> Tuple[TP, Any]:
# Note: this method is not in normal iteration
# as __iter__ returns generator.
it = self._it
if it is None:
it = self._it = iter(self)
return it.__next__()