faust.sensors.monitor

Monitor - sensor tracking metrics.

class faust.sensors.monitor.TableState(table: faust.types.tables.CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) → None[source]

Represents the current state of a table.

table = None
keys_retrieved = 0

Number of times a key has been retrieved from this table.

keys_updated = 0

Number of times a key has been created/changed in this table.

keys_deleted = 0

Number of times a key has been deleted from this table.

asdict() → Mapping[source]

Return table state as dictionary.

Return type

Mapping[~KT, +VT_co]

class faust.sensors.monitor.Monitor(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, http_response_codes: Counter[http.HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) → None[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

send_errors = 0

Number of produce operations that ended in error.

assignments_completed = 0

Number of partition assignments completed.

assignments_failed = 0

Number of partitions assignments that failed.

max_avg_history = 100

Max number of total run time values to keep to build average.

max_commit_latency_history = 30

Max number of commit latency numbers to keep.

max_send_latency_history = 30

Max number of send latency numbers to keep.

max_assignment_latency_history = 30

Max number of assignment latency numbers to keep.

rebalances = 0

Number of rebalances seen by this worker.

tables = None

Mapping of tables

commit_latency = None

Deque of commit latency values

send_latency = None

Deque of send latency values

assignment_latency = None

Deque of assignment latency values.

rebalance_return_latency = None

Deque of previous n rebalance return latencies.

rebalance_end_latency = None

Deque of previous n rebalance end latencies.

rebalance_return_avg = 0.0

Average rebalance return latency.

rebalance_end_avg = 0.0

Average rebalance end latency.

messages_active = 0

Number of messages currently being processed.

messages_received_total = 0

Number of messages processed in total.

messages_received_by_topic = None

Count of messages received by topic

messages_sent = 0

Number of messages sent in total.

messages_sent_by_topic = None

Number of messages sent by topic.

messages_s = 0

Number of messages being processed this second.

events_active = 0

Number of events currently being processed.

events_total = 0

Number of events processed in total.

events_by_task = None

Count of events processed by task

events_by_stream = None

Count of events processed by stream

events_s = 0

Number of events being processed this second.

events_runtime_avg = 0.0

Average event runtime over the last second.

events_runtime = None

Deque of run times used for averages

topic_buffer_full = None

Counter of times a topics buffer was full

http_response_codes = None

Counter of returned HTTP status codes.

http_response_latency = None

Deque of previous n HTTP request->response latencies.

http_response_latency_avg = 0.0

Average request->response latency.

metric_counts = None

Arbitrary counts added by apps

tp_committed_offsets = None

Last committed offsets by TopicPartition

tp_read_offsets = None

Last read offsets by TopicPartition

tp_end_offsets = None

Log end offsets by TopicPartition

secs_since(start_time: float) → float[source]

Given timestamp start, return number of seconds since that time.

Return type

float

ms_since(start_time: float) → float[source]

Given timestamp start, return number of ms since that time.

Return type

float

logger = <Logger faust.sensors.monitor (WARNING)>
secs_to_ms(timestamp: float) → float[source]

Convert seconds to milliseconds.

Return type

float

asdict() → Mapping[source]

Return monitor state as dictionary.

Return type

Mapping[~KT, +VT_co]

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call before message is delegated to streams.

Return type

None

on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]

Call when stream starts processing an event.

Return type

Optional[Dict[~KT, ~VT]]

on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]

Call when stream is done processing an event.

Return type

None

on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Call when conductor topic buffer is full and has to wait.

Return type

None

on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Call when message is fully acknowledged and can be committed.

Return type

None

on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when value in table is retrieved.

Return type

None

on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Call when new value for key in table is set.

Return type

None

on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Call when key in a table is deleted.

Return type

None

on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type

Any

on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Call when consumer commit offset operation completed.

Return type

None

on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]

Call when message added to producer buffer.

Return type

Any

on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]

Call when producer finished sending message.

Return type

None

on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]

Call when producer was unable to publish message.

Return type

None

count(metric_name: str, count: int = 1) → None[source]

Count metric by name.

Return type

None

on_tp_commit(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]

Call when offset in topic partition is committed.

Return type

None

track_tp_end_offset(tp: faust.types.tuples.TP, offset: int) → None[source]

Track new topic partition end offset for monitoring lags.

Return type

None

on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]

Partition assignor is starting to assign partitions.

Return type

Dict[~KT, ~VT]

on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]

Partition assignor did not complete assignor due to error.

Return type

None

on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]

Partition assignor completed assignment.

Return type

None

on_rebalance_start(app: faust.types.app.AppT) → Dict[source]

Cluster rebalance in progress.

Return type

Dict[~KT, ~VT]

on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]

Consumer replied assignment is done to broker.

Return type

None

on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]

Cluster rebalance fully completed (including recovery).

Return type

None

on_web_request_start(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]

Web server started working on request.

Return type

Dict[~KT, ~VT]

on_web_request_end(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]

Web server finished working on request.

Return type

None