"""Program ``faust worker`` used to start application from console."""
import asyncio
import os
import platform
import socket
from typing import Any, List, Optional, cast
from mode import ServiceT, Worker
from mode.utils.imports import symbol_by_name
from mode.utils.logging import level_name
from yarl import URL
from faust.types import AppT
from faust.types._env import WEB_BIND, WEB_PORT, WEB_TRANSPORT
from . import params
from .base import AppCommand, now_builtin_worker_options, option
__all__ = ['worker']
FAUST = 'ƒaµS†'
# XXX mypy borks if we do `from faust import __version`.
faust_version: str = symbol_by_name('faust:__version__')
[docs]class worker(AppCommand):
"""Start worker instance for given app."""
daemon = True
redirect_stdouts = True
worker_options = [
option('--with-web/--without-web',
default=True,
help='Enable/disable web server and related components.'),
option('--web-port', '-p',
default=None, type=params.TCPPort(),
help=f'Port to run web server on (default: {WEB_PORT})'),
option('--web-transport',
default=None, type=params.URLParam(),
help=f'Web server transport (default: {WEB_TRANSPORT})'),
option('--web-bind', '-b', type=str),
option('--web-host', '-h',
default=socket.gethostname(), type=str,
help=f'Canonical host name for the web server '
f'(default: {WEB_BIND})'),
]
options = (cast(List, worker_options) +
cast(List, now_builtin_worker_options))
[docs] def on_worker_created(self, worker: Worker) -> None:
"""Print banner when worker starts."""
self.say(self.banner(worker))
[docs] def as_service(self, loop: asyncio.AbstractEventLoop,
*args: Any, **kwargs: Any) -> ServiceT:
"""Return the service this command should execute.
For the worker we simply start the application itself.
Note:
The application will be started using a :class:`faust.Worker`.
"""
self._init_worker_options(*args, **kwargs)
return self.app
def _init_worker_options(self,
*args: Any,
with_web: bool,
web_port: Optional[int],
web_bind: Optional[str],
web_host: str,
web_transport: URL,
**kwargs: Any) -> None:
self.app.conf.web_enabled = with_web
if web_port is not None:
self.app.conf.web_port = web_port
if web_bind:
self.app.conf.web_bind = web_bind
if web_host is not None:
self.app.conf.web_host = web_host
if web_transport is not None:
self.app.conf.web_transport = web_transport
@property
def _Worker(self) -> Worker:
# using Faust worker to start the app, not command code.
return self.app.conf.Worker
[docs] def banner(self, worker: Worker) -> str:
"""Generate the text banner emitted before the worker starts."""
app = worker.app
loop = worker.loop
transport_extra = ''
# uvloop didn't leave us with any way to identify itself,
# and also there's no uvloop.__version__ attribute.
if loop.__class__.__module__ == 'uvloop':
transport_extra = '+uvloop'
logfile = worker.logfile if worker.logfile else '-stderr-'
loglevel = level_name(worker.loglevel or 'WARN').lower()
compiler = platform.python_compiler()
cython_info = None
try:
import faust._cython.windows # noqa: F401
except ImportError:
pass
else:
cython_info = (' +', f'Cython ({compiler})')
data = list(filter(None, [
('id', app.conf.id),
('transport', f'{app.conf.broker} {transport_extra}'),
('store', app.conf.store),
('web', app.web.url) if app.conf.web_enabled else None,
('log', f'{logfile} ({loglevel})'),
('pid', f'{os.getpid()}'),
('hostname', f'{socket.gethostname()}'),
('platform', self.platform()),
cython_info if cython_info else None,
('drivers', ''),
(' transport', app.transport.driver_version),
(' web', app.web.driver_version),
('datadir', f'{str(app.conf.datadir.absolute()):<40}'),
('appdir', f'{str(app.conf.appdir.absolute()):<40}'),
]))
table = self.table(
[(self.bold(x), str(y)) for x, y in data],
title=self.faust_ident(),
)
table.inner_heading_row_border = False
table.inner_row_border = False
return table.table
def _driver_versions(self, app: AppT) -> List[str]:
return [
app.transport.driver_version,
app.web.driver_version,
]
[docs] def faust_ident(self) -> str:
"""Return Faust version information as ANSI string."""
return self.color('hiblue', f'{FAUST} v{faust_version}')