Source code for faust.types.topics

import abc
import asyncio
import typing
from typing import Any, Mapping, Optional, Pattern, Sequence, Set, Union

from mode import Seconds
from mode.utils.queues import ThrowableQueue

from .channels import ChannelT
from .codecs import CodecArg
from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
    from .models import ModelArg as _ModelArg
    from .serializers import SchemaT as _SchemaT
else:
    class _AppT: ...             # noqa
    class _ModelArg: ...         # noqa
    class _SchemaT: ...          # noqa

__all__ = ['TopicT']


[docs]class TopicT(ChannelT): #: Iterable/Sequence of topic names to subscribe to. topics: Sequence[str] #: Topic retention setting: expiry time in seconds #: for messages in the topic. retention: Optional[Seconds] #: Flag that when enabled means the topic can be "compacted": #: if the topic is a log of key/value pairs, the broker can delete #: old values for the same key. compacting: Optional[bool] deleting: Optional[bool] #: Number of replicas for topic. replicas: Optional[int] #: Additional configuration as a mapping. config: Optional[Mapping[str, Any]] #: Enable acks for this topic. acks: bool #: Mark topic as internal: it's owned by us and we are allowed #: to create or delete the topic as necessary. internal: bool has_prefix: bool = False active_partitions: Optional[Set[TP]] @abc.abstractmethod def __init__(self, app: _AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern] = None, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: ThrowableQueue = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, maxsize: int = None, root: ChannelT = None, active_partitions: Set[TP] = None, allow_empty: bool = False, has_prefix: bool = False, loop: asyncio.AbstractEventLoop = None) -> None: ... @property @abc.abstractmethod def pattern(self) -> Optional[Pattern]: ... @pattern.setter def pattern(self, pattern: Union[str, Pattern]) -> None: ... @property @abc.abstractmethod def partitions(self) -> Optional[int]: ... @partitions.setter def partitions(self, partitions: int) -> None: ...
[docs] @abc.abstractmethod def derive(self, **kwargs: Any) -> ChannelT: ...
[docs] @abc.abstractmethod def derive_topic(self, *, topics: Sequence[str] = None, schema: _SchemaT = None, key_type: _ModelArg = None, value_type: _ModelArg = None, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) -> 'TopicT': ...