faust

Python Stream processing.

class faust.Service(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

An asyncio service that can be started/stopped/restarted.

Keyword Arguments
abstract = False
class Diag(service: mode.types.services.ServiceT) → None

Service diagnostics.

This can be used to track what your service is doing. For example if your service is a Kafka consumer with a background thread that commits the offset every 30 seconds, you may want to see when this happens:

DIAG_COMMITTING = 'committing'

class Consumer(Service):

    @Service.task
    async def _background_commit(self) -> None:
        while not self.should_stop:
            await self.sleep(30.0)
            self.diag.set_flag(DIAG_COMITTING)
            try:
                await self._consumer.commit()
            finally:
                self.diag.unset_flag(DIAG_COMMITTING)

The above code is setting the flag manually, but you can also use a decorator to accomplish the same thing:

@Service.timer(30.0)
async def _background_commit(self) -> None:
    await self.commit()

@Service.transitions_with(DIAG_COMITTING)
async def commit(self) -> None:
    await self._consumer.commit()
set_flag(flag: str) → None
Return type

None

unset_flag(flag: str) → None
Return type

None

wait_for_shutdown = False

Set to True if .stop must wait for the shutdown flag to be set.

shutdown_timeout = 60.0

Time to wait for shutdown flag set before we give up.

restart_count = 0

Current number of times this service instance has been restarted.

mundane_level = 'info'

The log level for mundane info such as starting, stopping, etc. Set this to "debug" for less information.

classmethod from_awaitable(coro: Awaitable, *, name: str = None, **kwargs: Any) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

classmethod task(fun: Callable[Any, Awaitable[None]]) → mode.services.ServiceTask[source]

Decorate function to be used as background task.

Example

>>> class S(Service):
...
...     @Service.task
...     async def background_task(self):
...         while not self.should_stop:
...             await self.sleep(1.0)
...             print('Waking up')
Return type

ServiceTask

classmethod timer(interval: Union[datetime.timedelta, float, str]) → Callable[Callable, mode.services.ServiceTask][source]

Background timer executing every n seconds.

Example

>>> class S(Service):
...
...     @Service.timer(1.0)
...     async def background_timer(self):
...         print('Waking up')
Return type

Callable[[Callable], ServiceTask]

classmethod transitions_to(flag: str) → Callable[source]

Decorate function to set and reset diagnostic flag.

Return type

Callable

async transition_with(flag: str, fut: Awaitable, *args: Any, **kwargs: Any) → Any[source]
Return type

Any

add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Add dependency to other service.

The service will be started/stopped with this service.

Return type

ServiceT[]

async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

async remove_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]

Stop and remove dependency of this service.

Return type

ServiceT[]

async add_async_context(context: AsyncContextManager) → Any[source]
Return type

Any

add_context(context: ContextManager) → Any[source]
Return type

Any

add_future(coro: Awaitable) → _asyncio.Future[source]

Add relationship to asyncio.Future.

The future will be joined when this service is stopped.

Return type

Future

on_init() → None[source]
Return type

None

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of service dependencies for this service.

Return type

Iterable[ServiceT[]]

async join_services(services: Sequence[mode.types.services.ServiceT]) → None[source]
Return type

None

async sleep(n: Union[datetime.timedelta, float, str], *, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Sleep for n seconds, or until service stopped.

Return type

None

async wait_for_stopped(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → bool[source]
Return type

bool

async wait(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]

Wait for coroutines to complete, or until the service stops.

Return type

WaitResult

async wait_many(coros: Iterable[Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event]], *, timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResult[source]
Return type

WaitResult

async wait_first(*coros: Union[Generator[[Any, None], Any], Awaitable, asyncio.locks.Event, mode.utils.locks.Event], timeout: Union[datetime.timedelta, float, str] = None) → mode.services.WaitResults[source]
Return type

WaitResults

async start() → None[source]
Return type

None

async maybe_start() → None[source]

Start the service, if it has not already been started.

Return type

None

async crash(reason: BaseException) → None[source]

Crash the service and all child services.

Return type

None

async stop() → None[source]

Stop the service.

Return type

None

async restart() → None[source]

Restart this service.

Return type

None

service_reset() → None[source]
Return type

None

async wait_until_stopped() → None[source]

Wait until the service is signalled to stop.

Return type

None

set_shutdown() → None[source]

Set the shutdown signal.

Notes

If wait_for_shutdown is set, stopping the service will wait for this flag to be set.

Return type

None

itertimer(interval: Union[datetime.timedelta, float, str], *, max_drift_correction: float = 0.1, loop: asyncio.events.AbstractEventLoop = None, sleep: Callable[..., Awaitable] = None, clock: Callable[float] = <built-in function perf_counter>, name: str = '') → AsyncIterator[float][source]

Sleep interval seconds for every iteration.

This is an async iterator that takes advantage of timer_intervals() to act as a timer that stop drift from occurring, and adds a tiny amount of drift to timers so that they don’t start at the same time.

Uses Service.sleep which will bail-out-quick if the service is stopped.

Note

Will sleep the full interval seconds before returning from first iteration.

Examples

>>> async for sleep_time in self.itertimer(1.0):
...   print('another second passed, just woke up...')
...   await perform_some_http_request()
Return type

AsyncIterator[float]

property started

Return True if the service was started. :rtype: bool

property crashed
Return type

bool

property should_stop

Return True if the service must stop. :rtype: bool

property state

Service state - as a human readable string. :rtype: str

property label

Label used for graphs. :rtype: str

property shortlabel

Label used for logging. :rtype: str

property beacon

Beacon used to track services in a dependency graph. :rtype: NodeT[~_T]

logger = <Logger mode.services (WARNING)>
property crash_reason
Return type

Optional[BaseException]

class faust.ServiceT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Abstract type for an asynchronous service that can be started/stopped.

See also

mode.Service.

wait_for_shutdown = False
restart_count = 0
supervisor = None
abstract add_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

abstract async add_runtime_dependency(service: mode.types.services.ServiceT) → mode.types.services.ServiceT[source]
Return type

ServiceT[]

abstract async add_async_context(context: AsyncContextManager) → Any[source]
Return type

Any

abstract add_context(context: ContextManager) → Any[source]
Return type

Any

abstract async start() → None[source]
Return type

None

abstract async maybe_start() → None[source]
Return type

None

abstract async crash(reason: BaseException) → None[source]
Return type

None

abstract async stop() → None[source]
Return type

None

abstract service_reset() → None[source]
Return type

None

abstract async restart() → None[source]
Return type

None

abstract async wait_until_stopped() → None[source]
Return type

None

abstract set_shutdown() → None[source]
Return type

None

abstract property started
Return type

bool

abstract property crashed
Return type

bool

abstract property should_stop
Return type

bool

abstract property state
Return type

str

abstract property label
Return type

str

abstract property shortlabel
Return type

str

property beacon
Return type

NodeT[~_T]

abstract property loop
Return type

AbstractEventLoop

abstract property crash_reason
Return type

Optional[BaseException]

class faust.Agent(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, use_reply_headers: bool = None, **kwargs: Any) → None[source]

Agent.

This is the type of object returned by the @app.agent decorator.

supervisor = None
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of services dependencies required to start agent.

Return type

Iterable[ServiceT[]]

async on_start() → None[source]

Call when an agent starts.

Return type

None

async on_stop() → None[source]

Call when an agent stops.

Return type

None

cancel() → None[source]

Cancel agent and its actor instances running in this process.

Return type

None

async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when partitions are revoked.

Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when partitions are assigned.

Return type

None

async on_isolated_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when isolated partitions are revoked.

Return type

None

async on_isolated_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when isolated partitions are assigned.

Return type

None

async on_shared_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when non-isolated partitions are revoked.

Return type

None

async on_shared_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when non-isolated partitions are assigned.

Return type

None

info() → Mapping[source]

Return agent attributes as a dictionary.

Return type

Mapping[~KT, +VT_co]

clone(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs: Any) → faust.types.agents.AgentT[source]

Create clone of this agent object.

Keyword arguments can be passed to override any argument supported by Agent.__init__.

Return type

AgentT[]

test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs: Any) → faust.types.agents.AgentTestWrapperT[source]

Create new unit-testing wrapper for this agent.

Return type

AgentTestWrapperT[]

actor_from_stream(stream: Optional[faust.types.streams.StreamT], *, index: int = None, active_partitions: Set[faust.types.tuples.TP] = None, channel: faust.types.channels.ChannelT = None) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]

Create new actor from stream.

Return type

ActorT[]

add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]

Add new sink to further handle results from this agent.

Return type

None

stream(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → faust.types.streams.StreamT[source]

Create underlying stream used by this agent.

Return type

StreamT[+T_co]

async cast(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → None[source]

RPC operation: like ask() but do not expect reply.

Cast here is like “casting a spell”, and will not expect a reply back from the agent.

Return type

None

async ask(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]

RPC operation: ask agent for result of processing value.

This version will wait until the result is available and return the processed value.

Return type

Any

async ask_nowait(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]

RPC operation: ask agent for result of processing value.

This version does not wait for the result to arrive, but instead returns a promise of future evaluation.

Return type

ReplyPromise

async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic used by agent.

Return type

Awaitable[RecordMetadata]

map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]

RPC map operation on a list of values.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type

AsyncIterator[+T_co]

kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]

RPC map operation on a list of (key, value) pairs.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type

AsyncIterator[str]

async join(values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]

RPC map operation on a list of values.

A join returns the results in order, and only returns once all values have been processed.

Return type

List[Any]

async kvjoin(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]

RPC map operation on list of (key, value) pairs.

A join returns the results in order, and only returns once all values have been processed.

Return type

List[Any]

get_topic_names() → Iterable[str][source]

Return list of topic names this agent subscribes to.

Return type

Iterable[str]

property channel

Return channel used by agent. :rtype: ChannelT[]

property channel_iterator

Return channel agent iterates over. :rtype: AsyncIterator[+T_co]

property label

Return human-readable description of agent. :rtype: str

property shortlabel

Return short description of agent. :rtype: str

logger = <Logger faust.agents.agent (WARNING)>
class faust.App(id: str, *, monitor: faust.sensors.monitor.Monitor = None, config_source: Any = None, loop: asyncio.events.AbstractEventLoop = None, beacon: mode.utils.types.trees.NodeT = None, **options: Any) → None[source]

Faust Application.

Parameters

id (str) – Application ID.

Keyword Arguments

loop (asyncio.AbstractEventLoop) – optional event loop to use.

See also

Application Parameters – for supported keyword arguments.

SCAN_CATEGORIES = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task']
class BootStrategy(app: faust.types.app.AppT, *, enable_web: bool = None, enable_kafka: bool = None, enable_kafka_producer: bool = None, enable_kafka_consumer: bool = None, enable_sensors: bool = None) → None

App startup strategy.

The startup strategy defines the graph of services to start when the Faust worker for an app starts.

agents() → Iterable[mode.types.services.ServiceT]

Return list of services required to start agents.

Return type

Iterable[ServiceT[]]

client_only() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in client_only mode.

Return type

Iterable[ServiceT[]]

enable_kafka = True
enable_kafka_consumer = None
enable_kafka_producer = None
enable_sensors = True
enable_web = None
kafka_client_consumer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka client consumer.

Return type

Iterable[ServiceT[]]

kafka_conductor() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka conductor.

Return type

Iterable[ServiceT[]]

kafka_consumer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka consumer.

Return type

Iterable[ServiceT[]]

kafka_producer() → Iterable[mode.types.services.ServiceT]

Return list of services required to start Kafka producer.

Return type

Iterable[ServiceT[]]

producer_only() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in producer_only mode.

Return type

Iterable[ServiceT[]]

sensors() → Iterable[mode.types.services.ServiceT]

Return list of services required to start sensors.

Return type

Iterable[ServiceT[]]

server() → Iterable[mode.types.services.ServiceT]

Return services to start when app is in default mode.

Return type

Iterable[ServiceT[]]

tables() → Iterable[mode.types.services.ServiceT]

Return list of table-related services.

Return type

Iterable[ServiceT[]]

web_components() → Iterable[mode.types.services.ServiceT]

Return list of web-related services (excluding web server).

Return type

Iterable[ServiceT[]]

web_server() → Iterable[mode.types.services.ServiceT]

Return list of web-server services.

Return type

Iterable[ServiceT[]]

class Settings(id: str, *, debug: bool = None, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, broker_max_poll_interval: int = None, broker_consumer: Union[str, yarl.URL, List[yarl.URL]] = None, broker_producer: Union[str, yarl.URL, List[yarl.URL]] = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, table_key_index_size: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, topic_disable_leader: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Event: Union[_T, str] = None, Schema: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, GlobalTable: Union[_T, str] = None, SetGlobalTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs: Any) → None
property Agent
Return type

Type[AgentT[]]

property ConsumerScheduler
Return type

Type[SchedulingStrategyT]

property Event
Return type

Type[EventT[]]

property GlobalTable
Return type

Type[GlobalTableT[]]

property HttpClient
Return type

Type[ClientSession]

property LeaderAssignor
Return type

Type[LeaderAssignorT[]]

property Monitor
Return type

Type[SensorT[]]

property PartitionAssignor
Return type

Type[PartitionAssignorT]

property Router
Return type

Type[RouterT]

property Schema
Return type

Type[SchemaT[~KT, ~VT]]

property Serializers
Return type

Type[RegistryT]

property SetGlobalTable
Return type

Type[GlobalTableT[]]

property SetTable
Return type

Type[TableT[~KT, ~VT]]

property Stream
Return type

Type[StreamT[+T_co]]

property Table
Return type

Type[TableT[~KT, ~VT]]

property TableManager
Return type

Type[TableManagerT[]]

property Topic
Return type

Type[TopicT[]]

property Worker
Return type

Type[_WorkerT]

property agent_supervisor
Return type

Type[SupervisorStrategyT]

property appdir
Return type

Path

autodiscover = False
property broker
Return type

List[URL]

broker_check_crcs = True
broker_client_id = 'faust-1.9.0'
broker_commit_every = 10000
property broker_commit_interval
Return type

float

property broker_commit_livelock_soft_timeout
Return type

float

property broker_consumer
Return type

List[URL]

property broker_credentials
Return type

Optional[CredentialsT]

property broker_heartbeat_interval
Return type

float

broker_max_poll_interval = 1000.0
property broker_max_poll_records
Return type

Optional[int]

property broker_producer
Return type

List[URL]

property broker_request_timeout
Return type

float

property broker_session_timeout
Return type

float

property cache
Return type

URL

property canonical_url
Return type

URL

consumer_auto_offset_reset = 'earliest'
consumer_max_fetch_size = 4194304
property datadir
Return type

Path

debug = False
find_old_versiondirs() → Iterable[pathlib.Path]
Return type

Iterable[Path]

property id
Return type

str

id_format = '{id}-v{self.version}'
key_serializer = 'raw'
logging_config = None
property name
Return type

str

property origin
Return type

Optional[str]

property processing_guarantee
Return type

ProcessingGuarantee

producer_acks = -1
producer_api_version = 'auto'
producer_compression_type = None
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_max_request_size = 1000000
property producer_partitioner
Return type

Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]

property producer_request_timeout
Return type

float

reply_create_topic = False
property reply_expires
Return type

float

reply_to_prefix = 'f-reply-'
classmethod setting_names() → Set[str]
Return type

Set[str]

ssl_context = None
property store
Return type

URL

stream_ack_cancelled_tasks = True
stream_ack_exceptions = True
stream_buffer_maxsize = 4096
stream_publish_on_commit = False
property stream_recovery_delay
Return type

float

stream_wait_empty = True
property table_cleanup_interval
Return type

float

table_key_index_size = 1000
table_standby_replicas = 1
property tabledir
Return type

Path

timezone = datetime.timezone.utc
topic_allow_declare = True
topic_disable_leader = False
topic_partitions = 8
topic_replication_factor = 1
value_serializer = 'json'
property version
Return type

int

property web
Return type

URL

web_bind = '0.0.0.0'
web_cors_options = None
web_host = 'build-10233069-project-230058-faust'
web_in_thread = False
web_port = 6066
property web_transport
Return type

URL

worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
client_only = False

Set this to True if app should only start the services required to operate as an RPC client (producer and simple reply consumer).

producer_only = False

Set this to True if app should run without consumer/tables.

tracer = None

Optional tracing support.

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of additional service dependencies.

The services returned will be started with the app when the app starts.

Return type

Iterable[ServiceT[]]

async on_first_start() → None[source]

Call first time app starts in this process.

Return type

None

async on_start() → None[source]

Call every time app start/restarts.

Return type

None

async on_started() → None[source]

Call when app is fully started.

Return type

None

async on_started_init_extra_tasks() → None[source]

Call when started to start additional tasks.

Return type

None

async on_started_init_extra_services() → None[source]

Call when initializing extra services at startup.

Return type

None

async on_init_extra_service(service: Union[mode.types.services.ServiceT, Type[mode.types.services.ServiceT]]) → mode.types.services.ServiceT[source]

Call when adding user services to this app.

Return type

ServiceT[]

config_from_object(obj: Any, *, silent: bool = False, force: bool = False) → None[source]

Read configuration from object.

Object is either an actual object or the name of a module to import.

Examples

>>> app.config_from_object('myproj.faustconfig')
>>> from myproj import faustconfig
>>> app.config_from_object(faustconfig)
Parameters
  • silent (bool) – If true then import errors will be ignored.

  • force (bool) – Force reading configuration immediately. By default the configuration will be read only when required.

Return type

None

finalize() → None[source]

Finalize app configuration.

Return type

None

worker_init() → None[source]

Init worker/CLI commands.

Return type

None

worker_init_post_autodiscover() → None[source]

Init worker after autodiscover.

Return type

None

discover(*extra_modules: str, categories: Iterable[str] = None, ignore: Iterable[Any] = [<built-in method search of _sre.SRE_Pattern object>, '.__main__']) → None[source]

Discover decorators in packages.

Return type

None

main() → NoReturn[source]

Execute the faust umbrella command using this app.

Return type

_NoReturn

topic(*topics: str, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, maxsize: int = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → faust.types.topics.TopicT[source]

Create topic description.

Topics are named channels (for example a Kafka topic), that exist on a server. To make an ephemeral local communication channel use: channel().

Return type

TopicT[]

channel(*, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, maxsize: int = None, loop: asyncio.events.AbstractEventLoop = None) → faust.types.channels.ChannelT[source]

Create new channel.

By default this will create an in-memory channel used for intra-process communication, but in practice channels can be backed by any transport (network or even means of inter-process communication).

Return type

ChannelT[]

agent(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT][source]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

actor(channel: Union[str, faust.types.channels.ChannelT] = None, *, name: str = None, concurrency: int = 1, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, isolated_partitions: bool = False, use_reply_headers: bool = False, **kwargs: Any) → Callable[Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], faust.types.agents.AgentT]

Create Agent from async def function.

It can be a regular async function:

@app.agent()
async def my_agent(stream):
    async for number in stream:
        print(f'Received: {number!r}')

Or it can be an async iterator that yields values. These values can be used as the reply in an RPC-style call, or for sinks: callbacks that forward events to other agents/topics/statsd, and so on:

@app.agent(sink=[log_topic])
async def my_agent(requests):
    async for number in requests:
        yield number * 2
Return type

Callable[[Callable[[StreamT[+T_co]], Union[Coroutine[Any, Any, None], Awaitable[None], AsyncIterable[+T_co]]]], AgentT[]]

task(fun: Union[Callable[AppT, Awaitable], Callable[Awaitable]] = None, *, on_leader: bool = False, traced: bool = True) → Union[Callable[Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]], Union[Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]]], Callable[faust.types.app.AppT, Awaitable], Callable[Awaitable]][source]

Define an async def function to be started with the app.

This is like timer() but a one-shot task only executed at worker startup (after recovery and the worker is fully ready for operation).

The function may take zero, or one argument. If the target function takes an argument, the app argument is passed:

>>> @app.task
>>> async def on_startup(app):
...    print('STARTING UP: %r' % (app,))

Nullary functions are also supported:

>>> @app.task
>>> async def on_startup():
...     print('STARTING UP')
Return type

Union[Callable[[Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Union[Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]], Callable[[AppT[]], Awaitable[+T_co]], Callable[[], Awaitable[+T_co]]]

timer(interval: Union[datetime.timedelta, float, str], on_leader: bool = False, traced: bool = True, name: str = None, max_drift_correction: float = 0.1) → Callable[source]

Define an async def function to be run at periodic intervals.

Like task(), but executes periodically until the worker is shut down.

This decorator takes an async function and adds it to a list of timers started with the app.

Parameters
  • interval (Seconds) – How often the timer executes in seconds.

  • on_leader (bool) – Should the timer only run on the leader?

Example

>>> @app.timer(interval=10.0)
>>> async def every_10_seconds():
...     print('TEN SECONDS JUST PASSED')
>>> app.timer(interval=5.0, on_leader=True)
>>> async def every_5_seconds():
...     print('FIVE SECONDS JUST PASSED. ALSO, I AM THE LEADER!')
Return type

Callable

crontab(cron_format: str, *, timezone: datetime.tzinfo = None, on_leader: bool = False, traced: bool = True) → Callable[source]

Define periodic task using Crontab description.

This is an async def function to be run at the fixed times, defined by the Cron format.

Like timer(), but executes at fixed times instead of executing at certain intervals.

This decorator takes an async function and adds it to a list of Cronjobs started with the app.

Parameters

cron_format (str) – The Cron spec defining fixed times to run the decorated function.

Keyword Arguments
  • timezone – The timezone to be taken into account for the Cron jobs. If not set value from timezone will be taken.

  • on_leader – Should the Cron job only run on the leader?

Example

>>> @app.crontab(cron_format='30 18 * * *',
                 timezone=pytz.timezone('US/Pacific'))
>>> async def every_6_30_pm_pacific():
...     print('IT IS 6:30pm')
>>> app.crontab(cron_format='30 18 * * *', on_leader=True)
>>> async def every_6_30_pm():
...     print('6:30pm UTC; ALSO, I AM THE LEADER!')
Return type

Callable

service(cls: Type[mode.types.services.ServiceT]) → Type[mode.types.services.ServiceT][source]

Decorate mode.Service to be started with the app.

Examples

from mode import Service

@app.service
class Foo(Service):
    ...
Return type

Type[ServiceT[]]

is_leader() → bool[source]

Return True if we are in leader worker process.

Return type

bool

stream(channel: Union[AsyncIterable, Iterable], beacon: mode.utils.types.trees.NodeT = None, **kwargs: Any) → faust.types.streams.StreamT[source]

Create new stream from channel/topic/iterable/async iterable.

Parameters
Return type

StreamT[+T_co]

Returns

to iterate over events in the stream.

Return type

faust.Stream

Table(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Define new table.

Parameters
  • name (str) – Name used for table, note that two tables living in the same application cannot have the same name.

  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default value for keys missing in this table.

  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> table = app.Table('user_to_amount', default=int)
>>> table['George']
0
>>> table['Elaine'] += 1
>>> table['Elaine'] += 1
>>> table['Elaine']
2
Return type

TableT[~KT, ~VT]

GlobalTable(name: str, *, default: Callable[Any] = None, window: faust.types.windows.WindowT = None, partitions: int = None, help: str = None, **kwargs: Any) → faust.types.tables.GlobalTableT[source]

Define new global table.

Parameters
  • name (str) – Name used for global table, note that two global tables living in the same application cannot have the same name.

  • default (Optional[Callable[[], Any]]) – A callable, or type that will return a default valu for keys missing in this global table.

  • window (Optional[WindowT]) – A windowing strategy to wrap this window in.

Examples

>>> gtable = app.GlobalTable('user_to_amount', default=int)
>>> gtable['George']
0
>>> gtable['Elaine'] += 1
>>> gtable['Elaine'] += 1
>>> gtable['Elaine']
2
Return type

GlobalTableT[]

SetTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Table of sets.

Return type

TableT[~KT, ~VT]

SetGlobalTable(name: str, *, window: faust.types.windows.WindowT = None, partitions: int = None, start_manager: bool = False, help: str = None, **kwargs: Any) → faust.types.tables.TableT[source]

Table of sets (global).

Return type

TableT[~KT, ~VT]

page(path: str, *, base: Type[faust.web.views.View] = <class 'faust.web.views.View'>, cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, name: str = None) → Callable[Union[Type[faust.types.web.View], Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Type[faust.web.views.View]][source]

Decorate view to be included in the web server.

Return type

Callable[[Union[Type[View], Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Type[View]]

table_route(table: faust.types.tables.CollectionT, shard_param: str = None, *, query_param: str = None, match_info: str = None, exact_key: str = None) → Callable[Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]], Union[Callable[[faust.types.web.View, faust.types.web.Request], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]], Callable[[faust.types.web.View, faust.types.web.Request, Any, Any], Union[Coroutine[[Any, Any], faust.types.web.Response], Awaitable[faust.types.web.Response]]]]][source]

Decorate view method to route request to table key destination.

Return type

Callable[[Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]], Union[Callable[[View, Request], Union[Coroutine[Any, Any, Response], Awaitable[Response]]], Callable[[View, Request, Any, Any], Union[Coroutine[Any, Any, Response], Awaitable[Response]]]]]

command(*options: Any, base: Optional[Type[faust.app.base._AppCommand]] = None, **kwargs: Any) → Callable[Callable, Type[faust.app.base._AppCommand]][source]

Decorate async def function to be used as CLI command.

Return type

Callable[[Callable], Type[_AppCommand]]

create_event(key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → faust.types.events.EventT[source]

Create new faust.Event object.

Return type

EventT[]

async start_client() → None[source]

Start the app in Client-Only mode necessary for RPC requests.

Notes

Once started as a client the app cannot be restarted as Server.

Return type

None

async maybe_start_client() → None[source]

Start the app in Client-Only mode if not started as Server.

Return type

None

trace(name: str, trace_enabled: bool = True, **extra_context: Any) → ContextManager[source]

Return new trace context to trace operation using OpenTracing.

Return type

ContextManager[+T_co]

traced(fun: Callable, name: str = None, sample_rate: float = 1.0, **context: Any) → Callable[source]

Decorate function to be traced using the OpenTracing API.

Return type

Callable

async send(channel: Union[faust.types.channels.ChannelT, str], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send event to channel/topic.

Parameters
  • channel (Union[ChannelT[], str]) – Channel/topic or the name of a topic to send event to.

  • key (Union[bytes, _ModelT, Any, None]) – Message key.

  • value (Union[bytes, _ModelT, Any, None]) – Message value.

  • partition (Optional[int]) – Specific partition to send to. If not set the partition will be chosen by the partitioner.

  • timestamp (Optional[float]) – Epoch seconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

  • headers (Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) – Mapping of key/value pairs, or iterable of key value pairs to use as headers for the message.

  • schema (Optional[SchemaT[~KT, ~VT]]) – Schema to use for serialization.

  • key_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • value_serializer (Union[CodecT, str, None]) – Serializer to use (if value is not model). Overrides schema if one is specified.

  • callback (Optional[Callable[[FutureMessage[]], Union[None, Awaitable[None]]]]) –

    Called after the message is fully delivered to the channel, but not to the consumer. Signature must be unary as the FutureMessage future is passed to it.

    The resulting faust.types.tuples.RecordMetadata object is then available as fut.result().

Return type

Awaitable[RecordMetadata]

in_transaction[source]

Return True if stream is using transactions.

LiveCheck(**kwargs: Any) → faust.app.base._LiveCheck[source]

Return new LiveCheck instance testing features for this app.

Return type

_LiveCheck

maybe_start_producer[source]

Ensure producer is started. :rtype: ProducerT[]

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]

Commit offset for acked messages in specified topics’.

Warning

This will commit acked messages in all topics if the topics argument is passed in as None.

Return type

bool

async on_stop() → None[source]

Call when application stops.

Tip

Remember to call super if you override this method.

Return type

None

on_rebalance_start() → None[source]

Call when rebalancing starts.

Return type

None

on_rebalance_return() → None[source]
Return type

None

on_rebalance_end() → None[source]

Call when rebalancing is done.

Return type

None

FlowControlQueue(maxsize: int = None, *, clear_on_resume: bool = False, loop: asyncio.events.AbstractEventLoop = None) → mode.utils.queues.ThrowableQueue[source]

Like asyncio.Queue, but can be suspended/resumed.

Return type

ThrowableQueue

Worker(**kwargs: Any) → faust.app.base._Worker[source]

Return application worker instance.

Return type

_Worker

on_webserver_init(web: faust.types.web.Web) → None[source]

Call when the Web server is initializing.

Return type

None

property conf

Application configuration. :rtype: Settings

property producer

Message producer. :rtype: ProducerT[]

property consumer

Message consumer. :rtype: ConsumerT[]

property transport

Consumer message transport. :rtype: TransportT

logger = <Logger faust.app.base (WARNING)>
property producer_transport

Producer message transport. :rtype: TransportT

property cache

Cache backend. :rtype: CacheBackendT[]

tables[source]

Map of available tables, and the table manager service.

topics[source]

Topic Conductor.

This is the mediator that moves messages fetched by the Consumer into the streams.

It’s also a set of registered topics by string topic name, so you can check if a topic is being consumed from by doing topic in app.topics.

property monitor

Monitor keeps stats about what’s going on inside the worker. :rtype: Monitor[]

flow_control[source]

Flow control of streams.

This object controls flow into stream queues, and can also clear all buffers.

property http_client

HTTP client Session. :rtype: ClientSession

assignor[source]

Partition Assignor.

Responsible for partition assignment.

router[source]

Find the node partitioned data belongs to.

The router helps us route web requests to the wanted Faust node. If a topic is sharded by account_id, the router can send us to the Faust worker responsible for any account. Used by the @app.table_route decorator.

web[source]

Web driver.

serializers[source]

Return serializer registry.

property label

Return human readable description of application. :rtype: str

property shortlabel

Return short description of application. :rtype: str

class faust.GSSAPICredentials(*, kerberos_service_name: str = 'kafka', kerberos_domain_name: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]

Describe GSSAPI credentials over SASL.

protocol = 'SASL_PLAINTEXT'
mechanism = 'GSSAPI'
class faust.SASLCredentials(*, username: str = None, password: str = None, ssl_context: ssl.SSLContext = None, mechanism: Union[str, faust.types.auth.SASLMechanism] = None) → None[source]

Describe SASL credentials.

protocol = 'SASL_PLAINTEXT'
mechanism = 'PLAIN'
class faust.SSLCredentials(context: ssl.SSLContext = None, *, purpose: Any = None, cafile: Optional[str] = None, capath: Optional[str] = None, cadata: Optional[str] = None) → None[source]

Describe SSL credentials/settings.

protocol = 'SSL'
class faust.Channel(app: faust.types.app.AppT, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Create new channel.

Parameters
  • app (AppT[]) – The app that created this channel (app.channel())

  • schema (Optional[SchemaT[~KT, ~VT]]) – Schema used for serialization/deserialization

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for keys in this channel. (overrides schema if one is defined)

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – The Model used for values in this channel. (overrides schema if one is defined)

  • maxsize (Optional[int]) – The maximum number of messages this channel can hold. If exceeded any new put call will block until a message is removed from the channel.

  • is_iterator (bool) – When streams iterate over a channel they will call stream.clone(is_iterator=True) so this attribute denotes that this channel instance is currently being iterated over.

  • active_partitions (Optional[Set[TP]]) – Set of active topic partitions this channel instance is assigned to.

  • loop (Optional[AbstractEventLoop]) – The asyncio event loop to use.

property queue

Return the underlying queue/buffer backing this channel. :rtype: ThrowableQueue

clone(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]

Create clone of this channel.

Parameters

is_iterator (Optional[bool]) – Set to True if this is now a channel that is being iterated over.

Keyword Arguments

**kwargs – Any keyword arguments passed will override any of the arguments supported by Channel.__init__.

Return type

ChannelT[]

clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]

Create clone of this channel using specific queue instance.

Return type

ChannelT[]

stream(**kwargs: Any) → faust.types.streams.StreamT[source]

Create stream reading from this channel.

Return type

StreamT[+T_co]

get_topic_name() → str[source]

Get the topic name, or raise if this is not a named channel.

Return type

str

async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to channel.

Return type

Awaitable[RecordMetadata]

send_soon(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]

Produce message by adding to buffer.

This method is only supported by Topic.

Raises

NotImplementedError – always for in-memory channel.

Return type

FutureMessage[]

as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]

Create promise that message will be transmitted.

Return type

FutureMessage[]

prepare_headers(headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]

Prepare headers passed before publishing.

Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]

async publish_message(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]

Publish message to channel.

This is the interface used by topic.send(), etc. to actually publish the message on the channel after being buffered up or similar.

It takes a FutureMessage object, which contains all the information required to send the message, and acts as a promise that is resolved once the message has been fully transmitted.

Return type

Awaitable[RecordMetadata]

maybe_declare[source]

Declare/create this channel, but only if it doesn’t exist. :rtype: None

async declare() → None[source]

Declare/create this channel.

This is used to create this channel on a server, if that is required to operate it.

Return type

None

prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]

Prepare key before it is sent to this channel.

Topic uses this to implement serialization of keys sent to the channel.

Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.serializers.SchemaT = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None] = None) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]

Prepare value before it is sent to this channel.

Topic uses this to implement serialization of values sent to the channel.

Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

async decode(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]

Decode Message into Event.

Return type

EventT[]

async deliver(message: faust.types.tuples.Message) → None[source]

Deliver message to queue from consumer.

This is called by the consumer to deliver the message to the channel.

Return type

None

async put(value: Any) → None[source]

Put event onto this channel.

Return type

None

async get(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]

Get the next Event received on this channel.

Return type

Any

empty() → bool[source]

Return True if the queue is empty.

Return type

bool

async on_key_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Unable to decode the key of an item in the queue.

Return type

None

async on_value_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Unable to decode the value of an item in the queue.

Return type

None

async on_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]

Signal that there was an error reading an event in the queue.

When a message in the channel needs deserialization to be reconstructed back to its original form, we will sometimes see decoding/deserialization errors being raised, from missing fields or malformed payloads, and so on.

We will log the exception, but you can also override this to perform additional actions.

Admonition: Kafka

In the event a deserialization error occurs, we HAVE to commit the offset of the source message to continue processing the stream.

For this reason it is important that you keep a close eye on error logs. For easy of use, we suggest using log aggregation software, such as Sentry, to surface these errors to your operations team.

Return type

None

on_stop_iteration() → None[source]

Signal that iteration over this channel was stopped.

Tip

Remember to call super when overriding this method.

Return type

None

derive(**kwargs: Any) → faust.types.channels.ChannelT[source]

Derive new channel from this channel, using new configuration.

See faust.Topic.derive.

For local channels this will simply return the same channel.

Return type

ChannelT[]

async throw(exc: BaseException) → None[source]

Throw exception to be received by channel subscribers.

Tip

When you find yourself having to call this from a regular, non-async def function, you can use _throw() instead.

Return type

None

property subscriber_count

Return number of active subscribers to local channel. :rtype: int

property label

Short textual description of channel. :rtype: str

class faust.ChannelT(app: faust.types.channels._AppT, *, schema: faust.types.channels._SchemaT = None, key_type: faust.types.channels._ModelArg = None, value_type: faust.types.channels._ModelArg = None, is_iterator: bool = False, queue: mode.utils.queues.ThrowableQueue = None, maxsize: int = None, root: Optional[faust.types.channels.ChannelT] = None, active_partitions: Set[faust.types.tuples.TP] = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]
abstract clone(*, is_iterator: bool = None, **kwargs: Any) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

abstract clone_using_queue(queue: asyncio.queues.Queue) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

abstract stream(**kwargs: Any) → faust.types.channels._StreamT[source]
Return type

_StreamT

abstract get_topic_name() → str[source]
Return type

str

abstract async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

abstract send_soon(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]
Return type

FutureMessage[]

abstract as_future_message(key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.channels._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]
Return type

FutureMessage[]

abstract async publish_message(fut: faust.types.tuples.FutureMessage, wait: bool = True) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

maybe_declare[source]
Return type

None

abstract async declare() → None[source]
Return type

None

abstract prepare_key(key: Union[bytes, faust.types.core._ModelT, Any, None], key_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.channels._SchemaT = None) → Any[source]
Return type

Any

abstract prepare_value(value: Union[bytes, faust.types.core._ModelT, Any], value_serializer: Union[faust.types.codecs.CodecT, str, None], schema: faust.types.channels._SchemaT = None) → Any[source]
Return type

Any

abstract async decode(message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.channels._EventT[source]
Return type

_EventT

abstract async deliver(message: faust.types.tuples.Message) → None[source]
Return type

None

abstract async put(value: Any) → None[source]
Return type

None

abstract async get(*, timeout: Union[datetime.timedelta, float, str] = None) → Any[source]
Return type

Any

abstract empty() → bool[source]
Return type

bool

abstract async on_key_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

abstract async on_value_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

abstract async on_decode_error(exc: Exception, message: faust.types.tuples.Message) → None[source]
Return type

None

abstract on_stop_iteration() → None[source]
Return type

None

abstract async throw(exc: BaseException) → None[source]
Return type

None

abstract derive(**kwargs: Any) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

abstract property subscriber_count
Return type

int

abstract property queue
Return type

ThrowableQueue

class faust.Event(app: faust.types.app.AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]

An event received on a channel.

Notes

  • Events have a key and a value:

    event.key, event.value
    
  • They also have a reference to the original message (if available), such as a Kafka record:

    event.message.offset

  • Iterating over channels/topics yields Event:

    async for event in channel:

  • Iterating over a stream (that in turn iterate over channel) yields Event.value:

    async for value in channel.stream()  # value is event.value
        ...
    
  • If you only have a Stream object, you can also access underlying events by using Stream.events.

    For example:

    async for event in channel.stream.events():
        ...
    

    Also commonly used for finding the “current event” related to a value in the stream:

    stream = channel.stream()
    async for event in stream.events():
        event = stream.current_event
        message = event.message
        topic = event.message.topic
    

    You can retrieve the current event in a stream to:

    • Get access to the serialized key+value.

    • Get access to message properties like, what topic+partition the value was received on, or its offset.

    If you want access to both key and value, you should use stream.items() instead.

    async for key, value in stream.items():
        ...
    

    stream.current_event can also be accessed but you must take extreme care you are using the correct stream object. Methods such as .group_by(key) and .through(topic) returns cloned stream objects, so in the example:

    The best way to access the current_event in an agent is to use the ContextVar:

    from faust import current_event
    
    @app.agent(topic)
    async def process(stream):
        async for value in stream:
            event = current_event()
    
app
key
value
message
headers
acked
async send(channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send object to channel.

Return type

Awaitable[RecordMetadata]

async forward(channel: Union[str, faust.types.channels.ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = <object object>, value: Union[bytes, faust.types.core._ModelT, Any] = <object object>, partition: int = None, timestamp: float = None, headers: Any = <object object>, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Forward original message (will not be reserialized).

Return type

Awaitable[RecordMetadata]

ack() → bool[source]

Acknowledge event as being processed by stream.

When the last stream processor acks the message, the offset in the source topic will be marked as safe-to-commit, and the worker will commit and advance the committed offset.

Return type

bool

class faust.EventT(app: faust.types.events._AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], message: faust.types.tuples.Message) → None[source]
app
key
value
headers
message
acked
abstract async send(channel: Union[str, faust.types.events._ChannelT], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.events._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

abstract async forward(channel: Union[str, faust.types.events._ChannelT], key: Any = None, value: Any = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.events._SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

abstract ack() → bool[source]
Return type

bool

class faust.ModelOptions(*args, **kwargs)[source]
serializer = None
include_metadata = True
polymorphic_fields = False
allow_blessed_key = False
isodates = False
decimals = False
validation = False
coerce = False
coercions = None
date_parser = None
fields = None

Flattened view of __annotations__ in MRO order.

Type

Index

fieldset = None

Set of required field names, for fast argument checking.

Type

Index

descriptors = None

Mapping of field name to field descriptor.

Type

Index

fieldpos = None

Positional argument index to field name. Used by Record.__init__ to map positional arguments to fields.

Type

Index

optionalset = None

Set of optional field names, for fast argument checking.

Type

Index

models = None

Mapping of fields that are ModelT

Type

Index

modelattrs = None
field_coerce = None

Mapping of fields that need to be coerced. Key is the name of the field, value is the coercion handler function.

Type

Index

defaults = None

Mapping of field names to default value.

initfield = None

Mapping of init field conversion callbacks.

polyindex = None

Index of field to polymorphic type

clone_defaults() → faust.types.models.ModelOptions[source]
Return type

ModelOptions

class faust.Record → None[source]

Describes a model type that is a record (Mapping).

Examples

>>> class LogEvent(Record, serializer='json'):
...     severity: str
...     message: str
...     timestamp: float
...     optional_field: str = 'default value'
>>> event = LogEvent(
...     severity='error',
...     message='Broken pact',
...     timestamp=666.0,
... )
>>> event.severity
'error'
>>> serialized = event.dumps()
'{"severity": "error", "message": "Broken pact", "timestamp": 666.0}'
>>> restored = LogEvent.loads(serialized)
<LogEvent: severity='error', message='Broken pact', timestamp=666.0>
>>> # You can also subclass a Record to create a new record
>>> # with additional fields
>>> class RemoteLogEvent(LogEvent):
...     url: str
>>> # You can also refer to record fields and pass them around:
>>> LogEvent.severity
>>> <FieldDescriptor: LogEvent.severity (str)>
classmethod from_data(data: Mapping, *, preferred_type: Type[faust.types.models.ModelT] = None) → faust.models.record.Record[source]

Create model object from Python dictionary.

Return type

Record

to_representation() → Mapping[str, Any][source]

Convert model to its Python generic counterpart.

Records will be converted to dictionary.

Return type

Mapping[str, Any]

asdict() → Dict[str, Any][source]

Convert record to Python dictionary.

Return type

Dict[str, Any]

class faust.Monitor(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, http_response_codes: Counter[http.HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) → None[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history = 100

Max number of total run time values to keep to build average.

max_commit_latency_history = 30

Max number of commit latency numbers to keep.

max_send_latency_history = 30

Max number of send latency numbers to keep.

max_assignment_latency_history = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables = None

Mapping of tables

commit_latency = None

Deque of commit latency values

send_latency = None

Deque of send latency values

assignment_latency = None

Deque of assignment latency values.

rebalance_return_latency = None

Deque of previous n rebalance return latencies.

rebalance_end_latency = None

Deque of previous n rebalance end latencies.

rebalance_return_avg = 0.0

Average rebalance return latency.

rebalance_end_avg = 0.0

Average rebalance end latency.

messages_active = 0

Number of messages currently being processed.

messages_received_total = 0

Number of messages processed in total.

messages_received_by_topic = None

Count of messages received by topic

messages_sent = 0

Number of messages sent in total.

messages_sent_by_topic = None

Number of messages sent by topic.

messages_s = 0

Number of messages being processed this second.

events_active = 0

Number of events currently being processed.

events_total = 0

Number of events processed in total.

events_by_task = None

Count of events processed by task

events_by_stream = None

Count of events processed by stream

events_s = 0

Number of events being processed this second.

events_runtime_avg = 0.0

Average event runtime over the last second.

events_runtime = None

Deque of run times used for averages

topic_buffer_full = None

Counter of times a topics buffer was full

http_response_codes = None

Counter of returned HTTP status codes.

http_response_latency = None

Deque of previous n HTTP request->response latencies.

http_response_latency_avg = 0.0

Average request->response latency.

metric_counts = None

Arbitrary counts added by apps

tp_committed_offsets = None

Last committed offsets by TopicPartition

tp_read_offsets = None

Last read offsets by TopicPartition

tp_end_offsets = None

Log end offsets by TopicPartition

secs_since(start_time: float) → float[source]

Given timestamp start, return number of seconds since that time.

Return type

float

ms_since(start_time: float) → float[source]

Given timestamp start, return number of ms since that time.

Return type

float

logger = <Logger faust.sensors.monitor (WARNING)>
secs_to_ms(timestamp: float) → float[source]

Convert seconds to milliseconds.

Return type

float

asdict() → Mapping[source]

Return monitor state as dictionary.

Return type

Mapping[~KT, +VT_co]

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call before message is delegated to streams.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Call when stream starts processing an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Call when stream is done processing an event.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Call when conductor topic buffer is full and has to wait.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call when message is fully acknowledged and can be committed.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when value in table is retrieved.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Call when new value for key in table is set.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when key in a table is deleted.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Call when consumer commit offset operation completed.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

Call when message added to producer buffer.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Call when producer finished sending message.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Call when producer was unable to publish message.

Return type

None

count(metric_name: str, count: int = 1) → None[source]

Count metric by name.

Return type

None

on_tp_commit(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]

Call when offset in topic partition is committed.

Return type

None

track_tp_end_offset(tp: faust.types.tuples.TP, offset: int) → None[source]

Track new topic partition end offset for monitoring lags.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]

Partition assignor did not complete assignor due to error.

Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

on_web_request_start(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]

Web server started working on request.

Return type

Dict[~KT, ~VT]

on_web_request_end(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]

Web server finished working on request.

Return type

None

class faust.Sensor(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Message sent to a stream as an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Message successfully sent.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Error while sending message.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]

Partition assignor did not complete assignor due to error.

Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

on_web_request_start(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]

Web server started working on request.

Return type

Dict[~KT, ~VT]

on_web_request_end(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]

Web server finished working on request.

Return type

None

asdict() → Mapping[source]

Convert sensor state to dictionary.

Return type

Mapping[~KT, +VT_co]

logger = <Logger faust.sensors.base (WARNING)>
class faust.Codec(children: Tuple[faust.types.codecs.CodecT, ...] = None, **kwargs: Any) → None[source]

Base class for codecs.

children = None

next steps in the recursive codec chain. x = pickle | binary returns codec with children set to (pickle, binary).

nodes = None

cached version of children including this codec as the first node. could use chain below, but seems premature so just copying the list.

kwargs = None

subclasses can support keyword arguments, the base implementation of clone() uses this to preserve keyword arguments in copies.

dumps(obj: Any) → bytes[source]

Encode object obj.

Return type

bytes

loads(s: bytes) → Any[source]

Decode object from string.

Return type

Any

clone(*children: faust.types.codecs.CodecT) → faust.types.codecs.CodecT[source]

Create a clone of this codec, with optional children added.

Return type

CodecT

class faust.Schema(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, allow_empty: bool = None) → None[source]
update(*, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, allow_empty: bool = None) → None[source]
Return type

None

loads_key(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, loads: Callable = None, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → KT[source]
Return type

~KT

loads_value(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, loads: Callable = None, serializer: Union[faust.types.codecs.CodecT, str, None] = None) → VT[source]
Return type

~VT

dumps_key(app: faust.types.app.AppT, key: Union[bytes, faust.types.core._ModelT, Any, None], *, serializer: Union[faust.types.codecs.CodecT, str, None] = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]
Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

dumps_value(app: faust.types.app.AppT, value: Union[bytes, faust.types.core._ModelT, Any], *, serializer: Union[faust.types.codecs.CodecT, str, None] = None, headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]][source]
Return type

Tuple[Any, Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]]

on_dumps_key_prepare_headers(key: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]
Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]

on_dumps_value_prepare_headers(value: Union[bytes, faust.types.core._ModelT, Any], headers: Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]) → Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None][source]
Return type

Union[List[Tuple[str, bytes]], MutableMapping[str, bytes], None]

async decode(app: faust.types.app.AppT, message: faust.types.tuples.Message, *, propagate: bool = False) → faust.types.events.EventT[source]

Decode message from topic (compiled function not cached).

Return type

EventT[]

compile(app: faust.types.app.AppT, *, on_key_decode_error: Callable[[Exception, faust.types.tuples.Message], Awaitable[None]] = <function _noop_decode_error>, on_value_decode_error: Callable[[Exception, faust.types.tuples.Message], Awaitable[None]] = <function _noop_decode_error>, default_propagate: bool = False) → Callable[..., Awaitable[faust.types.events.EventT]][source]

Compile function used to decode event.

Return type

Callable[…, Awaitable[EventT[]]]

class faust.Stream(channel: AsyncIterator[T_co], *, app: faust.types.app.AppT, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.joins.JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: faust.types.streams.StreamT = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]

A stream: async iterator processing events in channels/topics.

logger = <Logger faust.streams (WARNING)>
mundane_level = 'debug'
get_active_stream() → faust.types.streams.StreamT[source]

Return the currently active stream.

A stream can be derived using Stream.group_by etc, so if this stream was used to create another derived stream, this function will return the stream being actively consumed from. E.g. in the example:

>>> @app.agent()
... async def agent(a):
..      a = a
...     b = a.group_by(Withdrawal.account_id)
...     c = b.through('backup_topic')
...     async for value in c:
...         ...

The return value of a.get_active_stream() would be c.

Notes

The chain of streams that leads to the active stream is decided by the _next attribute. To get to the active stream we just traverse this linked-list:

>>> def get_active_stream(self):
...     node = self
...     while node._next:
...         node = node._next
Return type

StreamT[+T_co]

get_root_stream() → faust.types.streams.StreamT[source]

Get the root stream that this stream was derived from.

Return type

StreamT[+T_co]

add_processor(processor: Callable[T]) → None[source]

Add processor callback executed whenever a new event is received.

Processor functions can be async or non-async, must accept a single argument, and should return the value, mutated or not.

For example a processor handling a stream of numbers may modify the value:

def double(value: int) -> int:
    return value * 2

stream.add_processor(double)
Return type

None

info() → Mapping[str, Any][source]

Return stream settings as a dictionary.

Return type

Mapping[str, Any]

clone(**kwargs: Any) → faust.types.streams.StreamT[source]

Create a clone of this stream.

Notes

If the cloned stream is supposed to supersede this stream, like in group_by/through/etc., you should use _chain() instead so stream._next = cloned_stream is set and get_active_stream() returns the cloned stream.

Return type

StreamT[+T_co]

noack() → faust.types.streams.StreamT[source]

Create new stream where acks are manual.

Return type

StreamT[+T_co]

items() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]

Iterate over the stream as key, value pairs.

Examples

@app.agent(topic)
async def mytask(stream):
    async for key, value in stream.items():
        print(key, value)
Return type

AsyncIterator[Tuple[Union[bytes, _ModelT, Any, None], +T_co]]

events() → AsyncIterable[faust.types.events.EventT][source]

Iterate over the stream as events exclusively.

This means the stream must be iterating over a channel, or at least an iterable of event objects.

Return type

AsyncIterable[EventT[]]

take(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]

Buffer n values at a time and yield a list of buffered values.

Parameters

within (Union[timedelta, float, str]) – Timeout for when we give up waiting for another value, and process the values we have. Warning: If there’s no timeout (i.e. timeout=None), the agent is likely to stall and block buffered events for an unreasonable length of time(!).

Return type

AsyncIterable[Sequence[+T_co]]

enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]

Enumerate values received on this stream.

Unlike Python’s built-in enumerate, this works with async generators.

Return type

AsyncIterable[Tuple[int, +T_co]]

through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]

Forward values to in this stream to channel.

Send messages received on this stream to another channel, and return a new stream that consumes from that channel.

Notes

The messages are forwarded after any processors have been applied.

Example

topic = app.topic('foo')

@app.agent(topic)
async def mytask(stream):
    async for value in stream.through(app.topic('bar')):
        # value was first received in topic 'foo',
        # then forwarded and consumed from topic 'bar'
        print(value)
Return type

StreamT[+T_co]

echo(*channels: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]

Forward values to one or more channels.

Unlike through(), we don’t consume from these channels.

Return type

StreamT[+T_co]

group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None, partitions: int = None) → faust.types.streams.StreamT[source]

Create new stream that repartitions the stream using a new key.

Parameters
  • key (Union[FieldDescriptorT[~T], Callable[[~T], Union[bytes, _ModelT, Any, None]]]) –

    The key argument decides how the new key is generated, it can be a field descriptor, a callable, or an async callable.

    Note: The name argument must be provided if the key

    argument is a callable.

  • name (Optional[str]) – Suffix to use for repartitioned topics. This argument is required if key is a callable.

Examples

Using a field descriptor to use a field in the event as the new key:

s = withdrawals_topic.stream()
# values in this stream are of type Withdrawal
async for event in s.group_by(Withdrawal.account_id):
    ...

Using an async callable to extract a new key:

s = withdrawals_topic.stream()

async def get_key(withdrawal):
    return await aiohttp.get(
        f'http://e.com/resolve_account/{withdrawal.account_id}')

async for event in s.group_by(get_key):
    ...

Using a regular callable to extract a new key:

s = withdrawals_topic.stream()

def get_key(withdrawal):
    return withdrawal.account_id.upper()

async for event in s.group_by(get_key):
    ...
Return type

StreamT[+T_co]

filter(fun: Callable[T]) → faust.types.streams.StreamT[source]

Filter values from stream using callback.

The callback may be a traditional function, lambda function, or an async def function.

This method is useful for filtering events before repartitioning a stream.

Examples

>>> async for v in stream.filter(lambda: v > 1000).group_by(...):
...     # do something
Return type

StreamT[+T_co]

derive_topic(name: str, *, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]

Create Topic description derived from the K/V type of this stream.

Parameters
  • name (str) – Topic name.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific key type to use for this topic. If not set, the key type of this stream will be used.

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – Specific value type to use for this topic. If not set, the value type of this stream will be used.

Raises

ValueError – if the stream channel is not a topic.

Return type

TopicT[]

async throw(exc: BaseException) → None[source]

Send exception to stream iteration.

Return type

None

combine(*nodes: faust.types.streams.JoinableT, **kwargs: Any) → faust.types.streams.StreamT[source]

Combine streams and tables into joined stream.

Return type

StreamT[+T_co]

contribute_to_stream(active: faust.types.streams.StreamT) → None[source]

Add stream as node in joined stream.

Return type

None

async remove_from_stream(stream: faust.types.streams.StreamT) → None[source]

Remove as node in a joined stream.

Return type

None

join(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]

Create stream where events are joined.

Return type

StreamT[+T_co]

left_join(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]

Create stream where events are joined by LEFT JOIN.

Return type

StreamT[+T_co]

inner_join(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]

Create stream where events are joined by INNER JOIN.

Return type

StreamT[+T_co]

outer_join(*fields: faust.types.models.FieldDescriptorT) → faust.types.streams.StreamT[source]

Create stream where events are joined by OUTER JOIN.

Return type

StreamT[+T_co]

async on_merge(value: T = None) → Optional[T][source]

Signal called when an event is to be joined.

Return type

Optional[~T]

async send(value: T_contra) → None[source]

Send value into stream locally (bypasses topic).

Return type

None

async on_start() → None[source]

Signal called when the stream starts.

Return type

None

async stop() → None[source]

Stop this stream.

Return type

None

async on_stop() → None[source]

Signal that the stream is stopping.

Return type

None

async ack(event: faust.types.events.EventT) → bool[source]

Ack event.

This will decrease the reference count of the event message by one, and when the reference count reaches zero, the worker will commit the offset so that the message will not be seen by a worker again.

Parameters

event (EventT[]) – Event to ack.

Return type

bool

property label

Return description of stream, used in graphs and logs. :rtype: str

shortlabel[source]

Return short description of stream.

class faust.StreamT(channel: AsyncIterator[T_co] = None, *, app: faust.types.streams._AppT = None, processors: Iterable[Callable[T]] = None, combined: List[faust.types.streams.JoinableT] = None, on_start: Callable = None, join_strategy: faust.types.streams._JoinT = None, beacon: mode.utils.types.trees.NodeT = None, concurrency_index: int = None, prev: Optional[faust.types.streams.StreamT] = None, active_partitions: Set[faust.types.tuples.TP] = None, enable_acks: bool = True, prefix: str = '', loop: asyncio.events.AbstractEventLoop = None) → None[source]
outbox = None
join_strategy = None
task_owner = None
current_event = None
active_partitions = None
concurrency_index = None
enable_acks = True
prefix = ''
abstract get_active_stream() → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract add_processor(processor: Callable[T]) → None[source]
Return type

None

abstract info() → Mapping[str, Any][source]
Return type

Mapping[str, Any]

abstract clone(**kwargs: Any) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract async items() → AsyncIterator[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], T_co]][source]
abstract async events() → AsyncIterable[faust.types.events.EventT][source]
abstract async take(max_: int, within: Union[datetime.timedelta, float, str]) → AsyncIterable[Sequence[T_co]][source]
abstract enumerate(start: int = 0) → AsyncIterable[Tuple[int, T_co]][source]
Return type

AsyncIterable[Tuple[int, +T_co]]

abstract through(channel: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract echo(*channels: Union[str, faust.types.channels.ChannelT]) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract group_by(key: Union[faust.types.models.FieldDescriptorT, Callable[T, Union[bytes, faust.types.core._ModelT, Any, None]]], *, name: str = None, topic: faust.types.topics.TopicT = None) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract derive_topic(name: str, *, schema: faust.types.streams._SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, prefix: str = '', suffix: str = '') → faust.types.topics.TopicT[source]
Return type

TopicT[]

abstract async throw(exc: BaseException) → None[source]
Return type

None

abstract async send(value: T_contra) → None[source]
Return type

None

abstract async ack(event: faust.types.events.EventT) → bool[source]
Return type

bool

faust.current_event() → Optional[faust.types.events.EventT][source]

Return the event currently being processed, or None.

Return type

Optional[EventT[]]

class faust.GlobalTable(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, use_partitioner: bool = False, on_window_close: Callable[[Any, Any], None] = None, **kwargs: Any) → None[source]
logger = <Logger faust.tables.globaltable (WARNING)>
class faust.Table(app: faust.types.app.AppT, *, name: str = None, default: Callable[Any] = None, store: Union[str, yarl.URL] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, partitions: int = None, window: faust.types.windows.WindowT = None, changelog_topic: faust.types.topics.TopicT = None, help: str = None, on_recover: Callable[Awaitable[None]] = None, on_changelog_event: Callable[faust.types.events.EventT, Awaitable[None]] = None, recovery_buffer_size: int = 1000, standby_buffer_size: int = None, extra_topic_configs: Mapping[str, Any] = None, recover_callbacks: Set[Callable[Awaitable[None]]] = None, options: Mapping[str, Any] = None, use_partitioner: bool = False, on_window_close: Callable[[Any, Any], None] = None, **kwargs: Any) → None[source]

Table (non-windowed).

class WindowWrapper(table: faust.types.tables.TableT, *, relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None] = None, key_index: bool = False, key_index_table: faust.types.tables.TableT = None) → None

Windowed table wrapper.

A windowed table does not return concrete values when keys are accessed, instead WindowSet is returned so that the values can be further reduced to the wanted time period.

ValueType

alias of WindowSet

as_ansitable(title: str = '{table.name}', **kwargs: Any) → str

Draw table as a terminal ANSI table.

Return type

str

clone(relative_to: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT

Clone this table using a new time-relativity configuration.

Return type

WindowWrapperT[]

property get_relative_timestamp

Return the current handler for extracting event timestamp. :rtype: Optional[Callable[[Optional[EventT[]]], Union[float, datetime]]]

get_timestamp(event: faust.types.events.EventT = None) → float

Get timestamp from event.

Return type

float

items(event: faust.types.events.EventT = None) → ItemsView

Return table items view: iterate over (key, value) pairs.

Return type

ItemsView[~KT, +VT_co]

key_index = False
key_index_table = None
keys() → KeysView

Return table keys view: iterate over keys found in this table.

Return type

KeysView[~KT]

property name

Return the name of this table. :rtype: str

on_del_key(key: Any) → None

Call when a key is deleted from this table.

Return type

None

on_recover(fun: Callable[Awaitable[None]]) → Callable[Awaitable[None]]

Call after table recovery.

Return type

Callable[[], Awaitable[None]]

on_set_key(key: Any, value: Any) → None

Call when the value for a key in this table is set.

Return type

None

relative_to(ts: Union[faust.types.tables._FieldDescriptorT, Callable[Optional[faust.types.events.EventT], Union[float, datetime.datetime]], datetime.datetime, float, None]) → faust.types.tables.WindowWrapperT

Configure the time-relativity of this windowed table.

Return type

WindowWrapperT[]

relative_to_field(field: faust.types.models.FieldDescriptorT) → faust.types.tables.WindowWrapperT

Configure table to be time-relative to a field in the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Further it will not use the timestamp of the Kafka message, but a field in the value of the event.

For example a model field:

class Account(faust.Record):
    created: float

table = app.Table('foo').hopping(
    ...,
).relative_to_field(Account.created)
Return type

WindowWrapperT[]

relative_to_now() → faust.types.tables.WindowWrapperT

Configure table to be time-relative to the system clock.

Return type

WindowWrapperT[]

relative_to_stream() → faust.types.tables.WindowWrapperT

Configure table to be time-relative to the stream.

This means the window will use the timestamp from the event currently being processed in the stream.

Return type

WindowWrapperT[]

values(event: faust.types.events.EventT = None) → ValuesView

Return table values view: iterate over values in this table.

Return type

ValuesView[+VT_co]

using_window(window: faust.types.windows.WindowT, *, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table using a specific window type.

Return type

WindowWrapperT[]

hopping(size: Union[datetime.timedelta, float, str], step: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table in a hopping window.

Return type

WindowWrapperT[]

tumbling(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None, key_index: bool = False) → faust.types.tables.WindowWrapperT[source]

Wrap table in a tumbling window.

Return type

WindowWrapperT[]

on_key_get(key: KT) → None[source]

Call when the value for a key in this table is retrieved.

Return type

None

on_key_set(key: KT, value: VT) → None[source]

Call when the value for a key in this table is set.

Return type

None

on_key_del(key: KT) → None[source]

Call when a key in this table is removed.

Return type

None

as_ansitable(title: str = '{table.name}', **kwargs: Any) → str[source]

Draw table as a a terminal ANSI table.

Return type

str

logger = <Logger faust.tables.table (WARNING)>
class faust.SetGlobalTable(app: faust.types.app.AppT, *, start_manager: bool = False, manager_topic_name: str = None, manager_topic_suffix: str = None, **kwargs: Any) → None[source]
logger = <Logger faust.tables.sets (WARNING)>
class faust.SetTable(app: faust.types.app.AppT, *, start_manager: bool = False, manager_topic_name: str = None, manager_topic_suffix: str = None, **kwargs: Any) → None[source]

Table that maintains a dictionary of sets.

Manager

alias of SetTableManager

WindowWrapper

alias of SetWindowWrapper

logger = <Logger faust.tables.sets (WARNING)>
manager_topic_suffix = '-setmanager'
async on_start() → None[source]

Call when set table starts.

Return type

None

class faust.Topic(app: faust.types.app.AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = None, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Define new topic description.

Parameters
  • app (AppT[]) – App instance used to create this topic description.

  • topics (Optional[Sequence[str]]) – List of topic names.

  • partitions (Optional[int]) – Number of partitions for these topics. On declaration, topics are created using this. Note: If a message is produced before the topic is declared, and autoCreateTopics is enabled on the Kafka Server, the number of partitions used will be specified by the server configuration.

  • retention (Union[timedelta, float, str, None]) – Number of seconds (as float/timedelta) to keep messages in the topic before they can be expired by the server.

  • pattern (Union[str, Pattern[AnyStr], None]) – Regular expression evaluated to decide what topics to subscribe to. You cannot specify both topics and a pattern.

  • schema (Optional[SchemaT[~KT, ~VT]]) – Schema used for serialization/deserialization.

  • key_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize keys for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if one is defined).

  • value_type (Union[Type[ModelT], Type[bytes], Type[str], None]) – How to deserialize values for messages in this topic. Can be a faust.Model type, str, bytes, or None for “autodetect” (Overrides schema if ones is defined).

  • active_partitions (Optional[Set[TP]]) – Set of faust.types.tuples.TP that this topic should be restricted to.

Raises

TypeError – if both topics and pattern is provided.

async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic.

Return type

Awaitable[RecordMetadata]

send_soon(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, schema: faust.types.serializers.SchemaT = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, force: bool = False, eager_partitioning: bool = False) → faust.types.tuples.FutureMessage[source]

Produce message by adding to buffer.

Notes

This method can be used by non-async def functions to produce messages.

Return type

FutureMessage[]

async put(event: faust.types.events.EventT) → None[source]

Put even directly onto the underlying queue of this topic.

This will only affect subscribers to a particular instance, in a particular process.

Return type

None

property pattern

Regular expression used by this topic (if any). :rtype: Optional[Pattern[AnyStr]]

property partitions

Return the number of configured partitions for this topic.

Notes

This is only active for internal topics, fully owned and managed by Faust itself.

We never touch the configuration of a topic that exists in Kafka, and Kafka will sometimes automatically create topics when they don’t exist. In this case the number of partitions for the automatically created topic will depend on the Kafka server configuration (num.partitions).

Always make sure your topics have the correct number of partitions. :rtype: Optional[int]

derive(**kwargs: Any) → faust.types.channels.ChannelT[source]

Create topic derived from the configuration of this topic.

Configuration will be copied from this topic, but any parameter overridden as a keyword argument.

See also

derive_topic(): for a list of supported keyword arguments.

Return type

ChannelT[]

derive_topic(*, topics: Sequence[str] = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = None, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) → faust.types.topics.TopicT[source]

Create new topic with configuration derived from this topic.

Return type

TopicT[]

get_topic_name() → str[source]

Return the main topic name of this topic description.

As topic descriptions can have multiple topic names, this will only return when the topic has a singular topic name in the description.

Raises
  • TypeError – if configured with a regular expression pattern.

  • ValueError – if configured with multiple topic names.

  • TypeError – if not configured with any names or patterns.

Return type

str

async publish_message(fut: faust.types.tuples.FutureMessage, wait: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Fulfill promise to publish message to topic.

Return type

Awaitable[RecordMetadata]

maybe_declare[source]

Declare/create this topic, only if it does not exist. :rtype: None

async declare() → None[source]

Declare/create this topic on the server.

Return type

None

class faust.TopicT(app: faust.types.topics._AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern[~AnyStr]] = None, schema: faust.types.topics._SchemaT = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: mode.utils.queues.ThrowableQueue = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, maxsize: int = None, root: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.events.AbstractEventLoop = None) → None[source]
topics = None

Iterable/Sequence of topic names to subscribe to.

retention = None

expiry time in seconds for messages in the topic.

Type

Topic retention setting

compacting = None

Flag that when enabled means the topic can be “compacted”: if the topic is a log of key/value pairs, the broker can delete old values for the same key.

replicas = None

Number of replicas for topic.

config = None

Additional configuration as a mapping.

acks = None

Enable acks for this topic.

internal = None

it’s owned by us and we are allowed to create or delete the topic as necessary.

Type

Mark topic as internal

has_prefix = False
abstract property pattern
Return type

Optional[Pattern[AnyStr]]

abstract property partitions
Return type

Optional[int]

abstract derive(**kwargs: Any) → faust.types.channels.ChannelT[source]
Return type

ChannelT[]

abstract derive_topic(*, topics: Sequence[str] = None, schema: faust.types.topics._SchemaT = None, key_type: faust.types.topics._ModelArg = None, value_type: faust.types.topics._ModelArg = None, partitions: int = None, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) → faust.types.topics.TopicT[source]
Return type

TopicT[]

class faust.Settings(id: str, *, debug: bool = None, version: int = None, broker: Union[str, yarl.URL, List[yarl.URL]] = None, broker_client_id: str = None, broker_request_timeout: Union[datetime.timedelta, float, str] = None, broker_credentials: Union[faust.types.auth.CredentialsT, ssl.SSLContext] = None, broker_commit_every: int = None, broker_commit_interval: Union[datetime.timedelta, float, str] = None, broker_commit_livelock_soft_timeout: Union[datetime.timedelta, float, str] = None, broker_session_timeout: Union[datetime.timedelta, float, str] = None, broker_heartbeat_interval: Union[datetime.timedelta, float, str] = None, broker_check_crcs: bool = None, broker_max_poll_records: int = None, broker_max_poll_interval: int = None, broker_consumer: Union[str, yarl.URL, List[yarl.URL]] = None, broker_producer: Union[str, yarl.URL, List[yarl.URL]] = None, agent_supervisor: Union[_T, str] = None, store: Union[str, yarl.URL] = None, cache: Union[str, yarl.URL] = None, web: Union[str, yarl.URL] = None, web_enabled: bool = True, processing_guarantee: Union[str, faust.types.enums.ProcessingGuarantee] = None, timezone: datetime.tzinfo = None, autodiscover: Union[bool, Iterable[str], Callable[Iterable[str]]] = None, origin: str = None, canonical_url: Union[str, yarl.URL] = None, datadir: Union[pathlib.Path, str] = None, tabledir: Union[pathlib.Path, str] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, logging_config: Dict = None, loghandlers: List[logging.Handler] = None, table_cleanup_interval: Union[datetime.timedelta, float, str] = None, table_standby_replicas: int = None, table_key_index_size: int = None, topic_replication_factor: int = None, topic_partitions: int = None, topic_allow_declare: bool = None, topic_disable_leader: bool = None, id_format: str = None, reply_to: str = None, reply_to_prefix: str = None, reply_create_topic: bool = None, reply_expires: Union[datetime.timedelta, float, str] = None, ssl_context: ssl.SSLContext = None, stream_buffer_maxsize: int = None, stream_wait_empty: bool = None, stream_ack_cancelled_tasks: bool = None, stream_ack_exceptions: bool = None, stream_publish_on_commit: bool = None, stream_recovery_delay: Union[datetime.timedelta, float, str] = None, producer_linger_ms: int = None, producer_max_batch_size: int = None, producer_acks: int = None, producer_max_request_size: int = None, producer_compression_type: str = None, producer_partitioner: Union[_T, str] = None, producer_request_timeout: Union[datetime.timedelta, float, str] = None, producer_api_version: str = None, consumer_max_fetch_size: int = None, consumer_auto_offset_reset: str = None, web_bind: str = None, web_port: int = None, web_host: str = None, web_transport: Union[str, yarl.URL] = None, web_in_thread: bool = None, web_cors_options: Mapping[str, faust.types.web.ResourceOptions] = None, worker_redirect_stdouts: bool = None, worker_redirect_stdouts_level: Union[int, str] = None, Agent: Union[_T, str] = None, ConsumerScheduler: Union[_T, str] = None, Event: Union[_T, str] = None, Schema: Union[_T, str] = None, Stream: Union[_T, str] = None, Table: Union[_T, str] = None, SetTable: Union[_T, str] = None, GlobalTable: Union[_T, str] = None, SetGlobalTable: Union[_T, str] = None, TableManager: Union[_T, str] = None, Serializers: Union[_T, str] = None, Worker: Union[_T, str] = None, PartitionAssignor: Union[_T, str] = None, LeaderAssignor: Union[_T, str] = None, Router: Union[_T, str] = None, Topic: Union[_T, str] = None, HttpClient: Union[_T, str] = None, Monitor: Union[_T, str] = None, url: Union[str, yarl.URL] = None, **kwargs: Any) → None[source]
classmethod setting_names() → Set[str][source]
Return type

Set[str]

id_format = '{id}-v{self.version}'
debug = False
ssl_context = None
autodiscover = False
broker_client_id = 'faust-1.9.0'
timezone = datetime.timezone.utc
broker_commit_every = 10000
broker_check_crcs = True
broker_max_poll_interval = 1000.0
key_serializer = 'raw'
value_serializer = 'json'
table_standby_replicas = 1
table_key_index_size = 1000
topic_replication_factor = 1
topic_partitions = 8
topic_allow_declare = True
topic_disable_leader = False
reply_create_topic = False
logging_config = None
stream_buffer_maxsize = 4096
stream_wait_empty = True
stream_ack_cancelled_tasks = True
stream_ack_exceptions = True
stream_publish_on_commit = False
producer_linger_ms = 0
producer_max_batch_size = 16384
producer_acks = -1
producer_max_request_size = 1000000
producer_compression_type = None
producer_api_version = 'auto'
consumer_max_fetch_size = 4194304
consumer_auto_offset_reset = 'earliest'
web_bind = '0.0.0.0'
web_port = 6066
web_host = 'build-10233069-project-230058-faust'
web_in_thread = False
web_cors_options = None
worker_redirect_stdouts = True
worker_redirect_stdouts_level = 'WARN'
reply_to_prefix = 'f-reply-'
property name
Return type

str

property id
Return type

str

property origin
Return type

Optional[str]

property version
Return type

int

property broker
Return type

List[URL]

property broker_consumer
Return type

List[URL]

property broker_producer
Return type

List[URL]

property store
Return type

URL

property web
Return type

URL

property cache
Return type

URL

property canonical_url
Return type

URL

property datadir
Return type

Path

property appdir
Return type

Path

find_old_versiondirs() → Iterable[pathlib.Path][source]
Return type

Iterable[Path]

property tabledir
Return type

Path

property processing_guarantee
Return type

ProcessingGuarantee

property broker_credentials
Return type

Optional[CredentialsT]

property broker_request_timeout
Return type

float

property broker_session_timeout
Return type

float

property broker_heartbeat_interval
Return type

float

property broker_commit_interval
Return type

float

property broker_commit_livelock_soft_timeout
Return type

float

property broker_max_poll_records
Return type

Optional[int]

property producer_partitioner
Return type

Optional[Callable[[Optional[bytes], Sequence[int], Sequence[int]], int]]

property producer_request_timeout
Return type

float

property table_cleanup_interval
Return type

float

property reply_expires
Return type

float

property stream_recovery_delay
Return type

float

property agent_supervisor
Return type

Type[SupervisorStrategyT]

property web_transport
Return type

URL

property Agent
Return type

Type[AgentT[]]

property ConsumerScheduler
Return type

Type[SchedulingStrategyT]

property Event
Return type

Type[EventT[]]

property Schema
Return type

Type[SchemaT[~KT, ~VT]]

property Stream
Return type

Type[StreamT[+T_co]]

property Table
Return type

Type[TableT[~KT, ~VT]]

property SetTable
Return type

Type[TableT[~KT, ~VT]]

property GlobalTable
Return type

Type[GlobalTableT[]]

property SetGlobalTable
Return type

Type[GlobalTableT[]]

property TableManager
Return type

Type[TableManagerT[]]

property Serializers
Return type

Type[RegistryT]

property Worker
Return type

Type[_WorkerT]

property PartitionAssignor
Return type

Type[PartitionAssignorT]

property LeaderAssignor
Return type

Type[LeaderAssignorT[]]

property Router
Return type

Type[RouterT]

property Topic
Return type

Type[TopicT[]]

property HttpClient
Return type

Type[ClientSession]

property Monitor
Return type

Type[SensorT[]]

faust.HoppingWindow

alias of faust.windows._PyHoppingWindow

class faust.TumblingWindow(size: Union[datetime.timedelta, float, str], expires: Union[datetime.timedelta, float, str] = None) → None[source]

Tumbling window type.

Fixed-size, non-overlapping, gap-less windows.

faust.SlidingWindow

alias of faust.windows._PySlidingWindow

class faust.Window(*args, **kwargs)[source]

Base class for window types.

class faust.Worker(app: faust.types.app.AppT, *services: mode.types.services.ServiceT, sensors: Iterable[faust.types.sensors.SensorT] = None, debug: bool = False, quiet: bool = False, loglevel: Union[str, int] = None, logfile: Union[str, IO] = None, stdout: IO = <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, stderr: IO = <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, blocking_timeout: float = 10.0, workdir: Union[pathlib.Path, str] = None, console_port: int = 50101, loop: asyncio.events.AbstractEventLoop = None, redirect_stdouts: bool = None, redirect_stdouts_level: int = None, logging_config: Dict = None, **kwargs: Any) → None[source]

Worker.

See also

This is a subclass of mode.Worker.

Usage:

You can start a worker using:

  1. the faust worker program.

  2. instantiating Worker programmatically and calling execute_from_commandline():

    >>> worker = Worker(app)
    >>> worker.execute_from_commandline()
    
  3. or if you already have an event loop, calling await start, but in that case you are responsible for gracefully shutting down the event loop:

    async def start_worker(worker: Worker) -> None:
        await worker.start()
    
    def manage_loop():
        loop = asyncio.get_event_loop()
        worker = Worker(app, loop=loop)
        try:
            loop.run_until_complete(start_worker(worker)
        finally:
            worker.stop_and_shutdown_loop()
    
Parameters
  • app (AppT[]) – The Faust app to start.

  • *services – Services to start with worker. This includes application instances to start.

  • sensors (Iterable[SensorT]) – List of sensors to include.

  • debug (bool) – Enables debugging mode [disabled by default].

  • quiet (bool) – Do not output anything to console [disabled by default].

  • loglevel (Union[str, int]) – Level to use for logging, can be string (one of: CRIT|ERROR|WARN|INFO|DEBUG), or integer.

  • logfile (Union[str, IO]) – Name of file or a stream to log to.

  • stdout (IO) – Standard out stream.

  • stderr (IO) – Standard err stream.

  • blocking_timeout (float) – When debug is enabled this sets the timeout for detecting that the event loop is blocked.

  • workdir (Union[str, Path]) – Custom working directory for the process that the worker will change into when started. This working directory change is permanent for the process, or until something else changes the working directory again.

  • loop (asyncio.AbstractEventLoop) – Custom event loop object.

logger = <Logger faust.worker (WARNING)>
app = None

The Faust app started by this worker.

sensors = None

Additional sensors to add to the Faust app.

workdir = None

Current working directory. Note that if passed as an argument to Worker, the worker will change to this directory when started.

spinner = None

Class that displays a terminal progress spinner (see progress).

async on_start() → None[source]

Signal called every time the worker starts.

Return type

None

async on_startup_finished() → None[source]

Signal called when worker has started.

Return type

None

on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return service dependencies that must start with the worker.

Return type

Iterable[ServiceT[]]

async on_first_start() → None[source]

Signal called the first time the worker starts.

First time, means this callback is not called if the worker is restarted by an exception being raised.

Return type

None

change_workdir(path: pathlib.Path) → None[source]

Change the current working directory (CWD).

Return type

None

autodiscover() → None[source]

Autodiscover modules and files to find @agent decorators, etc.

Return type

None

async on_execute() → None[source]

Signal called when the worker is about to start.

Return type

None

on_worker_shutdown() → None[source]

Signal called before the worker is shutting down.

Return type

None

on_setup_root_logger(logger: logging.Logger, level: int) → None[source]

Signal called when the root logger is being configured.

Return type

None

faust.uuid() → str[source]

Generate random UUID string.

Shortcut to str(uuid4()).

Return type

str