Source code for faust.types.tables

import abc
import typing
from datetime import datetime
from typing import (
    Any,
    Awaitable,
    Callable,
    ItemsView,
    Iterable,
    Iterator,
    KeysView,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    TypeVar,
    Union,
    ValuesView,
)

from mode import Seconds, ServiceT
from mode.utils.collections import FastUserDict, ManagedUserDict
from yarl import URL

from .codecs import CodecArg
from .events import EventT
from .stores import StoreT
from .streams import JoinableT
from .topics import TopicT
from .tuples import FutureMessage, TP
from .windows import WindowT

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .models import FieldDescriptorT as _FieldDescriptorT
    from .models import ModelArg as _ModelArg
    from .serializers import SchemaT as _SchemaT
else:
    class _AppT: ...  # noqa
    class _FieldDescriptorT: ...  # noqa
    class _ModelArg: ...  # noqa
    class _SchemaT: ...   # noqa

__all__ = [
    'RecoverCallback',
    'RelativeArg',
    'CollectionT',
    'TableT',
    'GlobalTableT',
    'TableManagerT',
    'WindowCloseCallback',
    'WindowSetT',
    'WindowedItemsViewT',
    'WindowedValuesViewT',
    'WindowWrapperT',
    'ChangelogEventCallback',
    'CollectionTps',
]

RelativeHandler = Callable[[Optional[EventT]], Union[float, datetime]]
RecoverCallback = Callable[[], Awaitable[None]]
ChangelogEventCallback = Callable[[EventT], Awaitable[None]]
WindowCloseCallback = Callable[[Any, Any], None]
RelativeArg = Optional[Union[
    _FieldDescriptorT,
    RelativeHandler,
    datetime,
    float,
]]
CollectionTps = MutableMapping['CollectionT', Set[TP]]

KT = TypeVar('KT')
VT = TypeVar('VT')


[docs]class CollectionT(ServiceT, JoinableT): app: _AppT name: str default: Any # noqa: E704 schema: Optional[_SchemaT] key_type: Optional[_ModelArg] value_type: Optional[_ModelArg] partitions: Optional[int] window: Optional[WindowT] help: str recovery_buffer_size: int standby_buffer_size: int options: Optional[Mapping[str, Any]] last_closed_window: float use_partitioner: bool @abc.abstractmethod def __init__(self, app: _AppT, *, name: str = None, default: Callable[[], Any] = None, store: Union[str, URL] = None, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, partitions: int = None, window: WindowT = None, changelog_topic: TopicT = None, help: str = None, on_recover: RecoverCallback = None, on_changelog_event: ChangelogEventCallback = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, options: Mapping[str, Any] = None, use_partitioner: bool = False, on_window_close: WindowCloseCallback = None, **kwargs: Any) -> None: ... @property @abc.abstractmethod def changelog_topic(self) -> TopicT: ... @changelog_topic.setter def changelog_topic(self, topic: TopicT) -> None: ...
[docs] @abc.abstractmethod def apply_changelog_batch(self, batch: Iterable[EventT]) -> None: ...
[docs] @abc.abstractmethod def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod def reset_state(self) -> None: ...
[docs] @abc.abstractmethod def send_changelog(self, partition: Optional[int], key: Any, value: Any, key_serializer: CodecArg = None, value_serializer: CodecArg = None) -> FutureMessage: ...
[docs] @abc.abstractmethod def partition_for_key(self, key: Any) -> Optional[int]: ...
[docs] @abc.abstractmethod def on_window_close(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def on_changelog_event(self, event: EventT) -> None: ...
[docs] @abc.abstractmethod def on_recover(self, fun: RecoverCallback) -> RecoverCallback: ...
[docs] @abc.abstractmethod async def on_recovery_completed(self, active_tps: Set[TP], standby_tps: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def call_recover_callbacks(self) -> None: ...
[docs]class TableT(CollectionT, ManagedUserDict[KT, VT]):
[docs] @abc.abstractmethod def using_window(self, window: WindowT, *, key_index: bool = False) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def hopping(self, size: Seconds, step: Seconds, expires: Seconds = None, key_index: bool = False) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def tumbling(self, size: Seconds, expires: Seconds = None, key_index: bool = False) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def as_ansitable(self, **kwargs: Any) -> str: ...
[docs]class GlobalTableT(TableT): ...
[docs]class TableManagerT(ServiceT, FastUserDict[str, CollectionT]): app: _AppT actives_ready: bool standbys_ready: bool @abc.abstractmethod def __init__(self, app: _AppT, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod def add(self, table: CollectionT) -> CollectionT: ...
[docs] @abc.abstractmethod def persist_offset_on_commit(self, store: StoreT, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod def on_commit(self, offsets: MutableMapping[TP, int]) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) -> None: ...
@property @abc.abstractmethod def changelog_topics(self) -> Set[str]: ...
[docs]class WindowSetT(FastUserDict[KT, VT]): key: Any table: TableT event: Optional[EventT] @abc.abstractmethod def __init__(self, key: KT, table: TableT, wrapper: 'WindowWrapperT', event: EventT = None) -> None: ...
[docs] @abc.abstractmethod def apply(self, op: Callable[[VT, VT], VT], value: VT, event: EventT = None) -> 'WindowSetT': ...
[docs] @abc.abstractmethod def value(self, event: EventT = None) -> VT: ...
[docs] @abc.abstractmethod def current(self, event: EventT = None) -> VT: ...
[docs] @abc.abstractmethod def now(self) -> VT: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: EventT = None) -> VT: ...
@abc.abstractmethod def __iadd__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __isub__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __imul__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __itruediv__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __ifloordiv__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __imod__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __ipow__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __ilshift__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __irshift__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __iand__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __ixor__(self, other: VT) -> 'WindowSetT': ... @abc.abstractmethod def __ior__(self, other: VT) -> 'WindowSetT': ...
[docs]class WindowedItemsViewT(ItemsView): @abc.abstractmethod def __init__(self, mapping: 'WindowWrapperT', event: EventT = None) -> None: ... @abc.abstractmethod def __iter__(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def now(self) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def current(self, event: EventT = None) -> Iterator[Tuple[Any, Any]]: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: EventT = None) -> Iterator[Tuple[Any, Any]]: ...
[docs]class WindowedValuesViewT(ValuesView): @abc.abstractmethod def __init__(self, mapping: 'WindowWrapperT', event: EventT = None) -> None: ... @abc.abstractmethod def __iter__(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def now(self) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def current(self, event: EventT = None) -> Iterator[Any]: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: EventT = None) -> Iterator[Any]: ...
[docs]class WindowWrapperT(MutableMapping): table: TableT @abc.abstractmethod def __init__(self, table: TableT, *, relative_to: RelativeArg = None, key_index: bool = False, key_index_table: TableT = None) -> None: ... @property @abc.abstractmethod def name(self) -> str: ...
[docs] @abc.abstractmethod def clone(self, relative_to: RelativeArg) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_now(self) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_field(self, field: _FieldDescriptorT) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_stream(self) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def get_timestamp(self, event: EventT = None) -> float: ...
@abc.abstractmethod def __getitem__(self, key: Any) -> WindowSetT: ...
[docs] @abc.abstractmethod def keys(self) -> KeysView: ...
[docs] @abc.abstractmethod def on_set_key(self, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod def on_del_key(self, key: Any) -> None: ...
[docs] @abc.abstractmethod def as_ansitable(self, **kwargs: Any) -> str: ...
@property def get_relative_timestamp(self) -> Optional[RelativeHandler]: ... @get_relative_timestamp.setter def get_relative_timestamp(self, relative_to: RelativeArg) -> None: ...