faust.sensors

class faust.sensors.Monitor(*, max_avg_history: int = 100, max_commit_latency_history: int = 30, max_send_latency_history: int = 30, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: List[float] = None, commit_latency: List[float] = None, send_latency: List[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, **kwargs) → None[source]

Default Faust Sensor.

This is the default sensor, recording statistics about events, etc.

max_avg_history = 0

Max number of total run time values to keep to build average.

max_commit_latency_history = 0

Max number of commit latency numbers to keep.

max_send_latency_history = 0

Max number of send latency numbers to keep.

tables = None

Mapping of tables

commit_latency = None

List of commit latency values

send_latency = None

List of send latency values

messages_active = 0

Number of messages currently being processed.

messages_received_total = 0

Number of messages processed in total.

messages_received_by_topic = None

Count of messages received by topic

messages_sent = 0

Number of messages sent in total.

messages_sent_by_topic = None

Number of messages sent by topic.

messages_s = 0

Number of messages being processed this second.

events_active = 0

Number of events currently being processed.

events_total = 0

Number of events processed in total.

events_by_task = None

Count of events processed by task

events_by_stream = None

Count of events processed by stream

events_s = 0

Number of events being processed this second.

events_runtime_avg = 0.0

Average event runtime over the last second.

events_runtime = None

List of run times used for averages

topic_buffer_full = None

Counter of times a topics buffer was full

metric_counts = None

Arbitrary counts added by apps

tp_committed_offsets = None

Last committed offsets by TopicPartition

tp_read_offsets = None

Last read offsets by TopicPartition

tp_end_offsets = None

Log end offsets by TopicPartition

asdict() → Mapping[source]
Return type:Mapping[~KT, +VT_co]
logger = <Logger faust.sensors.monitor (WARNING)>
on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type:None
on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]

Message sent to a stream as an event.

Return type:None
on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type:None
on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type:None
on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type:None
on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type:None
on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type:None
on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type:None
on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type:Any
on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type:None
on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type:Any
on_send_completed(producer: faust.types.transports.ProducerT, state: Any) → None[source]

Message successfully sent.

Return type:None
count(metric_name: str, count: int = 1) → None[source]
Return type:None
on_tp_commit(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]
Return type:None
track_tp_end_offset(tp: faust.types.tuples.TP, offset: int) → None[source]
Return type:None
class faust.sensors.Sensor(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]

Base class for sensors.

This sensor does not do anything at all, but can be subclassed to create new monitors.

on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

Message received by a consumer.

Return type:None
on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]

Message sent to a stream as an event.

Return type:None
on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]

Event was acknowledged by stream.

Notes

Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through on_message_out() just before offsets are committed.

Return type:None
on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]

All streams finished processing message.

Return type:None
on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]

Topic buffer full so conductor had to wait.

Return type:None
on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key retrieved from table.

Return type:None
on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]

Value set for key in table.

Return type:None
on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]

Key deleted from table.

Return type:None
on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]

Consumer is about to commit topic offset.

Return type:Any
on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]

Consumer finished committing topic offset.

Return type:None
on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]

About to send a message.

Return type:Any
on_send_completed(producer: faust.types.transports.ProducerT, state: Any) → None[source]

Message successfully sent.

Return type:None
logger = <Logger faust.sensors.base (WARNING)>
class faust.sensors.SensorDelegate(app: faust.types.app.AppT) → None[source]

A class that delegates sensor methods to a list of sensors.

add(sensor: faust.types.sensors.SensorT) → None[source]
Return type:None
remove(sensor: faust.types.sensors.SensorT) → None[source]
Return type:None
on_message_in(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]
Return type:None
on_stream_event_in(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]
Return type:None
on_stream_event_out(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → None[source]
Return type:None
on_topic_buffer_full(topic: faust.types.topics.TopicT) → None[source]
Return type:None
on_message_out(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]
Return type:None
on_table_get(table: faust.types.tables.CollectionT, key: Any) → None[source]
Return type:None
on_table_set(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]
Return type:None
on_table_del(table: faust.types.tables.CollectionT, key: Any) → None[source]
Return type:None
on_commit_initiated(consumer: faust.types.transports.ConsumerT) → Any[source]
Return type:Any
on_commit_completed(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]
Return type:None
on_send_initiated(producer: faust.types.transports.ProducerT, topic: str, keysize: int, valsize: int) → Any[source]
Return type:Any
on_send_completed(producer: faust.types.transports.ProducerT, state: Any) → None[source]
Return type:None
class faust.sensors.TableState(table: faust.types.tables.CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) → None[source]

Represents the current state of a table.

table = None
keys_retrieved = 0

Number of times a key has been retrieved from this table.

keys_updated = 0

Number of times a key has been created/changed in this table.

keys_deleted = 0

Number of times a key has been deleted from this table.

asdict() → Mapping[source]
Return type:Mapping[~KT, +VT_co]