Source code for faust.types.sensors

import abc
import typing
from typing import Any, Iterable

from mode import ServiceT

from .events import EventT
from .streams import StreamT
from .tables import CollectionT
from .topics import TopicT
from .transports import ConsumerT, ProducerT
from .tuples import Message, TP

if typing.TYPE_CHECKING:
    from .app import AppT
else:
    class AppT: ...  # noqa

__all__ = ['SensorInterfaceT', 'SensorT', 'SensorDelegateT']


[docs]class SensorInterfaceT(abc.ABC):
[docs] @abc.abstractmethod def on_message_in(self, tp: TP, offset: int, message: Message) -> None: ...
[docs] @abc.abstractmethod def on_stream_event_in(self, tp: TP, offset: int, stream: StreamT, event: EventT) -> None: ...
[docs] @abc.abstractmethod def on_stream_event_out(self, tp: TP, offset: int, stream: StreamT, event: EventT) -> None: ...
[docs] @abc.abstractmethod def on_topic_buffer_full(self, topic: TopicT) -> None: ...
[docs] @abc.abstractmethod def on_message_out(self, tp: TP, offset: int, message: Message) -> None: ...
[docs] @abc.abstractmethod def on_table_get(self, table: CollectionT, key: Any) -> None: ...
[docs] @abc.abstractmethod def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None: ...
[docs] @abc.abstractmethod def on_table_del(self, table: CollectionT, key: Any) -> None: ...
[docs] @abc.abstractmethod def on_commit_initiated(self, consumer: ConsumerT) -> Any: ...
[docs] @abc.abstractmethod def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None: ...
[docs] @abc.abstractmethod def on_send_initiated(self, producer: ProducerT, topic: str, keysize: int, valsize: int) -> Any: ...
[docs] @abc.abstractmethod def on_send_completed(self, producer: ProducerT, state: Any) -> None: ...
[docs]class SensorT(SensorInterfaceT, ServiceT): ...
[docs]class SensorDelegateT(SensorInterfaceT, Iterable): # Delegate calls to many sensors.
[docs] @abc.abstractmethod def add(self, sensor: SensorT) -> None: ...
[docs] @abc.abstractmethod def remove(self, sensor: SensorT) -> None: ...