"""Command-line programs using :pypi:`click`."""
import abc
import asyncio
import inspect
import os
import sys
from functools import wraps
from pathlib import Path
from textwrap import wrap
from types import ModuleType
from typing import (

import click
import click_completion
from click import echo
from colorclass import Color, disable_all_colors, enable_all_colors
from mode.utils import text
from mode.utils.compat import want_bytes
from mode.utils.imports import import_from_cwd, symbol_by_name

from faust.types import AppT, CodecArg, ModelT
from faust.utils import json
from faust.utils import terminal

from ._env import DATADIR, DEBUG, WORKDIR

__all__ = [


argument = click.argument
option = click.option

LOOP_CHOICES = ('aio', 'gevent', 'eventlet', 'uvloop')

faust_version = symbol_by_name('faust:__version__')

[docs]class TCPPort(click.IntRange): """CLI option: TCP Port (integer in range 1 - 65535).""" name = 'range[1-65535]' def __init__(self) -> None: super().__init__(1, 65535)
WritableDirectory = click.Path( exists=False, # create if needed, file_okay=False, # must be directory, dir_okay=True, # not file; writable=True, # and read/write access. readable=True, # ) WritableFilePath = click.Path( exists=False, # create if needed, file_okay=True, # must be file, dir_okay=False, # not directory; writable=True, # and read/write access. readable=True, # ) builtin_options: Sequence[Callable] = [ click.version_option(version=f'Faust {faust_version}'), option('--app', '-A', help='Path of Faust application to use, or the name of a module.'), option('--quiet/--no-quiet', '-q', default=False, help='Silence output to <stdout>/<stderr>.'), option('--debug/--no-debug', default=DEBUG, help='Enable debugging output, and the blocking detector.'), option('--no_color/--color', default=False, help='Enable colors in output.'), option('--workdir', '-W', default=WORKDIR, type=WritableDirectory, help='Working directory to change to after start.'), option('--datadir', '-D', default=DATADIR, type=WritableDirectory, help='Directory to keep application state.'), option('--json', default=False, is_flag=True, help='Return output in machine-readable JSON format'), option('--loop', '-L', default=DEFAULT_LOOP, type=click.Choice(LOOP_CHOICES), help='Event loop implementation to use.'), ]
[docs]def find_app(app: str, *, symbol_by_name: Callable = symbol_by_name, imp: Callable = import_from_cwd) -> AppT: """Find app by string like ``examples.simple``. Notes: This function uses ``import_from_cwd`` to temporarily add the current working directory to :envvar:`PYTHONPATH`, such that when importing the app it will search the current working directory last. You can think of it as temporarily running with the :envvar:`PYTHONPATH` set like this: .. sourcecode: console $ PYTHONPATH="${PYTHONPATH}:." You can disable this with the ``imp`` keyword argument, for example passing ``imp=importlib.import_module``. Examples: >>> # If providing the name of a module, it will attempt >>> # to find an attribute name (.app) in that module. >>> # Example below is the same as importing:: >>> # from examples.simple import app >>> find_app('examples.simple') >>> # If you want an attribute other than .app you can >>> # use : to separate module and attribute. >>> # Examples below is the same as importing:: >>> # from examples.simple import my_app >>> find_app('examples.simple:my_app') >>> # You can also use period for the module/attribute separator >>> find_app('examples.simple.my_app') """ try: # Try to import name' as is. val = symbol_by_name(app, imp=imp) except AttributeError: # last part (of "pkg.x") was not an attribute, # but a module instead: use imp to import_module. val = imp(app) if isinstance(val, ModuleType) and ':' not in app: # if we found a module, try to get .app attribute found = # type: ignore if isinstance(found, ModuleType): # where x is a module raise AttributeError(f'Looks like module, not app: -A {app}') val = found return prepare_app(val, app)
def prepare_app(app: AppT, name: Optional[str]) -> AppT: app.finalize() if app.conf.origin is None: app.conf.origin = name if app.conf.autodiscover: # Hack to fix cProfile support. main = sys.modules.get('__main__') if main is not None and '' in getattr(main, '__file__', ''): from ..models import registry registry.update({ (app.conf.origin or '') + k[8:]: v for k, v in registry.items() if k.startswith('cProfile.') }) return app # We just use this to apply many @click.option/@click.argument # decorators in the same decorator. def _apply_options(options: Sequence[Callable]) -> Callable: """Add list of ``click.option`` values to click command function.""" def _inner(fun: Callable) -> Callable: for opt in options: fun = opt(fun) return fun return _inner class _Group(click.Group): def get_help(self, ctx: click.Context) -> str: self._maybe_import_app() return super().get_help(ctx) def get_usage(self, ctx: click.Context) -> str: self._maybe_import_app() return super().get_usage(ctx) def _maybe_import_app(self, argv: Sequence[str] = sys.argv) -> None: # This is here so that custom AppCommand defined in example/ # works and is included in --help/usage, etc. when using the faust # command like: # $ faust -A example.myapp --help # # This is not necessary when using app.main(), since that always # imports the app module before creating the cli() object: # $ python example/ --help workdir = self._extract_param(argv, '-W', '--workdir') if workdir: os.chdir(Path(workdir).absolute()) appstr = self._extract_param(argv, '-A', '--app') if appstr is not None: find_app(appstr) def _extract_param(self, argv: Sequence[str], shortopt: str, longopt: str) -> Optional[str]: for i, arg in enumerate(argv): if arg == shortopt: try: return argv[i + 1] except IndexError: raise click.UsageError(f'Missing argument for {shortopt}') elif arg.startswith(longopt): if '=' in arg: _, _, value = arg.partition('=') return value else: try: return argv[i + 1] except IndexError: raise click.UsageError( f'Missing argument for {longopt}') return None @no_type_check # mypy bugs out on this def make_context(self, info_name: str, args: str, app: AppT = None, parent: click.Context = None, **extra: Any) -> click.Context: ctx = super().make_context(info_name, args, **extra) self._maybe_import_app() ctx.find_root().app = app return ctx # This is the thing that app.main(), ``python -m faust -A ...``, # and ``faust -A ..`` calls (see also faust/, and # in the git repository (entrypoints).) @_apply_options(builtin_options) @click.pass_context def cli(ctx: click.Context, app: str, quiet: bool, debug: bool, workdir: str, datadir: str, json: bool, no_color: bool, loop: str) -> None: """Faust command-line interface.""" ctx.obj = { 'app': app, 'quiet': quiet, 'debug': debug, 'workdir': workdir, 'datadir': datadir, 'json': json, 'no_color': no_color, 'loop': loop, } if workdir: os.environ['F_WORKDIR'] = workdir # XXX I'm not sure this is the best place to chdir [ask] os.chdir(Path(workdir).absolute()) if datadir: # This is the only way we can set the datadir for App.__init__, # so that default values will have the right path prefix. # WARNING: Note that the module *MUST not* have # been imported before setting the envvar. os.environ['F_DATADIR'] = datadir if not no_color and terminal.isatty(sys.stdout): enable_all_colors() else: disable_all_colors() if json: disable_all_colors()
[docs]class Command(abc.ABC): """Base class for subcommands.""" UsageError: Type[Exception] = click.UsageError # To subclass this you only need to define: # # run for an async command: # # async def run(self, *args, **kwargs) -> None: # ... # or for a non-async command you override __call__: # # def __call__(self, *args, **kwargs) -> Any: # ... abstract: ClassVar[bool] = True _click: Any = None debug: bool quiet: bool workdir: str datadir: str json: bool no_color: bool builtin_options: List = builtin_options options: Optional[List] = None args: Tuple kwargs: Dict prog_name: str = ''
[docs] @classmethod def as_click_command(cls) -> Callable: # This is what actually registers the commands into the # :pypi:`click` command-line interface (the ``def cli`` main above). # __init_subclass__ calls this for the side effect of being # registered as a `faust` subcommand. @click.pass_context @wraps(cls) def _inner(*args: Any, **kwargs: Any) -> Callable: cmd = cls(*args, **kwargs) # type: ignore return cmd() return _apply_options(cls.options or [])( cli.command(help=cls.__doc__)(_inner))
def __init_subclass__(self, *args: Any, **kwargs: Any) -> None: if self.abstract: # sets the class attribute, so next time # Command is inherited from it will have abstract=False, # unless you set the attribute again in that subclass:: # class MyAbstractCommand(Command): # abstract: ClassVar[bool] = True # # class x(MyAbstractCommand): # async def run(self) -> None: # print('just here to experience this execution') self.abstract = False else: self._click = self.as_click_command() # This hack creates the Command.parse method used to parse # command-line arguments in sys.argv and returns a dict. _apply_options(self.builtin_options)(self._parse) _apply_options(self.options or [])(self._parse)
[docs] @classmethod def parse(cls, argv: Sequence[str]) -> Mapping: """Parse command-line arguments in argv' and return mapping.""" return cls._parse(argv, standalone_mode=False)
@staticmethod @click.command() def _parse(**kwargs: Any) -> Mapping: return kwargs def __init__(self, ctx: click.Context, *args: Any, **kwargs: Any) -> None: self.ctx = ctx self.debug = self.ctx.obj['debug'] self.quiet = self.ctx.obj['quiet'] self.workdir = self.ctx.obj['workdir'] self.datadir = self.ctx.obj['datadir'] self.json = self.ctx.obj['json'] self.no_color = self.ctx.obj['no_color'] self.args = args self.kwargs = kwargs self.prog_name = self.ctx.find_root().command_path
[docs] @no_type_check # Subclasses can omit *args, **kwargs in signature. async def run(self, *args: Any, **kwargs: Any) -> Any: # NOTE: If you override __call__ below, you have a non-async command. # This is used by .worker to call the # Worker.execute_from_commandline() method. ...
def __call__(self, *args: Any, **kwargs: Any) -> Any: loop = asyncio.get_event_loop() args = self.args + args kwargs = {**self.kwargs, **kwargs} return loop.run_until_complete(*args, **kwargs))
[docs] def tabulate(self, data: terminal.TableDataT, headers: Sequence[str] = None, wrap_last_row: bool = True, title: str = '', title_color: str = 'blue', **kwargs: Any) -> str: """Create an ANSI representation of a table of two-row tuples. See Also: Keyword arguments are forwarded to :class:`terminaltables.SingleTable` Note: If the :option:`--json <faust --json>` option is enabled this returns json instead. """ if self.json: return self._tabulate_json(data, headers=headers) if headers: data = [headers] + list(data) title = self.bold(self.color(title_color, title)) table = self.table(data, title=title, **kwargs) if wrap_last_row: # slow, but not big data data = [ list(item[:-1]) + [self._table_wrap(table, item[-1])] for item in data ] return table.table
def _tabulate_json(self, data: terminal.TableDataT, headers: Sequence[str] = None) -> str: if headers: return json.dumps([dict(zip(headers, row)) for row in data]) return json.dumps(data)
[docs] def table(self, data: terminal.TableDataT, title: str = '', **kwargs: Any) -> terminal.Table: """Format table data as ANSI/ASCII table.""" return terminal.table(data, title=title, target=sys.stdout, **kwargs)
[docs] def color(self, name: str, text: str) -> str: """Return text having a certain color by name. Examples:: >>> self.color('blue', 'text_to_color') >>> self.color('hiblue', text_to_color') See Also: :pypi:`colorclass`: for a list of available colors. """ return Color(f'{{{name}}}{text}{{/{name}}}')
[docs] def dark(self, text: str) -> str: """Return cursor text.""" return self.color('autoblack', text)
[docs] def bold(self, text: str) -> str: """Return text in bold.""" return self.color('b', text)
[docs] def bold_tail(self, text: str, *, sep: str = '.') -> str: """Put bold emphasis on the last part of a string.""" head, fsep, tail = text.rpartition(sep) return fsep.join([head, self.bold(tail)])
def _table_wrap(self, table: terminal.Table, text: str) -> str: max_width = max(table.column_max_width(1), 10) return '\n'.join(wrap(text, max_width))
[docs] def say(self, *args: Any, **kwargs: Any) -> None: """Print something to stdout (or use ``file=stderr`` kwarg). Note: Does not do anything if the :option:`--quiet <faust --quiet>` option is enabled. """ if not self.quiet: echo(*args, **kwargs)
[docs] def carp(self, s: Any, **kwargs: Any) -> None: """Print something to stdout (or use ``file=stderr`` kwargs). Note: Does not do anything if the :option:`--debug <faust --debug>` option is enabled. """ if self.debug: print(f'#-- {s}', **kwargs)
[docs] def dumps(self, obj: Any) -> str: return json.dumps(obj)
[docs]class AppCommand(Command): """Command that takes ``-A app`` as argument.""" abstract: ClassVar[bool] = True app: AppT require_app = True #: The :term:`codec` used to serialize keys. #: Taken from instance parameters or :attr:`@key_serializer`. key_serializer: CodecArg #: The :term:`codec` used to serialize values. #: Taken from instance parameters or :attr:`@value_serializer`. value_serialier: CodecArg
[docs] @classmethod def from_handler( cls, *options: Any, **kwargs: Any) -> Callable[[Callable], Type['AppCommand']]: def _inner(fun: Callable[..., Awaitable[Any]]) -> Type['AppCommand']: target: Any = fun if not inspect.signature(fun).parameters: # if it does not take self argument, use staticmethod target = staticmethod(fun) fields = { 'run': target, '__doc__': fun.__doc__, '__name__': fun.__name__, '__qualname__': fun.__qualname__, '__module__': fun.__module__, '__wrapped__': fun, 'options': options, } return type(fun.__name__, (cls,), {**fields, **kwargs}) return _inner
def __init__(self, ctx: click.Context, *args: Any, key_serializer: CodecArg = None, value_serializer: CodecArg = None, **kwargs: Any) -> None: super().__init__(ctx) = getattr(ctx.find_root(), 'app', None) if is not None: # XXX How to find full argv[0] with click? origin = if sys.argv: prog = Path(sys.argv[0]).absolute() paths = [] p = prog.parent # find lowermost path, that is a package while p: if not (p / '').is_file(): break paths.append(p) p = p.parent package = '.'.join( [ for p in paths] + [prog.with_suffix('').name]) if package.endswith('.__main__'): # when `python -m pkg`: remove .__main__ from pkg.__main__ package = package[:-9] origin = package prepare_app(, origin) else: appstr = self.ctx.obj['app'] if appstr: = find_app(appstr) conf = key_serializer = key_serializer or conf.key_serializer value_serializer = value_serializer or conf.value_serializer else: if self.require_app: raise self.UsageError( 'Need to specify app using -A parameter') self.args = args self.kwargs = kwargs self.key_serializer = key_serializer self.value_serializer = value_serializer
[docs] def to_key(self, typ: Optional[str], key: str) -> Any: """Convert command-line argument string to model (key). Arguments: typ: The name of the model to create. key: The string json of the data to populate it with. Notes: Uses :attr:`key_serializer` to set the :term:`codec` for the key (e.g. ``"json"``), as set by the :option:`--key-serializer <faust send --key-serializer>` option. """ return self.to_model(typ, key, self.key_serializer)
[docs] def to_value(self, typ: Optional[str], value: str) -> Any: """Convert command-line argument string to model (value). Arguments: typ: The name of the model to create. key: The string json of the data to populate it with. Notes: Uses :attr:`value_serializer` to set the :term:`codec` for the value (e.g. ``"json"``), as set by the :option:`--value-serializer <faust send --value-serializer>` option. """ return self.to_model(typ, value, self.value_serializer)
[docs] def to_model(self, typ: Optional[str], value: str, serializer: CodecArg) -> Any: """Convert command-line argument to model. Generic version of :meth:`to_key`/:meth:`to_value`. Arguments: typ: The name of the model to create. key: The string json of the data to populate it with. serializer: The argument setting it apart from to_key/to_value enables you to specify a custom serializer not mandated by :attr:`key_serializer`, and :attr:`value_serializer`. Notes: Uses :attr:`value_serializer` to set the :term:`codec` for the value (e.g. ``"json"``), as set by the :option:`--value-serializer <faust send --value-serializer>` option. """ if typ: model: ModelT = self.import_relative_to_app(typ) return model.loads( want_bytes(value), default_serializer=serializer) return want_bytes(value)
[docs] def import_relative_to_app(self, attr: str) -> Any: """Import string like "module.Model", or "Model" to model class.""" try: return symbol_by_name(attr) except ImportError as original_exc: if not raise root, _, _ =':') try: return symbol_by_name(f'{root}.models.{attr}') except ImportError: try: return symbol_by_name(f'{root}.{attr}') except ImportError: raise original_exc from original_exc
[docs] def to_topic(self, entity: str) -> Any: """Convert topic name given on command-line to ``app.topic()``.""" if not entity: raise self.UsageError('Missing topic/@agent name') if entity.startswith('@'): # actor prefix: e.g. `faust send @myactorname` return self.import_relative_to_app(entity[1:]) return
[docs] def abbreviate_fqdn(self, name: str, *, prefix: str = '') -> str: """Abbreviate fully-qualified Python name, by removing origin. ``app.conf.origin`` is the package where the app is defined, so if this is ``examples.simple`` it returns the truncated:: >>> app.conf.origin 'examples.simple' >>> abbr_fqdn(app.conf.origin, ... 'examples.simple.Withdrawal', ... prefix='[...]') '[...]Withdrawal' but if the package is not part of origin it provides the full path:: >>> abbr_fqdn(app.conf.origin, ... 'examples.other.Foo', prefix='[...]') '' """ return text.abbr_fqdn(, name, prefix=prefix)