Source code for faust.tables.objects

"""Storing objects in tables.

This is also used to store data structures such as sets/lists.

"""
import abc

from typing import (
    Any,
    Callable,
    ClassVar,
    Dict,
    Iterable,
    MutableMapping,
    Optional,
    Set,
    Type,
)

from mode import Service

from faust.stores.base import Store
from faust.streams import current_event
from faust.types import EventT, TP
from faust.types.stores import StoreT
from faust.types.tables import CollectionT

from .table import Table


[docs]class ChangeloggedObject: """A changelogged object in a :class:`ChangeloggedObjectManager` store.""" manager: 'ChangeloggedObjectManager' def __init__(self, manager: 'ChangeloggedObjectManager', key: Any) -> None: self.manager = manager self.key = key self.__post_init__() def __post_init__(self) -> None: # pragma: no cover ...
[docs] @abc.abstractmethod def sync_from_storage(self, value: Any) -> None: """Sync value from storage.""" ...
[docs] @abc.abstractmethod def as_stored_value(self) -> Any: """Return value as represented in storage.""" ...
[docs] @abc.abstractmethod def apply_changelog_event(self, operation: int, value: Any) -> None: """Apply event in changelog topic to local table state.""" ...
[docs]class ChangeloggedObjectManager(Store): """Store of changelogged objects.""" ValueType: ClassVar[Type[ChangeloggedObject]] table: Table data: MutableMapping _storage: Optional[StoreT] = None _dirty: Set def __init__(self, table: Table, **kwargs: Any) -> None: self.table = table self.table_name = self.table.name self.data = {} self._dirty = set() Service.__init__(self, **kwargs)
[docs] def send_changelog_event(self, key: Any, operation: int, value: Any) -> None: """Send changelog event to the tables changelog topic.""" event = current_event() self._dirty.add(key) self.table._send_changelog(event, (operation, key), value)
def __getitem__(self, key: Any) -> ChangeloggedObject: if key in self.data: return self.data[key] s = self.data[key] = self.ValueType(self, key) return s def __setitem__(self, key: Any, value: Any) -> None: raise NotImplementedError(f'{self._table_type_name}: cannot set key') def __delitem__(self, key: Any) -> None: raise NotImplementedError(f'{self._table_type_name}: cannot del key') @property def _table_type_name(self) -> str: return f'{type(self.table).__name__}'
[docs] async def on_start(self) -> None: """Call when the changelogged object manager starts.""" await self.add_runtime_dependency(self.storage)
[docs] async def on_stop(self) -> None: """Call when the changelogged object manager stops.""" self.flush_to_storage()
[docs] def persisted_offset(self, tp: TP) -> Optional[int]: """Get the last persisted offset for changelog topic partition.""" return self.storage.persisted_offset(tp)
[docs] def set_persisted_offset(self, tp: TP, offset: int) -> None: """Set the last persisted offset for changelog topic partition.""" self.storage.set_persisted_offset(tp, offset)
[docs] async def on_rebalance(self, table: CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) -> None: """Call when cluster is rebalancing.""" await self.storage.on_rebalance( table, assigned, revoked, newly_assigned)
[docs] async def on_recovery_completed(self, active_tps: Set[TP], standby_tps: Set[TP]) -> None: """Call when table recovery is completed after rebalancing.""" self.sync_from_storage()
[docs] def sync_from_storage(self) -> None: """Sync set contents from storage.""" for key, value in self.storage.items(): self[key].sync_from_storage(value)
[docs] def flush_to_storage(self) -> None: """Flush set contents to storage.""" for key in self._dirty: self.storage[key] = self.data[key].as_stored_value() self._dirty.clear()
@Service.task(2.0) async def _periodic_flush(self) -> None: # pragma: no cover self.flush_to_storage()
[docs] def reset_state(self) -> None: """Reset table local state.""" # delegate to underlying RocksDB store. self.storage.reset_state()
@property def storage(self) -> StoreT: """Return underlying storage used by this set table.""" if self._storage is None: self._storage = self.table._new_store_by_url( self.table._store or self.table.app.conf.store) return self._storage
[docs] def apply_changelog_batch(self, batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any]) -> None: """Apply batch of changelog events to local state.""" tp_offsets: Dict[TP, int] = {} for event in batch: tp, offset = event.message.tp, event.message.offset tp_offsets[tp] = ( offset if tp not in tp_offsets else max(offset, tp_offsets[tp]) ) if event.key is None: raise RuntimeError('Changelog key cannot be None') operation, key = event.key key = to_key(key) value: Any = to_value(event.value) self[key].apply_changelog_event(operation, value) for tp, offset in tp_offsets.items(): self.set_persisted_offset(tp, offset)