faust.stores.rocksdb

RocksDB storage.

class faust.stores.rocksdb.DB[source]

Dummy DB.

class faust.stores.rocksdb.Options[source]

Dummy Options.

class faust.stores.rocksdb.PartitionDB(*args, **kwargs)[source]

Tuple of (partition, rocksdb.DB).

property partition

Alias for field number 0

property db

Alias for field number 1

class faust.stores.rocksdb.RocksDBOptions(max_open_files: int = None, write_buffer_size: int = None, max_write_buffer_number: int = None, target_file_size_base: int = None, block_cache_size: int = None, block_cache_compressed_size: int = None, bloom_filter_size: int = None, **kwargs: Any) → None[source]

Options required to open a RocksDB database.

max_open_files = 943719
write_buffer_size = 67108864
max_write_buffer_number = 3
target_file_size_base = 67108864
block_cache_size = 2147483648
block_cache_compressed_size = 524288000
bloom_filter_size = 3
open(path: pathlib.Path, *, read_only: bool = False) → faust.stores.rocksdb.DB[source]

Open RocksDB database using this configuration.

Return type

DB

as_options() → faust.stores.rocksdb.Options[source]

Return rocksdb.Options object using this configuration.

Return type

Options

class faust.stores.rocksdb.Store(url: Union[str, yarl.URL], app: faust.types.app.AppT, table: faust.types.tables.CollectionT, *, key_index_size: int = None, options: Mapping[str, Any] = None, **kwargs: Any) → None[source]

RocksDB table storage.

offset_key = b'__faust\x00offset__'
rocksdb_options = None

Used to configure the RocksDB settings for table stores.

key_index_size = None

Decides the size of the K=>TopicPartition index (10_000).

persisted_offset(tp: faust.types.tuples.TP) → Optional[int][source]

Return the last persisted offset.

See set_persisted_offset().

Return type

Optional[int]

set_persisted_offset(tp: faust.types.tuples.TP, offset: int) → None[source]

Set the last persisted offset for this table.

This will remember the last offset that we wrote to RocksDB, so that on rebalance/recovery we can seek past this point to only read the events that occurred recently while we were not an active replica.

Return type

None

async need_active_standby_for(tp: faust.types.tuples.TP) → bool[source]

Decide if an active standby is needed for this topic partition.

Since other workers may be running on the same local machine, we can decide to not actively read standby messages, since that database file is already being populated.

Currently it is recommended that you use separate data directories for multiple worker son the same machine.

For example if you have a 4 CPU core machine, you can run four worker instances on that machine, but using separate data directories:

$ myproj --datadir=/var/faust/w1 worker -l info --web-port=6066
$ myproj --datadir=/var/faust/w2 worker -l info --web-port=6067
$ myproj --datadir=/var/faust/w3 worker -l info --web-port=6068
$ myproj --datadir=/var/faust/w4 worker -l info --web-port=6069
Return type

bool

apply_changelog_batch(batch: Iterable[faust.types.events.EventT], to_key: Callable[Any, Any], to_value: Callable[Any, Any]) → None[source]

Write batch of changelog events to local RocksDB storage.

Parameters
  • batch (Iterable[EventT[]]) – Iterable of changelog events (faust.Event)

  • to_key (Callable[[Any], Any]) – A callable you can use to deserialize the key of a changelog event.

  • to_value (Callable[[Any], Any]) – A callable you can use to deserialize the value of a changelog event.

Return type

None

async on_rebalance(table: faust.types.tables.CollectionT, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]

Rebalance occurred.

Parameters
  • table (CollectionT[]) – The table that we store data for.

  • assigned (Set[TP]) – Set of all assigned topic partitions.

  • revoked (Set[TP]) – Set of newly revoked topic partitions.

  • newly_assigned (Set[TP]) – Set of newly assigned topic partitions, for which we were not assigned the last time.

Return type

None

revoke_partitions(table: faust.types.tables.CollectionT, tps: Set[faust.types.tuples.TP]) → None[source]

De-assign partitions used on this worker instance.

Parameters
  • table (CollectionT[]) – The table that we store data for.

  • tps (Set[TP]) – Set of topic partitions that we should no longer be serving data for.

Return type

None

async assign_partitions(table: faust.types.tables.CollectionT, tps: Set[faust.types.tuples.TP]) → None[source]

Assign partitions to this worker instance.

Parameters
  • table (CollectionT[]) – The table that we store data for.

  • tps (Set[TP]) – Set of topic partitions we have been assigned.

Return type

None

logger = <Logger faust.stores.rocksdb (WARNING)>
reset_state() → None[source]

Remove all data stored in this table.

Notes

Only local data will be removed, table changelog partitions in Kafka will not be affected.

Return type

None

partition_path(partition: int) → pathlib.Path[source]

Return pathlib.Path to db file of specific partition.

Return type

Path

property path

Path to directory where tables are stored.

See also

tabledir (default value for this path).

Return type

Path

Returns

pathlib.Path.

property basename

Return the name of this table, used as filename prefix. :rtype: Path