faust
¶
Python Stream processing.
-
class
faust.
Service
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ An asyncio service that can be started/stopped/restarted.
- Keyword Arguments
beacon (NodeT) – Beacon used to track services in a graph.
loop (asyncio.AbstractEventLoop) – Event loop object.
-
abstract
= False¶
-
class
Diag
(service: mode.types.services.ServiceT) → None¶ Service diagnostics.
This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:
DIAG_COMMITTING = 'committing' class Consumer(Service): @Service.task async def _background_commit(self) -> None: while not self.should_stop: await self.sleep(30.0) self.diag.set_flag(DIAG_COMITTING) try: await self._consumer.commit() finally: self.diag.unset_flag(DIAG_COMMITTING)
The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:
@Service.timer(30.0) async def _background_commit(self) -> None: await self.commit() @Service.transitions_with(DIAG_COMITTING) async def commit(self) -> None: await self._consumer.commit()
-
set_flag
(flag: str) → None¶ - Return type
None
-
unset_flag
(flag: str) → None¶ - Return type
None
-
-
wait_for_shutdown
= False¶ Set to True if .stop must wait for the shutdown flag to be set.
-
shutdown_timeout
= 60.0¶ Time to wait for shutdown flag set before we give up.
-
restart_count
= 0¶ Current number of times this service instance has been restarted.
-
mundane_level
= 'info'¶ The log level for mundane info such as starting, stopping, etc. Set this to
"debug"
for less information.
-
classmethod
from_awaitable
(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT[source]¶ - Return type
ServiceT
[]
-
classmethod
task
(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]¶ Decorate function to be used as background task.
Example
>>> class S(Service): ... ... @Service.task ... async def background_task(self): ... while not self.should_stop: ... await self.sleep(1.0) ... print('Waking up')
- Return type
ServiceTask
-
classmethod
timer
(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask][source]¶ Background timer executing every
n
seconds.Example
>>> class S(Service): ... ... @Service.timer(1.0) ... async def background_timer(self): ... print('Waking up')
-
classmethod
transitions_to
(flag: str) → Callable[source]¶ Decorate function to set and reset diagnostic flag.
- Return type
-
async
transition_with
(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) → Any[source]¶ - Return type
-
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Add dependency to other service.
The service will be started/stopped with this service.
- Return type
ServiceT
[]
-
async
add_runtime_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ - Return type
ServiceT
[]
-
async
remove_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ Stop and remove dependency of this service.
- Return type
ServiceT
[]
-
add_future
(coro: Awaitable) → _asyncio.Future[source]¶ Add relationship to asyncio.Future.
The future will be joined when this service is stopped.
- Return type
Future
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
async
join_services
(services: Sequence[mode.types.services.ServiceT]) → None[source]¶ - Return type
None
-
async
sleep
(n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Sleep for
n
seconds, or until service stopped.- Return type
None
-
async
wait_for_stopped
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → bool[source]¶ - Return type
-
async
wait
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]¶ Wait for coroutines to complete, or until the service stops.
- Return type
WaitResult
-
async
wait_many
(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]¶ - Return type
WaitResult
-
async
wait_first
(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]¶ - Return type
WaitResults
-
async
maybe_start
() → None[source]¶ Start the service, if it has not already been started.
- Return type
None
-
async
crash
(reason: BaseException) → None[source]¶ Crash the service and all child services.
- Return type
None
-
async
wait_until_stopped
() → None[source]¶ Wait until the service is signalled to stop.
- Return type
None
-
set_shutdown
() → None[source]¶ Set the shutdown signal.
Notes
If
wait_for_shutdown
is set, stopping the service will wait for this flag to be set.- Return type
None
-
itertimer
(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float][source]¶ Sleep
interval
seconds for every iteration.This is an async iterator that takes advantage of
timer_intervals()
to act as a timer that stop drift from occurring, and adds a tiny amount of drift to timers so that they don’t start at the same time.Uses
Service.sleep
which will bail-out-quick if the service is stopped.Note
Will sleep the full interval seconds before returning from first iteration.
Examples
>>> async for sleep_time in self.itertimer(1.0): ... print('another second passed, just woke up...') ... await perform_some_http_request()
- Return type
-
logger
= <Logger mode.services (WARNING)>¶
-
property
crash_reason
¶ - Return type
-
class
faust.
ServiceT
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Abstract type for an asynchronous service that can be started/stopped.
See also
-
wait_for_shutdown
= False¶
-
restart_count
= 0¶
-
supervisor
= None¶
-
abstract
add_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ - Return type
ServiceT
[]
-
abstract async
add_runtime_dependency
(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]¶ - Return type
ServiceT
[]
-
abstract property
loop
¶ - Return type
AbstractEventLoop
-
abstract property
crash_reason
¶ - Return type
-
-
class
faust.
Agent
(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, use_reply_headers: bool = None, **kwargs: Any) → None[source]¶ Agent.
This is the type of object returned by the
@app.agent
decorator.-
supervisor
= None¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of services dependencies required to start agent.
-
cancel
() → None[source]¶ Cancel agent and its actor instances running in this process.
- Return type
None
-
async
on_partitions_revoked
(revoked: Set[faust.types.tuples.TP]) → None[source]¶ Call when partitions are revoked.
- Return type
None
-
async
on_partitions_assigned
(assigned: Set[faust.types.tuples.TP]) → None[source]¶ Call when partitions are assigned.
- Return type
None
-
async
on_isolated_partitions_revoked
(revoked: Set[faust.types.tuples.TP]) → None[source]¶ Call when isolated partitions are revoked.
- Return type
None
-
async
on_isolated_partitions_assigned
(assigned: Set[faust.types.tuples.TP]) → None[source]¶ Call when isolated partitions are assigned.
- Return type
None
Call when non-isolated partitions are revoked.
- Return type
None
Call when non-isolated partitions are assigned.
- Return type
None
-
clone
(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs: Any) → faust.types.agents.AgentT[source]¶ Create clone of this agent object.
Keyword arguments can be passed to override any argument supported by
Agent.__init__
.- Return type
AgentT
[]
-
test_context
(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs: Any) → faust.types.agents.AgentTestWrapperT[source]¶ Create new unit-testing wrapper for this agent.
- Return type
-
actor_from_stream
(stream: Optional[faust.types.streams.StreamT], *, index: int = None, active_partitions: Set[faust.types.tuples.TP] = None, channel: faust.types.channels.ChannelT = None) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]¶ Create new actor from stream.
- Return type
ActorT
[]
-
add_sink
(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]¶ Add new sink to further handle results from this agent.
- Return type
None
-
stream
(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → faust.types.streams.StreamT[source]¶ Create underlying stream used by this agent.
- Return type
StreamT
[+T_co]
-
async
cast
(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → None[source]¶ RPC operation: like
ask()
but do not expect reply.Cast here is like “casting a spell”, and will not expect a reply back from the agent.
- Return type
None
-
async
ask
(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]¶ RPC operation: ask agent for result of processing value.
This version will wait until the result is available and return the processed value.
- Return type
-
async
ask_nowait
(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]¶ RPC operation: ask agent for result of processing value.
This version does not wait for the result to arrive, but instead returns a promise of future evaluation.
- Return type
-
async
send
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to topic used by agent.
- Return type
-
map
(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]¶ RPC map operation on a list of values.
A map operation iterates over results as they arrive. See
join()
andkvjoin()
if you want them in order.- Return type
AsyncIterator
[+T_co]
-
kvmap
(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]¶ RPC map operation on a list of
(key, value)
pairs.A map operation iterates over results as they arrive. See
join()
andkvjoin()
if you want them in order.- Return type
-
async
join
(values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ RPC map operation on a list of values.
A join returns the results in order, and only returns once all values have been processed.
-
async
kvjoin
(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ RPC map operation on list of
(key, value)
pairs.A join returns the results in order, and only returns once all values have been processed.
-
property
channel_iterator
¶ Return channel agent iterates over. :rtype:
AsyncIterator
[+T_co]
-
logger
= <Logger faust.agents.agent (WARNING)>¶
-
-
class
faust.
App
(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options: Any) → None[source]¶ Faust Application.
- Parameters
id (str) – Application ID.
- Keyword Arguments
loop (asyncio.AbstractEventLoop) – optional event loop to use.
See also
Application Parameters – for supported keyword arguments.
-
SCAN_CATEGORIES
= ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task']¶
-
class
BootStrategy
(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None¶ App startup strategy.
The startup strategy defines the graph of services to start when the Faust worker for an app starts.
-
agents
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start agents.
-
client_only
() → Iterable[mode.types.services.ServiceT]¶ Return services to start when app is in client_only mode.
-
enable_kafka
= True¶
-
enable_kafka_consumer
= None¶
-
enable_kafka_producer
= None¶
-
enable_sensors
= True¶
-
enable_web
= None¶
-
kafka_client_consumer
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start Kafka client consumer.
-
kafka_conductor
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start Kafka conductor.
-
kafka_consumer
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start Kafka consumer.
-
kafka_producer
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start Kafka producer.
-
producer_only
() → Iterable[mode.types.services.ServiceT]¶ Return services to start when app is in producer_only mode.
-
sensors
() → Iterable[mode.types.services.ServiceT]¶ Return list of services required to start sensors.
-
server
() → Iterable[mode.types.services.ServiceT]¶ Return services to start when app is in default mode.
-
tables
() → Iterable[mode.types.services.ServiceT]¶ Return list of table-related services.
-
web_components
() → Iterable[mode.types.services.ServiceT]¶ Return list of web-related services (excluding web server).
-
-
class
Settings
(id: str, *, debug: bool = None, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, broker_max_poll_interval: int = None, broker_consumer: Union[str, yarl.URL, List[yarl.URL]] = None, broker_producer: Union[str, yarl.URL, List[yarl.URL]] = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, table_key_index_size: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, topic_disable_leader: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Event: Union[_T, str] = None, Schema: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, GlobalTable: Union[_T, str] = None, SetGlobalTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs: Any) → None¶ -
-
property
GlobalTable
¶ - Return type
Type
[GlobalTableT
[]]
-
property
LeaderAssignor
¶ - Return type
-
property
PartitionAssignor
¶ - Return type
-
property
SetGlobalTable
¶ - Return type
Type
[GlobalTableT
[]]
-
property
TableManager
¶ - Return type
-
property
agent_supervisor
¶ - Return type
-
autodiscover
= False¶
-
broker_check_crcs
= True¶
-
broker_client_id
= 'faust-1.9.0'¶
-
broker_commit_every
= 10000¶
-
property
broker_credentials
¶ - Return type
-
broker_max_poll_interval
= 1000.0¶
-
consumer_auto_offset_reset
= 'earliest'¶
-
consumer_max_fetch_size
= 4194304¶
-
debug
= False¶
-
id_format
= '{id}-v{self.version}'¶
-
key_serializer
= 'raw'¶
-
logging_config
= None¶
-
property
processing_guarantee
¶ - Return type
-
producer_acks
= -1¶
-
producer_api_version
= 'auto'¶
-
producer_compression_type
= None¶
-
producer_linger_ms
= 0¶
-
producer_max_batch_size
= 16384¶
-
producer_max_request_size
= 1000000¶
-
property
producer_partitioner
¶
-
reply_create_topic
= False¶
-
reply_to_prefix
= 'f-reply-'¶
-
ssl_context
= None¶
-
stream_ack_cancelled_tasks
= True¶
-
stream_ack_exceptions
= True¶
-
stream_buffer_maxsize
= 4096¶
-
stream_publish_on_commit
= False¶
-
stream_wait_empty
= True¶
-
table_key_index_size
= 1000¶
-
table_standby_replicas
= 1¶
-
timezone
= datetime.timezone.utc¶
-
topic_allow_declare
= True¶
-
topic_disable_leader
= False¶
-
topic_partitions
= 8¶
-
topic_replication_factor
= 1¶
-
value_serializer
= 'json'¶
-
web_bind
= '0.0.0.0'¶
-
web_cors_options
= None¶
-
web_host
= 'build-10233069-project-230058-faust'¶
-
web_in_thread
= False¶
-
web_port
= 6066¶
-
worker_redirect_stdouts
= True¶
-
worker_redirect_stdouts_level
= 'WARN'¶
-
property
-
client_only
= False¶ Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).
-
producer_only
= False¶ Set this to True if app should run without consumer/tables.
-
tracer
= None¶ Optional tracing support.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of additional service dependencies.
The services returned will be started with the app when the app starts.
-
async
on_started_init_extra_tasks
() → None[source]¶ Call when started to start additional tasks.
- Return type
None
-
async
on_started_init_extra_services
() → None[source]¶ Call when initializing extra services at startup.
- Return type
None
-
async
on_init_extra_service
(service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]¶ Call when adding user services to this app.
- Return type
ServiceT
[]
-
config_from_object
(obj: Any, *, silent: bool = False, force: bool = False) → None[source]¶ Read configuration from object.
Object is either an actual object or the name of a module to import.
Examples
>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig >>> app.config_from_object(faustconfig)
-
discover
(*extra_modules: str, categories: Iterable[str] = None, ignore: Iterable[Any] = [<built-in method search of _sre.SRE_Pattern object>, '.__main__']) → None[source]¶ Discover decorators in packages.
- Return type
None
-
topic
(*topics: str, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]¶ Create topic description.
Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use:
channel()
.See also
- Return type
TopicT
[]
-
channel
(*, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]¶ Create new channel.
By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).
See also
- Return type
ChannelT
[]
-
agent
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
-
actor
(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]¶ Create Agent from async def function.
It can be a regular async function:
@app.agent() async def my_agent(stream): async for number in stream: print(f'Received: {number!r}')
Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:
@app.agent(sink=[log_topic]) async def my_agent(requests): async for number in requests: yield number * 2
-
task
(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]¶ Define an async def function to be started with the app.
This is like
timer()
but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).The function may take zero, or one argument. If the target function takes an argument, the
app
argument is passed:>>> @app.task >>> async def on_startup(app): ... print('STARTING UP: %r' % (app,))
Nullary functions are also supported:
>>> @app.task >>> async def on_startup(): ... print('STARTING UP')
-
timer
(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) → Callable[source]¶ Define an async def function to be run at periodic intervals.
Like
task()
, but executes periodically until the worker is shut down.This decorator takes an async function and adds it to a list of timers started with the app.
- Parameters
interval (Seconds) – How often the timer executes in seconds.
on_leader (bool) – Should the timer only run on the leader?
Example
>>> @app.timer(interval=10.0) >>> async def every_10_seconds(): ... print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True) >>> async def every_5_seconds(): ... print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
- Return type
-
crontab
(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]¶ Define periodic task using Crontab description.
This is an
async def
function to be run at the fixed times, defined by the Cron format.Like
timer()
, but executes at fixed times instead of executing at certain intervals.This decorator takes an async function and adds it to a list of Cronjobs started with the app.
- Parameters
cron_format (
str
) – The Cron spec defining fixed times to run the decorated function.- Keyword Arguments
timezone – The timezone to be taken into account for the Cron jobs. If not set value from
timezone
will be taken.on_leader – Should the Cron job only run on the leader?
Example
>>> @app.crontab(cron_format='30 18 * * *', timezone=pytz.timezone('US/Pacific')) >>> async def every_6_30_pm_pacific(): ... print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True) >>> async def every_6_30_pm(): ... print('6:30pm UTC; ALSO, I AM THE LEADER!')
- Return type
-
service
(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]¶ Decorate
mode.Service
to be started with the app.Examples
from mode import Service @app.service class Foo(Service): ...
-
stream
(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs: Any) → faust.types.streams.StreamT[source]¶ Create new stream from channel/topic/iterable/async iterable.
- Parameters
channel (
Union
[AsyncIterable
[+T_co],Iterable
[+T_co]]) – Iterable to stream over (async or non-async).
- Return type
StreamT
[+T_co]- Returns
to iterate over events in the stream.
- Return type
-
Table
(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]¶ Define new table.
- Parameters
name (
str
) – Name used for table, note that two tables living in the same application cannot have the same name.default (
Optional
[Callable
[[],Any
]]) – A callable, or type that will return a default value for keys missing in this table.window (
Optional
[WindowT
]) – A windowing strategy to wrap this window in.
Examples
>>> table = app.Table('user_to_amount', default=int) >>> table['George'] 0 >>> table['Elaine'] += 1 >>> table['Elaine'] += 1 >>> table['Elaine'] 2
- Return type
TableT
[~KT, ~VT]
-
GlobalTable
(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.GlobalTableT[source]¶ Define new global table.
- Parameters
name (
str
) – Name used for global table, note that two global tables living in the same application cannot have the same name.default (
Optional
[Callable
[[],Any
]]) – A callable, or type that will return a default valu for keys missing in this global table.window (
Optional
[WindowT
]) – A windowing strategy to wrap this window in.
Examples
>>> gtable = app.GlobalTable('user_to_amount', default=int) >>> gtable['George'] 0 >>> gtable['Elaine'] += 1 >>> gtable['Elaine'] += 1 >>> gtable['Elaine'] 2
- Return type
-
SetTable
(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]¶ Table of sets.
- Return type
TableT
[~KT, ~VT]
-
SetGlobalTable
(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]¶ Table of sets (global).
- Return type
TableT
[~KT, ~VT]
-
page
(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]¶ Decorate view to be included in the web server.
-
table_route
(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None, exact_key: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]¶ Decorate view method to route request to table key destination.
- Return type
Callable
[[Union
[Callable
[[View
,Request
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]],Callable
[[View
,Request
,Any
,Any
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]]]],Union
[Callable
[[View
,Request
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]],Callable
[[View
,Request
,Any
,Any
],Union
[Coroutine
[Any
,Any
,Response
],Awaitable
[Response
]]]]]
-
command
(*options: Any, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs: Any) → Callable[Callable, Type[faust.app.base._AppCommand]][source]¶ Decorate
async def
function to be used as CLI command.
-
create_event
(key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → faust.types.events.EventT[source]¶ Create new
faust.Event
object.- Return type
EventT
[]
-
async
start_client
() → None[source]¶ Start the app in Client-Only mode necessary for RPC requests.
Notes
Once started as a client the app cannot be restarted as Server.
- Return type
None
-
async
maybe_start_client
() → None[source]¶ Start the app in Client-Only mode if not started as Server.
- Return type
None
-
trace
(name: str, trace_enabled: bool = True, **extra_context: Any) → ContextManager[source]¶ Return new trace context to trace operation using OpenTracing.
- Return type
ContextManager
[+T_co]
-
traced
(fun: Callable, name: str = None, sample_rate: float = 1.0, **context: Any) → Callable[source]¶ Decorate function to be traced using the OpenTracing API.
- Return type
-
async
send
(channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send event to channel/topic.
- Parameters
channel (
Union
[ChannelT
[],str
]) – Channel/topic or the name of a topic to send event to.partition (
Optional
[int
]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.timestamp (
Optional
[float
]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.headers (
Union
[List
[Tuple
[str
,bytes
]],Mapping
[str
,bytes
],None
]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.schema (
Optional
[SchemaT
[~KT, ~VT]]) –Schema
to use for serialization.key_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model). Overrides schema if one is specified.value_serializer (
Union
[CodecT
,str
,None
]) – Serializer to use (if value is not model). Overrides schema if one is specified.callback (
Optional
[Callable
[[FutureMessage
[]],Union
[None
,Awaitable
[None
]]]]) –Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the
FutureMessage
future is passed to it.The resulting
faust.types.tuples.RecordMetadata
object is then available asfut.result()
.
- Return type
-
LiveCheck
(**kwargs: Any) → faust.app.base._LiveCheck[source]¶ Return new LiveCheck instance testing features for this app.
- Return type
_LiveCheck
-
async
commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ Commit offset for acked messages in specified topics’.
Warning
This will commit acked messages in all topics if the topics argument is passed in as
None
.- Return type
-
async
on_stop
() → None[source]¶ Call when application stops.
Tip
Remember to call
super
if you override this method.- Return type
None
-
FlowControlQueue
(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]¶ Like
asyncio.Queue
, but can be suspended/resumed.- Return type
-
Worker
(**kwargs: Any) → faust.app.base._Worker[source]¶ Return application worker instance.
- Return type
_Worker
-
on_webserver_init
(web: faust.types.web.Web) → None[source]¶ Call when the Web server is initializing.
- Return type
None
-
property
transport
¶ Consumer message transport. :rtype:
TransportT
-
logger
= <Logger faust.app.base (WARNING)>¶
-
property
producer_transport
¶ Producer message transport. :rtype:
TransportT
-
property
cache
¶ Cache backend. :rtype:
CacheBackendT
[]
-
topics
[source]¶ Topic Conductor.
This is the mediator that moves messages fetched by the Consumer into the streams.
It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing
topic in app.topics
.
-
flow_control
[source]¶ Flow control of streams.
This object controls flow into stream queues, and can also clear all buffers.
-
property
http_client
¶ HTTP client Session. :rtype:
ClientSession
-
class
faust.
GSSAPICredentials
(*, kerberos_service_name: str = 'kafka', kerberos_domain_name: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]¶ Describe GSSAPI credentials over SASL.
-
protocol
= 'SASL_PLAINTEXT'¶
-
mechanism
= 'GSSAPI'¶
-
-
class
faust.
SASLCredentials
(*, username: str = None, password: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]¶ Describe SASL credentials.
-
protocol
= 'SASL_PLAINTEXT'¶
-
mechanism
= 'PLAIN'¶
-
-
class
faust.
SSLCredentials
(context: ssl.SSLContext = None, *, purpose: Any = None, cafile: Optional[str] = None, capath: Optional[str] = None, cadata: Optional[str] = None) → None[source]¶ Describe SSL credentials/settings.
-
protocol
= 'SSL'¶
-
-
class
faust.
Channel
(app: faust.types.app.AppT, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Create new channel.
- Parameters
app (
AppT
[]) – The app that created this channel (app.channel()
)schema (
Optional
[SchemaT
[~KT, ~VT]]) – Schema used for serialization/deserializationkey_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for keys in this channel. (overrides schema if one is defined)value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – The Model used for values in this channel. (overrides schema if one is defined)maxsize (
Optional
[int
]) – The maximum number of messages this channel can hold. If exceeded any newput
call will block until a message is removed from the channel.is_iterator (
bool
) – When streams iterate over a channel they will callstream.clone(is_iterator=True)
so this attribute denotes that this channel instance is currently being iterated over.active_partitions (
Optional
[Set
[TP
]]) – Set of active topic partitions this channel instance is assigned to.loop (
Optional
[AbstractEventLoop
]) – Theasyncio
event loop to use.
-
property
queue
¶ Return the underlying queue/buffer backing this channel. :rtype:
ThrowableQueue
-
clone
(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]¶ Create clone of this channel.
- Parameters
is_iterator (
Optional
[bool
]) – Set to True if this is now a channel that is being iterated over.- Keyword Arguments
**kwargs – Any keyword arguments passed will override any of the arguments supported by
Channel.__init__
.- Return type
ChannelT
[]
-
clone_using_queue
(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]¶ Create clone of this channel using specific queue instance.
- Return type
ChannelT
[]
-
stream
(**kwargs: Any) → faust.types.streams.StreamT[source]¶ Create stream reading from this channel.
- Return type
StreamT
[+T_co]
-
get_topic_name
() → str[source]¶ Get the topic name, or raise if this is not a named channel.
- Return type
-
async
send
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to channel.
- Return type
-
send_soon
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ Produce message by adding to buffer.
This method is only supported by
Topic
.- Raises
NotImplementedError – always for in-memory channel.
- Return type
-
as_future_message
(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ Create promise that message will be transmitted.
- Return type
-
prepare_headers
(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]¶ Prepare
headers
passed before publishing.
-
async
publish_message
(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Publish message to channel.
This is the interface used by
topic.send()
, etc. to actually publish the message on the channel after being buffered up or similar.It takes a
FutureMessage
object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.- Return type
-
async
declare
() → None[source]¶ Declare/create this channel.
This is used to create this channel on a server, if that is required to operate it.
- Return type
None
-
prepare_key
(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶ Prepare key before it is sent to this channel.
Topic
uses this to implement serialization of keys sent to the channel.
-
prepare_value
(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶ Prepare value before it is sent to this channel.
Topic
uses this to implement serialization of values sent to the channel.
-
async
decode
(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Decode
Message
intoEvent
.- Return type
EventT
[]
-
async
deliver
(message: faust.types.tuples.Message) → None[source]¶ Deliver message to queue from consumer.
This is called by the consumer to deliver the message to the channel.
- Return type
None
-
async
get
(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ Get the next
Event
received on this channel.- Return type
-
async
on_key_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Unable to decode the key of an item in the queue.
See also
- Return type
None
-
async
on_value_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Unable to decode the value of an item in the queue.
See also
- Return type
None
-
async
on_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ Signal that there was an error reading an event in the queue.
When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.
We will log the exception, but you can also override this to perform additional actions.
- Admonition: Kafka
In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.
For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.
- Return type
None
-
on_stop_iteration
() → None[source]¶ Signal that iteration over this channel was stopped.
Tip
Remember to call
super
when overriding this method.- Return type
None
-
derive
(**kwargs: Any) → faust.types.channels.ChannelT[source]¶ Derive new channel from this channel, using new configuration.
See
faust.Topic.derive
.For local channels this will simply return the same channel.
- Return type
ChannelT
[]
-
class
faust.
ChannelT
(app: faust.types.channels._AppT, *, schema: faust.types.channels._SchemaT = None, key_type: faust.types.channels._ModelArg = None, value_type: faust.types.channels._ModelArg = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: Optional[faust.types.channels.ChannelT] = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
abstract
clone
(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]¶ - Return type
ChannelT
[]
-
abstract
clone_using_queue
(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]¶ - Return type
ChannelT
[]
-
abstract async
send
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ - Return type
-
abstract
send_soon
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ - Return type
-
abstract
as_future_message
(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ - Return type
-
abstract async
publish_message
(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ - Return type
-
abstract
prepare_key
(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.channels._SchemaT = None) → Any[source]¶ - Return type
-
abstract
prepare_value
(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.channels._SchemaT = None) → Any[source]¶ - Return type
-
abstract async
decode
(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.channels._EventT[source]¶ - Return type
_EventT
-
abstract async
get
(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]¶ - Return type
-
abstract async
on_key_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ - Return type
None
-
abstract async
on_value_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ - Return type
None
-
abstract async
on_decode_error
(exc: Exception, message: faust.types.tuples.Message) → None[source]¶ - Return type
None
-
abstract property
queue
¶ - Return type
-
abstract
-
class
faust.
Event
(app: faust.types.app.AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]¶ An event received on a channel.
Notes
Events have a key and a value:
event.key, event.value
They also have a reference to the original message (if available), such as a Kafka record:
event.message.offset
Iterating over channels/topics yields Event:
- async for event in channel:
…
Iterating over a stream (that in turn iterate over channel) yields Event.value:
async for value in channel.stream() # value is event.value ...
If you only have a Stream object, you can also access underlying events by using
Stream.events
.For example:
async for event in channel.stream.events(): ...
Also commonly used for finding the “current event” related to a value in the stream:
stream = channel.stream() async for event in stream.events(): event = stream.current_event message = event.message topic = event.message.topic
You can retrieve the current event in a stream to:
Get access to the serialized key+value.
Get access to message properties like, what topic+partition the value was received on, or its offset.
If you want access to both key and value, you should use
stream.items()
instead.async for key, value in stream.items(): ...
stream.current_event
can also be accessed but you must take extreme care you are using the correct stream object. Methods such as.group_by(key)
and.through(topic)
returns cloned stream objects, so in the example:The best way to access the current_event in an agent is to use the
ContextVar
:from faust import current_event @app.agent(topic) async def process(stream): async for value in stream: event = current_event()
-
app
¶
-
key
¶
-
value
¶
-
message
¶
-
headers
¶
-
acked
¶
-
async
send
(channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send object to channel.
- Return type
-
async
forward
(channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Forward original message (will not be reserialized).
- Return type
-
class
faust.
EventT
(app: faust.types.events._AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]¶ -
app
¶
-
key
¶
-
value
¶
-
headers
¶
-
message
¶
-
acked
¶
-
abstract async
send
(channel: Union[str, faust.types.events._ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.events._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ - Return type
-
abstract async
forward
(channel: Union[str, faust.types.events._ChannelT], key: Any = None, value: Any = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.events._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ - Return type
-
-
class
faust.
ModelOptions
(*args, **kwargs)[source]¶ -
serializer
= None¶
-
include_metadata
= True¶
-
polymorphic_fields
= False¶
-
allow_blessed_key
= False¶
-
isodates
= False¶
-
decimals
= False¶
-
validation
= False¶
-
coerce
= False¶
-
coercions
= None¶
-
date_parser
= None¶
-
fields
= None¶ Flattened view of __annotations__ in MRO order.
- Type
Index
-
fieldset
= None¶ Set of required field names, for fast argument checking.
- Type
Index
-
descriptors
= None¶ Mapping of field name to field descriptor.
- Type
Index
-
fieldpos
= None¶ Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.
- Type
Index
-
optionalset
= None¶ Set of optional field names, for fast argument checking.
- Type
Index
-
models
= None¶ Mapping of fields that are ModelT
- Type
Index
-
modelattrs
= None¶
-
field_coerce
= None¶ Mapping of fields that need to be coerced. Key is the name of the field, value is the coercion handler function.
- Type
Index
-
defaults
= None¶ Mapping of field names to default value.
-
initfield
= None¶ Mapping of init field conversion callbacks.
-
polyindex
= None¶ Index of field to polymorphic type
-
-
class
faust.
Record
→ None[source]¶ Describes a model type that is a record (Mapping).
Examples
>>> class LogEvent(Record, serializer='json'): ... severity: str ... message: str ... timestamp: float ... optional_field: str = 'default value'
>>> event = LogEvent( ... severity='error', ... message='Broken pact', ... timestamp=666.0, ... )
>>> event.severity 'error'
>>> serialized = event.dumps() '{"severity": "error", "message": "Broken pact", "timestamp": 666.0}'
>>> restored = LogEvent.loads(serialized) <LogEvent: severity='error', message='Broken pact', timestamp=666.0>
>>> # You can also subclass a Record to create a new record >>> # with additional fields >>> class RemoteLogEvent(LogEvent): ... url: str
>>> # You can also refer to record fields and pass them around: >>> LogEvent.severity >>> <FieldDescriptor: LogEvent.severity (str)>
-
classmethod
from_data
(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source]¶ Create model object from Python dictionary.
- Return type
-
classmethod
-
class
faust.
Monitor
(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, http_response_codes: Counter[http.HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) → None[source]¶ Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
-
send_errors
= 0¶ Number of produce operations that ended in error.
-
assignments_completed
= 0¶ Number of partition assignments completed.
-
assignments_failed
= 0¶ Number of partitions assignments that failed.
-
max_avg_history
= 100¶ Max number of total run time values to keep to build average.
-
max_commit_latency_history
= 30¶ Max number of commit latency numbers to keep.
-
max_send_latency_history
= 30¶ Max number of send latency numbers to keep.
-
max_assignment_latency_history
= 30¶ Max number of assignment latency numbers to keep.
-
rebalances
= 0¶ Number of rebalances seen by this worker.
-
tables
= None¶ Mapping of tables
-
commit_latency
= None¶ Deque of commit latency values
-
send_latency
= None¶ Deque of send latency values
-
assignment_latency
= None¶ Deque of assignment latency values.
-
rebalance_return_latency
= None¶ Deque of previous n rebalance return latencies.
-
rebalance_end_latency
= None¶ Deque of previous n rebalance end latencies.
-
rebalance_return_avg
= 0.0¶ Average rebalance return latency.
-
rebalance_end_avg
= 0.0¶ Average rebalance end latency.
-
messages_active
= 0¶ Number of messages currently being processed.
-
messages_received_total
= 0¶ Number of messages processed in total.
-
messages_received_by_topic
= None¶ Count of messages received by topic
-
messages_sent
= 0¶ Number of messages sent in total.
-
messages_sent_by_topic
= None¶ Number of messages sent by topic.
-
messages_s
= 0¶ Number of messages being processed this second.
-
events_active
= 0¶ Number of events currently being processed.
-
events_total
= 0¶ Number of events processed in total.
-
events_by_task
= None¶ Count of events processed by task
-
events_by_stream
= None¶ Count of events processed by stream
-
events_s
= 0¶ Number of events being processed this second.
-
events_runtime_avg
= 0.0¶ Average event runtime over the last second.
-
events_runtime
= None¶ Deque of run times used for averages
-
topic_buffer_full
= None¶ Counter of times a topics buffer was full
-
http_response_codes
= None¶ Counter of returned HTTP status codes.
-
http_response_latency
= None¶ Deque of previous n HTTP request->response latencies.
-
http_response_latency_avg
= 0.0¶ Average request->response latency.
-
metric_counts
= None¶ Arbitrary counts added by apps
-
tp_committed_offsets
= None¶ Last committed offsets by TopicPartition
-
tp_read_offsets
= None¶ Last read offsets by TopicPartition
-
tp_end_offsets
= None¶ Log end offsets by TopicPartition
-
secs_since
(start_time: float) → float[source]¶ Given timestamp start, return number of seconds since that time.
- Return type
-
ms_since
(start_time: float) → float[source]¶ Given timestamp start, return number of ms since that time.
- Return type
-
logger
= <Logger faust.sensors.monitor (WARNING)>¶
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call before message is delegated to streams.
- Return type
None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Call when stream starts processing an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Call when stream is done processing an event.
- Return type
None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Call when conductor topic buffer is full and has to wait.
- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call when message is fully acknowledged and can be committed.
- Return type
None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when value in table is retrieved.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Call when new value for key in table is set.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when key in a table is deleted.
- Return type
None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Call when consumer commit offset operation completed.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ Call when message added to producer buffer.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Call when producer finished sending message.
- Return type
None
-
on_send_error
(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Call when producer was unable to publish message.
- Return type
None
-
on_tp_commit
(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]¶ Call when offset in topic partition is committed.
- Return type
None
-
track_tp_end_offset
(tp: faust.types.tuples.TP, offset: int) → None[source]¶ Track new topic partition end offset for monitoring lags.
- Return type
None
-
on_assignment_start
(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict
[~KT, ~VT]
-
on_assignment_error
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start
(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict
[~KT, ~VT]
-
on_rebalance_return
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-
-
class
faust.
Sensor
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Base class for sensors.
This sensor does not do anything at all, but can be subclassed to create new monitors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
- Return type
None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Message sent to a stream as an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
- Return type
None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
- Return type
None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
- Return type
None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ About to send a message.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Message successfully sent.
- Return type
None
-
on_send_error
(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Error while sending message.
- Return type
None
-
on_assignment_start
(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict
[~KT, ~VT]
-
on_assignment_error
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start
(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict
[~KT, ~VT]
-
on_rebalance_return
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-
on_web_request_start
(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]¶ Web server started working on request.
- Return type
Dict
[~KT, ~VT]
-
on_web_request_end
(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]¶ Web server finished working on request.
- Return type
None
-
logger
= <Logger faust.sensors.base (WARNING)>¶
-
-
class
faust.
Codec
(children: Tuple[faust.types.codecs.CodecT, ...] = None, **kwargs: Any) → None[source]¶ Base class for codecs.
-
children
= None¶ next steps in the recursive codec chain.
x = pickle | binary
returns codec with children set to(pickle, binary)
.
-
nodes
= None¶ cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.
-
-
class
faust.
Schema
(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, allow_empty: bool = None) → None[source]¶ -
update
(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, allow_empty: bool = None) → None[source]¶ - Return type
None
-
loads_key
(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, loads: Callable = None, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → KT[source]¶ - Return type
~KT
-
loads_value
(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, loads: Callable = None, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → VT[source]¶ - Return type
~VT
-
dumps_key
(app: faust.types.app.AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], *, serializer: Union[faust.types.codecs.CodecT, str, None] = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶
-
dumps_value
(app: faust.types.app.AppT, value: Union[bytes, faust.types.core._ModelT, Any], *, serializer: Union[faust.types.codecs.CodecT, str, None] = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]¶
-
on_dumps_key_prepare_headers
(key: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]¶
-
on_dumps_value_prepare_headers
(value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]¶
-
async
decode
(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]¶ Decode message from topic (compiled function not cached).
- Return type
EventT
[]
-
compile
(app: faust.types.app.AppT, *, on_key_decode_error: Callable[[Exception, faust.types.tuples.Message], Awaitable[None]] = <function _noop_decode_error>, on_value_decode_error: Callable[[Exception, faust.types.tuples.Message], Awaitable[None]] = <function _noop_decode_error>, default_propagate: bool = False) → Callable[..., Awaitable[faust.types.events.EventT]][source]¶ Compile function used to decode event.
-
-
class
faust.
Stream
(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ A stream: async iterator processing events in channels/topics.
-
logger
= <Logger faust.streams (WARNING)>¶
-
mundane_level
= 'debug'¶
-
get_active_stream
() → faust.types.streams.StreamT[source]¶ Return the currently active stream.
A stream can be derived using
Stream.group_by
etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:>>> @app.agent() ... async def agent(a): .. a = a ... b = a.group_by(Withdrawal.account_id) ... c = b.through('backup_topic') ... async for value in c: ... ...
The return value of
a.get_active_stream()
would bec
.Notes
The chain of streams that leads to the active stream is decided by the
_next
attribute. To get to the active stream we just traverse this linked-list:>>> def get_active_stream(self): ... node = self ... while node._next: ... node = node._next
- Return type
StreamT
[+T_co]
-
get_root_stream
() → faust.types.streams.StreamT[source]¶ Get the root stream that this stream was derived from.
- Return type
StreamT
[+T_co]
-
add_processor
(processor: Callable[T]) → None[source]¶ Add processor callback executed whenever a new event is received.
Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.
For example a processor handling a stream of numbers may modify the value:
def double(value: int) -> int: return value * 2 stream.add_processor(double)
- Return type
None
-
clone
(**kwargs: Any) → faust.types.streams.StreamT[source]¶ Create a clone of this stream.
Notes
If the cloned stream is supposed to supersede this stream, like in
group_by
/through
/etc., you should use_chain()
instead so stream._next = cloned_stream is set andget_active_stream()
returns the cloned stream.- Return type
StreamT
[+T_co]
-
noack
() → faust.types.streams.StreamT[source]¶ Create new stream where acks are manual.
- Return type
StreamT
[+T_co]
-
items
() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]¶ Iterate over the stream as
key, value
pairs.Examples
@app.agent(topic) async def mytask(stream): async for key, value in stream.items(): print(key, value)
- Return type
AsyncIterator
[Tuple
[Union
[bytes
,_ModelT
,Any
,None
], +T_co]]
-
events
() → AsyncIterable[faust.types.events.EventT][source]¶ Iterate over the stream as events exclusively.
This means the stream must be iterating over a channel, or at least an iterable of event objects.
- Return type
-
take
(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]¶ Buffer n values at a time and yield a list of buffered values.
- Parameters
within (
Union
[timedelta
,float
,str
]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).- Return type
AsyncIterable
[Sequence
[+T_co]]
-
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ Enumerate values received on this stream.
Unlike Python’s built-in
enumerate
, this works with async generators.- Return type
AsyncIterable
[Tuple
[int
, +T_co]]
-
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to in this stream to channel.
Send messages received on this stream to another channel, and return a new stream that consumes from that channel.
Notes
The messages are forwarded after any processors have been applied.
Example
topic = app.topic('foo') @app.agent(topic) async def mytask(stream): async for value in stream.through(app.topic('bar')): # value was first received in topic 'foo', # then forwarded and consumed from topic 'bar' print(value)
- Return type
StreamT
[+T_co]
-
echo
(*channels: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ Forward values to one or more channels.
Unlike
through()
, we don’t consume from these channels.- Return type
StreamT
[+T_co]
-
group_by
(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]¶ Create new stream that repartitions the stream using a new key.
- Parameters
key (
Union
[FieldDescriptorT
[~T],Callable
[[~T],Union
[bytes
,_ModelT
,Any
,None
]]]) –The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.
- Note: The
name
argument must be provided if the key argument is a callable.
- Note: The
name (
Optional
[str
]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.
Examples
Using a field descriptor to use a field in the event as the new key:
s = withdrawals_topic.stream() # values in this stream are of type Withdrawal async for event in s.group_by(Withdrawal.account_id): ...
Using an async callable to extract a new key:
s = withdrawals_topic.stream() async def get_key(withdrawal): return await aiohttp.get( f'http://e.com/resolve_account/{withdrawal.account_id}') async for event in s.group_by(get_key): ...
Using a regular callable to extract a new key:
s = withdrawals_topic.stream() def get_key(withdrawal): return withdrawal.account_id.upper() async for event in s.group_by(get_key): ...
- Return type
StreamT
[+T_co]
-
filter
(fun: Callable[T]) → faust.types.streams.StreamT[source]¶ Filter values from stream using callback.
The callback may be a traditional function, lambda function, or an async def function.
This method is useful for filtering events before repartitioning a stream.
Examples
>>> async for v in stream.filter(lambda: v > 1000).group_by(...): ... # do something
- Return type
StreamT
[+T_co]
-
derive_topic
(name: str, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ Create Topic description derived from the K/V type of this stream.
- Parameters
name (
str
) – Topic name.key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.
- Raises
ValueError – if the stream channel is not a topic.
- Return type
TopicT
[]
-
async
throw
(exc: BaseException) → None[source]¶ Send exception to stream iteration.
- Return type
None
-
combine
(*nodes: faust.types.streams.JoinableT, **kwargs: Any) → faust.types.streams.StreamT[source]¶ Combine streams and tables into joined stream.
- Return type
StreamT
[+T_co]
-
contribute_to_stream
(active: faust.types.streams.StreamT) → None[source]¶ Add stream as node in joined stream.
- Return type
None
-
async
remove_from_stream
(stream: faust.types.streams.StreamT) → None[source]¶ Remove as node in a joined stream.
- Return type
None
-
join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined.
- Return type
StreamT
[+T_co]
-
left_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by LEFT JOIN.
- Return type
StreamT
[+T_co]
-
inner_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by INNER JOIN.
- Return type
StreamT
[+T_co]
-
outer_join
(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]¶ Create stream where events are joined by OUTER JOIN.
- Return type
StreamT
[+T_co]
-
async
on_merge
(value: T = None) → Optional[T][source]¶ Signal called when an event is to be joined.
- Return type
Optional
[~T]
-
async
send
(value: T_contra) → None[source]¶ Send value into stream locally (bypasses topic).
- Return type
None
-
-
class
faust.
StreamT
(channel: AsyncIterator[T_co] = None, *, app: faust.types.streams._AppT = None, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.streams._JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: Optional[faust.types.streams.StreamT] = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
outbox
= None¶
-
join_strategy
= None¶
-
task_owner
= None¶
-
current_event
= None¶
-
active_partitions
= None¶
-
concurrency_index
= None¶
-
enable_acks
= True¶
-
prefix
= ''¶
-
abstract async
items
() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]¶
-
abstract async
take
(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]¶
-
abstract
enumerate
(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]¶ - Return type
AsyncIterable
[Tuple
[int
, +T_co]]
-
abstract
through
(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ - Return type
StreamT
[+T_co]
-
abstract
echo
(*channels: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]¶ - Return type
StreamT
[+T_co]
-
abstract
group_by
(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None) → faust.types.streams.StreamT[source]¶ - Return type
StreamT
[+T_co]
-
abstract
derive_topic
(name: str, *, schema: faust.types.streams._SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]¶ - Return type
TopicT
[]
-
-
faust.
current_event
() → Optional[faust.types.events.EventT][source]¶ Return the event currently being processed, or None.
-
class
faust.
GlobalTable
(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, use_partitioner: bool = False, on_window_close: Callable[[Any, Any], None] = None, **kwargs: Any) → None[source]¶ -
logger
= <Logger faust.tables.globaltable (WARNING)>¶
-
-
class
faust.
Table
(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, use_partitioner: bool = False, on_window_close: Callable[[Any, Any], None] = None, **kwargs: Any) → None[source]¶ Table (non-windowed).
-
class
WindowWrapper
(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None¶ Windowed table wrapper.
A windowed table does not return concrete values when keys are accessed, instead
WindowSet
is returned so that the values can be further reduced to the wanted time period.-
ValueType
¶ alias of
WindowSet
-
as_ansitable
(title: str = '{table.name}', **kwargs: Any) → str¶ Draw table as a terminal ANSI table.
- Return type
-
clone
(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT¶ Clone this table using a new time-relativity configuration.
- Return type
-
property
get_relative_timestamp
¶ Return the current handler for extracting event timestamp. :rtype:
Optional
[Callable
[[Optional
[EventT
[]]],Union
[float
,datetime
]]]
-
get_timestamp
(event: faust.types.events.EventT = None) → float¶ Get timestamp from event.
- Return type
-
items
(event: faust.types.events.EventT = None) → ItemsView¶ Return table items view: iterate over
(key, value)
pairs.- Return type
ItemsView
[~KT, +VT_co]
-
key_index
= False¶
-
key_index_table
= None¶
-
keys
() → KeysView¶ Return table keys view: iterate over keys found in this table.
- Return type
KeysView
[~KT]
-
on_del_key
(key: Any) → None¶ Call when a key is deleted from this table.
- Return type
None
-
on_recover
(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]¶ Call after table recovery.
-
on_set_key
(key: Any, value: Any) → None¶ Call when the value for a key in this table is set.
- Return type
None
-
relative_to
(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT¶ Configure the time-relativity of this windowed table.
- Return type
-
relative_to_field
(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT¶ Configure table to be time-relative to a field in the stream.
This means the window will use the timestamp from the event currently being processed in the stream.
Further it will not use the timestamp of the Kafka message, but a field in the value of the event.
For example a model field:
class Account(faust.Record): created: float table = app.Table('foo').hopping( ..., ).relative_to_field(Account.created)
- Return type
-
relative_to_now
() → faust.types.tables.WindowWrapperT¶ Configure table to be time-relative to the system clock.
- Return type
-
relative_to_stream
() → faust.types.tables.WindowWrapperT¶ Configure table to be time-relative to the stream.
This means the window will use the timestamp from the event currently being processed in the stream.
- Return type
-
values
(event: faust.types.events.EventT = None) → ValuesView¶ Return table values view: iterate over values in this table.
- Return type
ValuesView
[+VT_co]
-
-
using_window
(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Wrap table using a specific window type.
- Return type
-
hopping
(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Wrap table in a hopping window.
- Return type
-
tumbling
(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]¶ Wrap table in a tumbling window.
- Return type
-
on_key_get
(key: KT) → None[source]¶ Call when the value for a key in this table is retrieved.
- Return type
None
-
on_key_set
(key: KT, value: VT) → None[source]¶ Call when the value for a key in this table is set.
- Return type
None
-
as_ansitable
(title: str = '{table.name}', **kwargs: Any) → str[source]¶ Draw table as a a terminal ANSI table.
- Return type
-
logger
= <Logger faust.tables.table (WARNING)>¶
-
class
-
class
faust.
SetGlobalTable
(app: faust.types.app.AppT, *, start_manager: bool = False, manager_topic_name: str = None, manager_topic_suffix: str = None, **kwargs: Any) → None[source]¶ -
logger
= <Logger faust.tables.sets (WARNING)>¶
-
-
class
faust.
SetTable
(app: faust.types.app.AppT, *, start_manager: bool = False, manager_topic_name: str = None, manager_topic_suffix: str = None, **kwargs: Any) → None[source]¶ Table that maintains a dictionary of sets.
-
Manager
¶ alias of
SetTableManager
-
WindowWrapper
¶ alias of
SetWindowWrapper
-
logger
= <Logger faust.tables.sets (WARNING)>¶
-
manager_topic_suffix
= '-setmanager'¶
-
-
class
faust.
Topic
(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = None, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Define new topic description.
- Parameters
app (
AppT
[]) – App instance used to create this topic description.partitions (
Optional
[int
]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, andautoCreateTopics
is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.retention (
Union
[timedelta
,float
,str
,None
]) – Number of seconds (as float/timedelta
) to keep messages in the topic before they can be expired by the server.pattern (
Union
[str
,Pattern
[AnyStr
],None
]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.schema (
Optional
[SchemaT
[~KT, ~VT]]) – Schema used for serialization/deserialization.key_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – How to deserialize keys for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” (Overrides schema if one is defined).value_type (
Union
[Type
[ModelT
],Type
[bytes
],Type
[str
],None
]) – How to deserialize values for messages in this topic. Can be afaust.Model
type,str
,bytes
, orNone
for “autodetect” (Overrides schema if ones is defined).active_partitions (
Optional
[Set
[TP
]]) – Set offaust.types.tuples.TP
that this topic should be restricted to.
- Raises
TypeError – if both topics and pattern is provided.
-
async
send
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to topic.
- Return type
-
send_soon
(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]¶ Produce message by adding to buffer.
Notes
This method can be used by non-async def functions to produce messages.
- Return type
-
async
put
(event: faust.types.events.EventT) → None[source]¶ Put even directly onto the underlying queue of this topic.
This will only affect subscribers to a particular instance, in a particular process.
- Return type
None
-
property
partitions
¶ Return the number of configured partitions for this topic.
Notes
This is only active for internal topics, fully owned and managed by Faust itself.
We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (
num.partitions
).Always make sure your topics have the correct number of partitions. :rtype:
Optional
[int
]
-
derive
(**kwargs: Any) → faust.types.channels.ChannelT[source]¶ Create topic derived from the configuration of this topic.
Configuration will be copied from this topic, but any parameter overridden as a keyword argument.
See also
derive_topic()
: for a list of supported keyword arguments.- Return type
ChannelT
[]
-
derive_topic
(*, topics: Sequence[str] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) → faust.types.topics.TopicT[source]¶ Create new topic with configuration derived from this topic.
- Return type
TopicT
[]
-
get_topic_name
() → str[source]¶ Return the main topic name of this topic description.
As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.
- Raises
TypeError – if configured with a regular expression pattern.
ValueError – if configured with multiple topic names.
TypeError – if not configured with any names or patterns.
- Return type
-
class
faust.
TopicT
(app: faust.types.topics._AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.topics._SchemaT = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ -
topics
= None¶ Iterable/Sequence of topic names to subscribe to.
-
retention
= None¶ expiry time in seconds for messages in the topic.
- Type
Topic retention setting
-
compacting
= None¶ Flag that when enabled means the topic can be “compacted”: if the topic is a log of key/value pairs, the broker can delete old values for the same key.
-
replicas
= None¶ Number of replicas for topic.
-
config
= None¶ Additional configuration as a mapping.
-
acks
= None¶ Enable acks for this topic.
-
internal
= None¶ it’s owned by us and we are allowed to create or delete the topic as necessary.
- Type
Mark topic as internal
-
has_prefix
= False¶
-
abstract
derive_topic
(*, topics: Sequence[str] = None, schema: faust.types.topics._SchemaT = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) → faust.types.topics.TopicT[source]¶ - Return type
TopicT
[]
-
-
class
faust.
Settings
(id: str, *, debug: bool = None, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, broker_max_poll_interval: int = None, broker_consumer: Union[str, yarl.URL, List[yarl.URL]] = None, broker_producer: Union[str, yarl.URL, List[yarl.URL]] = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, table_key_index_size: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, topic_disable_leader: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Event: Union[_T, str] = None, Schema: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, GlobalTable: Union[_T, str] = None, SetGlobalTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs: Any) → None[source]¶ -
-
id_format
= '{id}-v{self.version}'¶
-
debug
= False¶
-
ssl_context
= None¶
-
autodiscover
= False¶
-
broker_client_id
= 'faust-1.9.0'¶
-
timezone
= datetime.timezone.utc¶
-
broker_commit_every
= 10000¶
-
broker_check_crcs
= True¶
-
broker_max_poll_interval
= 1000.0¶
-
key_serializer
= 'raw'¶
-
value_serializer
= 'json'¶
-
table_standby_replicas
= 1¶
-
table_key_index_size
= 1000¶
-
topic_replication_factor
= 1¶
-
topic_partitions
= 8¶
-
topic_allow_declare
= True¶
-
topic_disable_leader
= False¶
-
reply_create_topic
= False¶
-
logging_config
= None¶
-
stream_buffer_maxsize
= 4096¶
-
stream_wait_empty
= True¶
-
stream_ack_cancelled_tasks
= True¶
-
stream_ack_exceptions
= True¶
-
stream_publish_on_commit
= False¶
-
producer_linger_ms
= 0¶
-
producer_max_batch_size
= 16384¶
-
producer_acks
= -1¶
-
producer_max_request_size
= 1000000¶
-
producer_compression_type
= None¶
-
producer_api_version
= 'auto'¶
-
consumer_max_fetch_size
= 4194304¶
-
consumer_auto_offset_reset
= 'earliest'¶
-
web_bind
= '0.0.0.0'¶
-
web_port
= 6066¶
-
web_host
= 'build-10233069-project-230058-faust'¶
-
web_in_thread
= False¶
-
web_cors_options
= None¶
-
worker_redirect_stdouts
= True¶
-
worker_redirect_stdouts_level
= 'WARN'¶
-
reply_to_prefix
= 'f-reply-'¶
-
property
processing_guarantee
¶ - Return type
-
property
broker_credentials
¶ - Return type
-
property
producer_partitioner
¶
-
property
agent_supervisor
¶ - Return type
-
property
GlobalTable
¶ - Return type
Type
[GlobalTableT
[]]
-
property
SetGlobalTable
¶ - Return type
Type
[GlobalTableT
[]]
-
property
TableManager
¶ - Return type
-
property
PartitionAssignor
¶ - Return type
-
property
LeaderAssignor
¶ - Return type
-
-
faust.
HoppingWindow
¶ alias of
faust.windows._PyHoppingWindow
-
class
faust.
TumblingWindow
(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → None[source]¶ Tumbling window type.
Fixed-size, non-overlapping, gap-less windows.
-
faust.
SlidingWindow
¶ alias of
faust.windows._PySlidingWindow
-
class
faust.
Worker
(app: faust.types.app.AppT, *services: mode.types.services.ServiceT, sensors: Iterable[faust.types.sensors.SensorT] = None, debug: bool = False, quiet: bool = False, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, blocking_timeout: float = 10.0, workdir: Union[pathlib.Path, str] = None, console_port: int = 50101, loop: asyncio.events.AbstractEventLoop = None, redirect_stdouts: bool = None, redirect_stdouts_level: int = None, logging_config: Dict = None, **kwargs: Any) → None[source]¶ Worker.
See also
This is a subclass of
mode.Worker
.- Usage:
You can start a worker using:
the faust worker program.
instantiating Worker programmatically and calling execute_from_commandline():
>>> worker = Worker(app) >>> worker.execute_from_commandline()
or if you already have an event loop, calling
await start
, but in that case you are responsible for gracefully shutting down the event loop:async def start_worker(worker: Worker) -> None: await worker.start() def manage_loop(): loop = asyncio.get_event_loop() worker = Worker(app, loop=loop) try: loop.run_until_complete(start_worker(worker) finally: worker.stop_and_shutdown_loop()
- Parameters
app (
AppT
[]) – The Faust app to start.*services – Services to start with worker. This includes application instances to start.
sensors (Iterable[SensorT]) – List of sensors to include.
debug (bool) – Enables debugging mode [disabled by default].
quiet (bool) – Do not output anything to console [disabled by default].
loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.
logfile (Union[str, IO]) – Name of file or a stream to log to.
stdout (IO) – Standard out stream.
stderr (IO) – Standard err stream.
blocking_timeout (float) – When
debug
is enabled this sets the timeout for detecting that the event loop is blocked.workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.
loop (asyncio.AbstractEventLoop) – Custom event loop object.
-
logger
= <Logger faust.worker (WARNING)>¶
-
app
= None¶ The Faust app started by this worker.
-
sensors
= None¶ Additional sensors to add to the Faust app.
-
workdir
= None¶ Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return service dependencies that must start with the worker.
-
async
on_first_start
() → None[source]¶ Signal called the first time the worker starts.
First time, means this callback is not called if the worker is restarted by an exception being raised.
- Return type
None
-
change_workdir
(path: pathlib.Path) → None[source]¶ Change the current working directory (CWD).
- Return type
None
-
autodiscover
() → None[source]¶ Autodiscover modules and files to find @agent decorators, etc.
- Return type
None
-
async
on_execute
() → None[source]¶ Signal called when the worker is about to start.
- Return type
None