import abc
import typing
from pathlib import Path
from typing import Any, Awaitable, Callable, Optional, Type, Union
from aiohttp.client import ClientSession as HttpClientT
from mode import Seconds, ServiceT
from mypy_extensions import Arg, KwArg, VarArg
from yarl import URL
if typing.TYPE_CHECKING:
from faust.types.app import AppT
from faust.web.base import Request, Response, Web
from faust.web.views import View
else:
class AppT: ... # noqa
[docs] class Request: ... # noqa
[docs] class Response: ... # noqa
__all__ = [
'Request',
'Response',
'View',
'ViewHandlerMethod',
'ViewHandlerFun',
'ViewDecorator',
'PageArg',
'HttpClientT',
'Web',
'CacheBackendT',
'CacheT',
'BlueprintT',
]
ViewHandlerMethod = Callable[
[Arg(Request), VarArg(Any), KwArg(Any)],
Awaitable[Response],
]
ViewHandlerFun = Callable[
[Arg(View), Arg(Request), VarArg(Any), KwArg(Any)],
Awaitable[Response],
]
ViewGetHandler = ViewHandlerFun # XXX compat
ViewDecorator = Callable[[ViewHandlerFun], ViewHandlerFun]
RoutedViewGetHandler = ViewDecorator # XXX compat
PageArg = Union[Type[View], ViewHandlerFun]
RouteDecoratorRet = Callable[[PageArg], PageArg]
[docs]class CacheBackendT(ServiceT):
@abc.abstractmethod
def __init__(self,
app: AppT,
url: Union[URL, str] = 'memory://',
**kwargs: Any) -> None:
...
[docs] @abc.abstractmethod
async def get(self, key: str) -> Optional[bytes]:
...
[docs] @abc.abstractmethod
async def set(self, key: str, value: bytes, timeout: float) -> None:
...
[docs] @abc.abstractmethod
async def delete(self, key: str) -> None:
...
[docs]class CacheT(abc.ABC):
timeout: Seconds
key_prefix: str
backend: Optional[Union[Type[CacheBackendT], str]]
@abc.abstractmethod
def __init__(self,
timeout: Seconds = None,
key_prefix: str = None,
backend: Union[Type[CacheBackendT], str] = None,
**kwargs: Any) -> None:
...
[docs] @abc.abstractmethod
def view(self,
timeout: Seconds = None,
key_prefix: str = None,
**kwargs: Any) -> Callable[[Callable], Callable]:
...
[docs]class BlueprintT(abc.ABC):
name: str
url_prefix: Optional[str]
[docs] @abc.abstractmethod
def cache(self,
timeout: Seconds = None,
key_prefix: str = None,
backend: Union[Type[CacheBackendT], str] = None) -> CacheT:
...
[docs] @abc.abstractmethod
def route(self,
uri: str,
*,
name: Optional[str] = None,
base: Type[View] = View) -> RouteDecoratorRet:
...
[docs] @abc.abstractmethod
def static(self,
uri: str,
file_or_directory: Union[str, Path],
*,
name: Optional[str] = None) -> None:
...
[docs] @abc.abstractmethod
def register(self, app: AppT,
*,
url_prefix: Optional[str] = None) -> None:
...
[docs] @abc.abstractmethod
def init_webserver(self, web: Web) -> None:
...
[docs] @abc.abstractmethod
def on_webserver_init(self, web: Web) -> None:
...