Source code for faust.web.site
"""Website served by the Faust web server."""
from typing import Any, Sequence, Tuple, Type, Union
from mode import Service
from faust.types import AppT
from . import drivers
from .apps import graph
from .apps import router
from .apps import stats
from .apps import tables
from .base import Web
from .views import Site
__all__ = ['Website']
DEFAULT_DRIVER = 'aiohttp://'
[docs]class Website(Service):
"""Service starting the Faust web server and endpoints."""
web: Web
app: AppT
port: int
bind: str
pages: Sequence[Tuple[str, Type[Site]]] = [
('/graph', graph.Site),
('', stats.Site),
('/router', router.Site),
('/table', tables.Site),
]
def __init__(self,
app: AppT,
*,
port: int = None,
bind: str = None,
driver: Union[Type[Web], str] = DEFAULT_DRIVER,
extra_pages: Sequence[Tuple[str, Type[Site]]] = None,
**kwargs: Any) -> None:
super().__init__(**kwargs)
self.app = app
self.port = port or 6066
self.bind = bind or 'localhost'
self.init_driver(driver, **kwargs)
self.init_pages(extra_pages or [])
self.add_dependency(self.web)
[docs] def init_driver(self, driver: Union[Type[Web], str],
**kwargs: Any) -> None:
web_cls: Type[Web] = drivers.by_url(driver)
self.web: Web = web_cls(
self.app,
port=self.port,
bind=self.bind,
**kwargs)
self.app.on_webserver_init(self.web)
[docs] def init_pages(self,
extra_pages: Sequence[Tuple[str, Type[Site]]]) -> None:
app = self.app
pages = list(self.pages) + list(app.pages) + list(extra_pages or [])
for prefix, page in pages:
page(app).enable(self.web, prefix=prefix)