faust.transport.base
¶
Base message transport implementation.
The Transport is responsible for:
Holds reference to the app that created it.
Creates new consumers/producers.
To see a reference transport implementation go to:
faust/transport/drivers/aiokafka.py
-
class
faust.transport.base.
Transport
(url: List[yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Message transport implementation.
-
class
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Consumer.
-
close
() → None¶ - Return type
None
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool¶ Maybe commit the offset for all or specific topics.
-
coroutine
commit_and_end_transactions
(self) → None¶ - Return type
None
-
consumer_stopped_errors
= ()¶
-
flow_active
= True¶
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool¶ - Return type
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]¶ - Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT]¶ Return list of service dependencies for this service.
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None¶ Call during rebalancing when partitions are being revoked.
- Return type
None
-
coroutine
on_restart
(self) → None¶ Service is being restarted.
- Return type
None
-
coroutine
on_stop
(self) → None¶ Service is being stopped/restarted.
- Return type
None
-
coroutine
on_task_error
(self, exc: BaseException) → None¶ - Return type
None
-
pause_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
perform_seek
(self) → None¶ - Return type
None
-
resume_flow
() → None¶ - Return type
None
-
resume_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
seek
(self, partition: faust.types.tuples.TP, offset: int) → None¶ - Return type
None
-
coroutine
seek_to_committed
(self) → Mapping[faust.types.tuples.TP, int]¶
-
stop_flow
() → None¶ - Return type
None
-
track_message
(message: faust.types.tuples.Message) → None¶ - Return type
None
-
coroutine
wait_empty
(self) → None¶ Wait for all messages that started processing to be acked.
- Return type
None
-
-
class
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Producer.
-
coroutine
abort_transaction
(self, transactional_id: str) → None¶ - Return type
None
-
coroutine
begin_transaction
(self, transactional_id: str) → None¶ - Return type
None
-
coroutine
commit_transaction
(self, transactional_id: str) → None¶ - Return type
None
-
coroutine
commit_transactions
(self, tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None¶ - Return type
None
-
coroutine
create_topic
(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ - Return type
None
-
coroutine
flush
(self) → None¶ - Return type
None
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
coroutine
maybe_begin_transaction
(self, transactional_id: str) → None¶ - Return type
None
-
coroutine
send
(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]¶ - Return type
-
coroutine
send_and_wait
(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata¶ - Return type
-
coroutine
stop_transaction
(self, transactional_id: str) → None¶ - Return type
None
-
coroutine
-
class
TransactionManager
(transport: faust.types.transports.TransportT, *, consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → None¶ -
coroutine
commit
(self, offsets: Mapping[faust.types.tuples.TP, int], start_new_transaction: bool = True) → bool¶ - Return type
-
coroutine
create_topic
(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ - Return type
None
-
coroutine
flush
(self) → None¶ - Return type
None
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
on_rebalance
(self, assigned: Set[faust.types.tuples.TP], revoked: Set[faust.types.tuples.TP], newly_assigned: Set[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
send
(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]¶ - Return type
-
coroutine
send_and_wait
(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata¶ - Return type
-
transactional_id_format
= '{tpg.group}-{tpg.partition}'¶
-
coroutine
-
class
Conductor
(app: faust.types.app.AppT, **kwargs) → None¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
add
(topic: Any) → None¶ Add an element.
- Return type
None
-
clear
() → None¶ This is slow (creates N new iterators!) but effective.
- Return type
None
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool¶ - Return type
-
discard
(topic: Any) → None¶ Remove an element. Do not raise an exception if absent.
- Return type
None
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None¶ - Return type
None
-
coroutine
wait_for_subscriptions
(self) → None¶ - Return type
None
-
class
Fetcher
(app: faust.types.app.AppT, **kwargs) → None¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
coroutine
on_stop
(self) → None¶ Service is being stopped/restarted.
- Return type
None
-
-
create_consumer
(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs) → faust.types.transports.ConsumerT[source]¶ - Return type
-
create_transaction_manager
(consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → faust.types.transports.TransactionManagerT[source]¶ - Return type
-
class
-
class
faust.transport.base.
Conductor
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ - Return type
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ - Return type
None
-
class
faust.transport.base.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
flow_active
= True¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of service dependencies for this service.
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]¶ - Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ Maybe commit the offset for all or specific topics.
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None, start_new_transaction: bool = True) → bool[source]¶ - Return type
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ - Return type
None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Call during rebalancing when partitions are being revoked.
- Return type
None
-
coroutine
seek
(self, partition: faust.types.tuples.TP, offset: int) → None[source]¶ - Return type
None
-
-
class
faust.transport.base.
Fetcher
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.base.
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Producer.
-
coroutine
commit_transactions
(self, tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]¶ - Return type
None
-
coroutine
create_topic
(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]¶ - Return type
None
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
coroutine
send
(self, topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ - Return type
-
coroutine