faust.transport.base
¶
Base message transport implementation.
The Transport is responsible for:
- Holds reference to the app that created it.
- Creates new consumers/producers.
To see a reference transport implementation go to: faust/transport/drivers/aiokafka.py
-
class
faust.transport.base.
Transport
(url: Union[str, yarl.URL], app: faust.types.app.AppT, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Message transport implementation.
-
class
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Consumer.
-
close
() → None¶ Return type: None
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool¶ Maybe commit the offset for all or specific topics.
Parameters: topics ( Optional
[AbstractSet
[Union
[str
,TP
]]]) – Set containing topics and/or TopicPartitions to commit.Return type: bool
-
consumer_stopped_errors
= ()¶
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool¶ Return type: bool
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None¶ Return type: None
-
coroutine
on_stop
(self) → None¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
on_task_error
(self, exc: BaseException) → None¶ Return type: None
-
track_message
(message: faust.types.tuples.Message) → None¶ Return type: None
-
unacked
¶
-
coroutine
wait_empty
(self) → None¶ Wait for all messages that started processing to be acked.
Return type: None
-
-
class
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None¶ Base Producer.
-
coroutine
create_topic
(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ Return type: None
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
coroutine
send
(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → Awaitable[faust.types.tuples.RecordMetadata]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send_and_wait
(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → faust.types.tuples.RecordMetadata¶ Return type: RecordMetadata
-
coroutine
-
class
Conductor
(app: faust.types.app.AppT, **kwargs) → None¶ Manages the channels that subscribe to topics.
- Consumes messages from topic using a single consumer.
- Forwards messages to all channels subscribing to a topic.
-
add
(topic: Any) → None¶ Add an element.
Return type: None
-
clear
() → None¶ This is slow (creates N new iterators!) but effective.
Return type: None
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool¶ Return type: bool
-
discard
(topic: Any) → None¶ Remove an element. Do not raise an exception if absent.
Return type: None
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None¶ Return type: None
-
coroutine
wait_for_subscriptions
(self) → None¶ Return type: None
-
class
Fetcher
(app: faust.types.app.AppT, **kwargs) → None¶ -
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
coroutine
on_stop
(self) → None¶ Called every time before the service is stopped/restarted.
Return type: None
-
-
create_consumer
(callback: Callable[faust.types.tuples.Message, Awaitable], **kwargs) → faust.types.transports.ConsumerT[source]¶ Return type: ConsumerT
[]
-
create_conductor
(**kwargs) → faust.types.transports.ConductorT[source]¶ Return type: ConductorT
[]
-
class
-
class
faust.transport.base.
Conductor
(app: faust.types.app.AppT, **kwargs) → None[source]¶ Manages the channels that subscribe to topics.
- Consumes messages from topic using a single consumer.
- Forwards messages to all channels subscribing to a topic.
-
logger
= <Logger faust.transport.conductor (WARNING)>¶
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]]) → bool[source]¶ Return type: bool
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
class
faust.transport.base.
Consumer
(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Consumer.
-
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
consumer_stopped_errors
= ()¶ Tuple of exception types that may be raised when the underlying consumer driver is stopped.
-
coroutine
commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool[source]¶ Maybe commit the offset for all or specific topics.
Parameters: topics ( Optional
[AbstractSet
[Union
[str
,TP
]]]) – Set containing topics and/or TopicPartitions to commit.Return type: bool
-
coroutine
force_commit
(self, topics: AbstractSet[Union[str, faust.types.tuples.TP]] = None) → bool[source]¶ Return type: bool
-
coroutine
on_partitions_assigned
(self, assigned: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_partitions_revoked
(self, revoked: Set[faust.types.tuples.TP]) → None[source]¶ Return type: None
-
coroutine
on_stop
(self) → None[source]¶ Called every time before the service is stopped/restarted.
Return type: None
-
coroutine
wait_empty
(self) → None[source]¶ Wait for all messages that started processing to be acked.
Return type: None
-
unacked
¶
-
-
class
faust.transport.base.
Fetcher
(app: faust.types.app.AppT, **kwargs) → None[source]¶ -
logger
= <Logger faust.transport.consumer (WARNING)>¶
-
-
class
faust.transport.base.
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]¶ Base Producer.
-
coroutine
create_topic
(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 1000.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]¶ Return type: None
-
logger
= <Logger faust.transport.producer (WARNING)>¶
-
coroutine
send
(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Return type: Awaitable
[RecordMetadata
]
-
coroutine
send_and_wait
(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → faust.types.tuples.RecordMetadata[source]¶ Return type: RecordMetadata
-
coroutine