faust.tables.recovery

Table recovery after rebalancing.

exception faust.tables.recovery.ServiceStopped[source]

The recovery service was stopped.

exception faust.tables.recovery.RebalanceAgain[source]

During rebalance, another rebalance happened.

class faust.tables.recovery.Recovery(app: faust.types.app.AppT, tables: faust.types.tables.TableManagerT, **kwargs: Any) → None[source]

Service responsible for recovering tables from changelog topics.

stats_interval = 5.0
highwaters = None

Mapping of highwaters by topic partition.

in_recovery = False
standbys_pending = False
standby_tps = None

Set of standby topic partitions.

active_tps = None

Set of active topic partitions.

tp_to_table = None

Mapping from topic partition to table

active_offsets = None

Active offset by topic partition.

standby_offsets = None

Standby offset by topic partition.

active_highwaters = None

Active highwaters by topic partition.

standby_highwaters = None

Standby highwaters by topic partition.

buffers = None

Changelog event buffers by table. These are filled by background task _slurp_changelog, and need to be flushed before starting new recovery/stopping.

buffer_sizes = None

Cache of buffer size by topic partition..

property signal_recovery_start

Event used to signal that recovery has started. :rtype: Event

property signal_recovery_end

Event used to signal that recovery has ended. :rtype: Event

property signal_recovery_reset

Event used to signal that recovery is restarting. :rtype: Event

async on_stop() → None[source]

Call when recovery service stops.

Return type

None

add_active(table: faust.types.tables.CollectionT, tp: faust.types.tuples.TP) → None[source]

Add changelog partition to be used for active recovery.

Return type

None

add_standby(table: faust.types.tables.CollectionT, tp: faust.types.tuples.TP) → None[source]

Add changelog partition to be used for standby recovery.

Return type

None

add_standbys_for_global_table(gtable: faust.types.tables.CollectionT, active_topic: faust.types.tuples.TP, topics: Set[faust.types.tuples.TP]) → None[source]

Add standby topics for global table.

Return type

None

revoke(tp: faust.types.tuples.TP) → None[source]

Revoke assignment of table changelog partition.

Return type

None

on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when rebalancing and partitions are revoked.

Return type

None

async on_rebalance(assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]

Call when cluster is rebalancing.

Return type

None

logger = <Logger faust.tables.recovery (WARNING)>
async on_recovery_completed() → None[source]

Call when active table recovery is completed.

Return type

None

flush_buffers() → None[source]

Flush changelog buffers.

Return type

None

need_recovery() → bool[source]

Return True if recovery is required.

Return type

bool

active_remaining() → Counter[faust.types.tuples.TP][source]

Return counter of remaining changes by active partition.

Return type

Counter[TP]

standby_remaining() → Counter[faust.types.tuples.TP][source]

Return counter of remaining changes by standby partition.

Return type

Counter[TP]

active_remaining_total() → int[source]

Return number of changes remaining for actives to be up-to-date.

Return type

int

standby_remaining_total() → int[source]

Return number of changes remaining for standbys to be up-to-date.

Return type

int

active_stats() → MutableMapping[faust.types.tuples.TP, Tuple[int, int, int]][source]

Return current active recovery statistics.

Return type

MutableMapping[TP, Tuple[int, int, int]]

standby_stats() → MutableMapping[faust.types.tuples.TP, Tuple[int, int, int]][source]

Return current standby recovery statistics.

Return type

MutableMapping[TP, Tuple[int, int, int]]