Source code for faust.types.tables

import abc
import asyncio
import typing
from datetime import datetime
from typing import (
    Any,
    Awaitable,
    Callable,
    ClassVar,
    Iterable,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Type,
    Union,
)

from mode import Seconds, ServiceT
from mode.utils.compat import Counter
from yarl import URL

from .events import EventT
from .stores import StoreT
from .streams import JoinableT
from .topics import TopicT
from .tuples import TP
from .windows import WindowT


if typing.TYPE_CHECKING:
    from .app import AppT
    from .models import FieldDescriptorT, ModelArg
else:
    class AppT: ...  # noqa
    class FieldDescriptorT: ...  # noqa
    class ModelArg: ...  # noqa

__all__ = [
    'RecoverCallback',
    'RelativeArg',
    'CollectionT',
    'TableT',
    'TableManagerT',
    'WindowSetT',
    'WindowWrapperT',
    'ChangelogReaderT',
    'ChangelogEventCallback',
    'CollectionTps',
]

RelativeHandler = Callable[[Optional[EventT]], Union[float, datetime]]
RecoverCallback = Callable[[], Awaitable[None]]
ChangelogEventCallback = Callable[[EventT], Awaitable[None]]
RelativeArg = Optional[Union[
    FieldDescriptorT,
    RelativeHandler,
    datetime,
    float,
]]


[docs]class CollectionT(ServiceT, JoinableT): StateStore: ClassVar[Optional[Type[StoreT]]] = None app: AppT name: str default: Any # noqa: E704 key_type: Optional[ModelArg] value_type: Optional[ModelArg] partitions: Optional[int] window: Optional[WindowT] help: str recovery_buffer_size: int standby_buffer_size: int @abc.abstractmethod def __init__(self, app: AppT, *, name: str = None, default: Callable[[], Any] = None, store: Union[str, URL] = None, key_type: ModelArg = None, value_type: ModelArg = None, partitions: int = None, window: WindowT = None, changelog_topic: TopicT = None, help: str = None, on_recover: RecoverCallback = None, on_changelog_event: ChangelogEventCallback = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs: Any) -> None: ... @property @abc.abstractmethod def changelog_topic(self) -> TopicT: ... @changelog_topic.setter def changelog_topic(self, topic: TopicT) -> None: ...
[docs] @abc.abstractmethod def apply_changelog_batch(self, batch: Iterable[EventT]) -> None: ...
[docs] @abc.abstractmethod def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod def reset_state(self) -> None: ...
[docs] @abc.abstractmethod async def on_partitions_assigned(self, assigned: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def on_partitions_revoked(self, revoked: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def on_changelog_event(self, event: EventT) -> None: ...
[docs] @abc.abstractmethod def on_recover(self, fun: RecoverCallback) -> RecoverCallback: ...
[docs] @abc.abstractmethod async def call_recover_callbacks(self) -> None: ...
CollectionTps = MutableMapping[CollectionT, Set[TP]]
[docs]class TableT(CollectionT, MutableMapping):
[docs] @abc.abstractmethod def using_window(self, window: WindowT) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def hopping(self, size: Seconds, step: Seconds, expires: Seconds = None) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def tumbling(self, size: Seconds, expires: Seconds = None) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def as_ansitable(self, *, key: str = 'Key', value: str = 'Value', sort: bool = False, sortkey: Callable[[Any], Any] = lambda x: x, title: str = 'Title') -> str: ...
[docs]class TableManagerT(ServiceT, MutableMapping[str, CollectionT]): app: AppT recovery_completed: asyncio.Event @abc.abstractmethod def __init__(self, app: AppT, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod def add(self, table: CollectionT) -> CollectionT: ...
[docs] @abc.abstractmethod async def on_partitions_assigned(self, assigned: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def on_partitions_revoked(self, revoked: Set[TP]) -> None: ...
@property @abc.abstractmethod def changelog_topics(self) -> Set[str]: ...
[docs]class ChangelogReaderT(ServiceT): table: CollectionT app: AppT tps: Set[TP] offsets: Counter[TP]
[docs]class WindowSetT(MutableMapping): key: Any table: TableT event: Optional[EventT] @abc.abstractmethod def __init__(self, key: Any, table: TableT, wrapper: 'WindowWrapperT', event: EventT = None) -> None: ...
[docs] @abc.abstractmethod def apply(self, op: Callable[[Any, Any], Any], value: Any, event: EventT = None) -> 'WindowSetT': ...
[docs] @abc.abstractmethod def value(self, event: EventT = None) -> Any: ...
[docs] @abc.abstractmethod def current(self, event: EventT = None) -> Any: ...
[docs] @abc.abstractmethod def now(self) -> Any: ...
[docs] @abc.abstractmethod def delta(self, d: Seconds, event: EventT = None) -> Any: ...
@abc.abstractmethod def __iadd__(self, other: Any) -> Any: ... @abc.abstractmethod def __isub__(self, other: Any) -> Any: ... @abc.abstractmethod def __imul__(self, other: Any) -> Any: ... @abc.abstractmethod def __itruediv__(self, other: Any) -> Any: ... @abc.abstractmethod def __ifloordiv__(self, other: Any) -> Any: ... @abc.abstractmethod def __imod__(self, other: Any) -> Any: ... @abc.abstractmethod def __ipow__(self, other: Any) -> Any: ... @abc.abstractmethod def __ilshift__(self, other: Any) -> Any: ... @abc.abstractmethod def __irshift__(self, other: Any) -> Any: ... @abc.abstractmethod def __iand__(self, other: Any) -> Any: ... @abc.abstractmethod def __ixor__(self, other: Any) -> Any: ... @abc.abstractmethod def __ior__(self, other: Any) -> Any: ...
[docs]class WindowWrapperT(MutableMapping): table: TableT @abc.abstractmethod def __init__(self, table: TableT, *, relative_to: RelativeArg = None) -> None: ... @property @abc.abstractmethod def name(self) -> str: ...
[docs] @abc.abstractmethod def clone(self, relative_to: RelativeArg) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_now(self) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_field(self, field: FieldDescriptorT) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def relative_to_stream(self) -> 'WindowWrapperT': ...
[docs] @abc.abstractmethod def get_timestamp(self, event: EventT = None) -> float: ...
@abc.abstractmethod def __getitem__(self, key: Any) -> WindowSetT: ... @property def get_relative_timestamp(self) -> Optional[RelativeHandler]: ... @get_relative_timestamp.setter def get_relative_timestamp(self, relative_to: RelativeArg) -> None: ...