import abc
import typing
from typing import Any, Dict, Iterable, Optional
from mode import ServiceT
from . import web
from .assignor import PartitionAssignorT
from .events import EventT
from .streams import StreamT
from .tables import CollectionT
from .topics import TopicT
from .transports import ConsumerT, ProducerT
from .tuples import Message, PendingMessage, RecordMetadata, TP
if typing.TYPE_CHECKING:
from .app import AppT as _AppT
else:
class _AppT: ... # noqa
__all__ = ['SensorInterfaceT', 'SensorT', 'SensorDelegateT']
[docs]class SensorInterfaceT(abc.ABC):
[docs] @abc.abstractmethod
def on_message_in(self, tp: TP, offset: int, message: Message) -> None:
...
[docs] @abc.abstractmethod
def on_stream_event_in(self, tp: TP, offset: int, stream: StreamT,
event: EventT) -> Optional[Dict]:
...
[docs] @abc.abstractmethod
def on_stream_event_out(self, tp: TP, offset: int, stream: StreamT,
event: EventT, state: Dict = None) -> None:
...
[docs] @abc.abstractmethod
def on_topic_buffer_full(self, topic: TopicT) -> None:
...
[docs] @abc.abstractmethod
def on_message_out(self,
tp: TP,
offset: int,
message: Message) -> None:
...
[docs] @abc.abstractmethod
def on_table_get(self, table: CollectionT, key: Any) -> None:
...
[docs] @abc.abstractmethod
def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None:
...
[docs] @abc.abstractmethod
def on_table_del(self, table: CollectionT, key: Any) -> None:
...
[docs] @abc.abstractmethod
def on_commit_initiated(self, consumer: ConsumerT) -> Any:
...
[docs] @abc.abstractmethod
def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None:
...
[docs] @abc.abstractmethod
def on_send_initiated(self, producer: ProducerT, topic: str,
message: PendingMessage,
keysize: int, valsize: int) -> Any:
...
[docs] @abc.abstractmethod
def on_send_completed(self,
producer: ProducerT,
state: Any,
metadata: RecordMetadata) -> None:
...
[docs] @abc.abstractmethod
def on_send_error(self,
producer: ProducerT,
exc: BaseException,
state: Any) -> None:
...
[docs] @abc.abstractmethod
def on_assignment_start(self, assignor: PartitionAssignorT) -> Dict:
...
[docs] @abc.abstractmethod
def on_assignment_error(self,
assignor: PartitionAssignorT,
state: Dict,
exc: BaseException) -> None:
...
[docs] @abc.abstractmethod
def on_assignment_completed(self,
assignor: PartitionAssignorT,
state: Dict) -> None:
...
[docs] @abc.abstractmethod
def on_rebalance_start(self, app: _AppT) -> Dict:
...
[docs] @abc.abstractmethod
def on_rebalance_return(self, app: _AppT, state: Dict) -> None:
...
[docs] @abc.abstractmethod
def on_rebalance_end(self, app: _AppT, state: Dict) -> None:
...
[docs] @abc.abstractmethod
def on_web_request_start(self, app: _AppT, request: web.Request, *,
view: web.View = None) -> Dict:
...
[docs] @abc.abstractmethod
def on_web_request_end(self,
app: _AppT,
request: web.Request,
response: Optional[web.Response],
state: Dict,
*,
view: web.View = None) -> None:
...
[docs]class SensorT(SensorInterfaceT, ServiceT):
...
[docs]class SensorDelegateT(SensorInterfaceT, Iterable):
# Delegate calls to many sensors.
[docs] @abc.abstractmethod
def add(self, sensor: SensorT) -> None:
...
[docs] @abc.abstractmethod
def remove(self, sensor: SensorT) -> None:
...