Source code for faust.tables.sets
from typing import (
Any,
Iterable,
List,
Set,
cast,
)
from mode.utils.collections import ManagedUserSet
from faust.types import EventT
from faust.types.tables import KT, VT
from faust.types.stores import StoreT
from . import wrappers
from .objects import ChangeloggedObject, ChangeloggedObjectManager
from .table import Table
__all__ = ['SetTable']
OPERATION_ADD: int = 0x1
OPERATION_DISCARD: int = 0x2
OPERATION_UPDATE: int = 0xF
class SetWindowSet(wrappers.WindowSet):
def add(self, element: Any, *, event: EventT = None) -> None:
self._apply_set_operation('add', element, event)
def discard(self, element: Any, *, event: EventT = None) -> None:
self._apply_set_operation('discard', element, event)
def _apply_set_operation(self,
op: str,
element: Any,
event: EventT = None) -> None:
table = cast(Table, self.table)
timestamp = self.wrapper.get_timestamp(event or self.event)
key = self.key
get_ = table._get_key
self.wrapper.on_set_key(key, element)
# apply set operation to every window within range of timestamp.
for window_range in table._window_ranges(timestamp):
set_wrapper = get_((key, window_range))
getattr(set_wrapper, op)(element)
class SetWindowWrapper(wrappers.WindowWrapper):
ValueType = SetWindowSet
class ChangeloggedSet(ChangeloggedObject, ManagedUserSet[VT]):
key: Any
data: Set
def __post_init__(self) -> None:
self.data = set()
def on_add(self, value: VT) -> None:
self.manager.send_changelog_event(self.key, OPERATION_ADD, value)
def on_discard(self, value: VT) -> None:
self.manager.send_changelog_event(self.key, OPERATION_DISCARD, value)
def on_change(self, added: Set[VT], removed: Set[VT]) -> None:
self.manager.send_changelog_event(
self.key, OPERATION_UPDATE, [added, removed])
def sync_from_storage(self, value: Any) -> None:
self.data = cast(Set, value)
def as_stored_value(self) -> Any:
return self.data
def apply_changelog_event(self, operation: int, value: Any) -> None:
if operation == OPERATION_ADD:
self.data.add(value)
elif operation == OPERATION_DISCARD:
self.data.discard(value)
elif operation == OPERATION_UPDATE:
tup = cast(Iterable[List], value)
added: List
removed: List
added, removed = tup
self.data |= set(added)
self.data -= set(removed)
else:
raise NotImplementedError(
f'Unknown operation {operation}: key={self.key!r}')
class ChangeloggedSetManager(ChangeloggedObjectManager):
ValueType = ChangeloggedSet
[docs]class SetTable(Table[KT, VT]):
WindowWrapper = SetWindowWrapper
_changelog_compacting = False
def _new_store(self) -> StoreT:
return ChangeloggedSetManager(self)
def __getitem__(self, key: KT) -> ChangeloggedSet[VT]:
# FastUserDict looks up using `key in self.data`
# but we are a defaultdict.
return self.data[key]