"""Table (key/value changelog stream)."""
from typing import Any, ClassVar, Type
from mode import Seconds
from faust import windows
from faust.streams import current_event
from faust.types.tables import KT, TableT, VT, WindowWrapperT
from faust.types.windows import WindowT
from faust.utils.terminal.tables import dict_as_ansitable
from . import wrappers
from .base import Collection
__all__ = ['Table']
[docs]class Table(TableT[KT, VT], Collection):
"""Table (non-windowed)."""
WindowWrapper: ClassVar[Type[WindowWrapperT]] = wrappers.WindowWrapper
[docs] def using_window(self, window: WindowT, *,
key_index: bool = False) -> WindowWrapperT:
self.window = window
self._changelog_compacting = True
self._changelog_deleting = True
self._changelog_topic = None # will reset on next property access
return self.WindowWrapper(self, key_index=key_index)
[docs] def hopping(self, size: Seconds, step: Seconds,
expires: Seconds = None,
key_index: bool = False) -> WindowWrapperT:
return self.using_window(
windows.HoppingWindow(size, step, expires),
key_index=key_index,
)
[docs] def tumbling(self, size: Seconds,
expires: Seconds = None,
key_index: bool = False) -> WindowWrapperT:
return self.using_window(
windows.TumblingWindow(size, expires),
key_index=key_index,
)
def __missing__(self, key: KT) -> VT:
if self.default is not None:
return self.default()
raise KeyError(key)
def _has_key(self, key: KT) -> bool:
return key in self
def _get_key(self, key: KT) -> VT:
return self[key]
def _set_key(self, key: KT, value: VT) -> None:
self[key] = value
def _del_key(self, key: KT) -> None:
del self[key]
[docs] def on_key_get(self, key: KT) -> None:
self._sensor_on_get(self, key)
[docs] def on_key_set(self, key: KT, value: VT) -> None:
event = current_event()
self._send_changelog(event, key, value)
if event is not None:
partition = event.message.partition
self._maybe_set_key_ttl(key, partition)
self._sensor_on_set(self, key, value)
else:
raise TypeError(
'Setting table key from outside of stream iteration')
[docs] def on_key_del(self, key: KT) -> None:
event = current_event()
self._send_changelog(event, key, value=None, value_serializer='raw')
if event is not None:
partition = event.message.partition
self._maybe_del_key_ttl(key, partition)
self._sensor_on_del(self, key)
else:
raise TypeError(
'Deleting table key from outside of stream iteration')
[docs] def as_ansitable(self, title: str = '{table.name}',
**kwargs: Any) -> str:
return dict_as_ansitable(
self,
title=title.format(table=self),
**kwargs)