faust.agents.replies

Agent replies: waiting for replies, sending them, etc.

class faust.agents.replies.ReplyPromise(reply_to: str, correlation_id: str, **kwargs) → None[source]

Reply promise can be await-ed to wait until result ready.

fulfill(correlation_id: str, value: Any) → None[source]
Return type:None
class faust.agents.replies.BarrierState(reply_to: str, **kwargs) → None[source]

State of pending/complete barrier.

A barrier is a synchronization primitive that will wait until a group of coroutines have completed.

size = 0

This is the size while the messages are being sent. (it’s a tentative total, added to until the total is finalized).

total = 0

This is the actual total when all messages have been sent. It’s set by finalize().

fulfilled = 0

The number of results we have received.

pending = None

Set of pending replies that this barrier is composed of.

add(p: faust.agents.replies.ReplyPromise) → None[source]
Return type:None
finalize() → None[source]
Return type:None
fulfill(correlation_id: str, value: Any) → None[source]
Return type:None
get_nowait() → faust.agents.replies.ReplyTuple[source]

Return next reply, or raise asyncio.QueueEmpty.

Return type:ReplyTuple
iterate() → AsyncIterator[faust.agents.replies.ReplyTuple][source]

Iterate over results as arrive.

Return type:AsyncIterator[ReplyTuple]
class faust.agents.replies.ReplyConsumer(app: faust.types.app.AppT, **kwargs) → None[source]

Consumer responsible for redelegation of replies received.

coroutine add(self, correlation_id: str, promise: faust.agents.replies.ReplyPromise) → None[source]
Return type:None
logger = <Logger faust.agents.replies (WARNING)>
coroutine on_start(self) → None[source]

Called every time before the service is started/restarted.

Return type:None