faust.agents.replies

Agent replies: waiting for replies, sending them, etc.

class faust.agents.replies.ReplyPromise(reply_to: str, correlation_id: str, **kwargs: Any) → None[source]

Reply promise can be await-ed to wait until result ready.

fulfill(correlation_id: str, value: Any) → None[source]

Fulfill promise: a reply was received.

Return type

None

class faust.agents.replies.BarrierState(reply_to: str, **kwargs: Any) → None[source]

State of pending/complete barrier.

A barrier is a synchronization primitive that will wait until a group of coroutines have completed.

size = 0

This is the size while the messages are being sent. (it’s a tentative total, added to until the total is finalized).

total = 0

This is the actual total when all messages have been sent. It’s set by finalize().

fulfilled = 0

The number of results we have received.

pending = None

Set of pending replies that this barrier is composed of.

add(p: faust.agents.replies.ReplyPromise) → None[source]

Add promise to barrier.

Note

You can only add promises before the barrier is finalized using finalize().

Return type

None

finalize() → None[source]

Finalize this barrier.

After finalization you can not grow or shrink the size of the barrier.

Return type

None

fulfill(correlation_id: str, value: Any) → None[source]

Fulfill one of the promises in this barrier.

Once all promises in this barrier is fulfilled, the barrier will be ready.

Return type

None

get_nowait() → faust.agents.replies.ReplyTuple[source]

Return next reply, or raise asyncio.QueueEmpty.

Return type

ReplyTuple

iterate() → AsyncIterator[faust.agents.replies.ReplyTuple][source]

Iterate over results as they arrive.

Return type

AsyncIterator[ReplyTuple]

class faust.agents.replies.ReplyConsumer(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Consumer responsible for redelegation of replies received.

logger = <Logger faust.agents.replies (WARNING)>
async on_start() → None[source]

Call when reply consumer starts.

Return type

None

async add(correlation_id: str, promise: faust.agents.replies.ReplyPromise) → None[source]

Register promise to start tracking when it arrives.

Return type

None