import abc
import asyncio
import typing
from typing import Any, AsyncIterator, Awaitable, Optional, Set
from mode import Seconds
from mode.utils.futures import stampede
from mode.utils.queues import ThrowableQueue
from .codecs import CodecArg
from .core import K, V
from .tuples import (
FutureMessage,
Message,
MessageSentCallback,
RecordMetadata,
TP,
)
if typing.TYPE_CHECKING:
from .app import AppT
from .events import EventT
from .models import ModelArg
from .streams import StreamT
from .transports import ConsumerT, TPorTopicSet
else:
[docs] class EventT: ... # noqa
[docs] class ModelArg: ... # noqa
[docs] class StreamT: ... # noqa
[docs] class ConsumerT: ... # noqa
[docs] class TPorTopicSet: ... # noqa
[docs]class ChannelT(AsyncIterator):
app: AppT
key_type: Optional[ModelArg]
value_type: Optional[ModelArg]
loop: Optional[asyncio.AbstractEventLoop]
maxsize: Optional[int]
active_partitions: Optional[Set[TP]]
@abc.abstractmethod
def __init__(self,
app: AppT,
*,
key_type: ModelArg = None,
value_type: ModelArg = None,
is_iterator: bool = False,
queue: ThrowableQueue = None,
maxsize: int = None,
root: 'ChannelT' = None,
active_partitions: Set[TP] = None,
loop: asyncio.AbstractEventLoop = None) -> None:
...
[docs] @abc.abstractmethod
def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> 'ChannelT':
...
[docs] @abc.abstractmethod
def clone_using_queue(self, queue: asyncio.Queue) -> 'ChannelT':
...
[docs] @abc.abstractmethod
def stream(self, **kwargs: Any) -> StreamT:
...
[docs] @abc.abstractmethod
def get_topic_name(self) -> str:
...
[docs] @abc.abstractmethod
async def send(self,
*,
key: K = None,
value: V = None,
partition: int = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
force: bool = False) -> Awaitable[RecordMetadata]:
...
[docs] @abc.abstractmethod
def as_future_message(
self,
key: K = None,
value: V = None,
partition: int = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None) -> FutureMessage:
...
[docs] @abc.abstractmethod
async def publish_message(self, fut: FutureMessage,
wait: bool = True) -> Awaitable[RecordMetadata]:
...
[docs] @stampede
@abc.abstractmethod
async def maybe_declare(self) -> None:
...
[docs] @abc.abstractmethod
async def declare(self) -> None:
...
[docs] @abc.abstractmethod
def prepare_key(self, key: K, key_serializer: CodecArg) -> Any:
...
[docs] @abc.abstractmethod
def prepare_value(self, value: V, value_serializer: CodecArg) -> Any:
...
[docs] @abc.abstractmethod
async def decode(self, message: Message, *,
propagate: bool = False) -> EventT:
...
[docs] @abc.abstractmethod
async def deliver(self, message: Message) -> None:
...
[docs] @abc.abstractmethod
async def put(self, value: Any) -> None:
...
[docs] @abc.abstractmethod
async def get(self, *, timeout: Seconds = None) -> Any:
...
[docs] @abc.abstractmethod
def empty(self) -> bool:
...
[docs] @abc.abstractmethod
async def on_key_decode_error(self, exc: Exception,
message: Message) -> None:
...
[docs] @abc.abstractmethod
async def on_value_decode_error(self, exc: Exception,
message: Message) -> None:
...
[docs] @abc.abstractmethod
async def on_decode_error(self, exc: Exception, message: Message) -> None:
...
[docs] @abc.abstractmethod
def on_stop_iteration(self) -> None:
...
@abc.abstractmethod
def __aiter__(self) -> 'ChannelT':
...
@abc.abstractmethod
async def __anext__(self) -> EventT:
...
[docs] @abc.abstractmethod
async def throw(self, exc: BaseException) -> None:
...
[docs] @abc.abstractmethod
def derive(self, **kwargs: Any) -> 'ChannelT':
...
@property
@abc.abstractmethod
def subscriber_count(self) -> int:
...
@property
@abc.abstractmethod
def queue(self) -> ThrowableQueue:
...