faust.types.agents¶
-
faust.types.agents.AgentErrorHandler¶ alias of
typing.Callable
-
faust.types.agents.AgentFun¶ alias of
typing.Callable
-
faust.types.agents.SinkT= typing.Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]¶ A sink can be – Agent, Channel or callable/async callable taking value as argument.
-
class
faust.types.agents.ActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → None[source]¶ -
index= None¶ If multiple instance are started for concurrency, this is its index.
-
-
class
faust.types.agents.AsyncIterableActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → None[source]¶ Used for agent function that yields.
-
class
faust.types.agents.AwaitableActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs) → None[source]¶ Used for agent function that do not yield.
-
faust.types.agents.ActorRefT¶ alias of
faust.types.agents.ActorT
-
class
faust.types.agents.AgentT(fun: Callable[Union[typing.AsyncIterator, faust.types.streams.StreamT], Union[typing.Awaitable, typing.AsyncIterable]], *, name: str = None, app: faust.types.agents.AppT = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]] = None, on_error: Callable[[_ForwardRef('AgentT'), BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, key_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, value_type: Union[typing.Type[faust.types.models.ModelT], typing.Type[bytes], typing.Type[str]] = None, isolated_partitions: bool = False, **kwargs) → None[source]¶ -
test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, **kwargs) → faust.types.agents.AgentTestWrapperT[source]¶ Return type: AgentTestWrapperT[]
-
add_sink(sink: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]) → None[source]¶ Return type: None
-
clone(*, cls: Type[_ForwardRef('AgentT')] = None, **kwargs) → faust.types.agents.AgentT[source]¶ Return type: AgentT[]
-
channel¶
-
channel_iterator¶
-
coroutine
ask(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, *, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]¶ Return type: Any
-
coroutine
cast(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, *, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None) → None[source]¶ Return type: None
-
coroutine
join(self, values: Union[typing.AsyncIterable[typing.Union[bytes, faust.types.core.ModelT, typing.Any]], typing.Iterable[typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
coroutine
kvjoin(self, items: Union[typing.AsyncIterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], typing.Iterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]]], reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → List[Any][source]¶ Return type: List[Any]
-
coroutine
kvmap(self, items: Union[typing.AsyncIterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]], typing.Iterable[typing.Tuple[typing.Union[bytes, faust.types.core.ModelT, typing.Any, NoneType], typing.Union[bytes, faust.types.core.ModelT, typing.Any]]]], reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]¶
-
coroutine
map(self, values: Union[typing.AsyncIterable, typing.Iterable], key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]¶
-
coroutine
on_partitions_assigned(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
send(self, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, *, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable[RecordMetadata]
-
-
class
faust.types.agents.AgentManagerT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶
-
class
faust.types.agents.AgentTestWrapperT(*args, original_channel: faust.types.channels.ChannelT = None, **kwargs) → None[source]¶ -
sent_offset= 0¶
-
processed_offset= 0¶
-
coroutine
put(self, value: Union[bytes, faust.types.core.ModelT, typing.Any] = None, key: Union[bytes, faust.types.core.ModelT, typing.Any, NoneType] = None, partition: int = None, key_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, value_serializer: Union[faust.types.codecs.CodecT, str, NoneType] = None, *, reply_to: Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, str] = None, correlation_id: str = None, wait: bool = True) → faust.types.events.EventT[source]¶ Return type: EventT[]
-