Source code for faust.web.cache.backends.base

import abc
from typing import Any, ClassVar, Optional, Tuple, Type, Union

from mode import Service
from mode.utils.compat import AsyncContextManager
from mode.utils.contexts import asynccontextmanager
from mode.utils.logging import get_logger
from yarl import URL

from faust.types import AppT
from faust.types.web import CacheBackendT
from faust.web.cache.exceptions import CacheUnavailable

logger = get_logger(__name__)

E_CACHE_IRRECOVERABLE = 'Cache disabled for irrecoverable error: %r'
E_CACHE_INVALIDATING = 'Destroying cache for key %r caused error: %r'
E_CANNOT_INVALIDATE = 'Unable to invalidate key %r: %r'
E_CACHE_INOPERATIONAL = 'Cache operational error: %r'


[docs]class CacheBackend(CacheBackendT, Service): logger = logger Unavailable: Type[BaseException] = CacheUnavailable operational_errors: ClassVar[Tuple[Type[BaseException], ...]] = () invalidating_errors: ClassVar[Tuple[Type[BaseException], ...]] = () irrecoverable_errors: ClassVar[Tuple[Type[BaseException], ...]] = () def __init__(self, app: AppT, url: Union[URL, str] = 'memory://', **kwargs: Any) -> None: self.app = app self.url = URL(url) Service.__init__(self, **kwargs) @abc.abstractmethod async def _get(self, key: str) -> Optional[bytes]: ... @abc.abstractmethod async def _set(self, key: str, value: bytes, timeout: float) -> None: ... @abc.abstractmethod async def _delete(self, key: str) -> None: ...
[docs] async def get(self, key: str) -> Optional[bytes]: async with self._recovery_context(key): return await self._get(key)
[docs] async def set(self, key: str, value: bytes, timeout: float) -> None: assert timeout is not None async with self._recovery_context(key): await self._set(key, value, timeout)
[docs] async def delete(self, key: str) -> None: async with self._recovery_context(key): await self._delete(key)
@asynccontextmanager async def _recovery_context(self, key: str) -> AsyncContextManager: try: yield except self.irrecoverable_errors as exc: self.log.exception(E_CACHE_IRRECOVERABLE, exc) raise self.Unavailable(exc) except self.invalidating_errors as exc: self.log.warn(E_CACHE_INVALIDATING, key, exc, exc_info=1) try: await self._delete(key) except self.operational_errors + self.invalidating_errors as exc: self.log.exception(E_CANNOT_INVALIDATE, key, exc) raise self.Unavailable() except self.operational_errors as exc: self.log.warn(E_CACHE_INOPERATIONAL, exc, exc_info=1) raise self.Unavailable() def _repr_info(self) -> str: return f'url={self.url!r}'