faust.transport.drivers.aiokafka
¶
Message transport using aiokafka.
-
class
faust.transport.drivers.aiokafka.
Consumer
(*args: Any, **kwargs: Any) → None[source]¶ Kafka consumer using aiokafka.
-
logger
= <Logger faust.transport.drivers.aiokafka (WARNING)>¶
-
RebalanceListener
¶ alias of
ConsumerRebalanceListener
-
consumer_stopped_errors
= (<class 'aiokafka.errors.ConsumerStoppedError'>,)¶
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]¶ Create/declare topic on server.
- Return type
None
-
-
class
faust.transport.drivers.aiokafka.
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]¶ Kafka producer using aiokafka.
-
logger
= <Logger faust.transport.drivers.aiokafka (WARNING)>¶
-
allow_headers
= True¶
-
async
begin_transaction
(transactional_id: str) → None[source]¶ Begin transaction by id.
- Return type
None
-
async
commit_transaction
(transactional_id: str) → None[source]¶ Commit transaction by id.
- Return type
None
-
async
abort_transaction
(transactional_id: str) → None[source]¶ Abort and rollback transaction by id.
- Return type
None
-
async
stop_transaction
(transactional_id: str) → None[source]¶ Stop transaction by id.
- Return type
None
-
async
maybe_begin_transaction
(transactional_id: str) → None[source]¶ Begin transaction (if one does not already exist).
- Return type
None
-
async
commit_transactions
(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]¶ Commit transactions.
- Return type
None
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]¶ Create/declare topic on server.
- Return type
None
-
async
send
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]¶ Schedule message to be transmitted by producer.
- Return type
-
async
send_and_wait
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata[source]¶ Send message and wait for it to be transmitted.
- Return type
-
async
flush
() → None[source]¶ Wait for producer to finish transmitting all buffered messages.
- Return type
None
-
-
class
faust.transport.drivers.aiokafka.
Transport
(*args: Any, **kwargs: Any) → None[source]¶ Kafka transport using aiokafka.
-
class
Consumer
(*args: Any, **kwargs: Any) → None¶ Kafka consumer using aiokafka.
-
RebalanceListener
¶ alias of
ConsumerRebalanceListener
-
consumer_stopped_errors
= (<class 'aiokafka.errors.ConsumerStoppedError'>,)¶
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ Create/declare topic on server.
- Return type
None
-
logger
= <Logger faust.transport.drivers.aiokafka (WARNING)>¶
-
async
on_stop
() → None¶ Call when consumer is stopping.
- Return type
None
-
-
class
Producer
(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None¶ Kafka producer using aiokafka.
-
async
abort_transaction
(transactional_id: str) → None¶ Abort and rollback transaction by id.
- Return type
None
-
allow_headers
= True¶
-
async
begin_transaction
(transactional_id: str) → None¶ Begin transaction by id.
- Return type
None
-
async
commit_transaction
(transactional_id: str) → None¶ Commit transaction by id.
- Return type
None
-
async
commit_transactions
(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None¶ Commit transactions.
- Return type
None
-
async
create_topic
(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None¶ Create/declare topic on server.
- Return type
None
-
async
flush
() → None¶ Wait for producer to finish transmitting all buffered messages.
- Return type
None
-
key_partition
(topic: str, key: bytes) → faust.types.tuples.TP¶ Hash key to determine partition destination.
- Return type
-
logger
= <Logger faust.transport.drivers.aiokafka (WARNING)>¶
-
async
maybe_begin_transaction
(transactional_id: str) → None¶ Begin transaction (if one does not already exist).
- Return type
None
-
async
on_start
() → None¶ Call when producer starts.
- Return type
None
-
async
on_stop
() → None¶ Call when producer stops.
- Return type
None
-
async
send
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]¶ Schedule message to be transmitted by producer.
- Return type
-
async
send_and_wait
(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata¶ Send message and wait for it to be transmitted.
- Return type
-
async
stop_transaction
(transactional_id: str) → None¶ Stop transaction by id.
- Return type
None
-
async
-
default_port
= 9092¶
-
driver_version
= 'aiokafka=1.1.3'¶
-
class