"""Wrappers for windowed tables."""
import operator
import typing
from datetime import datetime
from typing import (
Any,
Callable,
ClassVar,
ItemsView,
Iterator,
KeysView,
Optional,
Tuple,
Type,
Union,
ValuesView,
cast,
overload,
)
from mode import Seconds
from mode.utils.typing import NoReturn
from faust.exceptions import ImproperlyConfigured
from faust.streams import current_event
from faust.types import EventT, FieldDescriptorT
from faust.types.tables import (
KT,
RecoverCallback,
RelativeArg,
RelativeHandler,
TableT,
VT,
WindowSetT,
WindowWrapperT,
WindowedItemsViewT,
WindowedValuesViewT,
)
from faust.types.windows import WindowRange
from faust.utils.terminal.tables import dict_as_ansitable
if typing.TYPE_CHECKING: # pragma: no cover
from .table import Table as _Table
else:
class _Table: ... # noqa
__all__ = [
'WindowSet',
'WindowWrapper',
'WindowedItemsView',
'WindowedKeysView',
'WindowedValuesView',
]
[docs]class WindowedKeysView(KeysView):
"""The object returned by ``windowed_table.keys()``."""
def __init__(self,
mapping: WindowWrapperT,
event: EventT = None) -> None:
self._mapping = mapping
self.event = event
def __iter__(self) -> Iterator:
wrapper = cast(WindowWrapper, self._mapping)
for key, _ in wrapper._items(self.event):
yield key
def __len__(self) -> int:
return len(self._mapping)
[docs] def now(self) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for key, _ in wrapper._items_now():
yield key
[docs] def current(self, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for key, _ in wrapper._items_current(event or self.event):
yield key
[docs] def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for key, _ in wrapper._items_delta(d, event or self.event):
yield key
[docs]class WindowedItemsView(WindowedItemsViewT):
"""The object returned by ``windowed_table.items()``."""
def __init__(self,
mapping: WindowWrapperT,
event: EventT = None) -> None:
self._mapping = mapping
self.event = event
def __iter__(self) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
return wrapper._items(self.event)
[docs] def now(self) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
return wrapper._items_now()
[docs] def current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
return wrapper._items_current(event or self.event)
[docs] def delta(self,
d: Seconds,
event: EventT = None) -> Iterator[Tuple[Any, Any]]:
wrapper = cast(WindowWrapper, self._mapping)
return wrapper._items_delta(d, event or self.event)
[docs]class WindowedValuesView(WindowedValuesViewT):
"""The object returned by ``windowed_table.values()``."""
def __init__(self,
mapping: WindowWrapperT,
event: EventT = None) -> None:
self._mapping = mapping
self.event = event
def __iter__(self) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for _, value in wrapper._items(self.event):
yield value
[docs] def now(self) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for _, value in wrapper._items_now():
yield value
[docs] def current(self, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for _, value in wrapper._items_current(event or self.event):
yield value
[docs] def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]:
wrapper = cast(WindowWrapper, self._mapping)
for _, value in wrapper._items_delta(d, event or self.event):
yield value
[docs]class WindowSet(WindowSetT[KT, VT]):
"""Represents the windows available for table key.
``Table[k]`` returns WindowSet since ``k`` can exist in multiple
windows, and to retrieve an actual item we need a timestamp.
The timestamp of the current event (if this is executing in a stream
processor), can be used by accessing ``.current()``::
Table[k].current()
similarly the most recent value can be accessed using ``.now()``::
Table[k].now()
from delta of the time of the current event::
Table[k].delta(timedelta(hours=3))
or delta from time of other event::
Table[k].delta(timedelta(hours=3), other_event)
"""
def __init__(self,
key: KT,
table: TableT,
wrapper: WindowWrapperT,
event: EventT = None) -> None:
self.key = key
self.table = cast(_Table, table)
self.wrapper = wrapper
self.event = event
self.data = table # provides underlying mapping in FastUserDict
[docs] def apply(self,
op: Callable[[VT, VT], VT],
value: VT,
event: EventT = None) -> WindowSetT[KT, VT]:
table = cast(_Table, self.table)
wrapper = cast(WindowWrapper, self.wrapper)
timestamp = wrapper.get_timestamp(event or self.event)
wrapper.on_set_key(self.key, value)
table._apply_window_op(op, self.key, value, timestamp)
return self
[docs] def value(self, event: EventT = None) -> VT:
return cast(_Table, self.table)._windowed_timestamp(
self.key, self.wrapper.get_timestamp(event or self.event))
[docs] def now(self) -> VT:
return cast(_Table, self.table)._windowed_now(self.key)
[docs] def current(self, event: EventT = None) -> VT:
t = cast(_Table, self.table)
return t._windowed_timestamp(
self.key, t._relative_event(event or self.event))
[docs] def delta(self, d: Seconds, event: EventT = None) -> VT:
table = cast(_Table, self.table)
return table._windowed_delta(self.key, d, event or self.event)
@overload
def __getitem__(self, w: EventT) -> WindowSetT[KT, VT]: ... # noqa
@overload # noqa
def __getitem__(self, w: WindowRange) -> VT: ... # noqa
def __getitem__(self, w: Union[EventT, WindowRange]) -> VT: # noqa
# wrapper[key][event] returns WindowSet with event already set.
if isinstance(w, EventT):
return type(self)(self.key, self.table, self.wrapper, w)
# wrapper[key][window_range] returns value for that range.
return self.table[self.key, w]
@overload # noqa
def __setitem__(self, w: EventT, value: VT) -> NoReturn: ... # noqa
@overload # noqa
def __setitem__(self, w: WindowRange, value: VT) -> None: ... # noqa
def __setitem__(self, # noqa
w: Union[EventT, WindowRange],
value: VT) -> None:
if isinstance(w, EventT):
raise NotImplementedError(
'Cannot set WindowSet key, when key is an event')
self.wrapper.on_set_key(self.key, value)
self.table[self.key, w] = value
@overload # noqa
def __delitem__(self, w: EventT) -> NoReturn: ... # noqa
@overload # noqa
def __delitem__(self, w: WindowRange) -> None: ... # noqa
def __delitem__(self, w: Union[EventT, WindowRange]) -> None: # noqa
if isinstance(w, EventT):
raise NotImplementedError(
'Cannot delete WindowSet key, when key is an event')
self.wrapper.on_del_key(self.key)
del self.table[self.key, w]
def __iadd__(self, other: VT) -> WindowSetT:
return self.apply(operator.add, other)
def __isub__(self, other: VT) -> WindowSetT:
return self.apply(operator.sub, other)
def __imul__(self, other: VT) -> WindowSetT:
return self.apply(operator.mul, other)
def __itruediv__(self, other: VT) -> WindowSetT:
return self.apply(operator.truediv, other)
def __ifloordiv__(self, other: VT) -> WindowSetT:
return self.apply(operator.floordiv, other)
def __imod__(self, other: VT) -> WindowSetT:
return self.apply(operator.mod, other)
def __ipow__(self, other: VT) -> WindowSetT:
return self.apply(operator.pow, other)
def __ilshift__(self, other: VT) -> WindowSetT:
return self.apply(operator.lshift, other)
def __irshift__(self, other: VT) -> WindowSetT:
return self.apply(operator.rshift, other)
def __iand__(self, other: VT) -> WindowSetT:
return self.apply(operator.and_, other)
def __ixor__(self, other: VT) -> WindowSetT:
return self.apply(operator.xor, other)
def __ior__(self, other: VT) -> WindowSetT:
return self.apply(operator.or_, other)
def __repr__(self) -> str:
return f'<{type(self).__name__}: table={self.table}>'
[docs]class WindowWrapper(WindowWrapperT):
"""Windowed table wrapper.
A windowed table does not return concrete values when keys are
accessed, instead :class:`WindowSet` is returned so that
the values can be further reduced to the wanted time period.
"""
ValueType: ClassVar[Type[WindowSetT]] = WindowSet
key_index: bool = False
key_index_table: Optional[TableT] = None
def __init__(self, table: TableT, *,
relative_to: RelativeArg = None,
key_index: bool = False,
key_index_table: TableT = None) -> None:
self.table = table
self.key_index = key_index
self.key_index_table = key_index_table
if self.key_index and self.key_index_table is None:
self.key_index_table = self.table.app.Table(
f'{self.table.name}-key_index',
value_type=int,
key_type=self.table.key_type,
)
self._get_relative_timestamp = self._relative_handler(relative_to)
[docs] def clone(self, relative_to: RelativeArg) -> WindowWrapperT:
return type(self)(
table=self.table,
relative_to=relative_to or self._get_relative_timestamp,
key_index=self.key_index,
key_index_table=self.key_index_table,
)
@property
def name(self) -> str:
return self.table.name
[docs] def relative_to(self, ts: RelativeArg) -> WindowWrapperT:
return self.clone(relative_to=ts)
[docs] def relative_to_now(self) -> WindowWrapperT:
return self.clone(relative_to=self.table._relative_now)
[docs] def relative_to_field(self, field: FieldDescriptorT) -> WindowWrapperT:
return self.clone(relative_to=self.table._relative_field(field))
[docs] def relative_to_stream(self) -> WindowWrapperT:
return self.clone(relative_to=self.table._relative_event)
[docs] def get_timestamp(self, event: EventT = None) -> float:
event = event or current_event()
get_relative_timestamp = self.get_relative_timestamp
if get_relative_timestamp:
timestamp = get_relative_timestamp(event)
if isinstance(timestamp, datetime):
return timestamp.timestamp()
return timestamp
if event is None:
raise RuntimeError('Operation outside of stream iteration')
return event.message.timestamp
[docs] def on_recover(self, fun: RecoverCallback) -> RecoverCallback:
return self.table.on_recover(fun)
def __contains__(self, key: Any) -> bool:
return self.table._windowed_contains(key, self.get_timestamp())
def __getitem__(self, key: Any) -> WindowSetT:
return self.ValueType(key, self.table, self, current_event())
def __setitem__(self, key: Any, value: Any) -> None:
if not isinstance(value, WindowSetT):
table = cast(_Table, self.table)
self.on_set_key(key, value)
table._set_windowed(key, value, self.get_timestamp())
[docs] def on_set_key(self, key: Any, value: Any) -> None:
key_index_table = self.key_index_table
if key_index_table is not None:
if key not in key_index_table:
key_index_table[key] = 1
[docs] def on_del_key(self, key: Any) -> None:
key_index_table = self.key_index_table
if key_index_table is not None:
key_index_table.pop(key, None)
def __delitem__(self, key: Any) -> None:
self.on_del_key(key)
cast(_Table, self.table)._del_windowed(key, self.get_timestamp())
def __len__(self) -> int:
if self.key_index_table is not None:
return len(self.key_index_table)
raise NotImplementedError(
'Windowed table must use_index=True to support len()')
def _relative_handler(
self, relative_to: RelativeArg) -> Optional[RelativeHandler]:
if relative_to is None:
return None
elif isinstance(relative_to, datetime):
return self.table._relative_timestamp(relative_to.timestamp())
elif isinstance(relative_to, float):
return self.table._relative_timestamp(relative_to)
elif isinstance(relative_to, FieldDescriptorT):
return self.table._relative_field(relative_to)
elif callable(relative_to):
return relative_to
raise ImproperlyConfigured(
f'Relative cannot be type {type(relative_to)}')
def __iter__(self) -> Iterator:
return self._keys()
[docs] def keys(self) -> KeysView:
return WindowedKeysView(self)
def _keys(self) -> Iterator:
key_index_table = self.key_index_table
if key_index_table is not None:
for key in key_index_table.keys():
yield key
else:
raise NotImplementedError(
'Windowed table must set use_index=True to '
'support .keys/.items/.values')
[docs] def values(self, event: EventT = None) -> ValuesView:
return WindowedValuesView(self, event or current_event())
[docs] def items(self, event: EventT = None) -> ItemsView:
return WindowedItemsView(self, event or current_event())
def _items(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]:
table = cast(_Table, self.table)
timestamp = self.get_timestamp(event)
for key in self._keys():
try:
yield key, table._windowed_timestamp(key, timestamp)
except KeyError:
pass
def _items_now(self) -> Iterator[Tuple[Any, Any]]:
table = cast(_Table, self.table)
for key in self._keys():
try:
yield key, table._windowed_now(key)
except KeyError:
pass
def _items_current(
self, event: EventT = None) -> Iterator[Tuple[Any, Any]]:
table = cast(_Table, self.table)
timestamp = table._relative_event(event)
for key in self._keys():
try:
yield key, table._windowed_timestamp(key, timestamp)
except KeyError:
pass
def _items_delta(self, d: Seconds,
event: EventT = None) -> Iterator[Any]:
table = cast(_Table, self.table)
for key in self._keys():
try:
yield key, table._windowed_delta(key, d, event)
except KeyError:
pass
[docs] def as_ansitable(self, title: str = '{table.name}',
**kwargs: Any) -> str:
return dict_as_ansitable(
self,
title=title.format(table=self.table),
**kwargs)
@property
def get_relative_timestamp(self) -> Optional[RelativeHandler]:
return self._get_relative_timestamp
@get_relative_timestamp.setter
def get_relative_timestamp(self, relative_to: RelativeArg) -> None:
self._get_relative_timestamp = self._relative_handler(relative_to)