faust.agents.agent

Agent implementation.

class faust.agents.agent.Agent(fun: Callable[Union[typing.AsyncIterator, faust.types.streams.StreamT], Union[typing.Awaitable, typing.AsyncIterable]], *, app: faust.types.app.AppT, name: str = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]] = None, on_error: Callable[[_ForwardRef('AgentT'), BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, isolated_partitions: bool = False, **kwargs) → None[source]

Agent.

This is the type of object returned by the @app.agent decorator.

cancel() → None[source]
Return type:None
info() → Mapping[source]
Return type:Mapping[~KT, +VT_co]
clone(*, cls: Type[faust.types.agents.AgentT] = None, **kwargs) → faust.types.agents.AgentT[source]
Return type:AgentT[]
test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]
Return type:AgentTestWrapperT[]
actor_from_stream(stream: faust.types.streams.StreamT) → faust.types.agents.ActorT[Union[typing.AsyncIterable, typing.Awaitable]][source]
Return type:ActorT[]
add_sink(sink: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]) → None[source]
Return type:None
stream(active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → faust.types.streams.StreamT[source]
Return type:StreamT[+T_co]
get_topic_names() → Iterable[str][source]
Return type:Iterable[str]
coroutine ask(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, *, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]
Return type:Any
coroutine ask_nowait(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, *, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → faust.agents.replies.ReplyPromise[source]
Return type:ReplyPromise
coroutine cast(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, *, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None) → None[source]
Return type:None
channel
coroutine join(self, values: Union[typing.AsyncIterable[typing.Union[bytes, faust.types.core.ModelT, typing.Any]], typing.Iterable[typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type:List[Any]
coroutine kvjoin(self, items: Union[typing.AsyncIterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], typing.Iterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]]], reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type:List[Any]
kvmap(items: Union[typing.AsyncIterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], typing.Iterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]]], reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]
Return type:AsyncIterator[str]
logger = <Logger faust.agents.agent (WARNING)>
map(values: Union[typing.AsyncIterable, typing.Iterable], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]
Return type:AsyncIterator[+T_co]
coroutine on_isolated_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_isolated_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_shared_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine on_shared_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine send(self, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, callback: Callable[faust.types.tuples.FutureMessage, Union[NoneType, typing.Awaitable[NoneType]]] = None, *, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None, force: bool = False) → Awaitable[faust.types.tuples.RecordMetadata][source]

Send message to topic used by agent.

Return type:Awaitable[RecordMetadata]
channel_iterator
label