faust
¶
Python Stream processing.
-
class
faust.
Agent
(fun: Callable[Union[AsyncIterator, faust.types.streams.StreamT], Union[Awaitable, AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, **kwargs) → None[source]¶ Agent.
This is the type of object returned by the
@app.agent
decorator.-
supervisor
= None¶
-
clone
(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs) → faust.types.agents.AgentT[source]¶ Return type: AgentT
[]
-
test_context
(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]¶ Return type: AgentTestWrapperT
[]
-
actor_from_stream
(stream: faust.types.streams.StreamT) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]¶ Return type: ActorT
[]
-
add_sink
(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]¶ Return type: None
-
stream
(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → faust.types.streams.StreamT[source]¶ Return type: StreamT
[+T_co]
-
coroutine
ask
(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]¶ Return type: Any
-
coroutine
ask_nowait
(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]¶ Return type: ReplyPromise
-
coroutine
cast
(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None) → None[source]¶ Return type: None
-
coroutine
join
(self, values: Union[AsyncIterable[Union[bytes, faust.types.core.ModelT, Any]], Iterable[Union[bytes, faust.types.core.ModelT, Any]]], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List
[Any
]
-
coroutine
kvjoin
(self, items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List
[Any
]
-
kvmap
(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]¶ Return type: AsyncIterator
[str
]
-
logger
= <Logger faust.agents.agent (WARNING)>¶
-
map
(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]¶ Return type: AsyncIterator
[+T_co]
-
coroutine
on_isolated_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_isolated_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
Return type: None
Return type: None
-
coroutine
on_start
(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
send
(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to topic used by agent.
Return type: Awaitable
[RecordMetadata
]
-
channel_iterator
¶ Return type: AsyncIterator
[+T_co]
-
-
class
faust.
App
(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options) → None[source]¶ Faust Application.
Parameters: id (str) – Application ID. Keyword Arguments: loop (asyncio.AbstractEventLoop) – optional event loop to use. See also
Application Parameters – for supported keyword arguments.
-
class
BootStrategy
(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None¶ -
-
enable_kafka
= True¶
-
enable_kafka_consumer
= None¶
-
enable_kafka_producer
= None¶
-
enable_sensors
= True¶
-
enable_web
= None¶
-
-
client_only
= False¶ Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).
-
producer_only
= False¶ Set this to True if app should run without consumer/tables.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Callback to be used to add service dependencies.
Return type: Iterable
[ServiceT
[]]
-
config_from_object
(obj: Any, *, silent: bool = False, force: bool = False) → None[source]¶ Read configuration from object.
Object is either an actual object or the name of a module to import.
Examples
>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig >>> app.config_from_object(faustconfig)
Parameters: Return type: None
-
discover
(*extra_modules, categories: Iterable[str] = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task'], ignore: Iterable[str] = ['test_.*', '.*__main__.*']) → None[source]¶ Return type: None
-
topic
(*topics, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]¶ Create topic description.
Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use:
channel()
.See also
Return type: TopicT
[]
-
channel
(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]¶ Create new channel.
By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).
See also
Return type: ChannelT
[]
-
agent
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, **kwargs) → Callable[Callable[Union[AsyncIterator, faust.types.streams.StreamT], Union[Awaitable, AsyncIterable]], faust.types.agents.AgentT][source]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
Return type: Callable
[[Callable
[[Union
[AsyncIterator
[+T_co],StreamT
[+T_co]]],Union
[Awaitable
[+T_co],AsyncIterable
[+T_co]]]],AgentT
[]]
-
actor
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, **kwargs) → Callable[Callable[Union[AsyncIterator, faust.types.streams.StreamT], Union[Awaitable, AsyncIterable]], faust.types.agents.AgentT]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
Return type: Callable
[[Callable
[[Union
[AsyncIterator
[+T_co],StreamT
[+T_co]]],Union
[Awaitable
[+T_co],AsyncIterable
[+T_co]]]],AgentT
[]]
-
task
(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]¶ Define an async def function to be started with the app.
This is like
timer()
but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).The function may take zero, or one argument. If the target function takes an argument, the
app
argument is passed:>>> @app.task >>> async def on_startup(app): ... print('STARTING UP: %r' % (app,))
Nullary functions are also supported:
>>> @app.task >>> async def on_startup(): ... print('STARTING UP')
Return type: Union
[Callable
[[Union
[Callable
[[AppT
[]],Awaitable
[+T_co]],Callable
[[],Awaitable
[+T_co]]]],Union
[Callable
[[AppT
[]],Awaitable
[+T_co]],Callable
[[],Awaitable
[+T_co]]]],Callable
[[AppT
[]],Awaitable
[+T_co]],Callable
[[],Awaitable
[+T_co]]]
-
timer
(interval: Union[datetime.timedelta, float, str], on_leader: bool = False) → Callable[source]¶ Define an async def function to be run at periodic intervals.
Like
task()
, but executes periodically until the worker is shut down.This decorator takes an async function and adds it to a list of timers started with the app.
Parameters: - interval (Seconds) – How often the timer executes in seconds.
- on_leader (bool) – Should the timer only run on the leader?
Example
>>> @app.timer(interval=10.0) >>> async def every_10_seconds(): ... print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True) >>> async def every_5_seconds(): ... print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
Return type: Callable
-
crontab
(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False) → Callable[source]¶ Define an async def function to be run at the fixed times, defined by the cron format.
Like
timer()
, but executes at fixed times instead of executing at certain intervals.This decorator takes an async function and adds it to a list of cronjobs started with the app.
Parameters: cron_format (
str
) – The cron spec defining fixed times to run the decorated function.Keyword Arguments: - timezone – The timezone to be taken into account for the cron jobs.
If not set value from
timezone
will be taken. - on_leader – Should the cron job only run on the leader?
Example
>>> @app.crontab(cron_format='30 18 * * *', timezone=pytz.timezone('US/Pacific')) >>> async def every_6_30_pm_pacific(): ... print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True) >>> async def every_6_30_pm(): ... print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type: Callable
- timezone – The timezone to be taken into account for the cron jobs.
If not set value from
-
service
(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]¶ Decorate
mode.Service
to be started with the app.Examples
from mode import Service @app.service class Foo(Service): ...
Return type: Type
[ServiceT
[]]
-
stream
(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs) → faust.types.streams.StreamT[source]¶ Create new stream from channel/topic/iterable/async iterable.
Parameters: - channel (
Union
[AsyncIterable
[+T_co],Iterable
[+T_co]]) – Iterable to stream over (async or non-async). - kwargs (
Any
) – SeeStream
.
Return type: StreamT
[+T_co]Returns: to iterate over events in the stream.
Return type: - channel (
-
Table
(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]¶ Define new table.
Parameters: - name (
str
) – Name used for table, note that two tables living in the same application cannot have the same name. - default (
Optional
[Callable
[[],Any
]]) – A callable, or type that will return a default value for keys missing in this table. - window (
Optional
[WindowT
]) – A windowing strategy to wrap this window in.
Examples
>>> table = app.Table('user_to_amount', default=int) >>> table['George'] 0 >>> table['Elaine'] += 1 >>> table['Elaine'] += 1 >>> table['Elaine'] 2
Return type: TableT
[~KT, ~VT]- name (
-
SetTable
(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs) → faust.types.tables.TableT[source]¶ Return type: TableT
[~KT, ~VT]
-
page
(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Awaitable[faust.types.web.Response]]], Type[faust.web.views.View]][source]¶ Return type: Callable
[[Union
[Type
[View
],Callable
[[View
,Request
,Any
,Any
],Awaitable
[Response
]]]],Type
[View
]]
-
table_route
(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None) → Callable[Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Awaitable[faust.types.web.Response]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Awaitable[faust.types.web.Response]]][source]¶ Return type: Callable
[[Callable
[[View
,Request
,Any
,Any
],Awaitable
[Response
]]],Callable
[[View
,Request
,Any
,Any
],Awaitable
[Response
]]]
-
command
(*options, base: Optional[Type[faust.app.base.AppCommand]] = None, **kwargs) → Callable[Callable, Type[faust.app.base.AppCommand]][source]¶ Return type: Callable
[[Callable
],Type
[AppCommand
]]
-
FlowControlQueue
(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]¶ Like
asyncio.Queue
, but can be suspended/resumed.Return type: ThrowableQueue
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ Commit offset for acked messages in specified topics’.
Warning
This will commit acked messages in all topics if the topics argument is passed in as
None
.Return type: bool
-
logger
= <Logger faust.app.base (WARNING)>¶
-
coroutine
maybe_start_client
(self) → None[source]¶ Start the app in Client-Only mode if not started as Server.
Return type: None
-
coroutine
on_first_start
(self) → None[source]¶ Called only the first time the service is started.
Return type: None
-
coroutine
on_init_extra_service
(self, service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]¶ Return type: ServiceT
[]
-
coroutine
on_start
(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
coroutine
on_started
(self) → None[source]¶ Called every time after the service is started/restarted.
Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
send
(self, channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send event to channel/topic.
Parameters: - channel (
Union
[ChannelT
[],str
]) – Channel/topic or the name of a topic to send event to. - key (
Union
[bytes
,ModelT
,Any
,None
]) – Message key. - value (
Union
[bytes
,ModelT
,Any
,None
]) – Message value. - partition (
Optional
[int
]) – Specific partition to send to. If not set the partition will be chosen by the partitioner. - key_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model). - value_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model). - callback (
Optional
[Callable
[[FutureMessage
[]],Union
[None
,Awaitable
[None
]]]]) –Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the
FutureMessage
future is passed to it.The resulting
faust.types.tuples.RecordMetadata
object is then available asfut.result()
.
Return type: - channel (
-
coroutine
start_client
(self) → None[source]¶ Start the app in Client-Only mode necessary for RPC requests.
Notes
Once started as a client the app cannot be restarted as Server.
Return type: None
-
transport
¶ Message transport. :rtype:
TransportT
-
cache
¶ Return type: CacheBackendT
[]
-
topics
[source]¶ Topic Conductor.
This is the mediator that moves messages fetched by the Consumer into the streams.
It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing
topic in app.topics
.
-
flow_control
[source]¶ Internal flow control.
This object controls flow into stream queues, and can also clear all buffers.
-
http_client
¶ HTTP Client Session. :rtype:
ClientSession
-
class
-
class
faust.
Channel
(app: faust.types.app.AppT, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Create new channel.
Parameters: - app (
AppT
[]) – The app that created this channel (app.channel()
) - key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for keys in this channel. - value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for values in this channel. - maxsize (
Optional
[int
]) – The maximum number of messages this channel can hold. If exceeded any newput
call will block until a message is removed from the channel. - loop (
Optional
[AbstractEventLoop
]) – The asyncio event loop to use.
-
queue
¶ Return type: ThrowableQueue
-
clone
(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]¶ Return type: ChannelT
[]
-
clone_using_queue
(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]¶ Return type: ChannelT
[]
-
stream
(**kwargs) → faust.types.streams.StreamT[source]¶ Create stream reading from this channel.
Return type: StreamT
[+T_co]
-
as_future_message
(key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → faust.types.tuples.FutureMessage[source]¶ Return type: FutureMessage
[]
-
prepare_key
(key: Union[bytes, faust.types.core.ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]¶ Return type: Any
-
prepare_value
(value: Union[bytes, faust.types.core.ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]¶ Return type: Any
-
coroutine
decode
(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Return type: EventT
[]
-
coroutine
get
(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ Return type: Any
-
coroutine
on_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_key_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_value_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
publish_message
(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to channel.
Return type: Awaitable
[RecordMetadata
]
- app (
-
class
faust.
ChannelT
(app: faust.types.channels.AppT, *, key_type: faust.types.channels.ModelArg = None, value_type: faust.types.channels.ModelArg = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: Optional[faust.types.channels.ChannelT] = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
clone
(*, is_iterator: bool = None, **kwargs) → faust.types.channels.ChannelT[source]¶ Return type: ChannelT
[]
-
clone_using_queue
(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]¶ Return type: ChannelT
[]
-
as_future_message
(key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → faust.types.tuples.FutureMessage[source]¶ Return type: FutureMessage
[]
-
prepare_key
(key: Union[bytes, faust.types.core.ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]¶ Return type: Any
-
prepare_value
(value: Union[bytes, faust.types.core.ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]¶ Return type: Any
-
queue
¶ Return type: ThrowableQueue
-
coroutine
decode
(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.channels.EventT[source]¶ Return type: EventT
-
coroutine
get
(self, *, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ Return type: Any
-
coroutine
on_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_key_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
on_value_decode_error
(self, exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
coroutine
publish_message
(self, fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
-
class
faust.
Event
(app: faust.types.app.AppT, key: Union[bytes, faust.types.core.ModelT, Any, None], value: Union[bytes, faust.types.core.ModelT, Any], message: faust.types.tuples.Message) → None[source]¶ An event received on a channel.
Notes
Events have a key and a value:
event.key, event.value
They also have a reference to the original message (if available), such as a Kafka record:
event.message.offset
Iteratiing over channels/topics yields Event:
- async for event in channel:
…
Iterating over a stream (that in turn iterate over channel) yields Event.value:
async for value in channel.stream() # value is event.value ...
If you only have a Stream object, you can also access underlying events by using
Stream.events
.For example:
async for event in channel.stream.events(): ...
Also commonly used for finding the “current event” related to a value in the stream:
stream = channel.stream() async for event in stream.events(): event = stream.current_event message = event.message topic = event.message.topic
You can retrieve the current event in a stream to:
- Get access to the serialized key+value.
- Get access to message properties like, what topic+partition the value was received on, or its offset.
If you want access to both key and value, you should use
stream.items()
instead.async for key, value in stream.items(): ...
stream.current_event
can also be accessed but you must take extreme care you are using the correct stream object. Methods such as.group_by(key)
and.through(topic)
returns cloned stream objects, so in the example:The best way to access the current_event in an agent is to use the contextvar:
from faust import current_event @app.agent(topic) async def process(stream): async for value in stream: event = current_event()
-
coroutine
forward
(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core.ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core.ModelT, Any] = <object object>, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Forward original message (will not be reserialized).
Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core.ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core.ModelT, Any] = <object object>, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send object to channel.
Return type: Awaitable
[RecordMetadata
]
-
class
faust.
EventT
(app: faust.types.events.AppT, key: Union[bytes, faust.types.core.ModelT, Any, None], value: Union[bytes, faust.types.core.ModelT, Any], message: faust.types.tuples.Message) → None[source]¶ -
app
¶
-
key
¶
-
value
¶
-
message
¶
-
acked
¶
-
coroutine
forward
(self, channel: Union[str, faust.types.events.ChannelT], key: Any = None, value: Any = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, channel: Union[str, faust.types.events.ChannelT], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
-
class
faust.
ModelOptions
(*args, **kwargs)[source]¶ -
serializer
= None¶
-
include_metadata
= True¶
-
allow_blessed_key
= False¶
-
isodates
= False¶
-
decimals
= False¶
-
coercions
= None¶
-
fields
= None¶ Flattened view of __annotations__ in MRO order.
Type: Index
-
fieldset
= None¶ Set of required field names, for fast argument checking.
Type: Index
-
fieldpos
= None¶ Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.
Type: Index
-
optionalset
= None¶ Set of optional field names, for fast argument checking.
Type: Index
-
models
= None¶ Mapping of fields that are ModelT
Type: Index
-
modelattrs
= None¶
-
field_coerce
= None¶ Mapping of fields that need to be coerced. Key is the name of the field, value is the coercion handler function.
Type: Index
-
defaults
= None¶ Mapping of field names to default value.
-
initfield
= None¶ Mapping of init field conversion callbacks.
-
clone_defaults
() → faust.types.models.ModelOptions[source]¶ Return type: ModelOptions
-
-
class
faust.
Record
→ None[source]¶ Describes a model type that is a record (Mapping).
Examples
>>> class LogEvent(Record, serializer='json'): ... severity: str ... message: str ... timestamp: float ... optional_field: str = 'default value'
>>> event = LogEvent( ... severity='error', ... message='Broken pact', ... timestamp=666.0, ... )
>>> event.severity 'error'
>>> serialized = event.dumps() '{"severity": "error", "message": "Broken pact", "timestamp": 666.0}'
>>> restored = LogEvent.loads(serialized) <LogEvent: severity='error', message='Broken pact', timestamp=666.0>
>>> # You can also subclass a Record to create a new record >>> # with additional fields >>> class RemoteLogEvent(LogEvent): ... url: str
>>> # You can also refer to record fields and pass them around: >>> LogEvent.severity >>> <FieldDescriptor: LogEvent.severity (str)>
-
classmethod
from_data
(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source]¶ Return type: Record
-
classmethod
-
class
faust.
Monitor
(*, max_avg_history: int = 100, max_commit_latency_history: int = 30, max_send_latency_history: int = 30, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: List[float] = None, commit_latency: List[float] = None, send_latency: List[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, **kwargs) → None[source]¶ Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
-
max_avg_history
= 0¶ Max number of total run time values to keep to build average.
-
max_commit_latency_history
= 0¶ Max number of commit latency numbers to keep.
-
max_send_latency_history
= 0¶ Max number of send latency numbers to keep.
-
tables
= None¶ Mapping of tables
-
commit_latency
= None¶ List of commit latency values
-
send_latency
= None¶ List of send latency values
-
messages_active
= 0¶ Number of messages currently being processed.
-
messages_received_total
= 0¶ Number of messages processed in total.
-
messages_received_by_topic
= None¶ Count of messages received by topic
-
messages_sent
= 0¶ Number of messages sent in total.
-
messages_sent_by_topic
= None¶ Number of messages sent by topic.
-
messages_s
= 0¶ Number of messages being processed this second.
-
events_active
= 0¶ Number of events currently being processed.
-
events_total
= 0¶ Number of events processed in total.
-
events_by_task
= None¶ Count of events processed by task
-
events_by_stream
= None¶ Count of events processed by stream
-
events_s
= 0¶ Number of events being processed this second.
-
events_runtime_avg
= 0.0¶ Average event runtime over the last second.
-
events_runtime
= None¶ List of run times used for averages
-
topic_buffer_full
= None¶ Counter of times a topics buffer was full
-
metric_counts
= None¶ Arbitrary counts added by apps
-
tp_committed_offsets
= None¶ Last committed offsets by TopicPartition
-
tp_read_offsets
= None¶ Last read offsets by TopicPartition
-
tp_end_offsets
= None¶ Log end offsets by TopicPartition
-
logger
= <Logger faust.sensors.monitor (WARNING)>¶
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
Return type: None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Message sent to a stream as an event.
Return type: None
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.Return type: None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
Return type: None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
Return type: None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
Return type: None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
Return type: None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
Return type: None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
Return type: Any
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
Return type: None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]¶ About to send a message.
Return type: Any
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any) → None[source]¶ Message successfully sent.
Return type: None
-
-
class
faust.
Sensor
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Base class for sensors.
This sensor does not do anything at all, but can be subclassed to create new monitors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
Return type: None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Message sent to a stream as an event.
Return type: None
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.Return type: None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
Return type: None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
Return type: None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
Return type: None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
Return type: None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
Return type: None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
Return type: Any
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
Return type: None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]¶ About to send a message.
Return type: Any
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any) → None[source]¶ Message successfully sent.
Return type: None
-
logger
= <Logger faust.sensors.base (WARNING)>¶
-
-
class
faust.
Codec
(children: Tuple[faust.types.codecs.CodecT, ...] = None, **kwargs) → None[source]¶ Base class for codecs.
-
children
= None¶ next steps in the recursive codec chain.
x = pickle | binary
returns codec with children set to(pickle, binary)
.
-
nodes
= None¶ cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.
-
-
class
faust.
Stream
(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ A stream: async iterator processing events in channels/topics.
-
logger
= <Logger faust.streams (WARNING)>¶
-
mundane_level
= 'debug'¶
-
get_active_stream
() → faust.types.streams.StreamT[source]¶ Return the currently active stream.
A stream can be derived using
Stream.group_by
etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:>>> @app.agent() ... async def agent(a): .. a = a ... b = a.group_by(Withdrawal.account_id) ... c = b.through('backup_topic') ... async for value in c: ... ...
The return value of
a.get_active_stream()
would bec
.Notes
The chain of streams that leads to the active stream is decided by the
_next
attribute. To get to the active stream we just traverse this linked-list:>>> def get_active_stream(self): ... node = self ... while node._next: ... node = node._next
Return type: StreamT
[+T_co]
-
add_processor
(processor: Callable[T]) → None[source]¶ Add processor callback executed whenever a new event is received.
Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.
For example a processor handling a stream of numbers may modify the value:
def double(value: int) -> int: return value * 2 stream.add_processor(double)
Return type: None
-
info
() → Mapping[str, Any][source]¶ Return stream settings as a dictionary.
Return type: Mapping
[str
,Any
]
-
clone
(**kwargs) → faust.types.streams.StreamT[source]¶ Create a clone of this stream.
Notes
If the cloned stream is supposed to “supercede” this stream, like in
group_by
/through
/etc., you should use_chain()
instead so stream._next = cloned_stream is set andget_active_stream()
returns the cloned stream.Return type: StreamT
[+T_co]
-
events
() → AsyncIterable[faust.types.events.EventT][source]¶ Iterate over the stream as events exclusively.
This means the stream must be iterating over a channel, or at least an iterable of event objects.
Return type: AsyncIterable
[EventT
[]]
-
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ Enumerate values received on this stream.
Unlike Python’s built-in
enumerate
, this works with async generators.Return type: AsyncIterable
[Tuple
[int
, +T_co]]
-
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to in this stream to channel.
Send messages received on this stream to another channel, and return a new stream that consumes from that channel.
Notes
The messages are forwarded after any processors have been applied.
Example
topic = app.topic('foo') @app.agent(topic) async def mytask(stream): async for value in stream.through(app.topic('bar')): # value was first received in topic 'foo', # then forwarded and consumed from topic 'bar' print(value)
Return type: StreamT
[+T_co]
-
echo
(*channels) → faust.types.streams.StreamT[source]¶ Forward values to one or more channels.
Unlike
through()
, we don’t consume from these channels.Return type: StreamT
[+T_co]
-
group_by
(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core.ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]¶ Create new stream that repartitions the stream using a new key.
Parameters: - key (
Union
[FieldDescriptorT
,Callable
[[~T],Union
[bytes
,ModelT
,Any
,None
]]]) –The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.
- Note: The
name
argument must be provided if the key - argument is a callable.
- Note: The
- name (
Optional
[str
]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.
Examples
Using a field descriptor to use a field in the event as the new key:
s = withdrawals_topic.stream() # values in this stream are of type Withdrawal async for event in s.group_by(Withdrawal.account_id): ...
Using an async callable to extract a new key:
s = withdrawals_topic.stream() async def get_key(withdrawal): return await aiohttp.get( f'http://e.com/resolve_account/{withdrawal.account_id}') async for event in s.group_by(get_key): ...
Using a regular callable to extract a new key:
s = withdrawals_topic.stream() def get_key(withdrawal): return withdrawal.account_id.upper() async for event in s.group_by(get_key): ...
Return type: StreamT
[+T_co]- key (
-
derive_topic
(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ Create Topic description derived from the K/V type of this stream.
Parameters: - name (
str
) – Topic name. - key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific key type to use for this topic. If not set, the key type of this stream will be used. - value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.
Raises: ValueError
– if the stream channel is not a topic.Return type: TopicT
[]- name (
-
coroutine
ack
(self, event: faust.types.events.EventT) → bool[source]¶ Ack event.
This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.
Parameters: event ( EventT
[]) – Event to ack.Return type: bool
-
items
() → AsyncIterator[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], T_co]][source]¶ Iterate over the stream as
key, value
pairs.Examples
@app.agent(topic) async def mytask(stream): async for key, value in stream.items(): print(key, value)
Return type: AsyncIterator
[Tuple
[Union
[bytes
,ModelT
,Any
,None
], +T_co]]
-
coroutine
on_start
(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
remove_from_stream
(self, stream: faust.types.streams.StreamT) → None[source]¶ Return type: None
-
coroutine
send
(self, value: T_contra) → None[source]¶ Send value into stream locally (bypasses topic).
Return type: None
-
take
(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]¶ Buffer n values at a time and yield a list of buffered values.
Parameters: within ( Union
[timedelta
,float
,str
]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).Return type: AsyncIterable
[Sequence
[+T_co]]
-
-
class
faust.
StreamT
(channel: AsyncIterator[T_co] = None, *, app: faust.types.streams.AppT = None, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.streams.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: Optional[faust.types.streams.StreamT] = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
outbox
= None¶
-
join_strategy
= None¶
-
task_owner
= None¶
-
current_event
= None¶
-
active_partitions
= None¶
-
concurrency_index
= None¶
-
enable_acks
= True¶
-
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ Return type: AsyncIterable
[Tuple
[int
, +T_co]]
-
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Return type: StreamT
[+T_co]
-
group_by
(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core.ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None) → faust.types.streams.StreamT[source]¶ Return type: StreamT
[+T_co]
-
derive_topic
(name: str, *, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ Return type: TopicT
[]
-
coroutine
items
(self) → AsyncIterator[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], T_co]][source]¶
-
-
faust.
current_event
() → Optional[faust.types.events.EventT][source]¶ Return the event currently being processed, or None.
Return type: Optional
[EventT
[]]
-
class
faust.
SetTable
(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]¶ -
WindowWrapper
¶ alias of
SetWindowWrapper
-
logger
= <Logger faust.tables.sets (WARNING)>¶
-
-
class
faust.
Table
(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, **kwargs) → None[source]¶ Table (non-windowed).
-
class
WindowWrapper
(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables.FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None¶ Windowed table wrapper.
A windowed table does not return concrete values when keys are accessed, instead
WindowSet
is returned so that the values can be further reduced to the wanted time period.-
ValueType
¶ alias of
WindowSet
-
clone
(relative_to: Union[faust.types.tables.FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT¶ Return type: WindowWrapperT
[]
-
get_relative_timestamp
¶ Return type: Optional
[Callable
[[Optional
[EventT
[]]],Union
[float
,datetime
]]]
-
key_index
= False¶
-
key_index_table
= None¶
-
on_del_key
(key: Any) → None¶ Return type: None
-
on_recover
(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]¶ Return type: Callable
[[],Awaitable
[None
]]
-
on_set_key
(key: Any, value: Any) → None¶ Return type: None
-
relative_to
(ts: Union[faust.types.tables.FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT¶ Return type: WindowWrapperT
[]
-
relative_to_field
(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT¶ Return type: WindowWrapperT
[]
-
relative_to_now
() → faust.types.tables.WindowWrapperT¶ Return type: WindowWrapperT
[]
-
relative_to_stream
() → faust.types.tables.WindowWrapperT¶ Return type: WindowWrapperT
[]
-
values
(event: faust.types.events.EventT = None) → ValuesView¶ Return type: ValuesView
[+VT_co]
-
-
using_window
(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Return type: WindowWrapperT
[]
-
hopping
(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Return type: WindowWrapperT
[]
-
tumbling
(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Return type: WindowWrapperT
[]
-
on_key_set
(key: KT, value: VT) → None[source]¶ Handle that value for a key is being set.
Return type: None
-
logger
= <Logger faust.tables.table (WARNING)>¶
-
class
-
class
faust.
Topic
(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Define new topic description.
Parameters: - app (
AppT
[]) – App instance used to create this topic description. - topics (
Optional
[Sequence
[str
]]) – List of topic names. - partitions (
Optional
[int
]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, andautoCreateTopics
is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration. - retention (
Union
[timedelta
,float
,str
,None
]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server. - pattern (
Union
[str
,Pattern
[AnyStr
],None
]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern. - key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – How to deserialize keys for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” - value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – How to deserialize values for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” - active_partitions (
Optional
[Set
[TP
]]) – Set offaust.types.tuples.TP
that this topic should be restricted to.
Raises: TypeError
– if both topics and pattern is provided.-
derive
(**kwargs) → faust.types.channels.ChannelT[source]¶ Create new
Topic
derived from this topic.Configuration will be copied from this topic, but any parameter overriden as a keyword argument.
See also
derive_topic()
: for a list of supported keyword arguments.Return type: ChannelT
[]
-
derive_topic
(*, topics: Sequence[str] = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]¶ Return type: TopicT
[]
-
coroutine
decode
(self, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Return type: EventT
[]
-
coroutine
publish_message
(self, fut: faust.types.tuples.FutureMessage, wait: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send
(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to topic.
Return type: Awaitable
[RecordMetadata
]
-
prepare_key
(key: Union[bytes, faust.types.core.ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None]) → Any[source]¶ Return type: Any
- app (
-
class
faust.
TopicT
(app: faust.types.topics.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, key_type: faust.types.topics.ModelArg = None, value_type: faust.types.topics.ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
topics
= None¶ Iterable/Sequence of topic names to subscribe to.
-
retention
= None¶ expiry time in seconds for messages in the topic.
Type: Topic retention setting
-
compacting
= None¶ if the topic is a log of key/value pairs, the broker can delete old values for the same key.
Type: Flag that when enabled means the topic can be “compacted”
-
replicas
= None¶ Number of replicas for topic.
-
config
= None¶ Additional configuration as a mapping.
-
acks
= None¶ Enable acks for this topic.
-
internal
= None¶ it’s owned by us and we are allowed to create or delete the topic as necessary.
Type: Mark topic as internal
-
pattern
¶ or instead of
topics
, a regular expression used to match topics we want to subscribe to. :rtype:Optional
[Pattern
[AnyStr
]]
-
derive_topic
(*, topics: Sequence[str] = None, key_type: faust.types.topics.ModelArg = None, value_type: faust.types.topics.ModelArg = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs) → faust.types.topics.TopicT[source]¶ Return type: TopicT
[]
-
-
class
faust.
Settings
(id: str, *, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, topic_replication_factor: int = None, topic_partitions: int = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, consumer_max_fetch_size: int = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs) → None[source]¶ -
-
id_format
= '{id}-v{self.version}'¶
-
ssl_context
= None¶
-
autodiscover
= False¶
-
broker_client_id
= 'faust-1.4.6'¶
-
timezone
= datetime.timezone.utc¶
-
broker_commit_every
= 10000¶
-
broker_check_crcs
= True¶
-
key_serializer
= 'raw'¶
-
value_serializer
= 'json'¶
-
table_standby_replicas
= 1¶
-
topic_replication_factor
= 1¶
-
topic_partitions
= 8¶
-
reply_create_topic
= False¶
-
stream_buffer_maxsize
= 4096¶
-
stream_wait_empty
= True¶
-
stream_ack_cancelled_tasks
= False¶
-
stream_ack_exceptions
= True¶
-
stream_publish_on_commit
= False¶
-
producer_linger_ms
= 0¶
-
producer_max_batch_size
= 4096¶
-
producer_acks
= -1¶
-
producer_max_request_size
= 1000000¶
-
producer_compression_type
= None¶
-
consumer_max_fetch_size
= 4194304¶
-
web_bind
= '0.0.0.0'¶
-
web_port
= 6066¶
-
web_host
= 'build-8661279-project-230058-faust'¶
-
worker_redirect_stdouts
= True¶
-
worker_redirect_stdouts_level
= 'WARN'¶
-
reply_to_prefix
= 'f-reply-'¶
-
producer_partitioner
¶ Return type: Optional
[Callable
[[Optional
[bytes
],Sequence
[int
],Sequence
[int
]],int
]]
-
agent_supervisor
¶ Return type: Type
[SupervisorStrategyT
]
-
TableManager
¶ Return type: Type
[TableManagerT
[]]
-
PartitionAssignor
¶ Return type: Type
[PartitionAssignorT
]
-
LeaderAssignor
¶ Return type: Type
[LeaderAssignorT
[]]
-
-
class
faust.
HoppingWindow
(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → None[source]¶ Hopping window type.
Fixed-size, overlapping windows.
-
ranges
(timestamp: float) → List[faust.types.windows.WindowRange][source]¶ Return type: List
[WindowRange
]
-
current
(timestamp: float) → faust.types.windows.WindowRange[source]¶ The current WindowRange is the latest WindowRange for a given timestamp
Return type: WindowRange
-
delta
(timestamp: float, d: Union[datetime.timedelta, float, str]) → faust.types.windows.WindowRange[source]¶ Return type: WindowRange
-
earliest
(timestamp: float) → faust.types.windows.WindowRange[source]¶ Return type: WindowRange
-
-
class
faust.
TumblingWindow
(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → None[source]¶ Tumbling window type.
Fixed-size, non-overlapping, gap-less windows.
-
class
faust.
SlidingWindow
(before: Union[datetime.timedelta, float, str], after: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str]) → None[source]¶ Sliding window type.
Fixed-size, overlapping windows that work on differences between record timestamps
-
ranges
(timestamp: float) → List[faust.types.windows.WindowRange][source]¶ Return list of windows from timestamp.
Notes
SELECT * FROM s1, s2 WHERE s1.key = s2.key AND s1.ts - before <= s2.ts AND s2.ts <= s1.ts + after
Return type: List
[WindowRange
]
-
-
class
faust.
Worker
(app: faust.types.app.AppT, *services, sensors: Iterable[faust.types.sensors.SensorT] = None, debug: bool = False, quiet: bool = False, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, blocking_timeout: float = 10.0, workdir: Union[pathlib.Path, str] = None, console_port: int = 50101, loop: asyncio.events.AbstractEventLoop = None, redirect_stdouts: bool = None, redirect_stdouts_level: int = None, **kwargs) → None[source]¶ Worker.
- Usage:
You can start a worker using:
the faust worker program.
instantiating Worker programmatically and calling execute_from_commandline():
>>> worker = Worker(app) >>> worker.execute_from_commandline()
or if you already have an event loop, calling
await start
, but in that case you are responsible for gracefully shutting down the event loop:async def start_worker(worker: Worker) -> None: await worker.start() def manage_loop(): loop = asyncio.get_event_loop() worker = Worker(app, loop=loop) try: loop.run_until_complete(start_worker(worker) finally: worker.stop_and_shutdown_loop()
Parameters: - app (
AppT
[]) – The Faust app to start. - *services – Services to start with worker. This includes application instances to start.
- sensors (Iterable[SensorT]) – List of sensors to include.
- debug (bool) – Enables debugging mode [disabled by default].
- quiet (bool) – Do not output anything to console [disabled by default].
- loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.
- logfile (Union[str, IO]) – Name of file or a stream to log to.
- stdout (IO) – Standard out stream.
- stderr (IO) – Standard err stream.
- blocking_timeout (float) – When
debug
is enabled this sets the timeout for detecting that the event loop is blocked. - workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.
- loop (asyncio.AbstractEventLoop) – Custom event loop object.
-
logger
= <Logger faust.worker (WARNING)>¶
-
app
= None¶ The Faust app started by this worker.
-
sensors
= None¶ Additional sensors to add to the Faust app.
-
workdir
= None¶ Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Callback to be used to add service dependencies.
Return type: Iterable
[ServiceT
[]]
-
class
faust.
Service
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ An asyncio service that can be started/stopped/restarted.
Keyword Arguments: - beacon (NodeT) – Beacon used to track services in a graph.
- loop (asyncio.AbstractEventLoop) – Event loop object.
-
abstract
= False¶
-
class
Diag
(service: mode.types.services.ServiceT) → None¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
set_flag
(flag: str) → None¶ Return type: None
-
unset_flag
(flag: str) → None¶ Return type: None
-
-
wait_for_shutdown
= False¶ Set to True if .stop must wait for the shutdown flag to be set.
-
shutdown_timeout
= 60.0¶ Time to wait for shutdown flag set before we give up.
-
restart_count
= 0¶ Current number of times this service instance has been restarted.
-
mundane_level
= 'info'¶ The log level for mundane info such as starting, stopping, etc. Set this to
"debug"
for less information.
-
classmethod
from_awaitable
(coro: Awaitable, *, name: str = None, **kwargs) → mode.types.services.ServiceT[source]¶ Return type: ServiceT
[]
-
classmethod
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]¶ Decorator used to define a service background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
Return type: ServiceTask
-
classmethod
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable[mode.types.services.ServiceT, Awaitable[None]], mode.services.ServiceTask][source]¶ A background timer that executes every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
Return type: Callable
[[Callable
[[ServiceT
[]],Awaitable
[None
]]],ServiceTask
]
-
classmethod
transitions_to
(flag: str) → Callable[source]¶ Decorator that adds diagnostic flag while function is running.
Return type: Callable
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Add dependency to other service.
The service will be started/stopped with this service.
Return type: ServiceT
[]
-
add_future
(coro: Awaitable) → _asyncio.Future[source]¶ Add relationship to asyncio.Future.
The future will be joined when this service is stopped.
Return type: Future
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Callback to be used to add service dependencies.
Return type: Iterable
[ServiceT
[]]
-
coroutine
add_runtime_dependency
(self, service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Return type: ServiceT
[]
-
coroutine
crash
(self, reason: BaseException) → None[source]¶ Crash the service and all child services.
Return type: None
-
coroutine
join_services
(self, services: Sequence[mode.types.services.ServiceT]) → None[source]¶ Return type: None
-
logger
= <Logger mode.services (WARNING)>¶
-
coroutine
maybe_start
(self) → None[source]¶ Start the service, if it has not already been started.
Return type: None
-
coroutine
sleep
(self, n: Union[datetime.timedelta, float, str]) → None[source]¶ Sleep for
n
seconds, or until service stopped.Return type: None
-
coroutine
transition_with
(self, flag: str, fut: Awaitable, *args, **kwargs) → Any[source]¶ Return type: Any
-
coroutine
wait
(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]¶ Wait for coroutines to complete, or until the service stops.
Return type: WaitResult
-
coroutine
wait_first
(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]¶ Return type: WaitResults
-
coroutine
wait_for_stopped
(self, *coros, timeout: Union[datetime.timedelta, float, str] = None) → bool[source]¶ Return type: bool
-
coroutine
wait_many
(self, coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]¶ Return type: WaitResult
-
coroutine
wait_until_stopped
(self) → None[source]¶ Wait until the service is signalled to stop.
Return type: None
-
set_shutdown
() → None[source]¶ Set the shutdown signal.
Notes
If
wait_for_shutdown
is set, stopping the service will wait for this flag to be set.Return type: None
-
class
faust.
ServiceT
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Abstract type for an asynchronous service that can be started/stopped.
See also
-
wait_for_shutdown
= False¶
-
restart_count
= 0¶
-
supervisor
= None¶
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Return type: ServiceT
[]
-
coroutine
add_runtime_dependency
(self, service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Return type: ServiceT
[]
-
loop
¶ Return type: AbstractEventLoop
-