"""Program ``faust worker`` used to start application from console."""
import os
import platform
import socket
import typing
from typing import Any, Iterable, Optional
import click
from mode.utils.imports import symbol_by_name
from mode.utils.logging import level_name
from yarl import URL
from ._env import BLOCKING_TIMEOUT, WEB_BIND, WEB_PORT
from .base import AppCommand, TCPPort, WritableFilePath, option
if typing.TYPE_CHECKING:
from faust.worker import Worker
else:
class Worker: ... # noqa
__all__ = ['worker']
FAUST = 'ƒaµS†'
# XXX mypy borks if we do `from faust import __version`.
faust_version: str = symbol_by_name('faust:__version__')
LOGLEVELS = (
'CRIT',
'ERROR',
'WARN',
'INFO',
'DEBUG',
)
DEFAULT_LOGLEVEL = 'WARN'
class CaseInsensitiveChoice(click.Choice):
def __init__(self, choices: Iterable[Any]) -> None:
self.choices = [str(val).lower() for val in choices]
def convert(self,
value: str,
param: Optional[click.Parameter],
ctx: Optional[click.Context]) -> Any:
if value.lower() in self.choices:
return value
return super().convert(value, param, ctx)
[docs]class worker(AppCommand):
"""Start ƒaust worker instance."""
options = [
option('--logfile', '-f',
default=None, type=WritableFilePath,
help='Path to logfile (default is <stderr>).'),
option('--loglevel', '-l',
default=DEFAULT_LOGLEVEL, type=CaseInsensitiveChoice(LOGLEVELS),
help='Logging level to use.'),
option('--blocking-timeout',
default=BLOCKING_TIMEOUT, type=float,
help='Blocking detector timeout (requires --debug).'),
option('--web-port', '-p',
default=WEB_PORT, type=TCPPort(),
help='Port to run web server on.'),
option('--web-bind', '-b', default=WEB_BIND, type=str),
option('--web-host', '-h',
default=socket.gethostname(), type=str,
help='Canonical host name for the web server.'),
option('--console-port',
default=50101, type=TCPPort(),
help='(when --debug:) Port to run debugger console on.'),
]
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self.start_worker(
*self.args + args,
**{**self.kwargs, **kwargs})
[docs] def start_worker(self, logfile: str, loglevel: str,
blocking_timeout: float, web_port: int, web_bind: str,
web_host: str, console_port: int) -> Any:
self.app.conf.canonical_url = URL(f'http://{web_host}:{web_port}')
worker = self.app.Worker(
debug=self.debug,
quiet=self.quiet,
logfile=logfile,
loglevel=loglevel,
web_port=web_port,
web_bind=web_bind,
web_host=web_host,
console_port=console_port,
)
self.say(self.banner(worker))
return worker.execute_from_commandline()
[docs] def banner(self, worker: Worker) -> str:
"""Generate the text banner emitted before the worker starts."""
app = worker.app
loop = worker.loop
website = worker.website
transport_extra = ''
# uvloop didn't leave us with any way to identify itself,
# and also there's no uvloop.__version__ attribute.
if loop.__class__.__module__ == 'uvloop':
transport_extra = '+uvloop'
if 'gevent' in loop.__class__.__module__:
transport_extra = '+gevent'
logfile = worker.logfile if worker.logfile else '-stderr-'
loglevel = level_name(worker.loglevel or 'WARN').lower()
data = [
('id', app.conf.id),
('transport', f'{app.conf.broker} {transport_extra}'),
('store', app.conf.store),
('web', website.web.url),
('log', f'{logfile} ({loglevel})'),
('pid', f'{os.getpid()}'),
('hostname', f'{socket.gethostname()}'),
('platform', self.platform()),
('drivers', '{transport_v} {http_v}'.format(
transport_v=app.transport.driver_version,
http_v=website.web.driver_version)),
('datadir', f'{str(app.conf.datadir.absolute()):<40}'),
('appdir', f'{str(app.conf.appdir.absolute()):<40}'),
]
table = self.table(
[(self.bold(x), y) for x, y in data],
title=self.faust_ident(),
)
table.inner_heading_row_border = False
table.inner_row_border = False
return table.table
[docs] def faust_ident(self) -> str:
return self.color('hiblue', f'{FAUST} v{faust_version}')