faust.transport.base
¶
Base message transport implementation.
The Transport is responsible for:
Holds reference to the app that created it.
Creates new consumers/producers.
To see a reference transport implementation go to:
faust/transport/drivers/aiokafka.py
-
class
faust.transport.base.
Conductor
(app: faust.types.app.AppT, **kwargs: Any) → None[source]¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
async
commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ Commit offsets in topics.
- Return type
-
acks_enabled_for
(topic: str) → bool[source]¶ Return
True
if acks are enabled for topic by name.- Return type
-
class
faust.transport.base.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
flow_active
= True¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of services this consumer depends on.
-
async
perform_seek
() → None[source]¶ Seek all partitions to their current committed position.
- Return type
None
-
abstract async
seek_to_committed
() → Mapping[faust.types.tuples.TP, int][source]¶ Seek all partitions to their committed offsets.
-
async
seek
(partition: faust.types.tuples.TP, offset: int) → None[source]¶ Seek partition to specific offset.
- Return type
None
-
pause_partitions
(tps: Iterable[faust.types.tuples.TP]) → None[source]¶ Pause fetching from partitions.
- Return type
None
-
resume_partitions
(tps: Iterable[faust.types.tuples.TP]) → None[source]¶ Resume fetching from partitions.
- Return type
None
-
async
on_partitions_revoked
(revoked: Set[faust.types.tuples.TP]) → None[source]¶ Call during rebalancing when partitions are being revoked.
- Return type
None
-
async
on_partitions_assigned
(assigned: Set[faust.types.tuples.TP]) → None[source]¶ Call during rebalancing when partitions are being assigned.
- Return type
None
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]¶ Fetch batch of messages from server.
- Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
track_message
(message: faust.types.tuples.Message) → None[source]¶ Track message and mark it as pending ack.
- Return type
None
-
ack
(message: faust.types.tuples.Message) → bool[source]¶ Mark message as being acknowledged by stream.
- Return type
-
async
wait_empty
() → None[source]¶ Wait for all messages that started processing to be acked.
- Return type
None
-
async
commit_and_end_transactions
() → None[source]¶ Commit all safe offsets and end transaction.
- Return type
None
-
async
commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ Maybe commit the offset for all or specific topics.
-
async
maybe_wait_for_commit_to_finish
() → bool[source]¶ Wait for any existing commit operation to finish.
- Return type
-
async
force_commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ Force offset commit.
- Return type
-
-
class
faust.transport.base.
Fetcher
(app: faust.types.app.AppT, **kwargs: Any) → None[source]¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.base.
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]¶ Base Producer.
-
async
send
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Schedule message to be sent by producer.
- Return type
-
async
send_and_wait
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata[source]¶ Send message and wait for it to be transmitted.
- Return type
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]¶ Create/declare topic on server.
- Return type
None
-
key_partition
(topic: str, key: bytes) → faust.types.tuples.TP[source]¶ Hash key to determine partition.
- Return type
-
async
begin_transaction
(transactional_id: str) → None[source]¶ Begin transaction by id.
- Return type
None
-
async
commit_transaction
(transactional_id: str) → None[source]¶ Commit transaction by id.
- Return type
None
-
async
abort_transaction
(transactional_id: str) → None[source]¶ Abort and rollback transaction by id.
- Return type
None
-
async
stop_transaction
(transactional_id: str) → None[source]¶ Stop transaction by id.
- Return type
None
-
async
maybe_begin_transaction
(transactional_id: str) → None[source]¶ Begin transaction by id, if not already started.
- Return type
None
-
async
commit_transactions
(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]¶ Commit transactions.
- Return type
None
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
async
-
class
faust.transport.base.
Transport
(url: List[yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Message transport implementation.
-
class
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None¶ Base Consumer.
-
ack
(message: faust.types.tuples.Message) → bool¶ Mark message as being acknowledged by stream.
- Return type
-
close
() → None¶ Close consumer for graceful shutdown.
- Return type
None
-
async
commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool¶ Maybe commit the offset for all or specific topics.
-
async
commit_and_end_transactions
() → None¶ Commit all safe offsets and end transaction.
- Return type
None
-
consumer_stopped_errors
= ()¶
-
flow_active
= True¶
-
async
force_commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool¶ Force offset commit.
- Return type
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]¶ Fetch batch of messages from server.
- Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
async
maybe_wait_for_commit_to_finish
() → bool¶ Wait for any existing commit operation to finish.
- Return type
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT]¶ Return list of services this consumer depends on.
-
async
on_partitions_assigned
(assigned: Set[faust.types.tuples.TP]) → None¶ Call during rebalancing when partitions are being assigned.
- Return type
None
-
async
on_partitions_revoked
(revoked: Set[faust.types.tuples.TP]) → None¶ Call during rebalancing when partitions are being revoked.
- Return type
None
-
async
on_restart
() → None¶ Call when the consumer is restarted.
- Return type
None
-
async
on_stop
() → None¶ Call when consumer is stopping.
- Return type
None
-
async
on_task_error
(exc: BaseException) → None¶ Call when processing a message failed.
- Return type
None
-
pause_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ Pause fetching from partitions.
- Return type
None
-
async
perform_seek
() → None¶ Seek all partitions to their current committed position.
- Return type
None
-
resume_flow
() → None¶ Allow consumer to process messages.
- Return type
None
-
resume_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ Resume fetching from partitions.
- Return type
None
-
async
seek
(partition: faust.types.tuples.TP, offset: int) → None¶ Seek partition to specific offset.
- Return type
None
-
abstract async
seek_to_committed
() → Mapping[faust.types.tuples.TP, int]¶ Seek all partitions to their committed offsets.
-
stop_flow
() → None¶ Block consumer from processing any more messages.
- Return type
None
-
track_message
(message: faust.types.tuples.Message) → None¶ Track message and mark it as pending ack.
- Return type
None
-
async
wait_empty
() → None¶ Wait for all messages that started processing to be acked.
- Return type
None
-
-
class
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None¶ Base Producer.
-
async
abort_transaction
(transactional_id: str) → None¶ Abort and rollback transaction by id.
- Return type
None
-
async
begin_transaction
(transactional_id: str) → None¶ Begin transaction by id.
- Return type
None
-
async
commit_transaction
(transactional_id: str) → None¶ Commit transaction by id.
- Return type
None
-
async
commit_transactions
(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None¶ Commit transactions.
- Return type
None
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ Create/declare topic on server.
- Return type
None
-
async
flush
() → None¶ Flush all in-flight messages.
- Return type
None
-
key_partition
(topic: str, key: bytes) → faust.types.tuples.TP¶ Hash key to determine partition.
- Return type
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
async
maybe_begin_transaction
(transactional_id: str) → None¶ Begin transaction by id, if not already started.
- Return type
None
-
async
on_start
() → None¶ Service is starting.
- Return type
None
-
async
send
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]¶ Schedule message to be sent by producer.
- Return type
-
async
send_and_wait
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata¶ Send message and wait for it to be transmitted.
- Return type
-
send_soon
(fut: faust.types.tuples.FutureMessage) → None¶ - Return type
None
-
async
stop_transaction
(transactional_id: str) → None¶ Stop transaction by id.
- Return type
None
-
async
-
class
TransactionManager
(transport: faust.types.transports.TransportT, *, consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs: Any) → None¶ Manage producer transactions.
-
async
commit
(offsets: Mapping[faust.types.tuples.TP, int], start_new_transaction: bool = True) → bool¶ Commit offsets for partitions.
- Return type
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ Create/declare topic on server.
- Return type
None
-
async
flush
() → None¶ Wait for producer to transmit all pending messages.
- Return type
None
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
async
on_partitions_revoked
(revoked: Set[faust.types.tuples.TP]) → None¶ Call when the cluster is rebalancing and partitions are revoked.
- Return type
None
-
async
on_rebalance
(assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None¶ Call when the cluster is rebalancing.
- Return type
None
-
async
send
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]¶ Schedule message to be sent by producer.
- Return type
-
async
send_and_wait
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata¶ Send message and wait for it to be transmitted.
- Return type
-
transactional_id_format
= '{group_id}-{tpg.group}-{tpg.partition}'¶
-
async
-
class
Conductor
(app: faust.types.app.AppT, **kwargs: Any) → None¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
acks_enabled_for
(topic: str) → bool¶ Return
True
if acks are enabled for topic by name.- Return type
-
add
(topic: Any) → None¶ Register topic to be subscribed.
- Return type
None
-
clear
() → None¶ Clear all subscriptions.
- Return type
None
-
async
commit
(topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool¶ Commit offsets in topics.
- Return type
-
discard
(topic: Any) → None¶ Unregister topic from conductor.
- Return type
None
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
async
maybe_wait_for_subscriptions
() → None¶ - Return type
None
-
async
on_client_only_start
() → None¶ - Return type
None
-
async
on_partitions_assigned
(assigned: Set[faust.types.tuples.TP]) → None¶ Call when cluster is rebalancing and partitions are assigned.
- Return type
None
-
async
wait_for_subscriptions
() → None¶ Wait for consumer to be subscribed.
- Return type
None
-
class
Fetcher
(app: faust.types.app.AppT, **kwargs: Any) → None¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
async
on_stop
() → None¶ Call when the fetcher is stopping.
- Return type
None
-
-
create_consumer
(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs: Any) → faust.types.transports.ConsumerT[source]¶ Create new consumer.
- Return type
-
create_producer
(**kwargs: Any) → faust.types.transports.ProducerT[source]¶ Create new producer.
- Return type
-
create_transaction_manager
(consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs: Any) → faust.types.transports.TransactionManagerT[source]¶ Create new transaction manager.
- Return type
-
class