faust.transport.drivers.aiokafka

Message transport using aiokafka.

class faust.transport.drivers.aiokafka.Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]

Kafka consumer using aiokafka.

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
RebalanceListener

alias of ConsumerRebalanceListener

fetch_timeout = 10.0
consumer_stopped_errors = (<class 'aiokafka.errors.ConsumerStoppedError'>,)
on_init() → None[source]
Return type:None
assignment() → Set[faust.types.tuples.TP][source]
Return type:Set[TP]
highwater(tp: faust.types.tuples.TP) → int[source]
Return type:int
close() → None[source]
Return type:None
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]
Return type:None
coroutine earliest_offsets(self, *partitions) → MutableMapping[faust.types.tuples.TP, int][source]
Return type:MutableMapping[TP, int]
getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]][source]
Return type:AsyncIterator[Tuple[TP, Message]]
coroutine highwaters(self, *partitions) → MutableMapping[faust.types.tuples.TP, int][source]
Return type:MutableMapping[TP, int]
coroutine on_restart(self) → None[source]

Called every time when the service is restarted.

Return type:None
coroutine on_start(self) → None[source]

Called every time before the service is started/restarted.

Return type:None
coroutine on_stop(self) → None[source]

Called every time before the service is stopped/restarted.

Return type:None
coroutine pause_partitions(self, tps: Iterable[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine perform_seek(self) → None[source]
Return type:None
coroutine position(self, tp: faust.types.tuples.TP) → Union[int, NoneType][source]
Return type:Optional[int]
coroutine resume_partitions(self, tps: Iterable[faust.types.tuples.TP]) → None[source]
Return type:None
coroutine seek(self, partition: faust.types.tuples.TP, offset: int) → None[source]
Return type:None
coroutine subscribe(self, topics: Iterable[str]) → None[source]
Return type:None
class faust.transport.drivers.aiokafka.Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None[source]

Kafka producer using aiokafka.

logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
on_init() → None[source]
Return type:None
key_partition(topic: str, key: bytes) → faust.types.tuples.TP[source]
Return type:TP
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None[source]
Return type:None
coroutine on_restart(self) → None[source]

Called every time when the service is restarted.

Return type:None
coroutine on_start(self) → None[source]

Called every time before the service is started/restarted.

Return type:None
coroutine on_stop(self) → None[source]

Called every time before the service is stopped/restarted.

Return type:None
coroutine send(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → Awaitable[faust.types.tuples.RecordMetadata][source]
Return type:Awaitable[RecordMetadata]
coroutine send_and_wait(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → faust.types.tuples.RecordMetadata[source]
Return type:RecordMetadata
class faust.transport.drivers.aiokafka.Transport(*args, **kwargs) → None[source]

Kafka transport using aiokafka.

class Consumer(transport: faust.types.transports.TransportT, callback: Callable[faust.types.tuples.Message, Awaitable], on_partitions_revoked: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], on_partitions_assigned: Callable[Set[faust.types.tuples.TP], Awaitable[NoneType]], *, commit_interval: float = None, commit_livelock_soft_timeout: float = None, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None

Kafka consumer using aiokafka.

RebalanceListener

alias of ConsumerRebalanceListener

assignment() → Set[faust.types.tuples.TP]
Return type:Set[TP]
close() → None
Return type:None
consumer_stopped_errors = (<class 'aiokafka.errors.ConsumerStoppedError'>,)
coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 30.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None
Return type:None
coroutine earliest_offsets(self, *partitions) → MutableMapping[faust.types.tuples.TP, int]
Return type:MutableMapping[TP, int]
fetch_timeout = 10.0
getmany(timeout: float) → AsyncIterator[Tuple[faust.types.tuples.TP, faust.types.tuples.Message]]
Return type:AsyncIterator[Tuple[TP, Message]]
highwater(tp: faust.types.tuples.TP) → int
Return type:int
coroutine highwaters(self, *partitions) → MutableMapping[faust.types.tuples.TP, int]
Return type:MutableMapping[TP, int]
logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
on_init() → None
Return type:None
coroutine on_restart(self) → None

Called every time when the service is restarted.

Return type:None
coroutine on_start(self) → None

Called every time before the service is started/restarted.

Return type:None
coroutine on_stop(self) → None

Called every time before the service is stopped/restarted.

Return type:None
coroutine pause_partitions(self, tps: Iterable[faust.types.tuples.TP]) → None
Return type:None
coroutine perform_seek(self) → None
Return type:None
coroutine position(self, tp: faust.types.tuples.TP) → Union[int, NoneType]
Return type:Optional[int]
coroutine resume_partitions(self, tps: Iterable[faust.types.tuples.TP]) → None
Return type:None
coroutine seek(self, partition: faust.types.tuples.TP, offset: int) → None
Return type:None
coroutine subscribe(self, topics: Iterable[str]) → None
Return type:None
class Producer(transport: faust.types.transports.TransportT, loop: asyncio.events.AbstractEventLoop = None, **kwargs) → None

Kafka producer using aiokafka.

coroutine create_topic(self, topic: str, partitions: int, replication: int, *, config: Mapping[str, Any] = None, timeout: Union[datetime.timedelta, float, str] = 20.0, retention: Union[datetime.timedelta, float, str] = None, compacting: bool = None, deleting: bool = None, ensure_created: bool = False) → None
Return type:None
key_partition(topic: str, key: bytes) → faust.types.tuples.TP
Return type:TP
logger = <Logger faust.transport.drivers.aiokafka (WARNING)>
on_init() → None
Return type:None
coroutine on_restart(self) → None

Called every time when the service is restarted.

Return type:None
coroutine on_start(self) → None

Called every time before the service is started/restarted.

Return type:None
coroutine on_stop(self) → None

Called every time before the service is stopped/restarted.

Return type:None
coroutine send(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → Awaitable[faust.types.tuples.RecordMetadata]
Return type:Awaitable[RecordMetadata]
coroutine send_and_wait(self, topic: str, key: Union[bytes, NoneType], value: Union[bytes, NoneType], partition: Union[int, NoneType]) → faust.types.tuples.RecordMetadata
Return type:RecordMetadata
default_port = 9092
driver_version = 'aiokafka=0.4.18'