"""Base interface for Web server and views."""
import abc
import socket
from datetime import datetime
from http import HTTPStatus
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
Union,
)
from urllib.parse import quote
from mode import Service
from mode.utils.compat import want_str
from mode.utils.imports import SymbolArg, symbol_by_name
from yarl import URL
from faust.types import AppT
from faust.types.web import BlueprintT, ResourceOptions, View
__all__ = [
'DEFAULT_BLUEPRINTS',
'BlueprintManager',
'Request',
'Response',
'Web',
]
_bytes = bytes
_BPList = Iterable[Tuple[str, SymbolArg[Type[BlueprintT]]]]
DEFAULT_BLUEPRINTS: _BPList = [
('/router', 'faust.web.apps.router:blueprint'),
('/table', 'faust.web.apps.tables.blueprint'),
]
PRODUCTION_BLUEPRINTS: _BPList = [
('', 'faust.web.apps.production_index:blueprint'),
]
DEBUG_BLUEPRINTS: _BPList = [
('/graph', 'faust.web.apps.graph:blueprint'),
('', 'faust.web.apps.stats:blueprint'),
]
CONTENT_SEPARATOR: bytes = b'\r\n\r\n'
HEADER_SEPARATOR: bytes = b'\r\n'
HEADER_KEY_VALUE_SEPARATOR: bytes = b': '
[docs]class Response:
"""Web server response and status."""
@property
@abc.abstractmethod
def status(self) -> int:
"""Return the response status code."""
...
@property
@abc.abstractmethod
def body(self) -> _bytes:
"""Return the response body as bytes."""
...
@property
@abc.abstractmethod
def headers(self) -> MutableMapping:
"""Return mapping of response HTTP headers."""
...
@property
@abc.abstractmethod
def content_length(self) -> Optional[int]:
"""Return the size of the response body."""
...
@property
@abc.abstractmethod
def content_type(self) -> str:
"""Return the response content type."""
...
@property
@abc.abstractmethod
def charset(self) -> Optional[str]:
"""Return the response character set."""
...
@property
@abc.abstractmethod
def chunked(self) -> bool:
"""Return :const:`True` if response is chunked."""
...
@property
@abc.abstractmethod
def compression(self) -> bool:
"""Return :const:`True` if the response body is compressed."""
...
@property
@abc.abstractmethod
def keep_alive(self) -> Optional[bool]:
"""Return :const:`True` if HTTP keep-alive enabled."""
...
@property
@abc.abstractmethod
def body_length(self) -> int:
"""Size of HTTP response body."""
...
[docs]class BlueprintManager:
"""Manager of all blueprints."""
applied: bool
_enabled: List[Tuple[str, str]]
_active: MutableMapping[str, BlueprintT]
def __init__(self, initial: _BPList = None) -> None:
self.applied = False
self._enabled = list(initial) if initial else []
self._active = {}
[docs] def add(self, prefix: str, blueprint: SymbolArg[Type[BlueprintT]]) -> None:
"""Register blueprint with this app."""
if self.applied:
raise RuntimeError('Cannot add blueprints after server started')
self._enabled.append((prefix, blueprint))
[docs] def apply(self, web: 'Web') -> None:
"""Apply all blueprints."""
if not self.applied:
self.applied = True
for prefix, blueprint in self._enabled:
self._apply_blueprint(web, prefix, symbol_by_name(blueprint))
def _apply_blueprint(self,
web: 'Web',
prefix: str,
bp: BlueprintT) -> None:
self._active[bp.name] = bp
bp.register(web.app, url_prefix=prefix)
bp.init_webserver(web)
[docs]class Web(Service):
"""Web server and HTTP interface."""
default_blueprints: ClassVar[_BPList] = DEFAULT_BLUEPRINTS # noqa: E704
production_blueprints: ClassVar[_BPList] = PRODUCTION_BLUEPRINTS
debug_blueprints: ClassVar[_BPList] = DEBUG_BLUEPRINTS
app: AppT
driver_version: str
views: MutableMapping[str, View]
reverse_names: MutableMapping[str, str]
blueprints: BlueprintManager
content_separator: ClassVar[bytes] = CONTENT_SEPARATOR
header_separator: ClassVar[bytes] = HEADER_SEPARATOR
header_key_value_separator: ClassVar[bytes] = HEADER_KEY_VALUE_SEPARATOR
def __init__(self, app: AppT, **kwargs: Any) -> None:
self.app = app
self.views = {}
self.reverse_names = {}
blueprints = list(self.default_blueprints)
if self.app.conf.debug:
blueprints.extend(self.debug_blueprints)
else:
blueprints.extend(self.production_blueprints)
self.blueprints = BlueprintManager(blueprints)
Service.__init__(self, **kwargs)
[docs] @abc.abstractmethod
def text(self, value: str, *,
content_type: str = None,
status: int = 200,
reason: str = None,
headers: MutableMapping = None) -> Response:
"""Create text response, using "text/plain" content-type."""
...
[docs] @abc.abstractmethod
def html(self, value: str, *,
content_type: str = None,
status: int = 200,
reason: str = None,
headers: MutableMapping = None) -> Response:
"""Create HTML response from string, ``text/html`` content-type."""
...
[docs] @abc.abstractmethod
def json(self, value: Any, *,
content_type: str = None,
status: int = 200,
reason: str = None,
headers: MutableMapping = None) -> Response:
"""Create new JSON response.
Accepts any JSON-serializable value and will automatically
serialize it for you.
The content-type is set to "application/json".
"""
...
[docs] @abc.abstractmethod
def bytes(self,
value: _bytes,
*,
content_type: str = None,
status: int = 200,
reason: str = None,
headers: MutableMapping = None) -> Response:
"""Create new ``bytes`` response - for binary data."""
...
[docs] @abc.abstractmethod
def bytes_to_response(self, s: _bytes) -> Response:
"""Deserialize HTTP response from byte string."""
...
def _bytes_to_response(
self, s: _bytes) -> Tuple[HTTPStatus, Mapping, _bytes]:
status_code, _, payload = s.partition(self.content_separator)
headers, _, body = payload.partition(self.content_separator)
return (
HTTPStatus(int(status_code)),
dict(self._splitheader(h) for h in headers.splitlines()),
body,
)
def _splitheader(self, header: _bytes) -> Tuple[str, str]:
key, value = header.split(self.header_key_value_separator, 1)
return want_str(key.strip()), want_str(value.strip())
[docs] @abc.abstractmethod
def response_to_bytes(self, response: Response) -> _bytes:
"""Serialize HTTP response into byte string."""
...
def _response_to_bytes(
self, status: int, headers: Mapping, body: _bytes) -> _bytes:
return self.content_separator.join([
str(status).encode(),
self.content_separator.join([
self._headers_serialize(headers),
body,
]),
])
def _headers_serialize(self, headers: Mapping) -> _bytes:
return self.header_separator.join(
self.header_key_value_separator.join([
k if isinstance(k, _bytes) else k.encode('ascii'),
v if isinstance(v, _bytes) else v.encode('latin-1'),
])
for k, v in headers.items()
)
[docs] @abc.abstractmethod
def route(self,
pattern: str,
handler: Callable,
cors_options: Mapping[str, ResourceOptions] = None) -> None:
"""Add route for handler."""
...
[docs] @abc.abstractmethod
def add_static(self,
prefix: str,
path: Union[Path, str],
**kwargs: Any) -> None:
"""Add static route."""
...
[docs] @abc.abstractmethod
async def read_request_content(self, request: 'Request') -> _bytes:
"""Read HTTP body as bytes."""
...
[docs] @abc.abstractmethod
async def wsgi(self) -> Any:
"""WSGI entry point."""
...
[docs] def add_view(self, view_cls: Type[View], *,
prefix: str = '',
cors_options: Mapping[str, ResourceOptions] = None) -> View:
"""Add route for view."""
view: View = view_cls(self.app, self)
path = prefix.rstrip('/') + '/' + view.view_path.lstrip('/')
self.route(path, view, cors_options)
self.views[path] = view
self.reverse_names[view.view_name] = path
return view
[docs] def url_for(self, view_name: str, **kwargs: Any) -> str:
"""Get URL by view name.
If the provided view name has associated URL parameters,
those need to be passed in as kwargs, or a :exc:`TypeError`
will be raised.
"""
try:
path = self.reverse_names[view_name]
except KeyError:
raise KeyError(f'No view with name {view_name!r} found')
else:
return path.format(**{
k: self._quote_for_url(str(v)) for k, v in kwargs.items()})
def _quote_for_url(self, value: str) -> str:
return quote(value, safe='') # disable '/' being safe by default
[docs] def init_server(self) -> None:
"""Initialize and setup web server."""
self.blueprints.apply(self)
self.app.on_webserver_init(self)
@property
def url(self) -> URL:
"""Return the canonical URL to this worker (including port)."""
canon = self.app.conf.canonical_url
if canon.host == socket.gethostname():
return URL(f'http://localhost:{self.app.conf.web_port}/')
return self.app.conf.canonical_url
[docs]class Request(abc.ABC):
"""HTTP Request."""
method: str
headers: Mapping[str, str]
url: URL
rel_url: URL
query_string: str
keep_alive: bool
body_exists: bool
user: Any
if_modified_since: Optional[datetime]
if_unmodified_since: Optional[datetime]
if_range: Optional[datetime]
[docs] @abc.abstractmethod
def can_read_body(self) -> bool:
"""Return :const:`True` if the request has a body."""
...
[docs] @abc.abstractmethod
async def read(self) -> bytes:
"""Read post data as bytes."""
...
[docs] @abc.abstractmethod
async def text(self) -> str:
"""Read post data as text."""
...
[docs] @abc.abstractmethod
async def json(self) -> Any:
"""Read post data and deserialize as JSON."""
...
[docs] @abc.abstractmethod
async def post(self) -> Mapping[str, str]:
"""Read post data."""
...
@property
@abc.abstractmethod
def match_info(self) -> Mapping[str, str]:
"""Return match info from URL route as a mapping."""
...
@property
@abc.abstractmethod
def query(self) -> Mapping[str, str]:
"""Return HTTP query parameters as a mapping."""
...
@property
@abc.abstractmethod
def cookies(self) -> Mapping[str, Any]:
"""Return cookies as a mapping."""
...