faust.transport.base

Base message transport implementation.

The Transport is responsible for:

  • Holds reference to the app that created it.

  • Creates new consumers/producers.

To see a reference transport implementation go to: faust/transport/drivers/aiokafka.py

class faust.transport.base.Conductor(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.

  • Forwards messages to all channels subscribing to a topic.

logger = <Logger faust.transport.conductor (WARNING)>
async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]

Commit offsets in topics.

Return type

bool

acks_enabled_for(topic: str) → bool[source]

Return True if acks are enabled for topic by name.

Return type

bool

async wait_for_subscriptions() → None[source]

Wait for consumer to be subscribed.

Return type

None

async maybe_wait_for_subscriptions() → None[source]
Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call when cluster is rebalancing and partitions are assigned.

Return type

None

async on_client_only_start() → None[source]
Return type

None

clear() → None[source]

Clear all subscriptions.

Return type

None

add(topic: Any) → None[source]

Register topic to be subscribed.

Return type

None

discard(topic: Any) → None[source]

Unregister topic from conductor.

Return type

None

property label

Return label for use in logs. :rtype: str

property shortlabel

Return short label for use in logs. :rtype: str

class faust.transport.base.Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]

Base Consumer.

logger = <Logger faust.transport.consumer (WARNING)>
consumer_stopped_errors = ()

Tuple of exception types that may be raised when the underlying consumer driver is stopped.

flow_active = True
on_init_dependencies() → Iterable[mode.types.services.ServiceT][source]

Return list of services this consumer depends on.

Return type

Iterable[ServiceT[]]

async on_restart() → None[source]

Call when the consumer is restarted.

Return type

None

async perform_seek() → None[source]

Seek all partitions to their current committed position.

Return type

None

abstract async seek_to_committed() → Mapping[faust.types.tuples.TP, int][source]

Seek all partitions to their committed offsets.

Return type

Mapping[TP, int]

async seek(partition: faust.types.tuples.TP, offset: int) → None[source]

Seek partition to specific offset.

Return type

None

stop_flow() → None[source]

Block consumer from processing any more messages.

Return type

None

resume_flow() → None[source]

Allow consumer to process messages.

Return type

None

pause_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]

Pause fetching from partitions.

Return type

None

resume_partitions(tps: Iterable[faust.types.tuples.TP]) → None[source]

Resume fetching from partitions.

Return type

None

async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None[source]

Call during rebalancing when partitions are being revoked.

Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None[source]

Call during rebalancing when partitions are being assigned.

Return type

None

getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]

Fetch batch of messages from server.

Return type

AsyncIterator[Tuple[TP, Message]]

track_message(message: faust.types.tuples.Message) → None[source]

Track message and mark it as pending ack.

Return type

None

ack(message: faust.types.tuples.Message) → bool[source]

Mark message as being acknowledged by stream.

Return type

bool

async wait_empty() → None[source]

Wait for all messages that started processing to be acked.

Return type

None

async commit_and_end_transactions() → None[source]

Commit all safe offsets and end transaction.

Return type

None

async on_stop() → None[source]

Call when consumer is stopping.

Return type

None

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]

Maybe commit the offset for all or specific topics.

Parameters

topics (Optional[AbstractSet[Union[str, TP]]]) – Set containing topics and/or TopicPartitions to commit.

Return type

bool

async maybe_wait_for_commit_to_finish() → bool[source]

Wait for any existing commit operation to finish.

Return type

bool

async force_commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]

Force offset commit.

Return type

bool

async on_task_error(exc: BaseException) → None[source]

Call when processing a message failed.

Return type

None

close() → None[source]

Close consumer for graceful shutdown.

Return type

None

property unacked

Return the set of currently unacknowledged messages. :rtype: Set[Message]

class faust.transport.base.Fetcher(app: faust.types.app.AppT, **kwargs: Any) → None[source]

Service fetching messages from Kafka.

logger = <Logger faust.transport.consumer (WARNING)>
async on_stop() → None[source]

Call when the fetcher is stopping.

Return type

None

class faust.transport.base.Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]

Base Producer.

async on_start() → None[source]

Service is starting.

Return type

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Schedule message to be sent by producer.

Return type

Awaitable[RecordMetadata]

send_soon(fut: faust.types.tuples.FutureMessage) → None[source]
Return type

None

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata[source]

Send message and wait for it to be transmitted.

Return type

RecordMetadata

async flush() → None[source]

Flush all in-flight messages.

Return type

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]

Create/declare topic on server.

Return type

None

key_partition(topic: str, key: bytes) → faust.types.tuples.TP[source]

Hash key to determine partition.

Return type

TP

async begin_transaction(transactional_id: str) → None[source]

Begin transaction by id.

Return type

None

async commit_transaction(transactional_id: str) → None[source]

Commit transaction by id.

Return type

None

async abort_transaction(transactional_id: str) → None[source]

Abort and rollback transaction by id.

Return type

None

async stop_transaction(transactional_id: str) → None[source]

Stop transaction by id.

Return type

None

async maybe_begin_transaction(transactional_id: str) → None[source]

Begin transaction by id, if not already started.

Return type

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]

Commit transactions.

Return type

None

logger = <Logger faust.transport.producer (WARNING)>
supports_headers() → bool[source]

Return True if headers are supported by this transport.

Return type

bool

class faust.transport.base.Transport(url: List[yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Message transport implementation.

class Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None

Base Consumer.

ack(message: faust.types.tuples.Message) → bool

Mark message as being acknowledged by stream.

Return type

bool

close() → None

Close consumer for graceful shutdown.

Return type

None

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool

Maybe commit the offset for all or specific topics.

Parameters

topics (Optional[AbstractSet[Union[str, TP]]]) – Set containing topics and/or TopicPartitions to commit.

Return type

bool

async commit_and_end_transactions() → None

Commit all safe offsets and end transaction.

Return type

None

consumer_stopped_errors = ()
flow_active = True
async force_commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool

Force offset commit.

Return type

bool

getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]

Fetch batch of messages from server.

Return type

AsyncIterator[Tuple[TP, Message]]

logger = <Logger faust.transport.consumer (WARNING)>
async maybe_wait_for_commit_to_finish() → bool

Wait for any existing commit operation to finish.

Return type

bool

on_init_dependencies() → Iterable[mode.types.services.ServiceT]

Return list of services this consumer depends on.

Return type

Iterable[ServiceT[]]

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None

Call during rebalancing when partitions are being assigned.

Return type

None

async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None

Call during rebalancing when partitions are being revoked.

Return type

None

async on_restart() → None

Call when the consumer is restarted.

Return type

None

async on_stop() → None

Call when consumer is stopping.

Return type

None

async on_task_error(exc: BaseException) → None

Call when processing a message failed.

Return type

None

pause_partitions(tps: Iterable[faust.types.tuples.TP]) → None

Pause fetching from partitions.

Return type

None

async perform_seek() → None

Seek all partitions to their current committed position.

Return type

None

resume_flow() → None

Allow consumer to process messages.

Return type

None

resume_partitions(tps: Iterable[faust.types.tuples.TP]) → None

Resume fetching from partitions.

Return type

None

async seek(partition: faust.types.tuples.TP, offset: int) → None

Seek partition to specific offset.

Return type

None

abstract async seek_to_committed() → Mapping[faust.types.tuples.TP, int]

Seek all partitions to their committed offsets.

Return type

Mapping[TP, int]

stop_flow() → None

Block consumer from processing any more messages.

Return type

None

track_message(message: faust.types.tuples.Message) → None

Track message and mark it as pending ack.

Return type

None

property unacked

Return the set of currently unacknowledged messages. :rtype: Set[Message]

async wait_empty() → None

Wait for all messages that started processing to be acked.

Return type

None

class Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None

Base Producer.

async abort_transaction(transactional_id: str) → None

Abort and rollback transaction by id.

Return type

None

async begin_transaction(transactional_id: str) → None

Begin transaction by id.

Return type

None

async commit_transaction(transactional_id: str) → None

Commit transaction by id.

Return type

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None

Commit transactions.

Return type

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None

Create/declare topic on server.

Return type

None

async flush() → None

Flush all in-flight messages.

Return type

None

key_partition(topic: str, key: bytes) → faust.types.tuples.TP

Hash key to determine partition.

Return type

TP

logger = <Logger faust.transport.producer (WARNING)>
async maybe_begin_transaction(transactional_id: str) → None

Begin transaction by id, if not already started.

Return type

None

async on_start() → None

Service is starting.

Return type

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]

Schedule message to be sent by producer.

Return type

Awaitable[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata

Send message and wait for it to be transmitted.

Return type

RecordMetadata

send_soon(fut: faust.types.tuples.FutureMessage) → None
Return type

None

async stop_transaction(transactional_id: str) → None

Stop transaction by id.

Return type

None

supports_headers() → bool

Return True if headers are supported by this transport.

Return type

bool

class TransactionManager(transport: faust.types.transports.TransportT, *, consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs: Any) → None

Manage producer transactions.

async commit(offsets: Mapping[faust.types.tuples.TP, int], start_new_transaction: bool = True) → bool

Commit offsets for partitions.

Return type

bool

async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None

Create/declare topic on server.

Return type

None

async flush() → None

Wait for producer to transmit all pending messages.

Return type

None

key_partition(topic: str, key: bytes) → faust.types.tuples.TP
Return type

TP

logger = <Logger faust.transport.consumer (WARNING)>
async on_partitions_revoked(revoked: Set[faust.types.tuples.TP]) → None

Call when the cluster is rebalancing and partitions are revoked.

Return type

None

async on_rebalance(assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None

Call when the cluster is rebalancing.

Return type

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]

Schedule message to be sent by producer.

Return type

Awaitable[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata

Send message and wait for it to be transmitted.

Return type

RecordMetadata

supports_headers() → bool

Return True if the Kafka server supports headers.

Return type

bool

transactional_id_format = '{group_id}-{tpg.group}-{tpg.partition}'
class Conductor(app: faust.types.app.AppT, **kwargs: Any) → None

Manages the channels that subscribe to topics.

  • Consumes messages from topic using a single consumer.

  • Forwards messages to all channels subscribing to a topic.

acks_enabled_for(topic: str) → bool

Return True if acks are enabled for topic by name.

Return type

bool

add(topic: Any) → None

Register topic to be subscribed.

Return type

None

clear() → None

Clear all subscriptions.

Return type

None

async commit(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool

Commit offsets in topics.

Return type

bool

discard(topic: Any) → None

Unregister topic from conductor.

Return type

None

property label

Return label for use in logs. :rtype: str

logger = <Logger faust.transport.conductor (WARNING)>
async maybe_wait_for_subscriptions() → None
Return type

None

async on_client_only_start() → None
Return type

None

async on_partitions_assigned(assigned: Set[faust.types.tuples.TP]) → None

Call when cluster is rebalancing and partitions are assigned.

Return type

None

property shortlabel

Return short label for use in logs. :rtype: str

async wait_for_subscriptions() → None

Wait for consumer to be subscribed.

Return type

None

class Fetcher(app: faust.types.app.AppT, **kwargs: Any) → None

Service fetching messages from Kafka.

logger = <Logger faust.transport.consumer (WARNING)>
async on_stop() → None

Call when the fetcher is stopping.

Return type

None

create_consumer(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs: Any) → faust.types.transports.ConsumerT[source]

Create new consumer.

Return type

ConsumerT[]

create_producer(**kwargs: Any) → faust.types.transports.ProducerT[source]

Create new producer.

Return type

ProducerT[]

create_transaction_manager(consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs: Any) → faust.types.transports.TransactionManagerT[source]

Create new transaction manager.

Return type

TransactionManagerT[]

create_conductor(**kwargs: Any) → faust.types.transports.ConductorT[source]

Create new consumer conductor.

Return type

ConductorT[]