faust.agents

Agents.

class faust.agents.Agent(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, use_reply_headers: bool = None, **kwargs: Any) → None[source]

Agent.

This is the type of object returned by the @app.agent decorator.

supervisor = None
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of services dependencies required to start agent.

Return type

Iterable[ServiceT[]]

async on_start() → None[source]

Call when an agent starts.

Return type

None

async on_stop() → None[source]

Call when an agent stops.

Return type

None

cancel() → None[source]

Cancel agent and its actor instances running in this process.

Return type

None

async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when partitions are revoked.

Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when partitions are assigned.

Return type

None

async on_isolated_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when isolated partitions are revoked.

Return type

None

async on_isolated_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when isolated partitions are assigned.

Return type

None

async on_shared_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call when non-isolated partitions are revoked.

Return type

None

async on_shared_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when non-isolated partitions are assigned.

Return type

None

info() → Mapping[source]

Return agent attributes as a dictionary.

Return type

Mapping[~KT, +VT_co]

clone(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs: Any) → faust.types.agents.AgentT[source]

Create clone of this agent object.

Keyword arguments can be passed to override any argument supported by Agent.__init__.

Return type

AgentT[]

test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs: Any) → faust.types.agents.AgentTestWrapperT[source]

Create new unit-testing wrapper for this agent.

Return type

AgentTestWrapperT[]

actor_from_stream(stream: Optional[faust.types.streams.StreamT], *, index: int = None, active_partitions: Set[faust.types.tuples.TP] = None, channel: faust.types.channels.ChannelT = None) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]

Create new actor from stream.

Return type

ActorT[]

add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]

Add new sink to further handle results from this agent.

Return type

None

stream(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → faust.types.streams.StreamT[source]

Create underlying stream used by this agent.

Return type

StreamT[+T_co]

async cast(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → None[source]

RPC operation: like ask() but do not expect reply.

Cast here is like “casting a spell”, and will not expect a reply back from the agent.

Return type

None

async ask(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]

RPC operation: ask agent for result of processing value.

This version will wait until the result is available and return the processed value.

Return type

Any

async ask_nowait(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]

RPC operation: ask agent for result of processing value.

This version does not wait for the result to arrive, but instead returns a promise of future evaluation.

Return type

ReplyPromise

async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic used by agent.

Return type

Awaitable[RecordMetadata]

map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]

RPC map operation on a list of values.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type

AsyncIterator[+T_co]

kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]

RPC map operation on a list of (key, value) pairs.

A map operation iterates over results as they arrive. See join() and kvjoin() if you want them in order.

Return type

AsyncIterator[str]

async join(values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]

RPC map operation on a list of values.

A join returns the results in order, and only returns once all values have been processed.

Return type

List[Any]

async kvjoin(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]

RPC map operation on list of (key, value) pairs.

A join returns the results in order, and only returns once all values have been processed.

Return type

List[Any]

get_topic_names() → Iterable[str][source]

Return list of topic names this agent subscribes to.

Return type

Iterable[str]

property channel

Return channel used by agent. :rtype: ChannelT[]

property channel_iterator

Return channel agent iterates over. :rtype: AsyncIterator[+T_co]

property label

Return human-readable description of agent. :rtype: str

property shortlabel

Return short description of agent. :rtype: str

logger = <Logger faust.agents.agent (WARNING)>
faust.agents.AgentFun

alias of typing.Callable

class faust.agents.AgentT(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, name: str = None, app: faust.types.agents._AppT = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, **kwargs: Any) → None[source]
abstract test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, **kwargs: Any) → faust.types.agents.AgentTestWrapperT[source]
Return type

AgentTestWrapperT[]

abstract add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]
Return type

None

abstract stream(**kwargs: Any) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

abstract async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

abstract async cast(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → None[source]
Return type

None

abstract async ask(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]
Return type

Any

abstract async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

abstract async map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]
abstract async kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]
abstract async join(values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

abstract async kvjoin(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

abstract info() → Mapping[source]
Return type

Mapping[~KT, +VT_co]

abstract clone(*, cls: Type[AgentT] = None, **kwargs: Any) → faust.types.agents.AgentT[source]
Return type

AgentT[]

abstract get_topic_names() → Iterable[str][source]
Return type

Iterable[str]

abstract property channel
Return type

ChannelT[]

abstract property channel_iterator
Return type

AsyncIterator[+T_co]

faust.agents.current_agent() → Optional[faust.types.agents.AgentT][source]
Return type

Optional[AgentT[]]

class faust.agents.AgentManager(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Agent manager.

async on_start() → None[source]

Call when agents are being started.

Return type

None

service_reset() → None[source]

Reset service state on restart.

Return type

None

async on_stop() → None[source]

Call when agents are being stopped.

Return type

None

async stop() → None[source]

Stop all running agents.

Return type

None

cancel() → None[source]

Cancel all running agents.

Return type

None

update_topic_index() → None[source]

Update indices.

Return type

None

async on_rebalance(revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]

Call when a rebalance is needed.

Return type

None

logger = <Logger faust.agents.manager (WARNING)>
class faust.agents.AgentManagerT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]
abstract async on_rebalance(revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

class faust.agents.ReplyConsumer(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Consumer responsible for redelegation of replies received.

logger = <Logger faust.agents.replies (WARNING)>
async on_start() → None[source]

Call when reply consumer starts.

Return type

None

async add(correlation_id: str, promise: faust.agents.replies.ReplyPromise) → None[source]

Register promise to start tracking when it arrives.

Return type

None