faust.transport.consumer
¶
Consumer
The Consumer is responsible for:
Holds reference to the transport that created it
… and the app via
self.transport.app
.Has a callback that usually points back to
Conductor.on_message
.Receives messages and calls the callback for every message received.
Keeps track of the message and it’s acked/unacked status.
The Conductor forwards the message to all Streams that subscribes to the topic the message was sent to.
- Messages are reference counted, and the Conductor increases the reference count to the number of subscribed streams.
- Stream.__aiter__ is set up in a way such that when what is iterating over the stream is finished with the message, a finally: block will decrease the reference count by one.
- When the reference count for a message hits zero, the stream will call
Consumer.ack(message)
, which will mark that tp+offset combination as “commitable”- If all the streams share the same key_type/value_type, the conductor will only deserialize the payload once.
Commits the offset at an interval
- The Consumer has a background thread that periodically commits the offset.
- If the consumer marked an offset as committable this thread will advance the comitted offset.
- To find the offset that it can safely advance to the commit thread will traverse the _acked mapping of TP to list of acked offsets, by finding a range of consecutive acked offsets (see note in _new_offset).
-
class
faust.transport.consumer.
Fetcher
(app: faust.types.app.AppT, **kwargs) → None[source]¶ -
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.consumer.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool[source]¶ Maybe commit the offset for all or specific topics.
Parameters: topics ( Optional
[AbstractSet
[Union
[str
,TP
]]]) – Set containing topics and/or TopicPartitions to commit.Return type: bool
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool[source]¶ Return type: bool
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
wait_empty
(self) → None[source]¶ Wait for all messages that started processing to be acked.
Return type: None
-
unacked
¶
-