faust.transport.consumer

Consumer - fetching messages and managing consumer state.

The Consumer is responsible for:

  • Holds reference to the transport that created it

  • … and the app via self.transport.app.

  • Has a callback that usually points back to Conductor.on_message.

  • Receives messages and calls the callback for every message received.

  • Keeps track of the message and its acked/unacked status.

  • The Conductor forwards the message to all Streams that subscribes to the topic the message was sent to.

    • Messages are reference counted, and the Conductor increases the reference count to the number of subscribed streams.

    • Stream.__aiter__ is set up in a way such that when what is iterating over the stream is finished with the message, a finally: block will decrease the reference count by one.

    • When the reference count for a message hits zero, the stream will call Consumer.ack(message), which will mark that topic + partition + offset combination as “committable”

    • If all the streams share the same key_type/value_type, the conductor will only deserialize the payload once.

  • Commits the offset at an interval

    • The Consumer has a background thread that periodically commits the offset.

    • If the consumer marked an offset as committable this thread will advance the committed offset.

    • To find the offset that it can safely advance to the commit thread will traverse the _acked mapping of TP to list of acked offsets, by finding a range of consecutive acked offsets (see note in _new_offset).

class faust.transport.consumer.Fetcher(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Service fetching messages from Kafka.

logger = <Logger faust.transport.consumer (WARNING)>
async on_stop() → None[source]

Call when the fetcher is stopping.

Return type

None

class faust.transport.consumer.Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]

Base Consumer.

logger = <Logger faust.transport.consumer (WARNING)>
consumer_stopped_errors = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active = True
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of services this consumer depends on.

Return type

Iterable[ServiceT[]]

async on_restart() → None[source]

Call when the consumer is restarted.

Return type

None

async perform_seek() → None[source]

Seek all partitions to their current committed position.

Return type

None

abstract async seek_to_committed() → Mapping[faust.types.tuples.TP, int][source]

Seek all partitions to their committed offsets.

Return type

Mapping[TP, int]

async seek(partition: faust.types.tuples.TP, offset: int) → None[source]

Seek partition to specific offset.

Return type

None

stop_flow() → None[source]

Block consumer from processing any more messages.

Return type

None

resume_flow() → None[source]

Allow consumer to process messages.

Return type

None

pause_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]

Pause fetching from partitions.

Return type

None

resume_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]

Resume fetching from partitions.

Return type

None

async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call during rebalancing when partitions are being revoked.

Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call during rebalancing when partitions are being assigned.

Return type

None

getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]

Fetch batch of messages from server.

Return type

AsyncIterator[Tuple[TP, Message]]

track_message(message: faust.types.tuples.Message) → None[source]

Track message and mark it as pending ack.

Return type

None

ack(message: faust.types.tuples.Message) → bool[source]

Mark message as being acknowledged by stream.

Return type

bool

async wait_empty() → None[source]

Wait for all messages that started processing to be acked.

Return type

None

async commit_and_end_transactions() → None[source]

Commit all safe offsets and end transaction.

Return type

None

async on_stop() → None[source]

Call when consumer is stopping.

Return type

None

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]

Maybe commit the offset for all or specific topics.

Parameters

topics (Optional[AbstractSet[Union[str, TP]]]) – Set containing topics and/or TopicPartitions to commit.

Return type

bool

async maybe_wait_for_commit_to_finish() → bool[source]

Wait for any existing commit operation to finish.

Return type

bool

async force_commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]

Force offset commit.

Return type

bool

async on_task_error(exc: BaseException) → None[source]

Call when processing a message failed.

Return type

None

close() → None[source]

Close consumer for graceful shutdown.

Return type

None

property unacked

Return the set of currently unacknowledged messages. :rtype: Set[Message]