faust.types.agents

faust.types.agents.AgentErrorHandler

alias of typing.Callable

faust.types.agents.AgentFun

alias of typing.Callable

faust.types.agents.SinkT = typing.Union[_ForwardRef('AgentT'), faust.types.channels.ChannelT, typing.Callable[[typing.Any], typing.Union[typing.Awaitable, NoneType]]]

Agent, Channel or callable/async callable taking value as argument.

Type

A sink can be

class faust.types.agents.ActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → None[source]
index = None

If multiple instance are started for concurrency, this is its index.

abstract cancel() → None[source]
Return type

None

abstract async on_isolated_partition_revoked(tp: faust.types.tuples.TP) → None[source]
Return type

None

abstract async on_isolated_partition_assigned(tp: faust.types.tuples.TP) → None[source]
Return type

None

class faust.types.agents.AsyncIterableActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → None[source]

Used for agent function that yields.

class faust.types.agents.AwaitableActorT(agent: faust.types.agents.AgentT, stream: faust.types.streams.StreamT, it: _T, active_partitions: Set[faust.types.tuples.TP] = None, **kwargs: Any) → None[source]

Used for agent function that do not yield.

faust.types.agents.ActorRefT

alias of faust.types.agents.ActorT

class faust.types.agents.AgentT(fun: Callable[faust.types.streams.StreamT, Union[Coroutine[[Any, Any], None], Awaitable[None], AsyncIterable]], *, name: str = None, app: faust.types.agents._AppT = None, channel: Union[str, faust.types.channels.ChannelT] = None, concurrency: int = 1, sink: Iterable[Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]] = None, on_error: Callable[[AgentT, BaseException], Awaitable] = None, supervisor_strategy: Type[mode.types.supervisors.SupervisorStrategyT] = None, help: str = None, schema: faust.types.serializers.SchemaT = None, key_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, value_type: Union[Type[faust.types.models.ModelT], Type[bytes], Type[str]] = None, isolated_partitions: bool = False, **kwargs: Any) → None[source]
abstract test_context(channel: faust.types.channels.ChannelT = None, supervisor_strategy: mode.types.supervisors.SupervisorStrategyT = None, **kwargs: Any) → faust.types.agents.AgentTestWrapperT[source]
Return type

AgentTestWrapperT[]

abstract add_sink(sink: Union[AgentT, faust.types.channels.ChannelT, Callable[Any, Optional[Awaitable]]]) → None[source]
Return type

None

abstract stream(**kwargs: Any) → faust.types.streams.StreamT[source]
Return type

StreamT[+T_co]

abstract async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

abstract async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]
Return type

None

abstract async cast(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → None[source]
Return type

None

abstract async ask(value: Union[bytes, faust.types.core._ModelT, Any] = None, *, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Any[source]
Return type

Any

abstract async send(*, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, value: Union[bytes, faust.types.core._ModelT, Any] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type

Awaitable[RecordMetadata]

abstract async map(values: Union[AsyncIterable, Iterable], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[source]
abstract async kvmap(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → AsyncIterator[str][source]
abstract async join(values: Union[AsyncIterable[Union[bytes, faust.types.core._ModelT, Any]], Iterable[Union[bytes, faust.types.core._ModelT, Any]]], key: Union[bytes, faust.types.core._ModelT, Any, None] = None, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

abstract async kvjoin(items: Union[AsyncIterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]], Iterable[Tuple[Union[bytes, faust.types.core._ModelT, Any, None], Union[bytes, faust.types.core._ModelT, Any]]]], reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None) → List[Any][source]
Return type

List[Any]

abstract info() → Mapping[source]
Return type

Mapping[~KT, +VT_co]

abstract clone(*, cls: Type[AgentT] = None, **kwargs: Any) → faust.types.agents.AgentT[source]
Return type

AgentT[]

abstract get_topic_names() → Iterable[str][source]
Return type

Iterable[str]

abstract property channel
Return type

ChannelT[]

abstract property channel_iterator
Return type

AsyncIterator[+T_co]

class faust.types.agents.AgentManagerT(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]
abstract async on_rebalance(revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None[source]
Return type

None

class faust.types.agents.AgentTestWrapperT(*args: Any, original_channel: faust.types.channels.ChannelT = None, **kwargs: Any) → None[source]
sent_offset = 0
processed_offset = 0
abstract async put(value: Union[bytes, faust.types.core._ModelT, Any] = None, key: Union[bytes, faust.types.core._ModelT, Any, None] = None, partition: int = None, timestamp: float = None, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None, key_serializer: Union[faust.types.codecs.CodecT, str, None] = None, value_serializer: Union[faust.types.codecs.CodecT, str, None] = None, *, reply_to: Union[AgentT, faust.types.channels.ChannelT, str] = None, correlation_id: str = None, wait: bool = True) → faust.types.events.EventT[source]
Return type

EventT[]

abstract to_message(key: Union[bytes, faust.types.core._ModelT, Any, None], value: Union[bytes, faust.types.core._ModelT, Any], *, partition: int = 0, offset: int = 0, timestamp: float = None, timestamp_type: int = 0, headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None] = None) → faust.types.tuples.Message[source]
Return type

Message

abstract async throw(exc: BaseException) → None[source]
Return type

None