faust.tables

class faust.tables.Collection(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[NoneType]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[NoneType]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Base class for changelog-backed data structures stored in Kafka.

data
on_recover(fun: Callable[Awaitable[NoneType]]) → Callable[Awaitable[NoneType]][source]

Add function as callback to be called on table recovery.

Return type:Callable[[], Awaitable[None]]
info() → Mapping[str, Any][source]
Return type:Mapping[str, Any]
persisted_offset(tp: faust.types.tuples.TP) → Union[int, NoneType][source]
Return type:Optional[int]
reset_state() → None[source]
Return type:None
join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
left_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
inner_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
outer_join(*fields) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
clone(**kwargs) → Any[source]
Return type:Any
combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
contribute_to_stream(active: faust.types.streams.StreamT) → None[source]
Return type:None
label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

coroutine call_recover_callbacks(self) → None[source]
Return type:None
logger = <Logger faust.tables.base (WARNING)>
coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type:bool
coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type:None
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_start(self) → None[source]

Called every time before the service is started/restarted.

Return type:None
coroutine remove_from_stream(self, stream: faust.types.streams.StreamT) → None[source]
Return type:None
changelog_topic
apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type:None
class faust.tables.CollectionT(app: faust.types.tables.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables.ModelArg = None, value_type: faust.types.tables.ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[NoneType]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[NoneType]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
StateStore = None
changelog_topic
apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type:None
persisted_offset(tp: faust.types.tuples.TP) → Union[int, NoneType][source]
Return type:Optional[int]
reset_state() → None[source]
Return type:None
on_recover(fun: Callable[Awaitable[NoneType]]) → Callable[Awaitable[NoneType]][source]
Return type:Callable[[], Awaitable[None]]
coroutine call_recover_callbacks(self) → None[source]
Return type:None
coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type:bool
coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type:None
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
class faust.tables.TableManager(app: faust.types.app.AppT, **kwargs) → None[source]

Manage tables used by Faust worker.

changelog_topics
add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type:CollectionT[]
logger = <Logger faust.tables.manager (WARNING)>
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_start(self) → None[source]

Called every time before the service is started/restarted.

Return type:None
coroutine on_stop(self) → None[source]

Called every time before the service is stopped/restarted.

Return type:None
class faust.tables.TableManagerT(app: faust.types.tables.AppT, **kwargs) → None[source]
add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type:CollectionT[]
changelog_topics
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
class faust.tables.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[NoneType]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[NoneType]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Table (non-windowed).

using_window(window: faust.types.windows.WindowT) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
on_key_get(key: Any) → None[source]

Handle that key is being retrieved.

Return type:None
on_key_set(key: Any, value: Any) → None[source]

Handle that value for a key is being set.

Return type:None
on_key_del(key: Any) → None[source]

Handle that a key is deleted.

Return type:None
as_ansitable(*, key: str = 'Key', value: str = 'Value', sort: bool = False, sortkey: Callable[Any, Any] = operator.itemgetter(0), target: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, title: str = '{table.name}') → str[source]
Return type:str
logger = <Logger faust.tables.table (WARNING)>
class faust.tables.TableT(app: faust.types.tables.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables.ModelArg = None, value_type: faust.types.tables.ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[NoneType]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[NoneType]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
using_window(window: faust.types.windows.WindowT) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → faust.types.tables.WindowWrapperT[source]
Return type:WindowWrapperT[]
as_ansitable(*, key: str = 'Key', value: str = 'Value', sort: bool = False, sortkey: Callable[Any, Any] = <function TableT.<lambda>>, title: str = 'Title') → str[source]
Return type:str