Source code for faust.web.cache.backends.base
"""Cache backend - base implementation."""
import abc
from typing import Any, ClassVar, Optional, Tuple, Type, Union
from mode import Service
from mode.utils.contexts import asynccontextmanager
from mode.utils.logging import get_logger
from mode.utils.typing import AsyncContextManager
from yarl import URL
from faust.types import AppT
from faust.types.web import CacheBackendT
from faust.web.cache.exceptions import CacheUnavailable
logger = get_logger(__name__)
E_CACHE_IRRECOVERABLE = 'Cache disabled for irrecoverable error: %r'
E_CACHE_INVALIDATING = 'Destroying cache for key %r caused error: %r'
E_CANNOT_INVALIDATE = 'Unable to invalidate key %r: %r'
E_CACHE_INOPERATIONAL = 'Cache operational error: %r'
[docs]class CacheBackend(CacheBackendT, Service):
"""Backend for cache operations."""
logger = logger
Unavailable: Type[BaseException] = CacheUnavailable
operational_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
invalidating_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
irrecoverable_errors: ClassVar[Tuple[Type[BaseException], ...]] = ()
def __init__(self,
app: AppT,
url: Union[URL, str] = 'memory://',
**kwargs: Any) -> None:
self.app = app
self.url = URL(url)
Service.__init__(self, **kwargs)
@abc.abstractmethod
async def _get(self, key: str) -> Optional[bytes]:
...
@abc.abstractmethod
async def _set(self, key: str, value: bytes, timeout: float) -> None:
...
@abc.abstractmethod
async def _delete(self, key: str) -> None:
...
[docs] async def get(self, key: str) -> Optional[bytes]:
"""Get cached-value by key."""
async with self._recovery_context(key):
return await self._get(key)
[docs] async def set(self, key: str, value: bytes, timeout: float) -> None:
"""Set cached-value by key."""
assert timeout is not None
async with self._recovery_context(key):
await self._set(key, value, timeout)
[docs] async def delete(self, key: str) -> None:
"""Forget value for cache key."""
async with self._recovery_context(key):
await self._delete(key)
@asynccontextmanager
async def _recovery_context(self, key: str) -> AsyncContextManager:
try:
yield
except self.irrecoverable_errors as exc:
self.log.exception(E_CACHE_IRRECOVERABLE, exc) # noqa: G200
raise self.Unavailable(exc)
except self.invalidating_errors as exc:
self.log.warning( # noqa: G200
E_CACHE_INVALIDATING, key, exc, exc_info=1)
try:
await self._delete(key)
except self.operational_errors + self.invalidating_errors as exc:
self.log.exception( # noqa: G200
E_CANNOT_INVALIDATE, key, exc)
raise self.Unavailable()
except self.operational_errors as exc:
self.log.warning( # noqa: G200
E_CACHE_INOPERATIONAL, exc, exc_info=1)
raise self.Unavailable()
def _repr_info(self) -> str:
return f'url={self.url!r}'