faust.stores.rocksdb
¶
RocksDB storage.
-
class
faust.stores.rocksdb.
PartitionDB
(*args, **kwargs)[source]¶ Tuple of
(partition, rocksdb.DB)
.-
property
partition
¶ Alias for field number 0
-
property
db
¶ Alias for field number 1
-
property
-
class
faust.stores.rocksdb.
RocksDBOptions
(max_open_files: int = None, write_buffer_size: int = None, max_write_buffer_number: int = None, target_file_size_base: int = None, block_cache_size: int = None, block_cache_compressed_size: int = None, bloom_filter_size: int = None, **kwargs: Any) → None[source]¶ Options required to open a RocksDB database.
-
max_open_files
= 943719¶
-
write_buffer_size
= 67108864¶
-
max_write_buffer_number
= 3¶
-
target_file_size_base
= 67108864¶
-
block_cache_size
= 2147483648¶
-
block_cache_compressed_size
= 524288000¶
-
bloom_filter_size
= 3¶
-
open
(path: pathlib.Path, *, read_only: bool = False) → faust.stores.rocksdb.DB[source]¶ Open RocksDB database using this configuration.
- Return type
-
as_options
() → faust.stores.rocksdb.Options[source]¶ Return
rocksdb.Options
object using this configuration.- Return type
-
-
class
faust.stores.rocksdb.
Store
(url: Union[str, yarl.URL], app: faust.types.app.AppT, table: faust.types.tables.CollectionT, *, key_index_size: int = None, options: Mapping[str, Any] = None, **kwargs: Any) → None[source]¶ RocksDB table storage.
-
offset_key
= b'__faust\x00offset__'¶
-
rocksdb_options
= None¶ Used to configure the RocksDB settings for table stores.
-
key_index_size
= None¶ Decides the size of the K=>TopicPartition index (10_000).
-
persisted_offset
(tp: faust.types.tuples.TP) → Optional[int][source]¶ Return the last persisted offset.
-
set_persisted_offset
(tp: faust.types.tuples.TP, offset: int) → None[source]¶ Set the last persisted offset for this table.
This will remember the last offset that we wrote to RocksDB, so that on rebalance/recovery we can seek past this point to only read the events that occurred recently while we were not an active replica.
- Return type
None
-
async
need_active_standby_for
(tp: faust.types.tuples.TP) → bool[source]¶ Decide if an active standby is needed for this topic partition.
Since other workers may be running on the same local machine, we can decide to not actively read standby messages, since that database file is already being populated.
Currently it is recommended that you use separate data directories for multiple worker son the same machine.
For example if you have a 4 CPU core machine, you can run four worker instances on that machine, but using separate data directories:
$ myproj --datadir=/var/faust/w1 worker -l info --web-port=6066 $ myproj --datadir=/var/faust/w2 worker -l info --web-port=6067 $ myproj --datadir=/var/faust/w3 worker -l info --web-port=6068 $ myproj --datadir=/var/faust/w4 worker -l info --web-port=6069
- Return type
-
apply_changelog_batch
(batch: Iterable[faust.types.events.EventT], to_key: Callable[Any, Any], to_value: Callable[Any, Any]) → None[source]¶ Write batch of changelog events to local RocksDB storage.
- Parameters
- Return type
None
-
async
on_rebalance
(table: faust.types.tables.CollectionT, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]¶ Rebalance occurred.
- Parameters
- Return type
None
-
revoke_partitions
(table: faust.types.tables.CollectionT, tps: Set[faust.types.tuples.TP]) → None[source]¶ De-assign partitions used on this worker instance.
- Parameters
table (
CollectionT
[]) – The table that we store data for.tps (
Set
[TP
]) – Set of topic partitions that we should no longer be serving data for.
- Return type
None
-
async
assign_partitions
(table: faust.types.tables.CollectionT, tps: Set[faust.types.tuples.TP]) → None[source]¶ Assign partitions to this worker instance.
- Parameters
table (
CollectionT
[]) – The table that we store data for.tps (
Set
[TP
]) – Set of topic partitions we have been assigned.
- Return type
None
-
logger
= <Logger faust.stores.rocksdb (WARNING)>¶
-
reset_state
() → None[source]¶ Remove all data stored in this table.
Notes
Only local data will be removed, table changelog partitions in Kafka will not be affected.
- Return type
None
-
partition_path
(partition: int) → pathlib.Path[source]¶ Return
pathlib.Path
to db file of specific partition.- Return type
-