faust.tables

Tables: Distributed object K/V-store.

class faust.tables.Collection(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, **kwargs) → None[source]

Base class for changelog-backed data structures stored in Kafka.

property data

Underlying table storage.

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]

Add function as callback to be called on table recovery.

Return type

Callable[[], Awaitable[None]]

info() → Mapping[str, Any][source]

Return table attributes as dictionary.

Return type

Mapping[str, Any]

persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]

Return the last persisted offset for topic partition.

Return type

Optional[int]

reset_state() → None[source]

Reset local state.

Return type

None

join(*fields) → faust.types.streams.StreamT[source]

Right join of this table and another stream/table.

Return type

StreamT[+T_co]

left_join(*fields) → faust.types.streams.StreamT[source]

Left join of this table and another stream/table.

Return type

StreamT[+T_co]

inner_join(*fields) → faust.types.streams.StreamT[source]

Inner join of this table and another stream/table.

Return type

StreamT[+T_co]

outer_join(*fields) → faust.types.streams.StreamT[source]

Outer join of this table and another stream/table.

Return type

StreamT[+T_co]

clone(**kwargs) → Any[source]

Clone table instance.

Return type

Any

combine(*nodes, **kwargs) → faust.types.streams.StreamT[source]

Combine tables and streams.

Return type

StreamT[+T_co]

contribute_to_stream(active: faust.types.streams.StreamT) → None[source]

Contribute table to stream join.

Return type

None

property label

Return human-readable label used to represent this table. :rtype: str

property shortlabel

Return short label used to represent this table in logs. :rtype: str

logger = <Logger faust.tables.base (WARNING)>
property changelog_topic

Return the changelog topic used by this table. :rtype: TopicT[]

apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]

Apply batch of events from changelog topic local table storage.

Return type

None

class faust.tables.CollectionT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, options: Mapping[str, Any] = None, **kwargs) → None[source]
abstract property changelog_topic
Return type

TopicT[]

abstract apply_changelog_batch(batch: Iterable[faust.types.events.EventT]) → None[source]
Return type

None

abstract persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]
Return type

Optional[int]

abstract reset_state() → None[source]
Return type

None

abstract on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]][source]
Return type

Callable[[], Awaitable[None]]

class faust.tables.TableManager(app: faust.types.app.AppT, **kwargs) → None[source]

Manage tables used by Faust worker.

persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]

Mark the persisted offset for a TP to be saved on commit.

This is used for “exactly_once” processing guarantee. Instead of writing the persisted offset to RocksDB when the message is sent, we write it to disk when the offset is committed.

Return type

None

on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]

Call when committing source topic partitions.

Return type

None

on_commit_tp(tp: faust.types.tuples.TP) → None[source]

Call when committing source topic partition used by this table.

Return type

None

on_rebalance_start() → None[source]

Call when a new rebalancing operation starts.

Return type

None

on_actives_ready() → None[source]

Call when actives are fully up-to-date.

Return type

None

on_standbys_ready() → None[source]

Call when standbys are fully up-to-date and ready for failover.

Return type

None

property changelog_topics

Return the set of known changelog topics. :rtype: Set[str]

property changelog_queue

Queue used to buffer changelog events. :rtype: ThrowableQueue

property recovery

Recovery service used by this table manager. :rtype: Recovery[]

add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]

Add table to be managed by this table manager.

Return type

CollectionT[]

logger = <Logger faust.tables.manager (WARNING)>
on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when cluster is rebalancing and partitions revoked.

Return type

None

class faust.tables.TableManagerT(app: faust.types.tables._AppT, **kwargs) → None[source]
abstract add(table: faust.types.tables.CollectionT) → faust.types.tables.CollectionT[source]
Return type

CollectionT[]

abstract persist_offset_on_commit(store: faust.types.stores.StoreT, tp: faust.types.tuples.TP, offset: int) → None[source]
Return type

None

abstract on_commit(offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type

None

abstract property changelog_topics
Return type

Set[str]

class faust.tables.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, **kwargs) → None[source]

Table (non-windowed).

class WindowWrapper(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs) → str

Draw table as a terminal ANSI table.

Return type

str

clone(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT

Clone this table using a new time-relativity configuration.

Return type

WindowWrapperT[]

property get_relative_timestamp

Return the current handler for extracting event timestamp. :rtype: Optional[Callable[[Optional[EventT[]]], Union[float, datetime]]]

get_timestamp(event: faust.types.events.EventT = None) → float

Get timestamp from event.

Return type

float

items(event: faust.types.events.EventT = None) → ItemsView

Return table items view: iterate over (key, value) pairs.

Return type

ItemsView[~KT, +VT_co]

key_index = False
key_index_table = None
keys() → KeysView

Return table keys view: iterate over keys found in this table.

Return type

KeysView[~KT]

property name

Return the name of this table. :rtype: str

on_del_key(key: Any) → None

Call when a key is deleted from this table.

Return type

None

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]

Call after table recovery.

Return type

Callable[[], Awaitable[None]]

on_set_key(key: Any, value: Any) → None

Call when the value for a key in this table is set.

Return type

None

relative_to(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT

Configure the time-relativity of this windowed table.

Return type

WindowWrapperT[]

relative_to_field(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type

WindowWrapperT[]

relative_to_now() → faust.types.tables.WindowWrapperT

Configure table to be time-relative to the system clock.

Return type

WindowWrapperT[]

relative_to_stream() → faust.types.tables.WindowWrapperT

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type

WindowWrapperT[]

values(event: faust.types.events.EventT = None) → ValuesView

Return table values view: iterate over values in this table.

Return type

ValuesView[+VT_co]

using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table using a specific window type.

Return type

WindowWrapperT[]

hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table in a hopping window.

Return type

WindowWrapperT[]

tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table in a tumbling window.

Return type

WindowWrapperT[]

on_key_get(key: KT) → None[source]

Call when the value for a key in this table is retrieved.

Return type

None

on_key_set(key: KT, value: VT) → None[source]

Call when the value for a key in this table is set.

Return type

None

on_key_del(key: KT) → None[source]

Call when a key in this table is removed.

Return type

None

as_ansitable(title: str = '{table.name}', **kwargs) → str[source]

Draw table as a a terminal ANSI table.

Return type

str

logger = <Logger faust.tables.table (WARNING)>
class faust.tables.TableT(app: faust.types.tables._AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: faust.types.tables._ModelArg = None, value_type: faust.types.tables._ModelArg = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, options: Mapping[str, Any] = None, **kwargs) → None[source]
abstract using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

abstract hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

abstract tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]
Return type

WindowWrapperT[]

abstract as_ansitable(**kwargs) → str[source]
Return type

str