faust.tables

class faust.tables.Collection(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Base class for changelog-backed data structures stored in Kafka.

data
on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]

Add function as callback to be called on table recovery.

Return type

Callable[[], Awaitable[None]]

info() → Mapping[str, Any][source]
Return type

Mapping[str, Any]

persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]
Return type

Optional[int]

reset_state() → None[source]
Return type

None

join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

left_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

inner_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

outer_join(*fields) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

clone(**kwargs) → Any[source]
Return type

Any

combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

contribute_to_stream(active: faust.types.streams.StreamT) → None[source]
Return type

None

label

Label used for graphs. :rtype: str

shortlabel

Label used for logging. :rtype: str

coroutine call_recover_callbacks(self) → None[source]
Return type

None

logger = <Logger faust.tables.base (WARNING)>
coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type

bool

coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type

None

coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_recovery_completed(self, active_tps: Set[faust.types.tuples.TP], standby_tps: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine remove_from_stream(self, stream: faust.types.streams.StreamT) → None[source]
Return type

None

changelog_topic
Return type

TopicT[]

apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type

None

class faust.tables.CollectionT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
changelog_topic
Return type

TopicT[]

apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type

None

persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]
Return type

Optional[int]

reset_state() → None[source]
Return type

None

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]
Return type

Callable[[], Awaitable[None]]

coroutine call_recover_callbacks(self) → None[source]
Return type

None

coroutine need_active_standby_for(self, tp: faust.types.tuples.TP) → bool[source]
Return type

bool

coroutine on_changelog_event(self, event: faust.types.events.EventT) → None[source]
Return type

None

coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_recovery_completed(self, active_tps: Set[faust.types.tuples.TP], standby_tps: Set[faust.types.tuples.TP]) → None[source]
Return type

None

class faust.tables.TableManager(app: faust.types.app.AppT, **kwargs) → None[source]

Manage tables used by Faust worker.

persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]

Mark the persisted offset for a TP to be saved on commit.

This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.

Return type

None

on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type

None

on_commit_tp(tp: faust.types.tuples.TP) → None[source]
Return type

None

on_rebalance_start() → None[source]
Return type

None

on_actives_ready() → None[source]
Return type

None

on_standbys_ready() → None[source]
Return type

None

changelog_topics
Return type

Set[str]

changelog_queue
Return type

ThrowableQueue

logger = <Logger faust.tables.manager (WARNING)>
coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

coroutine on_start(self) → None[source]

Service is starting.

Return type

None

coroutine on_stop(self) → None[source]

Service is being stopped/restarted.

Return type

None

recovery
Return type

Recovery[]

add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type

CollectionT[]

on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

class faust.tables.TableManagerT(app: faust.types.tables._AppT, **kwargs) → None[source]
add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type

CollectionT[]

persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]
Return type

None

on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type

None

changelog_topics
Return type

Set[str]

coroutine on_rebalance(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

class faust.tables.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]

Table (non-windowed).

class WindowWrapper(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs) → str
Return type

str

clone(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

get_relative_timestamp
Return type

Optional[Callable[[Optional[EventT[]]], Union[float, datetime]]]

get_timestamp(event: faust.types.events.EventT = None) → float
Return type

float

items(event: faust.types.events.EventT = None) → ItemsView
Return type

ItemsView[~KT, +VT_co]

key_index = False
key_index_table = None
keys() → KeysView
Return type

KeysView[~KT]

name
Return type

str

on_del_key(key: Any) → None
Return type

None

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]
Return type

Callable[[], Awaitable[None]]

on_set_key(key: Any, value: Any) → None
Return type

None

relative_to(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_field(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_now() → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

relative_to_stream() → faust.types.tables.WindowWrapperT
Return type

WindowWrapperT[]

values(event: faust.types.events.EventT = None) → ValuesView
Return type

ValuesView[+VT_co]

using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

on_key_get(key: KT) → None[source]

Handle that key is being retrieved.

Return type

None

on_key_set(key: KT, value: VT) → None[source]

Handle that value for a key is being set.

Return type

None

on_key_del(key: KT) → None[source]

Handle that a key is deleted.

Return type

None

as_ansitable(title: str = '{table.name}', **kwargs) → str[source]
Return type

str

logger = <Logger faust.tables.table (WARNING)>
class faust.tables.TableT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]
using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

as_ansitable(**kwargs) → str[source]
Return type

str