faust.transport.base
¶
Base message transport implementation.
The Transport is responsible for:
Holds reference to the app that created it.
Creates new consumers/producers.
To see a reference transport implementation go to:
faust/transport/drivers/aiokafka.py
-
class
faust.transport.base.
Transport
(url: List[yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Message transport implementation.
-
class
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Consumer.
-
ack
(message: faust.types.tuples.Message) → bool¶ Mark message as being acknowledged by stream.
- Return type
-
close
() → None¶ Close consumer for graceful shutdown.
- Return type
None
-
consumer_stopped_errors
= ()¶
-
flow_active
= True¶
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]¶ Fetch batch of messages from server.
- Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT]¶ Return list of services this consumer depends on.
-
pause_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ Pause fetching from partitions.
- Return type
None
-
resume_flow
() → None¶ Allow consumer to process messages.
- Return type
None
-
resume_partitions
(tps: Iterable[faust.types.tuples.TP]) → None¶ Resume fetching from partitions.
- Return type
None
-
stop_flow
() → None¶ Block consumer from processing any more messages.
- Return type
None
-
track_message
(message: faust.types.tuples.Message) → None¶ Track message and mark it as pending ack.
- Return type
None
-
-
class
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Producer.
-
key_partition
(topic: str, key: bytes) → faust.types.tuples.TP¶ Hash key to determine partition.
- Return type
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
send_soon
(fut: faust.types.tuples.FutureMessage) → None¶ - Return type
None
-
-
class
TransactionManager
(transport: faust.types.transports.TransportT, *, consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → None¶ Manage producer transactions.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
transactional_id_format
= '{group_id}-{tpg.group}-{tpg.partition}'¶
-
-
class
Conductor
(app: faust.types.app.AppT, **kwargs) → None¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
acks_enabled_for
(topic: str) → bool¶ Return
True
if acks are enabled for topic by name.- Return type
-
add
(topic: Any) → None¶ Register topic to be subscribed.
- Return type
None
-
clear
() → None¶ Clear all subscriptions.
- Return type
None
-
discard
(topic: Any) → None¶ Unregister topic from conductor.
- Return type
None
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
class
Fetcher
(app: faust.types.app.AppT, **kwargs) → None¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
create_consumer
(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs) → faust.types.transports.ConsumerT[source]¶ Create new consumer.
- Return type
-
create_producer
(**kwargs) → faust.types.transports.ProducerT[source]¶ Create new producer.
- Return type
-
create_transaction_manager
(consumer: faust.types.transports.ConsumerT, producer: faust.types.transports.ProducerT, **kwargs) → faust.types.transports.TransactionManagerT[source]¶ Create new transaction manager.
- Return type
-
class
-
class
faust.transport.base.
Conductor
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Manages the channels that subscribe to topics.
Consumes messages from topic using a single consumer.
Forwards messages to all channels subscribing to a topic.
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
class
faust.transport.base.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[None]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[None]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
flow_active
= True¶
-
on_init_dependencies
() → Iterable[mode.types.services.ServiceT][source]¶ Return list of services this consumer depends on.
-
pause_partitions
(tps: Iterable[faust.types.tuples.TP]) → None[source]¶ Pause fetching from partitions.
- Return type
None
-
resume_partitions
(tps: Iterable[faust.types.tuples.TP]) → None[source]¶ Resume fetching from partitions.
- Return type
None
-
getmany
(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]¶ Fetch batch of messages from server.
- Return type
AsyncIterator
[Tuple
[TP
,Message
]]
-
track_message
(message: faust.types.tuples.Message) → None[source]¶ Track message and mark it as pending ack.
- Return type
None
-
-
class
faust.transport.base.
Fetcher
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Service fetching messages from Kafka.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.base.
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Producer.
-
key_partition
(topic: str, key: bytes) → faust.types.tuples.TP[source]¶ Hash key to determine partition.
- Return type
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-