Source code for faust.types.stores

import abc
import typing
from typing import (
    Any,
    Callable,
    Iterable,
    Mapping,
    Optional,
    Set,
    TypeVar,
    Union,
)

from mode import ServiceT
from mode.utils.collections import FastUserDict
from yarl import URL

from .codecs import CodecArg
from .events import EventT
from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .models import ModelArg as _ModelArg
    from .tables import CollectionT as _CollectionT
else:
    class _AppT: ...  # noqa
    class _ModelArg: ...  # noqa
    class _CollectionT: ...  # noqa

__all__ = ['StoreT']

KT = TypeVar('KT')
VT = TypeVar('VT')


[docs]class StoreT(ServiceT, FastUserDict[KT, VT]): url: URL app: _AppT table: _CollectionT table_name: str key_type: Optional[_ModelArg] value_type: Optional[_ModelArg] key_serializer: CodecArg value_serializer: CodecArg options: Optional[Mapping[str, Any]] @abc.abstractmethod def __init__(self, url: Union[str, URL], app: _AppT, table: _CollectionT, *, table_name: str = '', key_type: _ModelArg = None, value_type: _ModelArg = None, key_serializer: CodecArg = '', value_serializer: CodecArg = '', options: Mapping[str, Any] = None, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod def persisted_offset(self, tp: TP) -> Optional[int]: ...
[docs] @abc.abstractmethod def set_persisted_offset(self, tp: TP, offset: int) -> None: ...
[docs] @abc.abstractmethod async def need_active_standby_for(self, tp: TP) -> bool: ...
[docs] @abc.abstractmethod def apply_changelog_batch(self, batch: Iterable[EventT], to_key: Callable[[Any], KT], to_value: Callable[[Any], VT]) -> None: ...
[docs] @abc.abstractmethod def reset_state(self) -> None: ...
[docs] @abc.abstractmethod async def on_rebalance(self, table: _CollectionT, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]) -> None: ...
[docs] @abc.abstractmethod async def on_recovery_completed(self, active_tps: Set[TP], standby_tps: Set[TP]) -> None: ...