faust.app.base
¶
Faust Application.
An app is an instance of the Faust library. Everything starts here.
-
class
faust.app.base.
BootStrategy
(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None[source]¶ App startup strategy.
The startup strategy defines the graph of services to start when the Faust worker for an app starts.
-
enable_kafka
= True¶
-
enable_kafka_producer
= None¶
-
enable_kafka_consumer
= None¶
-
enable_web
= None¶
-
enable_sensors
= True¶
-
-
class
faust.app.base.
App
(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options) → None[source]¶ Faust Application.
- Parameters
id (str) – Application ID.
- Keyword Arguments
loop (asyncio.AbstractEventLoop) – optional event loop to use.
See also
Application Parameters – for supported keyword arguments.
-
class
BootStrategy
(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None¶ App startup strategy.
The startup strategy defines the graph of services to start when the Faust worker for an app starts.
-
enable_kafka
= True¶
-
enable_kafka_consumer
= None¶
-
enable_kafka_producer
= None¶
-
enable_sensors
= True¶
-
enable_web
= None¶
-
-
class
Settings
(id: str, *, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs) → None¶ -
-
LeaderAssignor
¶ - Return type
-
PartitionAssignor
¶ - Return type
-
TableManager
¶ - Return type
-
agent_supervisor
¶ - Return type
-
autodiscover
= False¶
-
broker_check_crcs
= True¶
-
broker_client_id
= 'faust-1.5.5'¶
-
broker_commit_every
= 10000¶
-
broker_credentials
¶ - Return type
-
consumer_auto_offset_reset
= 'earliest'¶
-
consumer_max_fetch_size
= 4194304¶
-
id_format
= '{id}-v{self.version}'¶
-
key_serializer
= 'raw'¶
-
logging_config
= None¶
-
processing_guarantee
¶ - Return type
-
producer_acks
= -1¶
-
producer_api_version
= 'auto'¶
-
producer_compression_type
= None¶
-
producer_linger_ms
= 0¶
-
producer_max_batch_size
= 16384¶
-
producer_max_request_size
= 1000000¶
-
producer_partitioner
¶
-
reply_create_topic
= False¶
-
reply_to_prefix
= 'f-reply-'¶
-
ssl_context
= None¶
-
stream_ack_cancelled_tasks
= True¶
-
stream_ack_exceptions
= True¶
-
stream_buffer_maxsize
= 4096¶
-
stream_publish_on_commit
= False¶
-
stream_wait_empty
= True¶
-
table_standby_replicas
= 1¶
-
timezone
= datetime.timezone.utc¶
-
topic_allow_declare
= True¶
-
topic_partitions
= 8¶
-
topic_replication_factor
= 1¶
-
value_serializer
= 'json'¶
-
web_bind
= '0.0.0.0'¶
-
web_cors_options
= None¶
-
web_host
= 'build-8929922-project-230058-faust'¶
-
web_in_thread
= False¶
-
web_port
= 6066¶
-
worker_redirect_stdouts
= True¶
-
worker_redirect_stdouts_level
= 'WARN'¶
-
-
client_only
= False¶ Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).
-
producer_only
= False¶ Set this to True if app should run without consumer/tables.
-
tracer
= None¶ Optional tracing support.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
config_from_object
(obj: Any, *, silent: bool = False, force: bool = False) → None[source]¶ Read configuration from object.
Object is either an actual object or the name of a module to import.
Examples
>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig >>> app.config_from_object(faustconfig)
-
discover
(*extra_modules, categories: Iterable[str] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task'], ignore: Iterable[Any] = [<built-in method search of _sre.SRE_Pattern object>, '.__main__']) → None[source]¶ - Return type
None
-
topic
(*topics, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]¶ Create topic description.
Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use:
channel()
.See also
- Return type
TopicT
[]
-
channel
(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]¶ Create new channel.
By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).
See also
- Return type
ChannelT
[]
-
agent
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
-
actor
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
-
task
(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]¶ Define an async def function to be started with the app.
This is like
timer()
but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).The function may take zero, or one argument. If the target function takes an argument, the
app
argument is passed:>>> @app.task >>> async def on_startup(app): ... print('STARTING UP: %r' % (app,))
Nullary functions are also supported:
>>> @app.task >>> async def on_startup(): ... print('STARTING UP')
-
timer
(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) → Callable[source]¶ Define an async def function to be run at periodic intervals.
Like
task()
, but executes periodically until the worker is shut down.This decorator takes an async function and adds it to a list of timers started with the app.
- Parameters
interval (Seconds) – How often the timer executes in seconds.
on_leader (bool) – Should the timer only run on the leader?
Example
>>> @app.timer(interval=10.0) >>> async def every_10_seconds(): ... print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True) >>> async def every_5_seconds(): ... print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
- Return type
-
crontab
(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]¶ Define periodic task using Crontab description.
This is an
async def
function to be run at the fixed times, defined by the Cron format.Like
timer()
, but executes at fixed times instead of executing at certain intervals.This decorator takes an async function and adds it to a list of Cronjobs started with the app.
- Parameters
cron_format (
str
) – The Cron spec defining fixed times to run the decorated function.- Keyword Arguments
timezone – The timezone to be taken into account for the Cron jobs. If not set value from
timezone
will be taken.on_leader – Should the Cron job only run on the leader?
Example
>>> @app.crontab(cron_format='30 18 * * *', timezone=pytz.timezone('US/Pacific')) >>> async def every_6_30_pm_pacific(): ... print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True) >>> async def every_6_30_pm(): ... print('6:30pm UTC; ALSO, I AM THE LEADER!')
- Return type
-
service
(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]¶ Decorate
mode.Service
to be started with the app.Examples
from mode import Service @app.service class Foo(Service): ...
-
stream
(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs) → faust.types.streams.StreamT[source]¶ Create new stream from channel/topic/iterable/async iterable.
- Parameters
channel (
Union
[AsyncIterable
[+T_co],Iterable
[+T_co]]) – Iterable to stream over (async or non-async).kwargs (
Any
) – SeeStream
.
- Return type
StreamT
[+T_co]- Returns
to iterate over events in the stream.
- Return type
-
Table
(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]¶ Define new table.
- Parameters
name (
str
) – Name used for table, note that two tables living in the same application cannot have the same name.default (
Optional
[Callable
[[],Any
]]) – A callable, or type that will return a default value for keys missing in this table.window (
Optional
[WindowT
]) – A windowing strategy to wrap this window in.
Examples
>>> table = app.Table('user_to_amount', default=int) >>> table['George'] 0 >>> table['Elaine'] += 1 >>> table['Elaine'] += 1 >>> table['Elaine'] 2
- Return type
TableT
[~KT, ~VT]
-
SetTable
(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]¶ - Return type
TableT
[~KT, ~VT]
-
page
(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]¶
-
table_route
(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]¶ - Return type
Callable
[[Union
[Callable
[[View
,Request
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]],Callable
[[View
,Request
,Any
,Any
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]]]],Union
[Callable
[[View
,Request
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]],Callable
[[View
,Request
,Any
,Any
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]]]]
-
command
(*options, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs) → Callable[Callable, Type[faust.app.base._AppCommand]][source]¶
-
trace
(name: str, trace_enabled: bool = True, **extra_context) → ContextManager[source]¶ - Return type
ContextManager
[+T_co]
-
traced
(fun: Callable, name: str = None, sample_rate: float = 1.0, **context) → Callable[source]¶ - Return type
-
FlowControlQueue
(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]¶ Like
asyncio.Queue
, but can be suspended/resumed.- Return type
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ Commit offset for acked messages in specified topics’.
Warning
This will commit acked messages in all topics if the topics argument is passed in as
None
.- Return type
-
logger
= <Logger faust.app.base (WARNING)>¶
-
coroutine
maybe_start_client
(self) → None[source]¶ Start the app in Client-Only mode if not started as Server.
- Return type
None
-
coroutine
on_first_start
(self) → None[source]¶ Service started for the first time in this process.
- Return type
None
-
coroutine
on_init_extra_service
(self, service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]¶ - Return type
ServiceT
[]
-
coroutine
send
(self, channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send event to channel/topic.
- Parameters
channel (
Union
[ChannelT
[],str
]) – Channel/topic or the name of a topic to send event to.partition (
Optional
[int
]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.timestamp (
Optional
[float
]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.headers (
Union
[List
[Tuple
[str
,bytes
]],Mapping
[str
,bytes
],None
]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.key_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model).value_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model).callback (
Optional
[Callable
[[FutureMessage
[]],Union
[None
,Awaitable
[None
]]]]) –Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the
FutureMessage
future is passed to it.The resulting
faust.types.tuples.RecordMetadata
object is then available asfut.result()
.
- Return type
-
coroutine
start_client
(self) → None[source]¶ Start the app in Client-Only mode necessary for RPC requests.
Notes
Once started as a client the app cannot be restarted as Server.
- Return type
None
-
transport
¶ Message transport. :rtype:
TransportT
-
cache
¶ - Return type
-
topics
[source]¶ Topic Conductor.
This is the mediator that moves messages fetched by the Consumer into the streams.
It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing
topic in app.topics
.
-
flow_control
[source]¶ Flow control of streams.
This object controls flow into stream queues, and can also clear all buffers.
-
http_client
¶ HTTP Client Session. :rtype:
ClientSession