Source code for faust.tables.changelogs

import asyncio
from typing import (
    Any,
    AsyncIterable,
    Iterable,
    List,
    MutableMapping,
    Set,
    Tuple,
)

from mode import Service
from mode.utils.compat import Counter
from mode.utils.aiter import aenumerate
from mode.utils.times import Seconds

from faust.types import AppT, ChannelT, EventT, TP
from faust.types.tables import ChangelogReaderT, CollectionT
from faust.utils import terminal

__all__ = ['ChangelogReader', 'StandbyReader', 'local_tps']

CHANGELOG_SEEKING = 'SEEKING'
CHANGELOG_STARTING = 'STARTING'
CHANGELOG_READING = 'READING'


[docs]async def local_tps(table: CollectionT, tps: Iterable[TP]) -> Set[TP]: # RocksDB: Find partitions that we have database files for, # since only one process can have them open at a time. return {tp for tp in tps if not await table.need_active_standby_for(tp)}
[docs]class ChangelogReader(Service, ChangelogReaderT): """Service synchronizing table state from changelog topic.""" wait_for_shutdown = True shutdown_timeout = None _highwaters: Counter[TP] _stop_event: asyncio.Event def __init__(self, table: CollectionT, channel: ChannelT, app: AppT, tps: Set[TP], offsets: Counter[TP] = None, stats_interval: Seconds = 5.0, **kwargs: Any) -> None: super().__init__(**kwargs) self.table = table self.channel = channel self.app = app self.tps = tps self.offsets = Counter() if offsets is None else offsets self.stats_interval = stats_interval for tp in self.tps: self.offsets.setdefault(tp, -1) self._highwaters = Counter() self._stop_event = asyncio.Event(loop=self.loop) def _repr_info(self) -> str: return f'{self.table!r} {self._stop_event!r}' @property def _buffer_size(self) -> int: return self.table.recovery_buffer_size async def _build_highwaters(self) -> None: consumer = self.app.consumer tps = self.tps highwaters = await consumer.highwaters(*tps) self._highwaters.clear() self._highwaters.update({ # FIXME the -1 here is because of the way we commit offsets tp: highwaters[tp] - 1 for tp in tps }) table = terminal.logtable( [[k.topic, str(k.partition), str(v)] for k, v in self._highwaters.items()], title='Highwater', headers=['topic', 'partition', 'highwater'], ) self.log.info('Highwater for changelog partitions:\n%s', table) def _should_stop_reading(self) -> bool: return not self._remaining_total() def _remaining(self) -> Counter[TP]: return self._highwaters - self.offsets def _remaining_total(self) -> int: return sum(self._remaining().values()) async def _update_offsets(self) -> None: # Offsets may have been compacted, need to get to the recent ones consumer = self.app.consumer earliest = await consumer.earliest_offsets(*self.tps) # FIXME: To be consistent with the offset -1 logic earliest = {tp: offset - 1 for tp, offset in earliest.items()} for tp in self.tps: self.offsets[tp] = max(self.offsets[tp], earliest[tp]) table = terminal.logtable( [(k.topic, k.partition, v) for k, v in self.offsets.items()], title='Reading Starts At', headers=['topic', 'partition', 'offset'], ) self.log.info('Updated offsets at start of reading:\n%s', table) @Service.transitions_to(CHANGELOG_SEEKING) async def _seek_tps(self) -> None: consumer = self.app.consumer tps = self.tps for tp in tps: offset = self.offsets[tp] if offset >= 0: # FIXME Remove check when fixed offset-1 discrepancy await consumer.seek(tp, offset) assert await consumer.position(tp) == offset def _should_start_reading(self) -> bool: return self._highwaters != self.offsets
[docs] async def wait_done_reading(self) -> None: # XXX [asksol] This method is unused, see note in tables/manager.py await self._stop_event.wait()
def _done_reading(self) -> None: self.set_shutdown() self._stop_event.set() self.log.info('Setting stop event') @property def _remaining_stats(self) -> MutableMapping[TP, Tuple[int, int, int]]: offsets = self.offsets return { tp: (highwater, offsets[tp], highwater - offsets[tp]) for tp, highwater in self._highwaters.items() if highwater - offsets[tp] != 0 }
[docs] def recovered(self) -> bool: did_recover = self._highwaters == self.offsets if not did_recover: self.log.info('Did not recover. Remaining %s', self._remaining_stats) return did_recover
@Service.task async def _publish_stats(self) -> None: while not self.should_stop and not self._stop_event.is_set(): self.log.info('Still fetching. Remaining: %s', self._remaining_stats) await self.sleep(self.stats_interval)
[docs] async def on_start(self) -> None: consumer = self.app.consumer await consumer.pause_partitions(self.tps) await self._build_highwaters() await self._update_offsets() if not self._should_start_reading(): self.log.info('No updates needed') return self._done_reading() tps = await local_tps(self.table, self.tps) if tps: self.log.info('Partitions %r are local to this node', sorted(tps)) # remove local tps from set of assigned partitions. self.tps.difference_update(tps) if not self.tps: self.log.info('No active standby needed') return self._done_reading() await self._seek_tps() await consumer.resume_partitions(self.tps)
@Service.task @Service.transitions_to(CHANGELOG_STARTING) async def _read(self) -> None: if self.should_stop or self._shutdown.is_set(): return # We don't want to log when there are zero records, # but we still slurp the stream so that we subscribe # to the changelog topic etc. if self._remaining_total(): self.log.info('Reading %s records...', self._remaining_total()) # log statement above, or the sleep below ... # fixes weird aiokafka 100% CPU deadlock [ask]. await self.sleep(0.1) self.diag.set_flag(CHANGELOG_READING) try: await self._slurp_stream() finally: self.diag.unset_flag(CHANGELOG_READING) self._done_reading()
[docs] async def on_stop(self) -> None: # Pause all our topic partitions, # to make sure we don't fetch any more records from them. assignment = self.app.consumer.assignment() await self.app.consumer.pause_partitions( {tp for tp in self.tps if tp in assignment}) # Tell _read coroutine iterating over the changelog channel # that they should stop iterating. if not self._stop_event.is_set(): await self.channel.throw(StopAsyncIteration())
async def _slurp_stream(self) -> None: buf: List[EventT] = [] can_log_done = True try: async for i, event in aenumerate(self._read_changelog()): buf.append(event) await self.table.on_changelog_event(event) if len(buf) >= self._buffer_size: self.table.apply_changelog_batch(buf) buf.clear() if self._should_stop_reading(): break remaining = self._remaining_total() if remaining and not i % 10_000: can_log_done = True self.log.info('Waiting for %s records...', remaining) elif not remaining and can_log_done: can_log_done = False self.log.info('All up to date') except StopAsyncIteration: self.log.info('Got stop iteration') pass finally: self.log.info('Stopped reading!') if buf: self.table.apply_changelog_batch(buf) buf.clear() async def _read_changelog(self) -> AsyncIterable[EventT]: offsets = self.offsets async for event in self.channel: message = event.message tp = message.tp offset = message.offset seen_offset = offsets.get(tp, -1) if offset > seen_offset: offsets[tp] = offset yield event @property def label(self) -> str: return self.shortlabel @property def shortlabel(self) -> str: return f'{type(self).__name__}: {self.table.name}'
[docs]class StandbyReader(ChangelogReader): """Service reading table changelogs to keep an up-to-date backup.""" @property def _buffer_size(self) -> int: return self.table.standby_buffer_size @Service.task async def _publish_stats(self) -> None: return def _should_start_reading(self) -> bool: return True def _should_stop_reading(self) -> bool: return self.should_stop
[docs] def recovered(self) -> bool: return False