faust.livecheck

LiveCheck - End-to-end testing of asynchronous systems.

class faust.livecheck.LiveCheck(id: str, *, test_topic_name: str = None, bus_topic_name: str = None, report_topic_name: str = None, bus_concurrency: int = None, test_concurrency: int = None, send_reports: bool = None, **kwargs) → None[source]

LiveCheck application.

SCAN_CATEGORIES = ['faust.agent', 'faust.command', 'faust.page', 'faust.service', 'faust.task', 'livecheck.case']
class Signal(name: str = '', case: faust.livecheck.signals._Case = None, index: int = -1) → None

Signal for test case using Kafka.

Used to wait for something to happen elsewhere.

class Case(*, app: faust.livecheck.case._LiveCheck, name: str, probability: float = None, warn_stalled_after: Union[datetime.timedelta, float, str] = None, active: bool = None, signals: Iterable[faust.livecheck.signals.BaseSignal] = None, test_expires: Union[datetime.timedelta, float, str] = None, frequency: Union[datetime.timedelta, float, str] = None, realtime_logs: bool = None, max_history: int = None, max_consecutive_failures: int = None, url_timeout_total: float = None, url_timeout_connect: float = None, url_error_retries: int = None, url_error_delay_min: float = None, url_error_delay_backoff: float = None, url_error_delay_max: float = None, **kwargs) → None

LiveCheck test case.

Runner

alias of faust.livecheck.runners.TestRunner

active = True
consecutive_failures = 0
property current_execution

Return the currently executing TestRunner in this task. :rtype: Optional[TestRunner]

property current_test

Return the currently active test in this task (if any). :rtype: Optional[TestExecution]

frequency = None
frequency_avg = None
property label

Return human-readable label for this test case. :rtype: str

last_fail = None
last_test_received = None
latency_avg = None
logger = <Logger faust.livecheck.case (WARNING)>
max_consecutive_failures = 30
max_history = 100
maybe_trigger(id: str = None, *args, **kwargs) → AsyncGenerator[Optional[faust.livecheck.models.TestExecution], None]

Schedule test execution, or not, based on probability setting.

Return type

AsyncGenerator[Optional[TestExecution], None]

probability = 0.5
realtime_logs = False
runtime_avg = None
property seconds_since_last_fail

Return number of seconds since any test failed. :rtype: Optional[float]

state = 'INIT'
state_transition_delay = 60.0
test_expires = datetime.timedelta(0, 10800)
total_failures = 0
url_error_delay_backoff = 1.5
url_error_delay_max = 5.0
url_error_delay_min = 0.5
url_error_retries = 10
url_timeout_connect = None
url_timeout_total = 300.0
warn_stalled_after = 1800.0
classmethod for_app(app: faust.types.app.AppT, *, prefix: str = 'livecheck-', web_port: int = 9999, test_topic_name: str = None, bus_topic_name: str = None, report_topic_name: str = None, bus_concurrency: int = None, test_concurrency: int = None, send_reports: bool = None, **kwargs) → faust.livecheck.app.LiveCheck[source]

Create LiveCheck application targeting specific app.

The target app will be used to configure the LiveCheck app.

Return type

LiveCheck[]

test_topic_name = 'livecheck'
bus_topic_name = 'livecheck-bus'
report_topic_name = 'livecheck-report'
bus_concurrency = 30

Number of concurrent actors processing signal events.

test_concurrency = 100

Number of concurrent actors executing test cases.

send_reports = True

Unset this if you don’t want reports to be sent to the report_topic_name topic.

property current_test

Return the current test context (if any). :rtype: Optional[TestExecution]

on_produce_attach_test_headers(sender: faust.types.app.AppT, key: bytes = None, value: bytes = None, partition: int = None, timestamp: float = None, headers: List[Tuple[str, bytes]] = None, **kwargs) → None[source]

Attach test headers to Kafka produce requests.

Return type

None

case(*, name: str = None, probability: float = None, warn_stalled_after: Union[datetime.timedelta, float, str] = datetime.timedelta(0, 1800), active: bool = None, test_expires: Union[datetime.timedelta, float, str] = None, frequency: Union[datetime.timedelta, float, str] = None, max_history: int = None, max_consecutive_failures: int = None, url_timeout_total: float = None, url_timeout_connect: float = None, url_error_retries: float = None, url_error_delay_min: float = None, url_error_delay_backoff: float = None, url_error_delay_max: float = None, base: Type[faust.livecheck.case.Case] = faust.livecheck.case.Case) → Callable[Type, faust.livecheck.case.Case][source]

Decorate class to be used as a test case.

Return type

Callable[[Type[+CT_co]], Case[]]

Returns

faust.livecheck.Case.

add_case(case: faust.livecheck.case.Case) → faust.livecheck.case.Case[source]

Add and register new test case.

Return type

Case[]

logger = <Logger faust.livecheck.app (WARNING)>
bus[source]

Topic used for signal communication.

pending_tests[source]

Topic used to keep pending test executions.

reports[source]

Topic used to log test reports.

class faust.livecheck.Case(*, app: faust.livecheck.case._LiveCheck, name: str, probability: float = None, warn_stalled_after: Union[datetime.timedelta, float, str] = None, active: bool = None, signals: Iterable[faust.livecheck.signals.BaseSignal] = None, test_expires: Union[datetime.timedelta, float, str] = None, frequency: Union[datetime.timedelta, float, str] = None, realtime_logs: bool = None, max_history: int = None, max_consecutive_failures: int = None, url_timeout_total: float = None, url_timeout_connect: float = None, url_error_retries: int = None, url_error_delay_min: float = None, url_error_delay_backoff: float = None, url_error_delay_max: float = None, **kwargs) → None[source]

LiveCheck test case.

Runner

alias of faust.livecheck.runners.TestRunner

state = 'INIT'

Current state of this test.

last_test_received = None

The warn_stalled_after timer uses this to keep track of either when a test was last received, or the last time the timer timed out.

last_fail = None

Timestamp of when the suite last failed.

runtime_avg = None
latency_avg = None
frequency_avg = None
state_transition_delay = 60.0
consecutive_failures = 0
total_failures = 0
name = None

Name of the test If not set this will be generated out of the subclass name.

active = True
probability = 0.5

Probability of test running when live traffic is going through.

warn_stalled_after = 1800.0

Timeout in seconds for when after we warn that nothing is processing.

test_expires = datetime.timedelta(0, 10800)
frequency = None

How often we execute the test using fake data (define Case.make_fake_request()).

Set to None if production traffic is frequent enough to satisfy warn_stalled_after.

realtime_logs = False
max_history = 100

Max items to store in latency_history and runtime_history.

max_consecutive_failures = 30
url_timeout_total = 300.0
url_timeout_connect = None
url_error_retries = 10
url_error_delay_min = 0.5
url_error_delay_backoff = 1.5
url_error_delay_max = 5.0
maybe_trigger(id: str = None, *args, **kwargs) → AsyncGenerator[Optional[faust.livecheck.models.TestExecution], None][source]

Schedule test execution, or not, based on probability setting.

Return type

AsyncGenerator[Optional[TestExecution], None]

logger = <Logger faust.livecheck.case (WARNING)>
property seconds_since_last_fail

Return number of seconds since any test failed. :rtype: Optional[float]

property current_test

Return the currently active test in this task (if any). :rtype: Optional[TestExecution]

property current_execution

Return the currently executing TestRunner in this task. :rtype: Optional[TestRunner]

property label

Return human-readable label for this test case. :rtype: str

class faust.livecheck.Signal(name: str = '', case: faust.livecheck.signals._Case = None, index: int = -1) → None[source]

Signal for test case using Kafka.

Used to wait for something to happen elsewhere.

class faust.livecheck.TestRunner(case: faust.livecheck.runners._Case, test: faust.livecheck.models.TestExecution, started: float) → None[source]

Execute and keep track of test instance.

state = 'INIT'
report = None
error = None
log_info(msg: str, *args) → None[source]

Log information related to the current execution.

Return type

None

end() → None[source]

End test execution.

Return type

None

faust.livecheck.current_test() → Optional[faust.livecheck.models.TestExecution][source]

Return information about the current test (if any).

Return type

Optional[TestExecution]