Source code for faust.types.web

import abc
import typing
from pathlib import Path
from typing import Any, Awaitable, Callable, Optional, Type, Union

from aiohttp.client import ClientSession as HttpClientT
from mode import Seconds, ServiceT
from mypy_extensions import Arg, KwArg, VarArg
from yarl import URL

if typing.TYPE_CHECKING:
    from faust.types.app import AppT
    from faust.web.base import Request, Response, Web
    from faust.web.views import View
else:
    class AppT: ...           # noqa
[docs] class Request: ... # noqa
[docs] class Response: ... # noqa
[docs] class Web: ... # noqa
[docs] class View: ... # noqa
__all__ = [ 'Request', 'Response', 'View', 'ViewHandlerMethod', 'ViewHandlerFun', 'ViewDecorator', 'PageArg', 'HttpClientT', 'Web', 'CacheBackendT', 'CacheT', 'BlueprintT', ] ViewHandlerMethod = Callable[ [Arg(Request), VarArg(Any), KwArg(Any)], Awaitable[Response], ] ViewHandlerFun = Callable[ [Arg(View), Arg(Request), VarArg(Any), KwArg(Any)], Awaitable[Response], ] ViewGetHandler = ViewHandlerFun # XXX compat ViewDecorator = Callable[[ViewHandlerFun], ViewHandlerFun] RoutedViewGetHandler = ViewDecorator # XXX compat PageArg = Union[Type[View], ViewHandlerFun] RouteDecoratorRet = Callable[[PageArg], PageArg]
[docs]class CacheBackendT(ServiceT): @abc.abstractmethod def __init__(self, app: AppT, url: Union[URL, str] = 'memory://', **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod async def get(self, key: str) -> Optional[bytes]: ...
[docs] @abc.abstractmethod async def set(self, key: str, value: bytes, timeout: float) -> None: ...
[docs] @abc.abstractmethod async def delete(self, key: str) -> None: ...
[docs]class CacheT(abc.ABC): timeout: Seconds key_prefix: str backend: Optional[Union[Type[CacheBackendT], str]] @abc.abstractmethod def __init__(self, timeout: Seconds = None, key_prefix: str = None, backend: Union[Type[CacheBackendT], str] = None, **kwargs: Any) -> None: ...
[docs] @abc.abstractmethod def view(self, timeout: Seconds = None, key_prefix: str = None, **kwargs: Any) -> Callable[[Callable], Callable]: ...
[docs]class BlueprintT(abc.ABC): name: str url_prefix: Optional[str]
[docs] @abc.abstractmethod def cache(self, timeout: Seconds = None, key_prefix: str = None, backend: Union[Type[CacheBackendT], str] = None) -> CacheT: ...
[docs] @abc.abstractmethod def route(self, uri: str, *, name: Optional[str] = None, base: Type[View] = View) -> RouteDecoratorRet: ...
[docs] @abc.abstractmethod def static(self, uri: str, file_or_directory: Union[str, Path], *, name: Optional[str] = None) -> None: ...
[docs] @abc.abstractmethod def register(self, app: AppT, *, url_prefix: Optional[str] = None) -> None: ...
[docs] @abc.abstractmethod def init_webserver(self, web: Web) -> None: ...
[docs] @abc.abstractmethod def on_webserver_init(self, web: Web) -> None: ...