faust.sensors.monitor¶
Monitor - sensor tracking metrics.
-
class
faust.sensors.monitor.TableState(table: faust.types.tables.CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) → None[source]¶ Represents the current state of a table.
-
table= None¶
-
keys_retrieved= 0¶ Number of times a key has been retrieved from this table.
-
keys_updated= 0¶ Number of times a key has been created/changed in this table.
-
keys_deleted= 0¶ Number of times a key has been deleted from this table.
-
-
class
faust.sensors.monitor.Monitor(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, http_response_codes: Counter[http.HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) → None[source]¶ Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
-
send_errors= 0¶ Number of produce operations that ended in error.
-
assignments_completed= 0¶ Number of partition assignments completed.
-
assignments_failed= 0¶ Number of partitions assignments that failed.
-
max_avg_history= 100¶ Max number of total run time values to keep to build average.
-
max_commit_latency_history= 30¶ Max number of commit latency numbers to keep.
-
max_send_latency_history= 30¶ Max number of send latency numbers to keep.
-
max_assignment_latency_history= 30¶ Max number of assignment latency numbers to keep.
-
rebalances= 0¶ Number of rebalances seen by this worker.
-
tables= None¶ Mapping of tables
-
commit_latency= None¶ Deque of commit latency values
-
send_latency= None¶ Deque of send latency values
-
assignment_latency= None¶ Deque of assignment latency values.
-
rebalance_return_latency= None¶ Deque of previous n rebalance return latencies.
-
rebalance_end_latency= None¶ Deque of previous n rebalance end latencies.
-
rebalance_return_avg= 0.0¶ Average rebalance return latency.
-
rebalance_end_avg= 0.0¶ Average rebalance end latency.
-
messages_active= 0¶ Number of messages currently being processed.
-
messages_received_total= 0¶ Number of messages processed in total.
-
messages_received_by_topic= None¶ Count of messages received by topic
-
messages_sent= 0¶ Number of messages sent in total.
-
messages_sent_by_topic= None¶ Number of messages sent by topic.
-
messages_s= 0¶ Number of messages being processed this second.
-
events_active= 0¶ Number of events currently being processed.
-
events_total= 0¶ Number of events processed in total.
-
events_by_task= None¶ Count of events processed by task
-
events_by_stream= None¶ Count of events processed by stream
-
events_s= 0¶ Number of events being processed this second.
-
events_runtime_avg= 0.0¶ Average event runtime over the last second.
-
events_runtime= None¶ Deque of run times used for averages
-
topic_buffer_full= None¶ Counter of times a topics buffer was full
-
http_response_codes= None¶ Counter of returned HTTP status codes.
-
http_response_latency= None¶ Deque of previous n HTTP request->response latencies.
-
http_response_latency_avg= 0.0¶ Average request->response latency.
-
metric_counts= None¶ Arbitrary counts added by apps
-
tp_committed_offsets= None¶ Last committed offsets by TopicPartition
-
tp_read_offsets= None¶ Last read offsets by TopicPartition
-
tp_end_offsets= None¶ Log end offsets by TopicPartition
-
secs_since(start_time: float) → float[source]¶ Given timestamp start, return number of seconds since that time.
- Return type
-
ms_since(start_time: float) → float[source]¶ Given timestamp start, return number of ms since that time.
- Return type
-
logger= <Logger faust.sensors.monitor (WARNING)>¶
-
on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call before message is delegated to streams.
- Return type
None
-
on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Call when stream starts processing an event.
-
on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Call when stream is done processing an event.
- Return type
None
-
on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]¶ Call when conductor topic buffer is full and has to wait.
- Return type
None
-
on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call when message is fully acknowledged and can be committed.
- Return type
None
-
on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when value in table is retrieved.
- Return type
None
-
on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Call when new value for key in table is set.
- Return type
None
-
on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when key in a table is deleted.
- Return type
None
-
on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
- Return type
-
on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Call when consumer commit offset operation completed.
- Return type
None
-
on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ Call when message added to producer buffer.
- Return type
-
on_send_completed(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Call when producer finished sending message.
- Return type
None
-
on_send_error(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Call when producer was unable to publish message.
- Return type
None
-
on_tp_commit(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]¶ Call when offset in topic partition is committed.
- Return type
None
-
track_tp_end_offset(tp: faust.types.tuples.TP, offset: int) → None[source]¶ Track new topic partition end offset for monitoring lags.
- Return type
None
-
on_assignment_start(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict[~KT, ~VT]
-
on_assignment_error(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict[~KT, ~VT]
-
on_rebalance_return(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-