Sensors - Monitors and Statistics¶
Basics¶
Sensors record information about events in a Faust application as they happen.
You can define custom sensors to record information that you care about, just add it to the list of application sensors. There’s also a default sensor called the “monitor” that record the runtime of messages and events as they go through the worker, the latency of publishing messages, the latency of committing Kafka offsets, and so on.
The web server uses this monitor to present graphs and statistics about your instance, and there’s also a versions of the monitor available that forwards statistics to StatsD, and Datadog.
Monitor¶
The faust.Monitor
is a built-in sensor that captures information like:
Average message processing time (when all agents have processed a message).
Average event processing time (from an event received by an agent to the event is acked.)
The total number of events processed every second.
The total number of events processed every second listed by topic.
The total number of events processed every second listed by agent.
The total number of records written to tables.
Duration of Kafka topic commit operations (latency).
Duration of producing messages (latency).
You can access the state of the monitor, while the worker is running,
in app.monitor
:
@app.agent(app.topic('topic'))
def mytask(events):
async for event in events:
# emit how many events are being processed every second.
print(app.monitor.events_s)
Monitor API Reference¶
Class: Monitor
¶
Monitor Attributes¶
-
class
faust.
Monitor
[source] -
messages_active
Number of messages currently being processed.
-
messages_received_total
Number of messages processed in total.
-
messages_received_by_topic
Count of messages received by topic
-
messages_s
Number of messages being processed this second.
-
events_active
Number of events currently being processed.
-
events_total
Number of events processed in total.
-
events_s
Number of events being processed this second.
-
events_by_stream
Count of events processed by stream
-
events_by_task
Count of events processed by task
-
events_runtime
Deque of run times used for averages
-
events_runtime_avg
Average event runtime over the last second.
-
tables
Mapping of tables
-
commit_latency
Deque of commit latency values
-
send_latency
Deque of send latency values
-
messages_sent
Number of messages sent in total.
-
messages_sent_by_topic
Number of messages sent by topic.
-
Class: TableState
¶
-
class
faust.sensors.
TableState
-
TableState.
table
= None
-
TableState.
keys_retrieved
= 0 Number of times a key has been retrieved from this table.
-
TableState.
keys_updated
= 0 Number of times a key has been created/changed in this table.
-
TableState.
keys_deleted
= 0 Number of times a key has been deleted from this table.
-
Sensor API Reference¶
This reference describes the sensor interface and is useful when you want to build custom sensors.
Methods¶
Message Callbacks¶
-
class
faust.
Sensor
[source] -
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source] Message received by a consumer.
- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source] All streams finished processing message.
- Return type
None
-
Event Callbacks¶
-
class
faust.
Sensor
[source] -
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source] Message sent to a stream as an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source] Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.- Return type
None
-
Table Callbacks¶
-
class
faust.
Sensor
[source] -
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source] Key retrieved from table.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source] Value set for key in table.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source] Key deleted from table.
- Return type
None
-
Operation Callbacks¶
-
class
faust.
Sensor
[source] -
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source] Consumer is about to commit topic offset.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source] Consumer finished committing topic offset.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source] About to send a message.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source] Message successfully sent.
- Return type
None
-