Source code for faust.livecheck.signals

"""LiveCheck Signals - Test communication and synchronization."""
import asyncio
import typing

from time import monotonic
from typing import Any, Dict, Generic, Tuple, Type, TypeVar, cast
from mode import Seconds, want_seconds

from faust.models import maybe_model

from .exceptions import TestTimeout
from .locals import current_test_stack
from .models import SignalEvent

if typing.TYPE_CHECKING:
    from .case import Case as _Case
else:
    class _Case: ...  # noqa

__all__ = ['BaseSignal', 'Signal']

VT = TypeVar('VT')


[docs]class BaseSignal(Generic[VT]): """Generic base class for signals.""" name: str case: _Case index: int def __init__(self, name: str = '', case: _Case = None, index: int = -1) -> None: self.name = name self.case = cast(_Case, case) self.index = index
[docs] async def send(self, value: VT = None, *, key: Any = None, force: bool = False) -> None: """Notify test that this signal is now complete.""" raise NotImplementedError()
[docs] async def wait(self, *, key: Any = None, timeout: Seconds = None) -> VT: """Wait for signal to be completed.""" raise NotImplementedError()
[docs] async def resolve(self, key: Any, event: SignalEvent) -> None: """Resolve signal with value.""" self._set_current_value(key, event) self._wakeup_resolvers()
def __set_name__(self, owner: Type, name: str) -> None: if not self.name: self.name = name def _wakeup_resolvers(self) -> None: self.case.app._can_resolve.set() async def _wait_for_resolved(self, *, timeout: float = None) -> None: app = self.case.app app._can_resolve.clear() await app.wait(app._can_resolve, timeout=timeout) def _get_current_value(self, key: Any) -> SignalEvent: return self.case.app._resolved_signals[self._index_key(key)] def _index_key(self, key: Any) -> Tuple[str, str, Any]: return self.name, self.case.name, key def _set_current_value(self, key: Any, event: SignalEvent) -> None: self.case.app._resolved_signals[self._index_key(key)] = event
[docs] def clone(self, **kwargs: Any) -> 'BaseSignal': """Clone this signal using keyword arguments.""" return type(self)(**{**self._asdict(), **kwargs})
def _asdict(self, **kwargs: Any) -> Dict: return {'name': self.name, 'case': self.case, 'index': self.index} def __repr__(self) -> str: return f'<{type(self).__name__}: {self.name}>'
[docs]class Signal(BaseSignal[VT]): """Signal for test case using Kafka. Used to wait for something to happen elsewhere. """ # What do we use for this? Kafka? some other mechanism? # I'm thinking separate Kafka cluster, with a single # topic for each test app.
[docs] async def send(self, value: VT = None, *, key: Any = None, force: bool = False) -> None: """Notify test that this signal is now complete.""" current_test = current_test_stack.top if current_test is None: if not force: return assert key else: key = key if key is not None else current_test.id await self.case.app.bus.send( key=key, value=SignalEvent( signal_name=self.name, case_name=self.case.name, key=key, value=value, ), )
[docs] async def wait(self, *, key: Any = None, timeout: Seconds = None) -> VT: """Wait for signal to be completed.""" # wait for key to arrive in consumer runner = self.case.current_execution if runner is None: raise RuntimeError('No test executing.') test = runner.test assert test k: Any = test.id if key is None else key timeout_s = want_seconds(timeout) await runner.on_signal_wait(self, timeout=timeout_s) time_start = monotonic() event = await self._wait_for_message_by_key(key=k, timeout=timeout_s) time_end = monotonic() await runner.on_signal_received( self, time_start=time_start, time_end=time_end, ) self._verify_event(event, k, self.name, self.case.name) return cast(VT, maybe_model(event.value))
def _verify_event(self, ev: SignalEvent, key: Any, name: str, case: str) -> None: assert ev.key == key, f'{ev.key!r} == {key!r}' assert ev.signal_name == name, f'{ev.signal_name!r} == {name!r}' assert ev.case_name == case, f'{ev.case_name!r} == {case!r}' async def _wait_for_message_by_key( self, key: Any, *, timeout: float = None, max_interval: float = 2.0) -> SignalEvent: app = self.case.app time_start = monotonic() remaining = timeout # See if the key is already there. try: return self._get_current_value(key) except KeyError: pass # If not, wait for it to arrive. while not app.should_stop: if remaining is not None: remaining = remaining - (monotonic() - time_start) try: if remaining is not None and remaining <= 0.0: try: return self._get_current_value(key) except KeyError: raise asyncio.TimeoutError() from None max_wait = None if remaining is not None: max_wait = min(remaining, max_interval) await self._wait_for_resolved(timeout=max_wait) except asyncio.TimeoutError: msg = f'Timed out waiting for signal {self.name} ({timeout})' raise TestTimeout(msg) from None if app.should_stop: break try: val = self._get_current_value(key) return val except KeyError: pass raise asyncio.CancelledError()