"""RocksDB storage."""
import math
import shutil
import typing
from collections import defaultdict
from contextlib import suppress
from pathlib import Path
from typing import (
Any,
Callable,
DefaultDict,
Dict,
Iterable,
Iterator,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Set,
Tuple,
Union,
cast,
)
from mode.utils.collections import LRUCache
from yarl import URL
from faust.exceptions import ImproperlyConfigured
from faust.streams import current_event
from faust.types import AppT, CollectionT, EventT, TP
from faust.utils import platforms
from . import base
_max_open_files = platforms.max_open_files()
if _max_open_files is not None:
_max_open_files = math.ceil(_max_open_files * 0.90)
DEFAULT_MAX_OPEN_FILES = _max_open_files
try:
import rocksdb
except ImportError:
rocksdb = None # noqa
if typing.TYPE_CHECKING:
from rocksdb import DB, Options
else:
[docs] class DB: # noqa
"""Dummy DB."""
[docs] class Options: # noqa
"""Dummy Options."""
[docs]class PartitionDB(NamedTuple):
"""Tuple of ``(partition, rocksdb.DB)``."""
partition: int
db: DB
class _DBValueTuple(NamedTuple):
db: DB
value: bytes
[docs]class RocksDBOptions:
"""Options required to open a RocksDB database."""
max_open_files: Optional[int] = DEFAULT_MAX_OPEN_FILES
write_buffer_size: int = 67108864
max_write_buffer_number: int = 3
target_file_size_base: int = 67108864
block_cache_size: int = 2 * 1024 ** 3
block_cache_compressed_size: int = 500 * 1024 ** 2
bloom_filter_size: int = 3
extra_options: Mapping
def __init__(self,
max_open_files: int = None,
write_buffer_size: int = None,
max_write_buffer_number: int = None,
target_file_size_base: int = None,
block_cache_size: int = None,
block_cache_compressed_size: int = None,
bloom_filter_size: int = None,
**kwargs: Any) -> None:
if max_open_files is not None:
self.max_open_files = max_open_files
if write_buffer_size is not None:
self.write_buffer_size = write_buffer_size
if max_write_buffer_number is not None:
self.max_write_buffer_number = max_write_buffer_number
if target_file_size_base is not None:
self.target_file_size_base = target_file_size_base
if block_cache_size is not None:
self.block_cache_size = block_cache_size
if block_cache_compressed_size is not None:
self.block_cache_compressed_size = block_cache_compressed_size
self.extra_options = kwargs
[docs] def open(self, path: Path, *, read_only: bool = False) -> DB:
return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
[docs] def as_options(self) -> Options:
return rocksdb.Options(
create_if_missing=True,
max_open_files=self.max_open_files,
write_buffer_size=self.write_buffer_size,
max_write_buffer_number=self.max_write_buffer_number,
target_file_size_base=self.target_file_size_base,
table_factory=rocksdb.BlockBasedTableFactory(
filter_policy=rocksdb.BloomFilterPolicy(
self.bloom_filter_size),
block_cache=rocksdb.LRUCache(self.block_cache_size),
block_cache_compressed=rocksdb.LRUCache(
self.block_cache_compressed_size),
),
**self.extra_options)
[docs]class Store(base.SerializedStore):
"""RocksDB table storage."""
offset_key = b'__faust\0offset__'
#: Decides the size of the K=>TopicPartition index (10_000).
key_index_size: int
#: Used to configure the RocksDB settings for table stores.
options: RocksDBOptions
_dbs: MutableMapping[int, DB]
_key_index: LRUCache[bytes, int]
def __init__(self,
url: Union[str, URL],
app: AppT,
*,
key_index_size: int = 10_000,
options: Mapping = None,
**kwargs: Any) -> None:
if rocksdb is None:
raise ImproperlyConfigured(
'RocksDB bindings not installed: pip install python-rocksdb')
super().__init__(url, app, **kwargs)
if not self.url.path:
self.url /= self.table_name
self.options = RocksDBOptions(**options or {})
self.key_index_size = key_index_size
self._dbs = {}
self._key_index = LRUCache(limit=self.key_index_size)
[docs] def persisted_offset(self, tp: TP) -> Optional[int]:
offset = self._db_for_partition(tp.partition).get(self.offset_key)
if offset:
return int(offset)
return None
[docs] def set_persisted_offset(self, tp: TP, offset: int) -> None:
self._db_for_partition(tp.partition).put(
self.offset_key, str(offset).encode())
[docs] async def need_active_standby_for(self, tp: TP) -> bool:
try:
self._db_for_partition(tp.partition)
except rocksdb.errors.RocksIOError as exc:
if 'lock' not in repr(exc):
raise
return False
else:
return True
[docs] def apply_changelog_batch(self,
batch: Iterable[EventT],
to_key: Callable[[Any], Any],
to_value: Callable[[Any], Any]) -> None:
batches: DefaultDict[int, rocksdb.WriteBatch]
batches = defaultdict(rocksdb.WriteBatch)
tp_offsets: Dict[TP, int] = {}
for event in batch:
tp, offset = event.message.tp, event.message.offset
tp_offsets[tp] = (
offset if tp not in tp_offsets
else max(offset, tp_offsets[tp])
)
msg = event.message
if msg.value is None:
batches[msg.partition].delete(msg.key)
else:
batches[msg.partition].put(msg.key, msg.value)
for partition, batch in batches.items():
self._db_for_partition(partition).write(batch)
for tp, offset in tp_offsets.items():
self.set_persisted_offset(tp, offset)
def _set(self, key: bytes, value: Optional[bytes]) -> None:
event = current_event()
assert event is not None
partition = event.message.partition
db = self._db_for_partition(partition)
self._key_index[key] = partition
db.put(key, value)
def _db_for_partition(self, partition: int) -> DB:
try:
return self._dbs[partition]
except KeyError:
db = self._dbs[partition] = self._open_for_partition(partition)
return db
def _open_for_partition(self, partition: int) -> DB:
return self.options.open(self.partition_path(partition))
def _get(self, key: bytes) -> Optional[bytes]:
dbvalue = self._get_bucket_for_key(key)
if dbvalue is None:
return None
db, value = dbvalue
if value is None:
if db.key_may_exist(key)[0]:
value = db.get(key)
if value is not None:
return value
return value
def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]:
dbs: Iterable[PartitionDB]
try:
partition = self._key_index[key]
dbs = [PartitionDB(partition, self._dbs[partition])]
except KeyError:
dbs = cast(Iterable[PartitionDB], self._dbs.items())
for partition, db in dbs:
if db.key_may_exist(key)[0]:
value = db.get(key)
if value is not None:
self._key_index[key] = partition
return _DBValueTuple(db, value)
return None
def _del(self, key: bytes) -> None:
for db in self._dbs_for_key(key):
db.delete(key)
[docs] async def on_partitions_revoked(self, table: CollectionT,
revoked: Set[TP]) -> None:
for tp in revoked:
if tp.topic in table.changelog_topic.topics:
db = self._dbs.pop(tp.partition, None)
if db is not None:
del(db)
import gc
gc.collect() # XXX RocksDB has no .close() method :X
self._key_index.clear()
[docs] async def on_partitions_assigned(self, table: CollectionT,
assigned: Set[TP]) -> None:
self._key_index.clear()
standby_tps = self.app.assignor.assigned_standbys()
my_topics = table.changelog_topic.topics
for tp in assigned:
if tp.topic in my_topics and tp not in standby_tps:
for i in range(5):
try:
# side effect: opens db and adds to self._dbs.
self._db_for_partition(tp.partition)
except rocksdb.errors.RocksIOError as exc:
if i == 4 or 'lock' not in repr(exc):
raise
self.log.info(
'DB for partition %r is locked! Retry in 1s...',
tp.partition)
await self.sleep(1.0)
else:
break
def _contains(self, key: bytes) -> bool:
for db in self._dbs_for_key(key):
# bloom filter: false positives possible, but not false negatives
if db.key_may_exist(key)[0] and db.get(key) is not None:
return True
return False
def _dbs_for_key(self, key: bytes) -> Iterable[DB]:
# Returns cached db if key is in index, otherwise all dbs
# for linear search.
try:
return [self._dbs[self._key_index[key]]]
except KeyError:
return self._dbs.values()
def _size(self) -> int:
return sum(self._size1(db) for db in self._dbs.values())
def _visible_keys(self, db: DB) -> Iterator[bytes]:
it = db.iterkeys() # noqa: B301
it.seek_to_first()
for key in it:
if key != self.offset_key:
yield key
def _visible_items(self, db: DB) -> Iterator[Tuple[bytes, bytes]]:
it = db.iteritems() # noqa: B301
it.seek_to_first()
for key, value in it:
if key != self.offset_key:
yield key, value
def _visible_values(self, db: DB) -> Iterator[bytes]:
for _, value in self._visible_items(db):
yield value
def _size1(self, db: DB) -> int:
return sum(1 for _ in self._visible_keys(db))
def _iterkeys(self) -> Iterator[bytes]:
for db in self._dbs.values():
yield from self._visible_keys(db)
def _itervalues(self) -> Iterator[bytes]:
for db in self._dbs.values():
yield from self._visible_values(db)
def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
for db in self._dbs.values():
yield from self._visible_items(db)
def _clear(self) -> None:
raise NotImplementedError('TODO') # XXX cannot reset tables
[docs] def reset_state(self) -> None:
self._dbs.clear()
self._key_index.clear()
with suppress(FileNotFoundError):
shutil.rmtree(self.path.absolute())
[docs] def partition_path(self, partition: int) -> Path:
p = self.path / self.basename
return self.with_suffix(p.with_name(f'{p.name}-{partition}'))
[docs] def with_suffix(self, path: Path, *, suffix: str = '.db') -> Path:
return path.with_suffix(suffix)
@property
def path(self) -> Path:
return self.app.conf.tabledir
@property
def basename(self) -> Path:
return Path(self.url.path)