faust.agents¶
-
class
faust.agents.Agent(fun: Callable[Union[AsyncIterator, faust.types.streams.StreamT], Union[Awaitable, AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, **kwargs) → None[source]¶ Agent.
This is the type of object returned by the
@app.agentdecorator.-
supervisor= None¶
-
clone(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs) → faust.types.agents.AgentT[source]¶ Return type: AgentT[]
-
test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]¶ Return type: AgentTestWrapperT[]
-
actor_from_stream(stream: faust.types.streams.StreamT) → faust.types.agents.ActorT[Union[AsyncIterable, Awaitable]][source]¶ Return type: ActorT[]
-
add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]¶ Return type: None
-
stream(channel: faust.types.channels.ChannelT = None, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → faust.types.streams.StreamT[source]¶ Return type: StreamT[+T_co]
-
coroutine
ask(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]¶ Return type: Any
-
coroutine
ask_nowait(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]¶ Return type: ReplyPromise
-
coroutine
cast(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None) → None[source]¶ Return type: None
-
coroutine
join(self, values: Union[AsyncIterable[Union[bytes, faust.types.core.ModelT, Any]], Iterable[Union[bytes, faust.types.core.ModelT, Any]]], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
coroutine
kvjoin(self, items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]¶ Return type: AsyncIterator[str]
-
logger= <Logger faust.agents.agent (WARNING)>¶
-
map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]¶ Return type: AsyncIterator[+T_co]
-
coroutine
on_isolated_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_isolated_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
Return type: None
Return type: None
-
coroutine
on_start(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
coroutine
on_stop(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
send(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[None, Awaitable[None]]] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Send message to topic used by agent.
Return type: Awaitable[RecordMetadata]
-
channel_iterator¶ Return type: AsyncIterator[+T_co]
-
-
faust.agents.AgentFun¶ alias of
typing.Callable
-
class
faust.agents.AgentT(fun: Callable[Union[AsyncIterator, faust.types.streams.StreamT], Union[Awaitable, AsyncIterable]], *, name: str = None, app: faust.types.agents.AppT = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, **kwargs) → None[source]¶ -
test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]¶ Return type: AgentTestWrapperT[]
-
add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]¶ Return type: None
-
clone(*, cls: Type[AgentT] = None, **kwargs) → faust.types.agents.AgentT[source]¶ Return type: AgentT[]
-
channel_iterator¶ Return type: AsyncIterator[+T_co]
-
coroutine
ask(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]¶ Return type: Any
-
coroutine
cast(self, value: Union[bytes, faust.types.core.ModelT, Any] = None, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, partition: int = None) → None[source]¶ Return type: None
-
coroutine
join(self, values: Union[AsyncIterable[Union[bytes, faust.types.core.ModelT, Any]], Iterable[Union[bytes, faust.types.core.ModelT, Any]]], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
coroutine
kvjoin(self, items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
coroutine
kvmap(self, items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core.ModelT, Any, None], Union[bytes, faust.types.core.ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]¶
-
coroutine
map(self, values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core.ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]¶
-
coroutine
on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
send(self, *, key: Union[bytes, faust.types.core.ModelT, Any, None] = None, value: Union[bytes, faust.types.core.ModelT, Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable[RecordMetadata]
-
-
class
faust.agents.AgentManager(app: faust.types.app.AppT, **kwargs) → None[source]¶ Agent manager.
-
logger= <Logger faust.agents.manager (WARNING)>¶
-
coroutine
on_rebalance(self, revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_start(self) → None[source]¶ Called every time before the service is started/restarted.
Return type: None
-
-
class
faust.agents.AgentManagerT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶
-
class
faust.agents.ReplyConsumer(app: faust.types.app.AppT, **kwargs) → None[source]¶ Consumer responsible for redelegation of replies received.
-
coroutine
add(self, correlation_id: str, promise: faust.agents.replies.ReplyPromise) → None[source]¶ Return type: None
-
logger= <Logger faust.agents.replies (WARNING)>¶
-
coroutine