faust.transport.consumer
¶
Consumer - fetching messages and managing consumer state.
The Consumer is responsible for:
Holds reference to the transport that created it
… and the app via
self.transport.app
.Has a callback that usually points back to
Conductor.on_message
.Receives messages and calls the callback for every message received.
Keeps track of the message and its acked/unacked status.
The Conductor forwards the message to all Streams that subscribes to the topic the message was sent to.
Messages are reference counted, and the Conductor increases the reference count to the number of subscribed streams.
Stream.__aiter__
is set up in a way such that when what is iterating over the stream is finished with the message, a finally: block will decrease the reference count by one.When the reference count for a message hits zero, the stream will call
Consumer.ack(message)
, which will mark that topic + partition + offset combination as “committable”If all the streams share the same key_type/value_type, the conductor will only deserialize the payload once.
Commits the offset at an interval
The Consumer has a background thread that periodically commits the offset.
If the consumer marked an offset as committable this thread will advance the committed offset.
To find the offset that it can safely advance to the commit thread will traverse the _acked mapping of TP to list of acked offsets, by finding a range of consecutive acked offsets (see note in _new_offset).
-
class
faust.transport.consumer.
Fetcher
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.consumer.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
flow_active
= True¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]¶ - Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ Maybe commit the offset for all or specific topics.
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ - Return type
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ - Return type
None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Call during rebalancing when partitions are being revoked.
- Return type
None
-
coroutine
seek
(self, partition: faust.types.tuples.TP, offset: int) → None[source]¶ - Return type
None
-