Source code for faust.types.channels

import abc
import asyncio
import typing
from typing import Any, AsyncIterator, Awaitable, Optional, Set

from mode import Seconds
from mode.utils.futures import stampede
from mode.utils.queues import ThrowableQueue

from .codecs import CodecArg
from .core import K, V
from .tuples import (
    FutureMessage,
    Message,
    MessageSentCallback,
    RecordMetadata,
    TP,
)

if typing.TYPE_CHECKING:
    from .app import AppT
    from .events import EventT
    from .models import ModelArg
    from .streams import StreamT
    from .transports import ConsumerT, TPorTopicSet
else:
[docs] class AppT: ... # noqa
[docs] class EventT: ... # noqa
[docs] class ModelArg: ... # noqa
[docs] class StreamT: ... # noqa
[docs] class ConsumerT: ... # noqa
[docs] class TPorTopicSet: ... # noqa
[docs]class ChannelT(AsyncIterator): app: AppT key_type: Optional[ModelArg] value_type: Optional[ModelArg] loop: Optional[asyncio.AbstractEventLoop] maxsize: Optional[int] active_partitions: Optional[Set[TP]] @abc.abstractmethod def __init__(self, app: AppT, *, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, queue: ThrowableQueue = None, maxsize: int = None, root: 'ChannelT' = None, active_partitions: Set[TP] = None, loop: asyncio.AbstractEventLoop = None) -> None: ...
[docs] @abc.abstractmethod def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> 'ChannelT': ...
[docs] @abc.abstractmethod def clone_using_queue(self, queue: asyncio.Queue) -> 'ChannelT': ...
[docs] @abc.abstractmethod def stream(self, **kwargs: Any) -> StreamT: ...
[docs] @abc.abstractmethod def get_topic_name(self) -> str: ...
[docs] @abc.abstractmethod async def send(self, *, key: K = None, value: V = None, partition: int = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, force: bool = False) -> Awaitable[RecordMetadata]: ...
[docs] @abc.abstractmethod def as_future_message( self, key: K = None, value: V = None, partition: int = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None) -> FutureMessage: ...
[docs] @abc.abstractmethod async def publish_message(self, fut: FutureMessage, wait: bool = True) -> Awaitable[RecordMetadata]: ...
[docs] @stampede @abc.abstractmethod async def maybe_declare(self) -> None: ...
[docs] @abc.abstractmethod async def declare(self) -> None: ...
[docs] @abc.abstractmethod def prepare_key(self, key: K, key_serializer: CodecArg) -> Any: ...
[docs] @abc.abstractmethod def prepare_value(self, value: V, value_serializer: CodecArg) -> Any: ...
[docs] @abc.abstractmethod async def decode(self, message: Message, *, propagate: bool = False) -> EventT: ...
[docs] @abc.abstractmethod async def deliver(self, message: Message) -> None: ...
[docs] @abc.abstractmethod async def put(self, value: Any) -> None: ...
[docs] @abc.abstractmethod async def get(self, *, timeout: Seconds = None) -> Any: ...
[docs] @abc.abstractmethod def empty(self) -> bool: ...
[docs] @abc.abstractmethod async def on_key_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod async def on_value_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod async def on_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod def on_stop_iteration(self) -> None: ...
@abc.abstractmethod def __aiter__(self) -> 'ChannelT': ... @abc.abstractmethod async def __anext__(self) -> EventT: ...
[docs] @abc.abstractmethod async def throw(self, exc: BaseException) -> None: ...
[docs] @abc.abstractmethod def derive(self, **kwargs: Any) -> 'ChannelT': ...
@property @abc.abstractmethod def subscriber_count(self) -> int: ... @property @abc.abstractmethod def queue(self) -> ThrowableQueue: ...