faust.transport.drivers.aiokafka

Message transport using aiokafka.

class faust.transport.drivers.aiokafka.Consumer(*args: Any, **kwargs: Any) → None[source]

Kafka consumer using aiokafka.

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
RebalanceListener

alias of ConsumerRebalanceListener

consumer_stopped_errors = (<class 'aiokafka.errors.ConsumerStoppedError'>,)
async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]

Create/declare topic on server.

Return type

None

async on_stop() → None[source]

Call when consumer is stopping.

Return type

None

class faust.transport.drivers.aiokafka.Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None[source]

Kafka producer using aiokafka.

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
allow_headers = True
async begin_transaction(transactional_id: str) → None[source]

Begin transaction by id.

Return type

None

async commit_transaction(transactional_id: str) → None[source]

Commit transaction by id.

Return type

None

async abort_transaction(transactional_id: str) → None[source]

Abort and rollback transaction by id.

Return type

None

async stop_transaction(transactional_id: str) → None[source]

Stop transaction by id.

Return type

None

async maybe_begin_transaction(transactional_id: str) → None[source]

Begin transaction (if one does not already exist).

Return type

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None[source]

Commit transactions.

Return type

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]

Create/declare topic on server.

Return type

None

async on_start() → None[source]

Call when producer starts.

Return type

None

async on_stop() → None[source]

Call when producer stops.

Return type

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata][source]

Schedule message to be transmitted by producer.

Return type

Awaitable[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata[source]

Send message and wait for it to be transmitted.

Return type

RecordMetadata

async flush() → None[source]

Wait for producer to finish transmitting all buffered messages.

Return type

None

key_partition(topic: str, key: bytes) → faust.types.tuples.TP[source]

Hash key to determine partition destination.

Return type

TP

supports_headers() → bool[source]

Return True if message headers are supported.

Return type

bool

class faust.transport.drivers.aiokafka.Transport(*args: Any, **kwargs: Any) → None[source]

Kafka transport using aiokafka.

class Consumer(*args: Any, **kwargs: Any) → None

Kafka consumer using aiokafka.

RebalanceListener

alias of ConsumerRebalanceListener

consumer_stopped_errors = (<class 'aiokafka.errors.ConsumerStoppedError'>,)
async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None

Create/declare topic on server.

Return type

None

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
async on_stop() → None

Call when consumer is stopping.

Return type

None

class Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs: Any) → None

Kafka producer using aiokafka.

async abort_transaction(transactional_id: str) → None

Abort and rollback transaction by id.

Return type

None

allow_headers = True
async begin_transaction(transactional_id: str) → None

Begin transaction by id.

Return type

None

async commit_transaction(transactional_id: str) → None

Commit transaction by id.

Return type

None

async commit_transactions(tid_to_offset_map: Mapping[str, Mapping[faust.types.tuples.TP, int]], group_id: str, start_new_transaction: bool = True) → None

Commit transactions.

Return type

None

async create_topic(topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None

Create/declare topic on server.

Return type

None

async flush() → None

Wait for producer to finish transmitting all buffered messages.

Return type

None

key_partition(topic: str, key: bytes) → faust.types.tuples.TP

Hash key to determine partition destination.

Return type

TP

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
async maybe_begin_transaction(transactional_id: str) → None

Begin transaction (if one does not already exist).

Return type

None

async on_start() → None

Call when producer starts.

Return type

None

async on_stop() → None

Call when producer stops.

Return type

None

async send(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → Awaitable[faust.types.tuples.RecordMetadata]

Schedule message to be transmitted by producer.

Return type

Awaitable[RecordMetadata]

async send_and_wait(topic: str, key: Optional[bytes], value: Optional[bytes], partition: Optional[int], timestamp: Optional[float], headers: Union[List[Tuple[str, bytes]], Mapping[str, bytes], None], *, transactional_id: str = None) → faust.types.tuples.RecordMetadata

Send message and wait for it to be transmitted.

Return type

RecordMetadata

async stop_transaction(transactional_id: str) → None

Stop transaction by id.

Return type

None

supports_headers() → bool

Return True if message headers are supported.

Return type

bool

default_port = 9092
driver_version = 'aiokafka=1.1.3'