Source code for faust.stores.rocksdb

"""RocksDB storage."""
import math
import shutil
import typing
from collections import defaultdict
from contextlib import suppress
from pathlib import Path
from typing import (

from mode.utils.collections import LRUCache
from yarl import URL

from faust.exceptions import ImproperlyConfigured
from faust.streams import current_event
from faust.types import AppT, CollectionT, EventT, TP
from faust.utils import platforms

from . import base

_max_open_files = platforms.max_open_files()
if _max_open_files is not None:
    _max_open_files = math.ceil(_max_open_files * 0.90)
DEFAULT_MAX_OPEN_FILES = _max_open_files

    import rocksdb
except ImportError:
    rocksdb = None  # noqa

if typing.TYPE_CHECKING:
    from rocksdb import DB, Options
[docs] class DB: # noqa """Dummy DB."""
[docs] class Options: # noqa """Dummy Options."""
[docs]class PartitionDB(NamedTuple): """Tuple of ``(partition, rocksdb.DB)``.""" partition: int db: DB
class _DBValueTuple(NamedTuple): db: DB value: bytes
[docs]class RocksDBOptions: """Options required to open a RocksDB database.""" max_open_files: Optional[int] = DEFAULT_MAX_OPEN_FILES write_buffer_size: int = 67108864 max_write_buffer_number: int = 3 target_file_size_base: int = 67108864 block_cache_size: int = 2 * 1024 ** 3 block_cache_compressed_size: int = 500 * 1024 ** 2 bloom_filter_size: int = 3 extra_options: Mapping def __init__(self, max_open_files: int = None, write_buffer_size: int = None, max_write_buffer_number: int = None, target_file_size_base: int = None, block_cache_size: int = None, block_cache_compressed_size: int = None, bloom_filter_size: int = None, **kwargs: Any) -> None: if max_open_files is not None: self.max_open_files = max_open_files if write_buffer_size is not None: self.write_buffer_size = write_buffer_size if max_write_buffer_number is not None: self.max_write_buffer_number = max_write_buffer_number if target_file_size_base is not None: self.target_file_size_base = target_file_size_base if block_cache_size is not None: self.block_cache_size = block_cache_size if block_cache_compressed_size is not None: self.block_cache_compressed_size = block_cache_compressed_size self.extra_options = kwargs
[docs] def open(self, path: Path, *, read_only: bool = False) -> DB: return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
[docs] def as_options(self) -> Options: return rocksdb.Options( create_if_missing=True, max_open_files=self.max_open_files, write_buffer_size=self.write_buffer_size, max_write_buffer_number=self.max_write_buffer_number, target_file_size_base=self.target_file_size_base, table_factory=rocksdb.BlockBasedTableFactory( filter_policy=rocksdb.BloomFilterPolicy( self.bloom_filter_size), block_cache=rocksdb.LRUCache(self.block_cache_size), block_cache_compressed=rocksdb.LRUCache( self.block_cache_compressed_size), ), **self.extra_options)
[docs]class Store(base.SerializedStore): """RocksDB table storage.""" offset_key = b'__faust\0offset__' #: Decides the size of the K=>TopicPartition index (10_000). key_index_size: int #: Used to configure the RocksDB settings for table stores. options: RocksDBOptions _dbs: MutableMapping[int, DB] _key_index: LRUCache[bytes, int] def __init__(self, url: Union[str, URL], app: AppT, *, key_index_size: int = 10_000, options: Mapping = None, **kwargs: Any) -> None: if rocksdb is None: raise ImproperlyConfigured( 'RocksDB bindings not installed: pip install python-rocksdb') super().__init__(url, app, **kwargs) if not self.url.path: self.url /= self.table_name self.options = RocksDBOptions(**options or {}) self.key_index_size = key_index_size self._dbs = {} self._key_index = LRUCache(limit=self.key_index_size)
[docs] def persisted_offset(self, tp: TP) -> Optional[int]: offset = self._db_for_partition(tp.partition).get(self.offset_key) if offset: return int(offset) return None
[docs] def set_persisted_offset(self, tp: TP, offset: int) -> None: self._db_for_partition(tp.partition).put( self.offset_key, str(offset).encode())
[docs] async def need_active_standby_for(self, tp: TP) -> bool: try: self._db_for_partition(tp.partition) except rocksdb.errors.RocksIOError as exc: if 'lock' not in repr(exc): raise return False else: return True
[docs] def apply_changelog_batch(self, batch: Iterable[EventT], to_key: Callable[[Any], Any], to_value: Callable[[Any], Any]) -> None: batches: DefaultDict[int, rocksdb.WriteBatch] batches = defaultdict(rocksdb.WriteBatch) tp_offsets: Dict[TP, int] = {} for event in batch: tp, offset =, event.message.offset tp_offsets[tp] = ( offset if tp not in tp_offsets else max(offset, tp_offsets[tp]) ) msg = event.message if msg.value is None: batches[msg.partition].delete(msg.key) else: batches[msg.partition].put(msg.key, msg.value) for partition, batch in batches.items(): self._db_for_partition(partition).write(batch) for tp, offset in tp_offsets.items(): self.set_persisted_offset(tp, offset)
def _set(self, key: bytes, value: Optional[bytes]) -> None: event = current_event() assert event is not None partition = event.message.partition db = self._db_for_partition(partition) self._key_index[key] = partition db.put(key, value) def _db_for_partition(self, partition: int) -> DB: try: return self._dbs[partition] except KeyError: db = self._dbs[partition] = self._open_for_partition(partition) return db def _open_for_partition(self, partition: int) -> DB: return def _get(self, key: bytes) -> Optional[bytes]: dbvalue = self._get_bucket_for_key(key) if dbvalue is None: return None db, value = dbvalue if value is None: if db.key_may_exist(key)[0]: value = db.get(key) if value is not None: return value return value def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]: dbs: Iterable[PartitionDB] try: partition = self._key_index[key] dbs = [PartitionDB(partition, self._dbs[partition])] except KeyError: dbs = cast(Iterable[PartitionDB], self._dbs.items()) for partition, db in dbs: if db.key_may_exist(key)[0]: value = db.get(key) if value is not None: self._key_index[key] = partition return _DBValueTuple(db, value) return None def _del(self, key: bytes) -> None: for db in self._dbs_for_key(key): db.delete(key)
[docs] async def on_partitions_revoked(self, table: CollectionT, revoked: Set[TP]) -> None: for tp in revoked: if tp.topic in table.changelog_topic.topics: db = self._dbs.pop(tp.partition, None) if db is not None: del(db) import gc gc.collect() # XXX RocksDB has no .close() method :X self._key_index.clear()
[docs] async def on_partitions_assigned(self, table: CollectionT, assigned: Set[TP]) -> None: self._key_index.clear() standby_tps = my_topics = table.changelog_topic.topics for tp in assigned: if tp.topic in my_topics and tp not in standby_tps: for i in range(5): try: # side effect: opens db and adds to self._dbs. self._db_for_partition(tp.partition) except rocksdb.errors.RocksIOError as exc: if i == 4 or 'lock' not in repr(exc): raise 'DB for partition %r is locked! Retry in 1s...', tp.partition) await self.sleep(1.0) else: break
def _contains(self, key: bytes) -> bool: for db in self._dbs_for_key(key): # bloom filter: false positives possible, but not false negatives if db.key_may_exist(key)[0] and db.get(key) is not None: return True return False def _dbs_for_key(self, key: bytes) -> Iterable[DB]: # Returns cached db if key is in index, otherwise all dbs # for linear search. try: return [self._dbs[self._key_index[key]]] except KeyError: return self._dbs.values() def _size(self) -> int: return sum(self._size1(db) for db in self._dbs.values()) def _visible_keys(self, db: DB) -> Iterator[bytes]: it = db.iterkeys() # noqa: B301 it.seek_to_first() for key in it: if key != self.offset_key: yield key def _visible_items(self, db: DB) -> Iterator[Tuple[bytes, bytes]]: it = db.iteritems() # noqa: B301 it.seek_to_first() for key, value in it: if key != self.offset_key: yield key, value def _visible_values(self, db: DB) -> Iterator[bytes]: for _, value in self._visible_items(db): yield value def _size1(self, db: DB) -> int: return sum(1 for _ in self._visible_keys(db)) def _iterkeys(self) -> Iterator[bytes]: for db in self._dbs.values(): yield from self._visible_keys(db) def _itervalues(self) -> Iterator[bytes]: for db in self._dbs.values(): yield from self._visible_values(db) def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]: for db in self._dbs.values(): yield from self._visible_items(db) def _clear(self) -> None: raise NotImplementedError('TODO') # XXX cannot reset tables
[docs] def reset_state(self) -> None: self._dbs.clear() self._key_index.clear() with suppress(FileNotFoundError): shutil.rmtree(self.path.absolute())
[docs] def partition_path(self, partition: int) -> Path: p = self.path / self.basename return self.with_suffix(p.with_name(f'{}-{partition}'))
[docs] def with_suffix(self, path: Path, *, suffix: str = '.db') -> Path: return path.with_suffix(suffix)
@property def path(self) -> Path: return @property def basename(self) -> Path: return Path(self.url.path)