faust.sensors
¶
-
class
faust.sensors.
Monitor
(*, max_avg_history: int = 100, max_commit_latency_history: int = 30, max_send_latency_history: int = 30, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: List[float] = None, commit_latency: List[float] = None, send_latency: List[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, **kwargs) → None[source]¶ Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
-
max_avg_history
= 0¶ Max number of total run time values to keep to build average.
-
max_commit_latency_history
= 0¶ Max number of commit latency numbers to keep.
-
max_send_latency_history
= 0¶ Max number of send latency numbers to keep.
-
tables
= None¶ Mapping of tables
-
commit_latency
= None¶ List of commit latency values
-
send_latency
= None¶ List of send latency values
-
messages_active
= 0¶ Number of messages currently being processed.
-
messages_received_total
= 0¶ Number of messages processed in total.
-
messages_received_by_topic
= None¶ Count of messages received by topic
-
messages_sent
= 0¶ Number of messages sent in total.
-
messages_sent_by_topic
= None¶ Number of messages sent by topic.
-
messages_s
= 0¶ Number of messages being processed this second.
-
events_active
= 0¶ Number of events currently being processed.
-
events_total
= 0¶ Number of events processed in total.
-
events_by_task
= None¶ Count of events processed by task
-
events_by_stream
= None¶ Count of events processed by stream
-
events_s
= 0¶ Number of events being processed this second.
-
events_runtime_avg
= 0.0¶ Average event runtime over the last second.
-
events_runtime
= None¶ List of run times used for averages
-
topic_buffer_full
= None¶ Counter of times a topics buffer was full
-
metric_counts
= None¶ Arbitrary counts added by apps
-
tp_committed_offsets
= None¶ Last committed offsets by TopicPartition
-
tp_read_offsets
= None¶ Last read offsets by TopicPartition
-
tp_end_offsets
= None¶ Log end offsets by TopicPartition
-
logger
= <Logger faust.sensors.monitor (WARNING)>¶
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
Return type: None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Message sent to a stream as an event.
Return type: None
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.Return type: None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
Return type: None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
Return type: None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
Return type: None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
Return type: None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
Return type: None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
Return type: Any
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
Return type: None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]¶ About to send a message.
Return type: Any
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any) → None[source]¶ Message successfully sent.
Return type: None
-
-
class
faust.sensors.
Sensor
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Base class for sensors.
This sensor does not do anything at all, but can be subclassed to create new monitors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
Return type: None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Message sent to a stream as an event.
Return type: None
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.Return type: None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
Return type: None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
Return type: None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
Return type: None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
Return type: None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
Return type: None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
Return type: Any
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
Return type: None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]¶ About to send a message.
Return type: Any
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any) → None[source]¶ Message successfully sent.
Return type: None
-
logger
= <Logger faust.sensors.base (WARNING)>¶
-
-
class
faust.sensors.
SensorDelegate
(app: faust.types.app.AppT) → None[source]¶ A class that delegates sensor methods to a list of sensors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Return type: None
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]¶ Return type: None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Return type: None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Return type: None
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Return type: None
-
-
class
faust.sensors.
TableState
(table: faust.types.tables.CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) → None[source]¶ Represents the current state of a table.
-
table
= None¶
-
keys_retrieved
= 0¶ Number of times a key has been retrieved from this table.
-
keys_updated
= 0¶ Number of times a key has been created/changed in this table.
-
keys_deleted
= 0¶ Number of times a key has been deleted from this table.
-