Source code for faust.types.topics

import abc
import asyncio
import typing
from typing import Any, Mapping, Optional, Pattern, Sequence, Set, Union

from mode import Seconds
from mode.utils.queues import ThrowableQueue

from .channels import ChannelT
from .codecs import CodecArg
from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT
    from .models import ModelArg
else:
    class AppT: ...             # noqa
    class ModelArg: ...         # noqa

__all__ = ['TopicT']


[docs]class TopicT(ChannelT): #: Iterable/Sequence of topic names to subscribe to. topics: Sequence[str] #: or instead of ``topics``, a regular expression used #: to match topics we want to subscribe to. pattern: Optional[Pattern] #: Topic retention setting: expiry time in seconds #: for messages in the topic. retention: Optional[Seconds] #: Flag that when enabled means the topic can be "compacted": #: if the topic is a log of key/value pairs, the broker can delete #: old values for the same key. compacting: Optional[bool] deleting: Optional[bool] #: Number of replicas for topic. replicas: Optional[int] #: Additional configuration as a mapping. config: Optional[Mapping[str, Any]] #: Enable acks for this topic. acks: bool #: Mark topic as internal: it's owned by us and we are allowed #: to create or delete the topic as necessary. internal: bool active_partitions: Optional[Set[TP]] @abc.abstractmethod def __init__(self, app: AppT, *, topics: Sequence[str] = None, pattern: Union[str, Pattern] = None, key_type: ModelArg = None, value_type: ModelArg = None, is_iterator: bool = False, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, replicas: int = None, acks: bool = True, internal: bool = False, config: Mapping[str, Any] = None, queue: ThrowableQueue = None, key_serializer: CodecArg = None, value_serializer: CodecArg = None, maxsize: int = None, root: ChannelT = None, active_partitions: Set[TP] = None, allow_empty: bool = False, loop: asyncio.AbstractEventLoop = None) -> None: ... @property @abc.abstractmethod def pattern(self) -> Optional[Pattern]: ... @pattern.setter def pattern(self, pattern: Union[str, Pattern]) -> None: ... @property @abc.abstractmethod def partitions(self) -> Optional[int]: ... @partitions.setter def partitions(self, partitions: int) -> None: ...
[docs] @abc.abstractmethod def derive(self, **kwargs: Any) -> ChannelT: ...
[docs] @abc.abstractmethod def derive_topic(self, *, topics: Sequence[str] = None, key_type: ModelArg = None, value_type: ModelArg = None, partitions: int = None, retention: Seconds = None, compacting: bool = None, deleting: bool = None, internal: bool = False, config: Mapping[str, Any] = None, prefix: str = '', suffix: str = '', **kwargs: Any) -> 'TopicT': ...