Source code for faust.types.channels

import abc
import asyncio
import typing
from typing import Any, AsyncIterator, Awaitable, Optional, Set

from mode import Seconds
from mode.utils.futures import stampede
from mode.utils.queues import ThrowableQueue

from .codecs import CodecArg
from .core import HeadersArg, K, V
from .tuples import (
    FutureMessage,
    Message,
    MessageSentCallback,
    RecordMetadata,
    TP,
)

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .events import EventT as _EventT
    from .models import ModelArg as _ModelArg
    from .serializers import SchemaT as _SchemaT
    from .streams import StreamT as _StreamT
else:
    class _AppT: ...             # noqa
    class _EventT: ...           # noqa
    class _ModelArg: ...         # noqa
    class _SchemaT: ...          # noqa
    class _StreamT: ...          # noqa


[docs]class ChannelT(AsyncIterator): app: _AppT schema: _SchemaT key_type: Optional[_ModelArg] value_type: Optional[_ModelArg] loop: Optional[asyncio.AbstractEventLoop] maxsize: Optional[int] active_partitions: Optional[Set[TP]] @abc.abstractmethod def __init__(self, app: _AppT, *, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, is_iterator: bool = False, queue: ThrowableQueue = None, maxsize: int = None, root: 'ChannelT' = None, active_partitions: Set[TP] = None, loop: asyncio.AbstractEventLoop = None) -> None: ...
[docs] @abc.abstractmethod def clone(self, *, is_iterator: bool = None, **kwargs: Any) -> 'ChannelT': ...
[docs] @abc.abstractmethod def clone_using_queue(self, queue: asyncio.Queue) -> 'ChannelT': ...
[docs] @abc.abstractmethod def stream(self, **kwargs: Any) -> _StreamT: ...
[docs] @abc.abstractmethod def get_topic_name(self) -> str: ...
[docs] @abc.abstractmethod async def send(self, *, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, schema: _SchemaT = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, force: bool = False) -> Awaitable[RecordMetadata]: ...
[docs] @abc.abstractmethod def send_soon(self, *, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, schema: _SchemaT = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, force: bool = False, eager_partitioning: bool = False) -> FutureMessage: ...
[docs] @abc.abstractmethod def as_future_message( self, key: K = None, value: V = None, partition: int = None, timestamp: float = None, headers: HeadersArg = None, schema: _SchemaT = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, callback: MessageSentCallback = None, eager_partitioning: bool = False) -> FutureMessage: ...
[docs] @abc.abstractmethod async def publish_message(self, fut: FutureMessage, wait: bool = True) -> Awaitable[RecordMetadata]: ...
[docs] @stampede @abc.abstractmethod async def maybe_declare(self) -> None: ...
[docs] @abc.abstractmethod async def declare(self) -> None: ...
[docs] @abc.abstractmethod def prepare_key(self, key: K, key_serializer: CodecArg, schema: _SchemaT = None) -> Any: ...
[docs] @abc.abstractmethod def prepare_value(self, value: V, value_serializer: CodecArg, schema: _SchemaT = None) -> Any: ...
[docs] @abc.abstractmethod async def decode(self, message: Message, *, propagate: bool = False) -> _EventT: ...
[docs] @abc.abstractmethod async def deliver(self, message: Message) -> None: ...
[docs] @abc.abstractmethod async def put(self, value: Any) -> None: ...
[docs] @abc.abstractmethod async def get(self, *, timeout: Seconds = None) -> Any: ...
[docs] @abc.abstractmethod def empty(self) -> bool: ...
[docs] @abc.abstractmethod async def on_key_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod async def on_value_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod async def on_decode_error(self, exc: Exception, message: Message) -> None: ...
[docs] @abc.abstractmethod def on_stop_iteration(self) -> None: ...
@abc.abstractmethod def __aiter__(self) -> 'ChannelT': ... @abc.abstractmethod async def __anext__(self) -> _EventT: ...
[docs] @abc.abstractmethod async def throw(self, exc: BaseException) -> None: ...
@abc.abstractmethod def _throw(self, exc: BaseException) -> None: ...
[docs] @abc.abstractmethod def derive(self, **kwargs: Any) -> 'ChannelT': ...
@property @abc.abstractmethod def subscriber_count(self) -> int: ... @property @abc.abstractmethod def queue(self) -> ThrowableQueue: ...