import abc
import asyncio
import typing
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
no_type_check,
)
from mode import Seconds, ServiceT
from mode.utils.trees import NodeT
from .channels import ChannelT
from .core import K
from .events import EventT
from .models import FieldDescriptorT, ModelArg
from .topics import TopicT
from .tuples import TP
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
from .join import JoinT as _JoinT
from .serializers import SchemaT as _SchemaT
else:
class _AppT: ... # noqa
class _JoinT: ... # noqa
class _SchemaT: ... # noqa
__all__ = [
'Processor',
'GroupByKeyArg',
'StreamT',
'T',
'T_co',
'T_contra',
]
# Used for typing StreamT[Withdrawal]
T = TypeVar('T')
T_co = TypeVar('T_co', covariant=True)
T_contra = TypeVar('T_contra', contravariant=True)
Processor = Callable[[T], Union[T, Awaitable[T]]]
#: Type of the `key` argument to `Stream.group_by()`
GroupByKeyArg = Union[FieldDescriptorT, Callable[[T], K]]
class JoinableT(abc.ABC):
@abc.abstractmethod
def combine(self, *nodes: 'JoinableT', **kwargs: Any) -> 'StreamT':
...
@abc.abstractmethod
def join(self, *fields: FieldDescriptorT) -> 'StreamT':
...
@abc.abstractmethod
def left_join(self, *fields: FieldDescriptorT) -> 'StreamT':
...
@abc.abstractmethod
def inner_join(self, *fields: FieldDescriptorT) -> 'StreamT':
...
@abc.abstractmethod
def outer_join(self, *fields: FieldDescriptorT) -> 'StreamT':
...
@abc.abstractmethod
def __and__(self, other: Any) -> Any:
...
@abc.abstractmethod
def contribute_to_stream(self, active: 'StreamT') -> None:
...
@abc.abstractmethod
async def remove_from_stream(self, stream: 'StreamT') -> None:
...
@abc.abstractmethod
def _human_channel(self) -> str:
...
[docs]class StreamT(AsyncIterable[T_co], JoinableT, ServiceT):
app: _AppT
channel: AsyncIterator[T_co]
outbox: Optional[asyncio.Queue] = None
join_strategy: Optional[_JoinT] = None
task_owner: Optional[asyncio.Task] = None
current_event: Optional[EventT] = None
active_partitions: Optional[Set[TP]] = None
concurrency_index: Optional[int] = None
enable_acks: bool = True
prefix: str = ''
# List of combined streams/tables after ret = (s1 & s2) combined them.
# AFter this ret.combined == [s1, s2]
combined: List[JoinableT]
# group_by, through, etc. sets this, and it means the
# active stream (the one the agent would be reading from) can be found
# by walking the path of links::
# >>> node = stream
# >>> while node._next:
# ... node = node._next
# which is also what .get_active_stream() gives
_next: Optional['StreamT'] = None
_prev: Optional['StreamT'] = None
@abc.abstractmethod
def __init__(self,
channel: AsyncIterator[T_co] = None,
*,
app: _AppT = None,
processors: Iterable[Processor[T]] = None,
combined: List[JoinableT] = None,
on_start: Callable = None,
join_strategy: _JoinT = None,
beacon: NodeT = None,
concurrency_index: int = None,
prev: 'StreamT' = None,
active_partitions: Set[TP] = None,
enable_acks: bool = True,
prefix: str = '',
loop: asyncio.AbstractEventLoop = None) -> None:
...
[docs] @abc.abstractmethod
def get_active_stream(self) -> 'StreamT':
...
[docs] @abc.abstractmethod
def add_processor(self, processor: Processor[T]) -> None:
...
[docs] @abc.abstractmethod
def info(self) -> Mapping[str, Any]:
...
[docs] @abc.abstractmethod
def clone(self, **kwargs: Any) -> 'StreamT':
...
[docs] @abc.abstractmethod
@no_type_check
async def items(self) -> AsyncIterator[Tuple[K, T_co]]:
...
[docs] @abc.abstractmethod
@no_type_check
async def events(self) -> AsyncIterable[EventT]:
...
[docs] @abc.abstractmethod
@no_type_check
async def take(self, max_: int,
within: Seconds) -> AsyncIterable[Sequence[T_co]]:
...
[docs] @abc.abstractmethod
def enumerate(self, start: int = 0) -> AsyncIterable[Tuple[int, T_co]]:
...
[docs] @abc.abstractmethod
def through(self, channel: Union[str, ChannelT]) -> 'StreamT':
...
[docs] @abc.abstractmethod
def echo(self, *channels: Union[str, ChannelT]) -> 'StreamT':
...
[docs] @abc.abstractmethod
def group_by(self,
key: GroupByKeyArg,
*,
name: str = None,
topic: TopicT = None) -> 'StreamT':
...
[docs] @abc.abstractmethod
def derive_topic(self,
name: str,
*,
schema: _SchemaT = None,
key_type: ModelArg = None,
value_type: ModelArg = None,
prefix: str = '',
suffix: str = '') -> TopicT:
...
[docs] @abc.abstractmethod
async def throw(self, exc: BaseException) -> None:
...
[docs] @abc.abstractmethod
async def send(self, value: T_contra) -> None:
...
@abc.abstractmethod
def __copy__(self) -> 'StreamT':
...
@abc.abstractmethod
def __iter__(self) -> Any:
...
@abc.abstractmethod
def __next__(self) -> T:
...
@abc.abstractmethod
def __aiter__(self) -> AsyncIterator[T_co]:
...
[docs] @abc.abstractmethod
async def ack(self, event: EventT) -> bool:
...