"""LiveCheck - Faust Application."""
import asyncio
from datetime import timedelta
from typing import (
Any,
Callable,
ClassVar,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
)
from mode.utils.compat import want_bytes
from mode.utils.objects import annotations, cached_property, qualname
from mode.utils.times import Seconds
import faust
from faust.app.base import SCAN_CATEGORIES
from faust.sensors.base import Sensor
from faust.types import AgentT, AppT, EventT, StreamT, TP, TopicT
from faust.utils import venusian
from . import patches
from .case import Case
from .exceptions import LiveCheckError
from .locals import current_test, current_test_stack
from .models import SignalEvent, TestExecution, TestReport
from .signals import BaseSignal, Signal
__all__ = ['LiveCheck']
SCAN_CASE = 'livecheck.case'
#: alias for mypy bug
_Case = Case
patches.patch_all() # XXX
class LiveCheckSensor(Sensor):
def on_stream_event_in(self,
tp: TP,
offset: int,
stream: StreamT,
event: EventT) -> Optional[Dict]:
"""Call when stream starts processing event."""
test = TestExecution.from_headers(event.headers)
if test is not None:
stream.current_test = test
current_test_stack.push_without_automatic_cleanup(test)
return None
def on_stream_event_out(self,
tp: TP,
offset: int,
stream: StreamT,
event: EventT,
state: Dict = None) -> None:
"""Call when stream is finished handling event."""
has_active_test = getattr(stream, 'current_test', None)
if has_active_test:
stream.current_test = None
current_test_stack.pop()
[docs]class LiveCheck(faust.App):
"""LiveCheck application."""
SCAN_CATEGORIES = list(SCAN_CATEGORIES) + [SCAN_CASE]
Signal: ClassVar[Type[BaseSignal]]
Signal = Signal
Case: ClassVar[Type[_Case]]
Case = _Case
#: Number of concurrent actors processing signal events.
bus_concurrency: int = 30
#: Number of concurrent actors executing test cases.
test_concurrency: int = 100
#: Unset this if you don't want reports to be sent to
#: the :attr:`report_topic_name` topic.
send_reports: bool = True
test_topic_name: str = 'livecheck'
bus_topic_name: str = 'livecheck-bus'
report_topic_name: str = 'livecheck-report'
cases: Dict[str, _Case]
_resolved_signals: Dict[Tuple[str, str, Any], SignalEvent]
[docs] @classmethod
def for_app(cls, app: AppT, *,
prefix: str = 'livecheck-',
web_port: int = 9999,
test_topic_name: str = None,
bus_topic_name: str = None,
report_topic_name: str = None,
bus_concurrency: int = None,
test_concurrency: int = None,
send_reports: bool = None,
**kwargs: Any) -> 'LiveCheck':
"""Create LiveCheck application targeting specific app.
The target app will be used to configure the LiveCheck app.
"""
app_id, passed_kwargs = app._default_options
livecheck_id = f'{prefix}{app_id}'
override = {
'web_port': web_port,
'test_topic_name': test_topic_name,
'bus_topic_name': bus_topic_name,
'report_topic_name': report_topic_name,
'bus_concurrency': bus_concurrency,
'test_concurrency': test_concurrency,
'send_reports': send_reports,
**kwargs}
options = {**passed_kwargs, **override}
livecheck_app = cls(livecheck_id, **options)
livecheck_app._contribute_to_app(app)
return livecheck_app
def _contribute_to_app(self, app: AppT) -> None:
from .patches.aiohttp import LiveCheckMiddleware
app.web.web_app.middlewares.append(LiveCheckMiddleware())
app.sensors.add(LiveCheckSensor())
app.livecheck = self
def __init__(self,
id: str,
*,
test_topic_name: str = None,
bus_topic_name: str = None,
report_topic_name: str = None,
bus_concurrency: int = None,
test_concurrency: int = None,
send_reports: bool = None,
**kwargs: Any) -> None:
super().__init__(id, **kwargs)
if test_topic_name is not None:
self.test_topic_name = test_topic_name
if bus_topic_name is not None:
self.bus_topic_name = bus_topic_name
if report_topic_name is not None:
self.report_topic_name = report_topic_name
if bus_concurrency is not None:
self.bus_concurrency = bus_concurrency
if test_concurrency is not None:
self.test_concurrency = test_concurrency
if send_reports is not None:
self.send_reports = send_reports
self.cases = {}
self._resolved_signals = {}
patches.patch_all()
self._apply_monkeypatches()
self._connect_signals()
@property
def current_test(self) -> Optional[TestExecution]:
"""Return the current test context (if any)."""
return current_test()
@cached_property
def _can_resolve(self) -> asyncio.Event:
return asyncio.Event()
def _apply_monkeypatches(self) -> None:
patches.patch_all()
def _connect_signals(self) -> None:
AppT.on_produce_message.connect(self.on_produce_attach_test_headers)
[docs] def case(self, *,
name: str = None,
probability: float = None,
warn_stalled_after: Seconds = timedelta(minutes=30),
active: bool = None,
test_expires: Seconds = None,
frequency: Seconds = None,
max_history: int = None,
max_consecutive_failures: int = None,
url_timeout_total: float = None,
url_timeout_connect: float = None,
url_error_retries: float = None,
url_error_delay_min: float = None,
url_error_delay_backoff: float = None,
url_error_delay_max: float = None,
base: Type[_Case] = Case) -> Callable[[Type], _Case]:
"""Decorate class to be used as a test case.
Returns:
:class:`faust.livecheck.Case`.
"""
base_case = base
def _inner(cls: Type) -> _Case:
case_cls = type(cls.__name__, (cls, base_case), {
'__module__': cls.__module__,
'app': self,
})
signal_types = dict(self._extract_signals(case_cls, base_case))
signals = []
for i, (attr_name, attr_type) in enumerate(signal_types.items()):
signal = getattr(case_cls, attr_name, None)
if signal is None:
signal = attr_type(name=attr_name, index=i + 1)
setattr(case_cls, attr_name, signal)
signals.append(signal)
else:
signal.index = i + 1
case = self.add_case(case_cls(
app=self,
name=self._prepare_case_name(name or qualname(cls)),
active=active,
probability=probability,
warn_stalled_after=warn_stalled_after,
signals=signals,
test_expires=test_expires,
frequency=frequency,
max_history=max_history,
max_consecutive_failures=max_consecutive_failures,
url_timeout_total=url_timeout_total,
url_timeout_connect=url_timeout_connect,
url_error_retries=url_error_retries,
url_error_delay_min=url_error_delay_min,
url_error_delay_backoff=url_error_delay_backoff,
url_error_delay_max=url_error_delay_max,
))
venusian.attach(case, category=SCAN_CASE)
return case
return _inner
def _extract_signals(
self,
case_cls: Type[_Case],
base_case: Type[_Case]) -> Iterable[Tuple[str, Type[BaseSignal]]]:
fields, defaults = annotations(
case_cls,
stop=base_case,
skip_classvar=True,
localns={case_cls.__name__: case_cls},
)
for attr_name, attr_type in fields.items():
actual_type = getattr(attr_type, '__origin__', attr_type)
if actual_type is None: # Python <3.7
actual_type = attr_type
try:
if issubclass(actual_type, BaseSignal):
yield attr_name, attr_type
except TypeError:
pass
[docs] def add_case(self, case: _Case) -> _Case:
"""Add and register new test case."""
self.cases[case.name] = case
return case
[docs] async def post_report(self, report: TestReport) -> None:
"""Publish test report to reporting topic."""
key = None
if report.test is not None:
key = report.test.id
await self.reports.send(key=key, value=report)
[docs] async def on_start(self) -> None:
"""Call when LiveCheck application starts."""
await super().on_start()
self._install_bus_agent()
self._install_test_execution_agent()
[docs] async def on_started(self) -> None:
"""Call when LiveCheck application is fully started."""
await super().on_started()
for case in self.cases.values():
await self.add_runtime_dependency(case)
def _install_bus_agent(self) -> AgentT:
return self.agent(
channel=self.bus,
concurrency=self.bus_concurrency,
)(self._populate_signals)
def _install_test_execution_agent(self) -> AgentT:
return self.agent(
channel=self.pending_tests,
concurrency=self.test_concurrency,
)(self._execute_tests)
async def _populate_signals(self, events: StreamT[SignalEvent]) -> None:
async for test_id, event in events.items():
event.case_name = self._prepare_case_name(event.case_name)
try:
case = self.cases[event.case_name]
except KeyError:
self.log.error('Received signal %r for unregistered case %r',
event, (test_id, event.case_name))
else:
await case.resolve_signal(test_id, event)
async def _execute_tests(self, tests: StreamT[TestExecution]) -> None:
async for test_id, test in tests.items():
test.case_name = self._prepare_case_name(test.case_name)
try:
case = self.cases[test.case_name]
except KeyError:
self.log.error('Unregistered test case %r with id %r: %r',
test.case_name, test_id, test)
else:
try:
await case.execute(test)
except LiveCheckError:
pass
def _prepare_case_name(self, name: str) -> str:
if name.startswith('__main__.'):
if not self.conf.origin:
raise RuntimeError('LiveCheck app missing origin argument')
return self.conf.origin + name[8:]
return name
[docs] @cached_property
def bus(self) -> TopicT:
"""Topic used for signal communication."""
return self.topic(
self.bus_topic_name,
key_type=str,
value_type=SignalEvent,
)
[docs] @cached_property
def pending_tests(self) -> TopicT:
"""Topic used to keep pending test executions."""
return self.topic(
self.test_topic_name,
key_type=str,
value_type=TestExecution,
)
[docs] @cached_property
def reports(self) -> TopicT:
"""Topic used to log test reports."""
return self.topic(
self.report_topic_name,
key_type=str,
value_type=TestReport,
)