faust.sensors
¶
Sensors.
-
class
faust.sensors.
Sensor
(*, beacon: mode.utils.types.trees.NodeT = None, loop: asyncio.events.AbstractEventLoop = None) → None[source]¶ Base class for sensors.
This sensor does not do anything at all, but can be subclassed to create new monitors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Message received by a consumer.
- Return type
None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Message sent to a stream as an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Event was acknowledged by stream.
Notes
Acknowledged means a stream finished processing the event, but given that multiple streams may be handling the same event, the message cannot be committed before all streams have processed it. When all streams have acknowledged the event, it will go through
on_message_out()
just before offsets are committed.- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ All streams finished processing message.
- Return type
None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Topic buffer full so conductor had to wait.
- Return type
None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key retrieved from table.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Value set for key in table.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Key deleted from table.
- Return type
None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Consumer finished committing topic offset.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ About to send a message.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Message successfully sent.
- Return type
None
-
on_send_error
(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Error while sending message.
- Return type
None
-
on_assignment_start
(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict
[~KT, ~VT]
-
on_assignment_error
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start
(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict
[~KT, ~VT]
-
on_rebalance_return
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-
on_web_request_start
(app: faust.types.app.AppT, request: faust.web.base.Request, *, view: faust.web.views.View = None) → Dict[source]¶ Web server started working on request.
- Return type
Dict
[~KT, ~VT]
-
on_web_request_end
(app: faust.types.app.AppT, request: faust.web.base.Request, response: Optional[faust.web.base.Response], state: Dict, *, view: faust.web.views.View = None) → None[source]¶ Web server finished working on request.
- Return type
None
-
logger
= <Logger faust.sensors.base (WARNING)>¶
-
-
class
faust.sensors.
SensorDelegate
(app: faust.types.app.AppT) → None[source]¶ A class that delegates sensor methods to a list of sensors.
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call before message is delegated to streams.
- Return type
None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Call when stream starts processing an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Call when stream is done processing an event.
- Return type
None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Call when conductor topic buffer is full and has to wait.
- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call when message is fully acknowledged and can be committed.
- Return type
None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when value in table is retrieved.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Call when new value for key in table is set.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when key in a table is deleted.
- Return type
None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Call when consumer commit offset operation starts.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Call when consumer commit offset operation completed.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ Call when message added to producer buffer.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Call when producer finished sending message.
- Return type
None
-
on_send_error
(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Call when producer was unable to publish message.
- Return type
None
-
on_assignment_start
(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict
[~KT, ~VT]
-
on_assignment_error
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start
(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict
[~KT, ~VT]
-
on_rebalance_return
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-
-
class
faust.sensors.
Monitor
(*, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, faust.sensors.monitor.TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[faust.types.streams.StreamT] = None, events_by_task: Counter[_asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[faust.types.topics.TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[float] = <built-in function monotonic>, http_response_codes: Counter[http.HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) → None[source]¶ Default Faust Sensor.
This is the default sensor, recording statistics about events, etc.
-
send_errors
= 0¶ Number of produce operations that ended in error.
-
assignments_completed
= 0¶ Number of partition assignments completed.
-
assignments_failed
= 0¶ Number of partitions assignments that failed.
-
max_avg_history
= 100¶ Max number of total run time values to keep to build average.
-
max_commit_latency_history
= 30¶ Max number of commit latency numbers to keep.
-
max_send_latency_history
= 30¶ Max number of send latency numbers to keep.
-
max_assignment_latency_history
= 30¶ Max number of assignment latency numbers to keep.
-
rebalances
= 0¶ Number of rebalances seen by this worker.
-
tables
= None¶ Mapping of tables
-
commit_latency
= None¶ Deque of commit latency values
-
send_latency
= None¶ Deque of send latency values
-
assignment_latency
= None¶ Deque of assignment latency values.
-
rebalance_return_latency
= None¶ Deque of previous n rebalance return latencies.
-
rebalance_end_latency
= None¶ Deque of previous n rebalance end latencies.
-
rebalance_return_avg
= 0.0¶ Average rebalance return latency.
-
rebalance_end_avg
= 0.0¶ Average rebalance end latency.
-
messages_active
= 0¶ Number of messages currently being processed.
-
messages_received_total
= 0¶ Number of messages processed in total.
-
messages_received_by_topic
= None¶ Count of messages received by topic
-
messages_sent
= 0¶ Number of messages sent in total.
-
messages_sent_by_topic
= None¶ Number of messages sent by topic.
-
messages_s
= 0¶ Number of messages being processed this second.
-
events_active
= 0¶ Number of events currently being processed.
-
events_total
= 0¶ Number of events processed in total.
-
events_by_task
= None¶ Count of events processed by task
-
events_by_stream
= None¶ Count of events processed by stream
-
events_s
= 0¶ Number of events being processed this second.
-
events_runtime_avg
= 0.0¶ Average event runtime over the last second.
-
events_runtime
= None¶ Deque of run times used for averages
-
topic_buffer_full
= None¶ Counter of times a topics buffer was full
-
http_response_codes
= None¶ Counter of returned HTTP status codes.
-
http_response_latency
= None¶ Deque of previous n HTTP request->response latencies.
-
http_response_latency_avg
= 0.0¶ Average request->response latency.
-
metric_counts
= None¶ Arbitrary counts added by apps
-
tp_committed_offsets
= None¶ Last committed offsets by TopicPartition
-
tp_read_offsets
= None¶ Last read offsets by TopicPartition
-
tp_end_offsets
= None¶ Log end offsets by TopicPartition
-
secs_since
(start_time: float) → float[source]¶ Given timestamp start, return number of seconds since that time.
- Return type
-
ms_since
(start_time: float) → float[source]¶ Given timestamp start, return number of ms since that time.
- Return type
-
logger
= <Logger faust.sensors.monitor (WARNING)>¶
-
on_message_in
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call before message is delegated to streams.
- Return type
None
-
on_stream_event_in
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT) → Optional[Dict][source]¶ Call when stream starts processing an event.
-
on_stream_event_out
(tp: faust.types.tuples.TP, offset: int, stream: faust.types.streams.StreamT, event: faust.types.events.EventT, state: Dict = None) → None[source]¶ Call when stream is done processing an event.
- Return type
None
-
on_topic_buffer_full
(topic: faust.types.topics.TopicT) → None[source]¶ Call when conductor topic buffer is full and has to wait.
- Return type
None
-
on_message_out
(tp: faust.types.tuples.TP, offset: int, message: faust.types.tuples.Message) → None[source]¶ Call when message is fully acknowledged and can be committed.
- Return type
None
-
on_table_get
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when value in table is retrieved.
- Return type
None
-
on_table_set
(table: faust.types.tables.CollectionT, key: Any, value: Any) → None[source]¶ Call when new value for key in table is set.
- Return type
None
-
on_table_del
(table: faust.types.tables.CollectionT, key: Any) → None[source]¶ Call when key in a table is deleted.
- Return type
None
-
on_commit_initiated
(consumer: faust.types.transports.ConsumerT) → Any[source]¶ Consumer is about to commit topic offset.
- Return type
-
on_commit_completed
(consumer: faust.types.transports.ConsumerT, state: Any) → None[source]¶ Call when consumer commit offset operation completed.
- Return type
None
-
on_send_initiated
(producer: faust.types.transports.ProducerT, topic: str, message: faust.types.tuples.PendingMessage, keysize: int, valsize: int) → Any[source]¶ Call when message added to producer buffer.
- Return type
-
on_send_completed
(producer: faust.types.transports.ProducerT, state: Any, metadata: faust.types.tuples.RecordMetadata) → None[source]¶ Call when producer finished sending message.
- Return type
None
-
on_send_error
(producer: faust.types.transports.ProducerT, exc: BaseException, state: Any) → None[source]¶ Call when producer was unable to publish message.
- Return type
None
-
on_tp_commit
(tp_offsets: MutableMapping[faust.types.tuples.TP, int]) → None[source]¶ Call when offset in topic partition is committed.
- Return type
None
-
track_tp_end_offset
(tp: faust.types.tuples.TP, offset: int) → None[source]¶ Track new topic partition end offset for monitoring lags.
- Return type
None
-
on_assignment_start
(assignor: faust.types.assignor.PartitionAssignorT) → Dict[source]¶ Partition assignor is starting to assign partitions.
- Return type
Dict
[~KT, ~VT]
-
on_assignment_error
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict, exc: BaseException) → None[source]¶ Partition assignor did not complete assignor due to error.
- Return type
None
-
on_assignment_completed
(assignor: faust.types.assignor.PartitionAssignorT, state: Dict) → None[source]¶ Partition assignor completed assignment.
- Return type
None
-
on_rebalance_start
(app: faust.types.app.AppT) → Dict[source]¶ Cluster rebalance in progress.
- Return type
Dict
[~KT, ~VT]
-
on_rebalance_return
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Consumer replied assignment is done to broker.
- Return type
None
-
on_rebalance_end
(app: faust.types.app.AppT, state: Dict) → None[source]¶ Cluster rebalance fully completed (including recovery).
- Return type
None
-
-
class
faust.sensors.
TableState
(table: faust.types.tables.CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) → None[source]¶ Represents the current state of a table.
-
table
= None¶
-
keys_retrieved
= 0¶ Number of times a key has been retrieved from this table.
-
keys_updated
= 0¶ Number of times a key has been created/changed in this table.
-
keys_deleted
= 0¶ Number of times a key has been deleted from this table.
-