# App.transport creates Kafka consumer and producer.
import abc
import asyncio
import ssl
import typing
from typing import (
AbstractSet,
Any,
AsyncIterator,
Awaitable,
Callable,
ClassVar,
Iterable,
Iterator,
List,
Mapping,
MutableSet,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
no_type_check,
)
from mode import Seconds, ServiceT
from yarl import URL
from .core import HeadersArg
from .channels import ChannelT
from .tuples import Message, RecordMetadata, TP
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
else:
class _AppT: ... # noqa
__all__ = [
'ConsumerCallback',
'TPorTopicSet',
'PartitionsRevokedCallback',
'PartitionsAssignedCallback',
'PartitionerT',
'ConsumerT',
'ProducerT',
'ConductorT',
'TransactionManagerT',
'TransportT',
]
#: Callback called by :class:`faust.transport.base.Consumer` whenever
#: a message is received.
ConsumerCallback = Callable[[Message], Awaitable]
#: Argument to Consumer.commit to specify topics/tps to commit.
TPorTopic = Union[str, TP]
TPorTopicSet = AbstractSet[TPorTopic]
#: Callback (:keyword:`async def`) called when consumer partitions are revoked.
PartitionsRevokedCallback = Callable[[Set[TP]], Awaitable[None]]
#: Callback (:keyword:`async def`) called when consumer
#: partitions are assigned.
PartitionsAssignedCallback = Callable[[Set[TP]], Awaitable[None]]
PartitionerT = Callable[
[Optional[bytes], # key to hash (or None)
Sequence[int], # all partitions
Sequence[int]], # available partitions
int,
]
[docs]class ProducerT(ServiceT):
#: The transport that created this Producer.
transport: 'TransportT'
client_id: str
linger_ms: int
max_batch_size: int
acks: int
max_request_size: int
compression_type: Optional[str]
ssl_context: Optional[ssl.SSLContext]
partitioner: Optional[PartitionerT]
request_timeout: float
@abc.abstractmethod
def __init__(self, transport: 'TransportT',
loop: asyncio.AbstractEventLoop = None,
**kwargs: Any) -> None:
...
[docs] @abc.abstractmethod
async def send(self, topic: str, key: Optional[bytes],
value: Optional[bytes],
partition: Optional[int],
timestamp: Optional[float],
headers: Optional[HeadersArg],
*,
transactional_id: str = None) -> Awaitable[RecordMetadata]:
...
[docs] @abc.abstractmethod
async def send_and_wait(self, topic: str, key: Optional[bytes],
value: Optional[bytes],
partition: Optional[int],
timestamp: Optional[float],
headers: Optional[HeadersArg],
*,
transactional_id: str = None) -> RecordMetadata:
...
[docs] @abc.abstractmethod
async def create_topic(self,
topic: str,
partitions: int,
replication: int,
*,
config: Mapping[str, Any] = None,
timeout: Seconds = 1000.0,
retention: Seconds = None,
compacting: bool = None,
deleting: bool = None,
ensure_created: bool = False) -> None:
...
[docs] @abc.abstractmethod
def key_partition(self, topic: str, key: bytes) -> TP:
...
[docs] @abc.abstractmethod
async def flush(self) -> None:
...
[docs] @abc.abstractmethod
async def begin_transaction(self, transactional_id: str) -> None:
...
[docs] @abc.abstractmethod
async def commit_transaction(self, transactional_id: str) -> None:
...
[docs] @abc.abstractmethod
async def abort_transaction(self, transactional_id: str) -> None:
...
[docs] @abc.abstractmethod
async def stop_transaction(self, transactional_id: str) -> None:
...
[docs] @abc.abstractmethod
async def maybe_begin_transaction(self, transactional_id: str) -> None:
...
[docs] @abc.abstractmethod
async def commit_transactions(
self,
tid_to_offset_map: Mapping[str, Mapping[TP, int]],
group_id: str,
start_new_transaction: bool = True) -> None:
...
[docs]class TransactionManagerT(ProducerT):
consumer: 'ConsumerT'
producer: 'ProducerT'
@abc.abstractmethod
def __init__(self,
transport: 'TransportT',
loop: asyncio.AbstractEventLoop = None,
*,
consumer: 'ConsumerT',
producer: 'ProducerT',
**kwargs: Any) -> None:
...
[docs] @abc.abstractmethod
async def on_partitions_revoked(self, revoked: Set[TP]) -> None:
...
[docs] @abc.abstractmethod
async def on_rebalance(self,
assigned: Set[TP],
revoked: Set[TP],
newly_assigned: Set[TP]) -> None:
...
[docs] @abc.abstractmethod
async def commit(self, offsets: Mapping[TP, int],
start_new_transaction: bool = True) -> bool:
...
[docs] async def begin_transaction(self, transactional_id: str) -> None:
raise NotImplementedError()
[docs] async def commit_transaction(self, transactional_id: str) -> None:
raise NotImplementedError()
[docs] async def abort_transaction(self, transactional_id: str) -> None:
raise NotImplementedError()
[docs] async def stop_transaction(self, transactional_id: str) -> None:
raise NotImplementedError()
[docs] async def maybe_begin_transaction(self, transactional_id: str) -> None:
raise NotImplementedError()
[docs] async def commit_transactions(
self,
tid_to_offset_map: Mapping[str, Mapping[TP, int]],
group_id: str,
start_new_transaction: bool = True) -> None:
raise NotImplementedError()
class SchedulingStrategyT:
@abc.abstractmethod
def __init__(self) -> None:
...
@abc.abstractmethod
def iterate(self, records: Mapping[TP, List]) -> Iterator[Tuple[TP, Any]]:
...
[docs]class ConsumerT(ServiceT):
#: The transport that created this Consumer.
transport: 'TransportT'
transactions: TransactionManagerT
#: How often we commit topic offsets.
#: See :setting:`broker_commit_interval`.
commit_interval: float
#: Set of topic names that are considered "randomly assigned".
#: This means we don't crash if it's not part of our assignment.
#: Used by e.g. the leader assignor service.
randomly_assigned_topics: Set[str]
in_transaction: bool
scheduler: SchedulingStrategyT
@abc.abstractmethod
def __init__(self,
transport: 'TransportT',
callback: ConsumerCallback,
on_partitions_revoked: PartitionsRevokedCallback,
on_partitions_assigned: PartitionsAssignedCallback,
*,
commit_interval: float = None,
loop: asyncio.AbstractEventLoop = None,
**kwargs: Any) -> None:
self._on_partitions_revoked: PartitionsRevokedCallback
self._on_partitions_assigned: PartitionsAssignedCallback
[docs] @abc.abstractmethod
async def create_topic(self,
topic: str,
partitions: int,
replication: int,
*,
config: Mapping[str, Any] = None,
timeout: Seconds = 1000.0,
retention: Seconds = None,
compacting: bool = None,
deleting: bool = None,
ensure_created: bool = False) -> None:
...
[docs] @abc.abstractmethod
async def subscribe(self, topics: Iterable[str]) -> None:
...
[docs] @abc.abstractmethod
@no_type_check
async def getmany(self,
timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
...
[docs] @abc.abstractmethod
def track_message(self, message: Message) -> None:
...
[docs] @abc.abstractmethod
def ack(self, message: Message) -> bool:
...
[docs] @abc.abstractmethod
async def wait_empty(self) -> None:
...
[docs] @abc.abstractmethod
def assignment(self) -> Set[TP]:
...
[docs] @abc.abstractmethod
def highwater(self, tp: TP) -> int:
...
[docs] @abc.abstractmethod
def stop_flow(self) -> None:
...
[docs] @abc.abstractmethod
def resume_flow(self) -> None:
...
[docs] @abc.abstractmethod
def pause_partitions(self, tps: Iterable[TP]) -> None:
...
[docs] @abc.abstractmethod
def resume_partitions(self, tps: Iterable[TP]) -> None:
...
[docs] @abc.abstractmethod
async def position(self, tp: TP) -> Optional[int]:
...
[docs] @abc.abstractmethod
async def seek(self, partition: TP, offset: int) -> None:
...
[docs] @abc.abstractmethod
async def seek_wait(self, partitions: Mapping[TP, int]) -> None:
...
[docs] @abc.abstractmethod
async def commit(self,
topics: TPorTopicSet = None,
start_new_transaction: bool = True) -> bool:
...
[docs] @abc.abstractmethod
async def on_task_error(self, exc: BaseException) -> None:
...
[docs] @abc.abstractmethod
async def earliest_offsets(self, *partitions: TP) -> Mapping[TP, int]:
...
[docs] @abc.abstractmethod
async def highwaters(self, *partitions: TP) -> Mapping[TP, int]:
...
[docs] @abc.abstractmethod
def topic_partitions(self, topic: str) -> Optional[int]:
...
[docs] @abc.abstractmethod
def key_partition(self,
topic: str,
key: Optional[bytes],
partition: int = None) -> Optional[int]:
...
[docs] @abc.abstractmethod
def close(self) -> None:
...
@property
@abc.abstractmethod
def unacked(self) -> Set[Message]:
...
[docs]class ConductorT(ServiceT, MutableSet[ChannelT]):
# The topic conductor delegates messages from the Consumer
# to the various Topic instances subscribed to a topic.
app: _AppT
@abc.abstractmethod
def __init__(self, app: _AppT, **kwargs: Any) -> None:
...
[docs] @abc.abstractmethod
def acks_enabled_for(self, topic: str) -> bool:
...
[docs] @abc.abstractmethod
async def commit(self, topics: TPorTopicSet) -> bool:
...
[docs] @abc.abstractmethod
async def wait_for_subscriptions(self) -> None:
...
[docs] @abc.abstractmethod
async def maybe_wait_for_subscriptions(self) -> None:
...
[docs] @abc.abstractmethod
async def on_partitions_assigned(self, assigned: Set[TP]) -> None:
...
[docs]class TransportT(abc.ABC):
#: The Consumer class used for this type of transport.
Consumer: ClassVar[Type[ConsumerT]]
#: The Producer class used for this type of transport.
Producer: ClassVar[Type[ProducerT]]
#: The TransactionManager class used for managing multiple transactions.
TransactionManager: ClassVar[Type[TransactionManagerT]]
#: The Conductor class used to delegate messages from Consumer to streams.
Conductor: ClassVar[Type[ConductorT]]
#: The Fetcher service used for this type of transport.
Fetcher: ClassVar[Type[ServiceT]]
#: The :class:`faust.App` that created this transport.
app: _AppT
#: The URL to use for this transport (e.g. kafka://localhost).
url: List[URL]
#: String identifying the underlying driver used for this transport.
#: E.g. for :pypi:`aiokafka` this could be ``aiokafka 0.4.1``.
driver_version: str
loop: asyncio.AbstractEventLoop
@abc.abstractmethod
def __init__(self,
url: List[URL],
app: _AppT,
loop: asyncio.AbstractEventLoop = None) -> None:
...
[docs] @abc.abstractmethod
def create_consumer(self, callback: ConsumerCallback,
**kwargs: Any) -> ConsumerT:
...
[docs] @abc.abstractmethod
def create_producer(self, **kwargs: Any) -> ProducerT:
...
[docs] @abc.abstractmethod
def create_transaction_manager(self,
consumer: ConsumerT,
producer: ProducerT,
**kwargs: Any) -> TransactionManagerT:
...
[docs] @abc.abstractmethod
def create_conductor(self, **kwargs: Any) -> ConductorT:
...