"""Faust Application.
An app is an instance of the Faust library.
Everything starts here.
"""
import asyncio
import importlib
import inspect
import re
import typing
import warnings
from datetime import tzinfo
from functools import wraps
from itertools import chain
from typing import (
Any,
AsyncIterable,
Awaitable,
Callable, Iterable,
Iterator,
List,
Mapping,
MutableMapping,
MutableSequence,
Optional,
Pattern,
Set,
Type,
Union,
cast,
)
from mode import Seconds, Service, ServiceT, SupervisorStrategyT, want_seconds
from mode.utils.aiter import aiter
from mode.utils.collections import force_mapping
from mode.utils.futures import stampede
from mode.utils.imports import import_from_cwd, smart_import
from mode.utils.logging import flight_recorder
from mode.utils.objects import cached_property
from mode.utils.queues import FlowControlEvent, ThrowableQueue
from mode.utils.types.trees import NodeT
from faust import transport
from faust.agents import (
AgentFun,
AgentManager,
AgentT,
ReplyConsumer,
SinkT,
)
from faust.channels import Channel, ChannelT
from faust.exceptions import ConsumerNotStarted, ImproperlyConfigured, SameNode
from faust.fixups import FixupT, fixups
from faust.sensors import Monitor, SensorDelegate
from faust.utils import cron, venusian
from faust.web import drivers as web_drivers
from faust.web.cache import backends as cache_backends
from faust.web.views import View
from faust.types._env import STRICT
from faust.types.app import AppT, BootStrategyT, TaskArg
from faust.types.assignor import LeaderAssignorT, PartitionAssignorT
from faust.types.codecs import CodecArg
from faust.types.core import K, V
from faust.types.models import ModelArg
from faust.types.router import RouterT
from faust.types.serializers import RegistryT
from faust.types.settings import Settings
from faust.types.streams import StreamT
from faust.types.tables import CollectionT, TableManagerT, TableT
from faust.types.topics import TopicT
from faust.types.transports import (
ConductorT,
ConsumerT,
ProducerT,
TPorTopicSet,
TransportT,
)
from faust.types.tuples import MessageSentCallback, RecordMetadata, TP
from faust.types.web import (
CacheBackendT,
HttpClientT,
PageArg,
Request,
Response,
ViewDecorator,
ViewHandlerFun,
Web,
)
from faust.types.windows import WindowT
from ._attached import Attachments
if typing.TYPE_CHECKING: # pragma: no cover
from faust.cli.base import AppCommand
from faust.events import Event
from faust.worker import Worker as WorkerT
else:
class AppCommand: ... # noqa
class Event: ... # noqa
class WorkerT: ... # noqa
__all__ = ['App']
#: Format string for ``repr(app)``.
APP_REPR = '''
<{name}({c.id}): {c.broker} {s.state} agents({agents}) topics({topics})>
'''.strip()
# Venusian (pypi): This is used for "autodiscovery" of user code,
# CLI commands, and much more.
# Named after same concept from Django: the Django Admin autodiscover function
# that finds custom admin configuration in ``{app}/admin.py`` modules.
SCAN_AGENT = 'faust.agent'
SCAN_COMMAND = 'faust.command'
SCAN_PAGE = 'faust.page'
SCAN_SERVICE = 'faust.service'
SCAN_TASK = 'faust.task'
#: Default decorator categories for :pypi`venusian` to scan for when
#: autodiscovering things like @app.agent decorators.
SCAN_CATEGORIES: Iterable[str] = [
SCAN_AGENT,
SCAN_COMMAND,
SCAN_PAGE,
SCAN_SERVICE,
SCAN_TASK,
]
#: List of regular expressions for :pypi:`venusian` that acts as a filter
#: for modules that :pypi:`venusian` should ignore when autodiscovering
#: decorators.
SCAN_IGNORE: Iterable[str] = ['test_.*', '.*__main__.*']
E_NEED_ORIGIN = '''
`origin` argument to faust.App is mandatory when autodiscovery enabled.
This parameter sets the canonical path to the project package,
and describes how a user, or program can find it on the command-line when using
the `faust -A project` option. It's also used as the default package
to scan when autodiscovery is enabled.
If your app is defined in a module: ``project/app.py``, then the
origin will be "project":
# file: project/app.py
import faust
app = faust.App(
id='myid',
origin='project',
)
'''
W_OPTION_DEPRECATED = '''\
Argument {old!r} is deprecated and scheduled for removal in Faust 1.0.
Please use {new!r} instead.
'''
W_DEPRECATED_SHARD_PARAM = '''\
The second argument to `@table_route` is deprecated,
please use the `query_param` keyword argument instead.
'''
# @app.task decorator may be called in several ways:
#
# 1) Without parens:
# @app.task
# async def foo():
#
# 2) With parens:
# @app.task()
#
# 3) With parens and arguments
# @app.task(on_leader=True)
#
# This means @app.task attempts to do the right thing depending
# on how it's used. All the frameworks do this, but we have to also type it.
TaskDecoratorRet = Union[
Callable[[TaskArg], TaskArg],
TaskArg,
]
class BootStrategy(BootStrategyT):
enable_kafka: bool = True
# We want these to take default from `enable_kafka`
# attribute, but still want to allow subclasses to define
# them like this:
# class MyBoot(BootStrategy):
# enable_kafka_consumer = False
enable_kafka_consumer: Optional[bool] = None
enable_kafka_producer: Optional[bool] = None
enable_web: Optional[bool] = None
enable_sensors: bool = True
def __init__(self, app: AppT, *,
enable_web: bool = None,
enable_kafka: bool = None,
enable_kafka_producer: bool = None,
enable_kafka_consumer: bool = None,
enable_sensors: bool = None) -> None:
self.app = app
if enable_kafka is not None:
self.enable_kafka = enable_kafka
if enable_kafka_producer is not None:
self.enable_kafka_producer = enable_kafka_producer
if enable_kafka_consumer is not None:
self.enable_kafka_consumer = enable_kafka_consumer
if enable_web is not None:
self.enable_web = enable_web
if enable_sensors is not None:
self.enable_sensors = enable_sensors
def server(self) -> Iterable[ServiceT]:
return self._chain(
# Sensors (Sensor): always start first and stop last.
self.sensors(),
# Producer: always stop after Consumer.
self.kafka_producer(),
# Web
self.web_server(),
# Consumer: always stop after Conductor
self.kafka_consumer(),
# AgentManager starts agents (app.agents)
self.agents(),
# Conductor (transport.Conductor))
self.kafka_conductor(),
# Table Manager (app.TableManager)
self.tables(),
)
def client_only(self) -> Iterable[ServiceT]:
return self._chain(
self.kafka_producer(),
self.kafka_client_consumer(),
self.kafka_conductor(),
[self.app._fetcher],
)
def producer_only(self) -> Iterable[ServiceT]:
return self._chain(
self.web_server(),
self.kafka_producer(),
)
def _chain(self, *arguments: Iterable[ServiceT]) -> Iterable[ServiceT]:
return cast(Iterable[ServiceT], chain(*arguments))
def sensors(self) -> Iterable[ServiceT]:
if self.enable_sensors:
return self.app.sensors
return []
def kafka_producer(self) -> Iterable[ServiceT]:
if self._should_enable_kafka_producer():
return [self.app.producer]
return []
def _should_enable_kafka_producer(self) -> bool:
if self.enable_kafka_producer is None:
return self.enable_kafka
return self.enable_kafka_producer
def kafka_consumer(self) -> Iterable[ServiceT]:
if self._should_enable_kafka_consumer():
return [
self.app.consumer,
# Leader Assignor (assignor.LeaderAssignor)
self.app._leader_assignor,
# Reply Consumer (ReplyConsumer)
self.app._reply_consumer,
]
return []
def _should_enable_kafka_consumer(self) -> bool:
if self.enable_kafka_consumer is None:
return self.enable_kafka
return self.enable_kafka_consumer
def kafka_client_consumer(self) -> Iterable[ServiceT]:
return [
self.app.consumer,
self.app._reply_consumer,
]
def agents(self) -> Iterable[ServiceT]:
return [self.app.agents]
def kafka_conductor(self) -> Iterable[ServiceT]:
if self._should_enable_kafka_consumer():
return [self.app.topics]
return []
def web_server(self) -> Iterable[ServiceT]:
if self._should_enable_web():
return list(self.web_components()) + [self.app.web]
return []
def _should_enable_web(self) -> bool:
if self.enable_web is None:
return self.app.conf.web_enabled
return self.enable_web
def web_components(self) -> Iterable[ServiceT]:
return [self.app.cache]
def tables(self) -> Iterable[ServiceT]:
return [self.app.tables]
[docs]class App(AppT, Service):
"""Faust Application.
Arguments:
id (str): Application ID.
Keyword Arguments:
loop (asyncio.AbstractEventLoop): optional event loop to use.
See Also:
:ref:`application-configuration` -- for supported keyword arguments.
"""
BootStrategy = BootStrategy
#: Set this to True if app should only start the services required to
#: operate as an RPC client (producer and simple reply consumer).
client_only = False
#: Set this to True if app should run without consumer/tables.
producer_only = False
#: Source of configuration: ``app.conf`` (when configured)
_conf: Optional[Settings] = None
#: Original configuration source object.
_config_source: Any = None
# Default consumer instance.
_consumer: Optional[ConsumerT] = None
# Default producer instance.
_producer: Optional[ProducerT] = None
# Transport is created on demand: use `.transport` property.
_transport: Optional[TransportT] = None
# Cache is created on demand: use `.cache` property.
_cache: Optional[CacheBackendT] = None
_assignor: Optional[PartitionAssignorT] = None
# Monitor is created on demand: use `.monitor` property.
_monitor: Optional[Monitor] = None
# @app.task decorator adds asyncio tasks to be started
# with the app here.
_tasks: MutableSequence[Callable[[], Awaitable]]
_http_client: Optional[HttpClientT] = None
_extra_services: List[ServiceT]
_extra_service_instances: Optional[List[ServiceT]] = None
# See faust/app/_attached.py
_attachments: Attachments
_on_revoked_timeout = None
#: Current assignment
_assignment: Optional[Set[TP]] = None
def __init__(self,
id: str,
*,
monitor: Monitor = None,
config_source: Any = None,
loop: asyncio.AbstractEventLoop = None,
beacon: NodeT = None,
**options: Any) -> None:
# This is passed to the configuration in self.conf
self._default_options = (id, options)
# The agent manager manages all agents.
self.agents = AgentManager(self)
# Sensors monitor Faust using a standard sensor API.
self.sensors = SensorDelegate(self)
# this is a local hack we use until we have proper
# transactions support in the Python Kafka Client
# and can have "exactly once" semantics.
self._attachments = Attachments(self)
# "The Monitor" is a special sensor that provides detailed stats
# for the web server.
self._monitor = monitor
# Any additional asyncio.Task's specified using @app.task decorator.
self._tasks = []
# Called as soon as the a worker is fully operational.
self.on_startup_finished: Optional[Callable] = None
# Any additional services added using the @app.service decorator.
self._extra_services = []
# The configuration source object/module passed to ``config_by_object``
# for introspectio purposes.
self._config_source = config_source
# create default sender for signals such as self.on_configured
self._init_signals()
# initialize fixups (automatically applied extensions,
# such as Django integration).
self.fixups = self._init_fixups()
self.boot_strategy = self.BootStrategy(self)
Service.__init__(self, loop=loop, beacon=beacon)
def _init_signals(self) -> None:
# Signals in Faust are the same as in Django, but asynchronous by
# default (:class:`mode.SyncSignal` is the normal ``def`` version)).
#
# Signals in Faust are usually local to the app instance::
#
# @app.on_before_configured.connect # <-- only sent by this app
# def on_before_configured(self):
# ...
#
# In Django signals are usually global, and an easter-egg
# provides this in Faust also::
#
# V---- NOTE upper case A in App
# @App.on_before_configured.connect # <-- sent by ALL apps
# def on_before_configured(app):
#
# Note: Signals are local-only, and cannot do anything to other
# processes or machines.
self.on_before_configured = (
self.on_before_configured.with_default_sender(self))
self.on_configured = self.on_configured.with_default_sender(self)
self.on_after_configured = (
self.on_after_configured.with_default_sender(self))
self.on_partitions_assigned = (
self.on_partitions_assigned.with_default_sender(self))
self.on_partitions_revoked = (
self.on_partitions_revoked.with_default_sender(self))
self.on_worker_init = self.on_worker_init.with_default_sender(self)
self.on_rebalance_complete = (
self.on_rebalance_complete.with_default_sender(self))
self.on_before_shutdown = (
self.on_before_shutdown.with_default_sender(self))
def _init_fixups(self) -> MutableSequence[FixupT]:
# Returns list of "fixups"
# Fixups are small additional patches we apply when certain
# platforms or frameworks are present.
#
# One example is the Django fixup, responsible for Django integration
# whenever the DJANGO_SETTINGS_MODULE environment variable is
# set. See faust/fixups/django.py, it's not complicated - using
# setuptools entrypoints you can very easily create extensions that
# are automatically enabled by installing a PyPI package with
# `pip install myname`.
return list(fixups(self))
[docs] def on_init_dependencies(self) -> Iterable[ServiceT]:
# Add the main Monitor sensor.
# The beacon is also reattached in case the monitor
# was created by the user.
self.monitor.beacon.reattach(self.beacon)
self.monitor.loop = self.loop
self.sensors.add(self.monitor)
if self.producer_only:
return self.boot_strategy.producer_only()
elif self.client_only:
return self.boot_strategy.client_only()
else:
return self.boot_strategy.server()
[docs] async def on_first_start(self) -> None:
self._create_directories()
[docs] async def on_start(self) -> None:
self.finalize()
[docs] async def on_started(self) -> None:
await self._wait_for_table_recovery_completed()
if not self.should_stop:
# Add all asyncio.Tasks, like timers, etc.
await self.on_started_init_extra_tasks()
# Start user-provided services.
await self.on_started_init_extra_services()
# Call the app-is-fully-started callback used by Worker
# to print the "ready" message that signals to the user that
# the worker is ready to start processing.
if self.on_startup_finished:
await self.on_startup_finished()
async def _wait_for_table_recovery_completed(self) -> None:
if self.tables.started:
await self.wait_for_stopped(self.tables.recovery.completed)
def _prepare_subservice(
self, service: Union[ServiceT, Type[ServiceT]]) -> ServiceT:
if inspect.isclass(service):
return service(loop=self.loop, beacon=self.beacon)
return service
[docs] def config_from_object(self,
obj: Any,
*,
silent: bool = False,
force: bool = False) -> None:
"""Read configuration from object.
Object is either an actual object or the name of a module to import.
Examples:
>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Arguments:
silent (bool): If true then import errors will be ignored.
force (bool): Force reading configuration immediately.
By default the configuration will be read only when required.
"""
self._config_source = obj
if self.finalized or self.configured:
Settings._warn_already_configured()
if force or self.configured:
self._conf = None
self._configure(silent=silent)
[docs] def finalize(self) -> None:
# Finalization signals that the application have been configured
# and is ready to use.
# If you access configuration before an explicit call to
# ``app.finalize()`` you will get an error.
# The ``app.main`` entrypoint and the ``faust -A app`` command
# both will automatically finalize the app for you.
if not self.finalized:
self.finalized = True
id = self.conf.id
if not id:
raise ImproperlyConfigured('App requires an id!')
async def _maybe_close_http_client(self) -> None:
if self._http_client:
await self._http_client.close()
[docs] def worker_init(self) -> None:
# This init is called by the `faust worker` command.
for fixup in self.fixups:
fixup.on_worker_init()
self.on_worker_init.send()
[docs] def discover(self,
*extra_modules: str,
categories: Iterable[str] = SCAN_CATEGORIES,
ignore: Iterable[str] = SCAN_IGNORE) -> None:
# based on autodiscovery in Django,
# but finds @app.agent decorators, and so on.
modules = set(self._discovery_modules())
modules |= set(extra_modules)
for fixup in self.fixups:
modules |= set(fixup.autodiscover_modules())
if modules:
scanner = self._new_scanner(*[re.compile(pat) for pat in ignore])
for name in modules:
try:
module = importlib.import_module(name)
except ModuleNotFoundError:
raise ModuleNotFoundError(
f'Unknown module {name} in App.conf.autodiscover list')
scanner.scan(
module,
categories=tuple(categories),
)
def _discovery_modules(self) -> List[str]:
modules: List[str] = []
autodiscover = self.conf.autodiscover
if autodiscover:
if isinstance(autodiscover, bool):
if self.conf.origin is None:
raise ImproperlyConfigured(E_NEED_ORIGIN)
elif callable(autodiscover):
modules.extend(
cast(Callable[[], Iterator[str]], autodiscover)())
else:
modules.extend(autodiscover)
if self.conf.origin:
modules.append(self.conf.origin)
return modules
def _new_scanner(self, *ignore: Pattern,
**kwargs: Any) -> venusian.Scanner:
return venusian.Scanner(ignore=[pat.search for pat in ignore])
[docs] def main(self) -> None:
"""Execute the :program:`faust` umbrella command using this app."""
from faust.cli.faust import cli
self.finalize()
self.worker_init()
if self.conf.autodiscover:
self.discover()
cli(app=self)
[docs] def topic(self,
*topics: str,
pattern: Union[str, Pattern] = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
partitions: int = None,
retention: Seconds = None,
compacting: bool = None,
deleting: bool = None,
replicas: int = None,
acks: bool = True,
internal: bool = False,
config: Mapping[str, Any] = None,
maxsize: int = None,
allow_empty: bool = False,
loop: asyncio.AbstractEventLoop = None) -> TopicT:
"""Create topic description.
Topics are named channels (for example a Kafka topic),
that exist on a server. To make an ephemeral local communication
channel use: :meth:`channel`.
See Also:
:class:`faust.topics.Topic`
"""
return self.conf.Topic(
self,
topics=topics,
pattern=pattern,
key_type=key_type,
value_type=value_type,
key_serializer=key_serializer,
value_serializer=value_serializer,
partitions=partitions,
retention=retention,
compacting=compacting,
deleting=deleting,
replicas=replicas,
acks=acks,
internal=internal,
config=config,
allow_empty=allow_empty,
loop=loop,
)
[docs] def channel(self,
*,
key_type: ModelArg = None,
value_type: ModelArg = None,
maxsize: int = None,
loop: asyncio.AbstractEventLoop = None) -> ChannelT:
"""Create new channel.
By default this will create an in-memory channel
used for intra-process communication, but in practice
channels can be backed by any transport (network or even means
of inter-process communication).
See Also:
:class:`faust.channels.Channel`.
"""
return Channel(
self,
key_type=key_type,
value_type=value_type,
maxsize=maxsize,
loop=loop,
)
[docs] def agent(self,
channel: Union[str, ChannelT] = None,
*,
name: str = None,
concurrency: int = 1,
supervisor_strategy: Type[SupervisorStrategyT] = None,
sink: Iterable[SinkT] = None,
isolated_partitions: bool = False,
**kwargs: Any) -> Callable[[AgentFun], AgentT]:
"""Create Agent from async def function.
It can be a regular async function::
@app.agent()
async def my_agent(stream):
async for number in stream:
print(f'Received: {number!r}')
Or it can be an async iterator that yields values.
These values can be used as the reply in an RPC-style call,
or for sinks: callbacks that forward events to
other agents/topics/statsd, and so on::
@app.agent(sink=[log_topic])
async def my_agent(requests):
async for number in requests:
yield number * 2
"""
def _inner(fun: AgentFun) -> AgentT:
agent = self.conf.Agent(
fun,
name=name,
app=self,
channel=channel,
concurrency=concurrency,
supervisor_strategy=supervisor_strategy,
sink=sink,
isolated_partitions=isolated_partitions,
on_error=self._on_agent_error,
help=fun.__doc__,
**kwargs)
self.agents[agent.name] = agent
venusian.attach(agent, category=SCAN_AGENT)
return agent
return _inner
actor = agent # XXX Compatibility alias: REMOVE FOR 1.0
async def _on_agent_error(self, agent: AgentT, exc: BaseException) -> None:
# See agent-errors in docs/userguide/agents.rst
if self._consumer:
try:
await self._consumer.on_task_error(exc)
except MemoryError:
raise
except Exception as exc:
self.log.exception('Consumer error callback raised: %r', exc)
[docs] def task(self,
fun: TaskArg = None,
*,
on_leader: bool = False) -> TaskDecoratorRet:
"""Define an async def function to be started with the app.
This is like :meth:`timer` but a one-shot task only
executed at worker startup (after recovery and the worker is
fully ready for operation).
The function may take zero, or one argument.
If the target function takes an argument, the ``app`` argument
is passed::
>>> @app.task
>>> async def on_startup(app):
... print('STARTING UP: %r' % (app,))
Nullary functions are also supported::
>>> @app.task
>>> async def on_startup():
... print('STARTING UP')
"""
def _inner(fun: TaskArg) -> TaskArg:
return self._task(fun, on_leader=on_leader)
return _inner(fun) if fun is not None else _inner
def _task(self, fun: TaskArg, on_leader: bool = False) -> TaskArg:
app = self
@wraps(fun)
async def _wrapped() -> None:
should_run = app.is_leader() if on_leader else True
if should_run:
# pass app only if decorated function takes an argument
if inspect.signature(fun).parameters:
task_takes_app = cast(Callable[[AppT], Awaitable], fun)
return await task_takes_app(app)
else:
task = cast(Callable[[], Awaitable], fun)
return await task()
venusian.attach(_wrapped, category=SCAN_TASK)
self._tasks.append(_wrapped)
return _wrapped
[docs] def timer(self, interval: Seconds, on_leader: bool = False) -> Callable:
"""Define an async def function to be run at periodic intervals.
Like :meth:`task`, but executes periodically until the worker
is shut down.
This decorator takes an async function and adds it to a
list of timers started with the app.
Arguments:
interval (Seconds): How often the timer executes in seconds.
on_leader (bool) = False: Should the timer only run on the leader?
Example:
>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
... print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
... print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
"""
interval_s = want_seconds(interval)
def _inner(fun: TaskArg) -> TaskArg:
@self.task
@wraps(fun)
async def around_timer(*args: Any) -> None:
while not self.should_stop:
await self.sleep(interval_s)
if not self.should_stop:
should_run = not on_leader or self.is_leader()
if should_run:
await fun(*args) # type: ignore
# If you call @app.task without parents the return value is:
# Callable[[TaskArg], TaskArg]
# but we always call @app.task() - with parens, so return value is
# always TaskArg.
return cast(TaskArg, around_timer)
return _inner
[docs] def crontab(self, cron_format: str, *,
timezone: tzinfo = None,
on_leader: bool = False) -> Callable:
"""Define an async def function to be run at the fixed times,
defined by the cron format.
Like :meth:`timer`, but executes at fixed times instead of executing
at certain intervals.
This decorator takes an async function and adds it to a
list of cronjobs started with the app.
Arguments:
cron_format: The cron spec defining fixed times to run the
decorated function.
Keyword Arguments:
timezone: The timezone to be taken into account for the cron jobs.
If not set value from :setting:`timezone` will be taken.
on_leader: Should the cron job only run on the leader?
Example:
>>> @app.crontab(cron_format='30 18 * * *',
timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
... print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
... print('6:30pm UTC; ALSO, I AM THE LEADER!')
"""
def _inner(fun: TaskArg) -> TaskArg:
@self.task
@wraps(fun)
async def cron_starter(*args: Any) -> None:
_tz = self.conf.timezone if timezone is None else timezone
while not self.should_stop:
await self.sleep(cron.secs_for_next(cron_format, _tz))
should_run = not on_leader or self.is_leader()
if should_run:
await fun(*args) # type: ignore
return cast(TaskArg, cron_starter)
return _inner
[docs] def service(self, cls: Type[ServiceT]) -> Type[ServiceT]:
"""Decorate :class:`mode.Service` to be started with the app.
Examples:
.. sourcecode:: python
from mode import Service
@app.service
class Foo(Service):
...
"""
venusian.attach(cls, category=SCAN_SERVICE)
self._extra_services.append(cls)
return cls
[docs] def is_leader(self) -> bool:
return self._leader_assignor.is_leader()
[docs] def stream(self,
channel: Union[AsyncIterable, Iterable],
beacon: NodeT = None,
**kwargs: Any) -> StreamT:
"""Create new stream from channel/topic/iterable/async iterable.
Arguments:
channel: Iterable to stream over (async or non-async).
kwargs: See :class:`Stream`.
Returns:
faust.Stream:
to iterate over events in the stream.
"""
return self.conf.Stream(
app=self,
channel=aiter(channel) if channel is not None else None,
beacon=beacon or self.beacon,
**kwargs)
[docs] def Table(self,
name: str,
*,
default: Callable[[], Any] = None,
window: WindowT = None,
partitions: int = None,
help: str = None,
**kwargs: Any) -> TableT:
"""Define new table.
Arguments:
name: Name used for table, note that two tables living in
the same application cannot have the same name.
default: A callable, or type that will return a default value
for keys missing in this table.
window: A windowing strategy to wrap this window in.
Examples:
>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
"""
table = self.tables.add(
self.conf.Table(
self,
name=name,
default=default,
beacon=self.beacon,
partitions=partitions,
help=help,
**kwargs))
return table.using_window(window) if window else table
[docs] def SetTable(self,
name: str,
*,
window: WindowT = None,
partitions: int = None,
help: str = None,
**kwargs: Any) -> TableT:
table = self.tables.add(
self.conf.SetTable(
self,
name=name,
beacon=self.beacon,
partitions=partitions,
help=help,
**kwargs))
return table.using_window(window) if window else table
[docs] def page(self, path: str, *,
base: Type[View] = View,
name: str = None) -> Callable[[PageArg], Type[View]]:
view_base: Type[View] = base if base is not None else View
def _decorator(fun: PageArg) -> Type[View]:
view: Optional[Type[View]] = None
if inspect.isclass(fun):
view = cast(Type[View], fun)
if not issubclass(view, View):
raise TypeError(
'When decorating class, it must be subclass of View')
if view is None:
view = view_base.from_handler(cast(ViewHandlerFun, fun))
view.view_name = name or view.__name__
view.view_path = path
self.web.add_view(view)
venusian.attach(view, category=SCAN_PAGE)
return view
return _decorator
[docs] def table_route(self, table: CollectionT,
shard_param: str = None,
*,
query_param: str = None,
match_info: str = None) -> ViewDecorator:
def _decorator(fun: ViewHandlerFun) -> ViewHandlerFun:
if shard_param is not None:
warnings.warn(DeprecationWarning(W_DEPRECATED_SHARD_PARAM))
if query_param:
raise TypeError(
'Cannot specify shard_param and query_param')
@wraps(fun)
async def get(view: View, request: Request,
*args: Any, **kwargs: Any) -> Response:
_query_param = query_param or shard_param
if match_info:
key = request.match_info[match_info]
elif _query_param:
key = request.query[_query_param]
try:
return await self.router.route_req(table.name, key,
view.web, request)
except SameNode:
return await fun(view, request, *args, **kwargs)
return get
return _decorator
[docs] def command(self,
*options: Any,
base: Optional[Type[AppCommand]] = None,
**kwargs: Any) -> Callable[[Callable], Type[AppCommand]]:
if options is None and base is None and kwargs is None:
raise TypeError('Use parens in @app.command(), not @app.command.')
_base: Type[AppCommand]
if base is None:
from faust.cli import base as cli_base
_base = cli_base.AppCommand
else:
_base = base
def _inner(fun: Callable[..., Awaitable[Any]]) -> Type[AppCommand]:
cmd = _base.from_handler(*options, **kwargs)(fun)
venusian.attach(cmd, category=SCAN_COMMAND)
return cmd
return _inner
[docs] async def start_client(self) -> None:
"""Start the app in Client-Only mode necessary for RPC requests.
Notes:
Once started as a client the app cannot be restarted as Server.
"""
self.client_only = True
await self.maybe_start()
[docs] async def maybe_start_client(self) -> None:
"""Start the app in Client-Only mode if not started as Server."""
if not self.started:
await self.start_client()
[docs] async def send(
self,
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: int = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None) -> Awaitable[RecordMetadata]:
"""Send event to channel/topic.
Arguments:
channel: Channel/topic or the name of a topic to send event to.
key: Message key.
value: Message value.
partition: Specific partition to send to.
If not set the partition will be chosen by the partitioner.
key_serializer: Serializer to use (if value is not model).
value_serializer: Serializer to use (if value is not model).
callback: Called after the message is fully delivered to the
channel, but not to the consumer.
Signature must be unary as the
:class:`~faust.types.tuples.FutureMessage` future is passed
to it.
The resulting :class:`faust.types.tuples.RecordMetadata`
object is then available as ``fut.result()``.
"""
chan: ChannelT
if isinstance(channel, str):
chan = self.topic(channel)
else:
chan = channel
return await chan.send(
key=key,
value=value,
partition=partition,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
[docs] @stampede
async def maybe_start_producer(self) -> ProducerT:
"""Ensure producer is started."""
producer = self.producer
# producer may also have been started by app.start()
await producer.maybe_start()
return producer
[docs] async def commit(self, topics: TPorTopicSet) -> bool:
"""Commit offset for acked messages in specified topics'.
Warning:
This will commit acked messages in **all topics**
if the topics argument is passed in as :const:`None`.
"""
return await self.topics.commit(topics)
[docs] async def on_stop(self) -> None:
await self._stop_consumer()
# send shutdown signal
await self.on_before_shutdown.send()
if self._producer is not None:
self.log.info('Flush producer buffer...')
await self._producer.flush()
await self._maybe_close_http_client()
async def _stop_consumer(self) -> None:
if self._consumer is not None:
consumer = self._consumer
try:
assignment = consumer.assignment()
except ConsumerNotStarted:
pass
else:
if assignment:
self.tables.on_partitions_revoked(assignment)
consumer.stop_flow()
self.flow_control.suspend()
consumer.pause_partitions(assignment)
self.flow_control.clear()
await self._stop_fetcher()
if self.conf.stream_wait_empty:
self.log.info('Wait for streams...')
await self.consumer.wait_empty()
[docs] def on_rebalance_start(self) -> None:
self.rebalancing = True
self.tables.on_rebalance_start()
[docs] def on_rebalance_end(self) -> None:
self.rebalancing = False
async def _on_partitions_revoked(self, revoked: Set[TP]) -> None:
"""Handle revocation of topic partitions.
This is called during a rebalance and is followed by
:meth:`on_partitions_assigned`.
Revoked means the partitions no longer exist, or they
have been reassigned to a different node.
"""
if self.should_stop:
return self._on_rebalance_when_stopped()
session_timeout = self.conf.broker_session_timeout * 0.95
with flight_recorder(self.log, timeout=session_timeout) as on_timeout:
self._on_revoked_timeout = on_timeout
try:
self.log.dev('ON PARTITIONS REVOKED')
self.tables.on_partitions_revoked(revoked)
assignment = self.consumer.assignment()
if assignment:
on_timeout.info('flow_control.suspend()')
self.consumer.stop_flow()
self.flow_control.suspend()
on_timeout.info('consumer.pause_partitions')
self.consumer.pause_partitions(assignment)
# Every agent instance has an incoming buffer of messages
# (a asyncio.Queue) -- we clear those to make sure
# agents will not start processing them.
#
# This allows for large buffer sizes
# (stream_buffer_maxsize).
on_timeout.info('flow_control.clear()')
self.flow_control.clear()
# even if we clear, some of the agent instances may have
# already started working on an event.
#
# we need to wait for them.
if self.conf.stream_wait_empty:
on_timeout.info('consumer.wait_empty()')
await self.consumer.wait_empty()
on_timeout.info('producer.flush()')
if self._producer is not None:
await self._producer.flush()
else:
self.log.dev('ON P. REVOKED NOT COMMITTING: NO ASSIGNMENT')
on_timeout.info('+send signal: on_partitions_revoked')
await self.on_partitions_revoked.send(revoked)
on_timeout.info('-send signal: on_partitions_revoked')
except Exception as exc:
on_timeout.info('on partitions revoked crashed: %r', exc)
await self.crash(exc)
finally:
self._on_revoked_timeout = None
async def _stop_fetcher(self) -> None:
await self._fetcher.stop()
# Reset fetcher service state so that we can restart it
# in TableManager table recovery.
self._fetcher.service_reset()
def _on_rebalance_when_stopped(self) -> None:
self.consumer.close()
async def _on_partitions_assigned(self, assigned: Set[TP]) -> None:
"""Handle new topic partition assignment.
This is called during a rebalance after :meth:`on_partitions_revoked`.
The new assignment overrides the previous
assignment, so any tp no longer in the assigned' list will have
been revoked.
"""
if self.should_stop:
return self._on_rebalance_when_stopped()
# shave some time off so we timeout before the broker
# (Kafka does not send error, it just logs)
session_timeout = self.conf.broker_session_timeout * 0.95
self.unassigned = not assigned
revoked: Set[TP]
newly_assigned: Set[TP]
if self._assignment is not None:
revoked = self._assignment - assigned
newly_assigned = assigned - self._assignment
else:
revoked = set()
newly_assigned = assigned
self._assignment = assigned
with flight_recorder(self.log, timeout=session_timeout) as on_timeout:
self._on_revoked_timeout = on_timeout
try:
on_timeout.info('agents.on_rebalance()')
await self.agents.on_rebalance(revoked, newly_assigned)
# Wait for transport.Conductor to finish
# calling Consumer.subscribe
on_timeout.info('topics.wait_for_subscriptions()')
await self.topics.wait_for_subscriptions()
on_timeout.info('consumer.pause_partitions()')
self.consumer.pause_partitions(assigned)
on_timeout.info('topics.on_partitions_assigned()')
await self.topics.on_partitions_assigned(assigned)
on_timeout.info('tables.on_rebalance()')
await self.tables.on_rebalance(
assigned, revoked, newly_assigned)
on_timeout.info('+send signal: on_partitions_assigned')
await self.on_partitions_assigned.send(assigned)
on_timeout.info('-send signal: on_partitions_assigned')
except Exception as exc:
on_timeout.info('on partitions assigned crashed: %r', exc)
await self.crash(exc)
def _new_producer(self) -> ProducerT:
return self.transport.create_producer(beacon=self.beacon)
def _new_consumer(self) -> ConsumerT:
return self.transport.create_consumer(
callback=self.topics.on_message,
on_partitions_revoked=self._on_partitions_revoked,
on_partitions_assigned=self._on_partitions_assigned,
beacon=self.beacon,
)
def _new_conductor(self) -> ConductorT:
return self.transport.create_conductor(beacon=self.beacon)
def _new_transport(self) -> TransportT:
return transport.by_url(self.conf.broker[0])(
self.conf.broker, self, loop=self.loop)
def _new_cache_backend(self) -> CacheBackendT:
return cache_backends.by_url(self.conf.cache)(
self, self.conf.cache, loop=self.loop)
[docs] def FlowControlQueue(
self,
maxsize: int = None,
*,
clear_on_resume: bool = False,
loop: asyncio.AbstractEventLoop = None) -> ThrowableQueue:
"""Like :class:`asyncio.Queue`, but can be suspended/resumed."""
return ThrowableQueue(
maxsize=maxsize,
flow_control=self.flow_control,
clear_on_resume=clear_on_resume,
loop=loop or self.loop,
)
[docs] def Worker(self, **kwargs: Any) -> WorkerT:
return self.conf.Worker(self, **kwargs)
[docs] def on_webserver_init(self, web: Web) -> None:
...
def _create_directories(self) -> None:
self.conf.datadir.mkdir(exist_ok=True)
self.conf.appdir.mkdir(exist_ok=True)
self.conf.tabledir.mkdir(exist_ok=True)
def __repr__(self) -> str:
return APP_REPR.format(
name=type(self).__name__,
s=self,
c=self.conf,
agents=self.agents,
topics=len(self.topics),
)
def _configure(self, *, silent: bool = False) -> None:
self.on_before_configured.send()
conf = self._load_settings(silent=silent)
self.on_configured.send(conf)
self._conf, self.configured = conf, True
self.on_after_configured.send()
def _load_settings(self, *, silent: bool = False) -> Settings:
changes: Mapping[str, Any] = {}
appid, defaults = self._default_options
if self._config_source:
changes = self._load_settings_from_source(
self._config_source, silent=silent)
conf = {**defaults, **changes}
return Settings(appid, **self._prepare_compat_settings(conf))
def _prepare_compat_settings(self, options: MutableMapping) -> Mapping:
COMPAT_OPTIONS = {
'client_id': 'broker_client_id',
'commit_interval': 'broker_commit_interval',
'create_reply_topic': 'reply_create_topic',
'num_standby_replicas': 'table_standby_replicas',
'default_partitions': 'topic_partitions',
'replication_factor': 'topic_replication_factor',
}
for old, new in COMPAT_OPTIONS.items():
val = options.get(new)
try:
options[new] = options[old]
except KeyError:
pass
else:
if val is not None:
raise ImproperlyConfigured(
f'Cannot use both compat option {old!r} and {new!r}')
warnings.warn(
FutureWarning(
W_OPTION_DEPRECATED.format(old=old, new=new)))
return options
def _load_settings_from_source(self, source: Any, *,
silent: bool = False) -> Mapping:
if isinstance(source, str):
try:
source = smart_import(source, imp=import_from_cwd)
except (AttributeError, ImportError):
if not silent:
raise
return {}
return force_mapping(source)
@property
def conf(self) -> Settings:
if not self.finalized and STRICT:
raise ImproperlyConfigured(
'App configuration accessed before app.finalize()')
if self._conf is None:
self._configure()
return cast(Settings, self._conf)
@conf.setter
def conf(self, settings: Settings) -> None:
self._conf = settings
@property
def producer(self) -> ProducerT:
if self._producer is None:
self._producer = self._new_producer()
return self._producer
@producer.setter
def producer(self, producer: ProducerT) -> None:
self._producer = producer
@property
def consumer(self) -> ConsumerT:
if self._consumer is None:
self._consumer = self._new_consumer()
return self._consumer
@consumer.setter
def consumer(self, consumer: ConsumerT) -> None:
self._consumer = consumer
@property
def transport(self) -> TransportT:
"""Message transport."""
if self._transport is None:
self._transport = self._new_transport()
return self._transport
@transport.setter
def transport(self, transport: TransportT) -> None:
self._transport = transport
@property
def cache(self) -> CacheBackendT:
if self._cache is None:
self._cache = self._new_cache_backend()
return self._cache
@cache.setter
def cache(self, cache: CacheBackendT) -> None:
self._cache = cache
[docs] @cached_property
def tables(self) -> TableManagerT:
"""Map of available tables, and the table manager service."""
return self.conf.TableManager(
app=self,
loop=self.loop,
beacon=self.beacon,
)
[docs] @cached_property
def topics(self) -> ConductorT:
"""Topic Conductor.
This is the mediator that moves messages fetched by the Consumer
into the streams.
It's also a set of registered topics by string topic name, so you
can check if a topic is being consumed from by doing
``topic in app.topics``.
"""
return self._new_conductor()
@property
def monitor(self) -> Monitor:
"""Monitor keeps stats about what's going on inside the worker."""
if self._monitor is None:
self._monitor = cast(Monitor,
self.conf.Monitor(
loop=self.loop, beacon=self.beacon))
return self._monitor
@monitor.setter
def monitor(self, monitor: Monitor) -> None:
self._monitor = monitor
@cached_property
def _fetcher(self) -> ServiceT:
"""Fetcher helps Kafka Consumer retrieve records in topics."""
return self.transport.Fetcher(self, loop=self.loop, beacon=self.beacon)
@cached_property
def _reply_consumer(self) -> ReplyConsumer:
"""Kafka Consumer that consumes agent replies."""
return ReplyConsumer(self, loop=self.loop, beacon=self.beacon)
[docs] @cached_property
def flow_control(self) -> FlowControlEvent:
"""Internal flow control.
This object controls flow into stream queues,
and can also clear all buffers.
"""
return FlowControlEvent(loop=self.loop)
@property
def http_client(self) -> HttpClientT:
"""HTTP Client Session."""
if self._http_client is None:
self._http_client = self.conf.HttpClient()
return self._http_client
@http_client.setter
def http_client(self, client: HttpClientT) -> None:
self._http_client = client
[docs] @cached_property
def assignor(self) -> PartitionAssignorT:
"""Partition Assignor.
Responsible for partition assignment.
"""
return self.conf.PartitionAssignor(
self, replicas=self.conf.table_standby_replicas)
@cached_property
def _leader_assignor(self) -> LeaderAssignorT:
"""The leader assignor is a special Kafka partition assignor.
It's used to find the leader in a cluster of Faust worker nodes,
and enables the ``@app.timer(on_leader=True)`` feature that executes
exclusively on one node at a time. Excellent for things that would
traditionally require a lock/mutex."""
return self.conf.LeaderAssignor(
self, loop=self.loop, beacon=self.beacon)
[docs] @cached_property
def router(self) -> RouterT:
"""Find the node partitioned data belongs to.
The router helps us route web requests to the wanted Faust node.
If a topic is sharded by account_id, the router can send us to the
Faust worker responsible for any account. Used by the
``@app.table_route`` decorator.
"""
return self.conf.Router(self)
[docs] @cached_property
def web(self) -> Web:
return self._new_web()
def _new_web(self) -> Web:
return web_drivers.by_url(self.conf.web)(self)
[docs] @cached_property
def serializers(self) -> RegistryT:
# Serializer registry.
# Many things such as key_serializer/value_serializer configures
# the serializer by name (e.g. "json"). The serializer registry
# lets you extend Faust with support for additional
# serialization formats.
self.finalize() # easiest way to autofinalize for topic.send
return self.conf.Serializers(
key_serializer=self.conf.key_serializer,
value_serializer=self.conf.value_serializer,
)
@property
def label(self) -> str:
return f'{self.shortlabel}: {self.conf.id}@{self.conf.broker}'
@property
def shortlabel(self) -> str:
return type(self).__name__