Source code for faust.sensors.monitor

"""Monitor - sensor tracking metrics."""
import asyncio

from collections import deque
from http import HTTPStatus
from statistics import median
from time import monotonic
from typing import (
    Any,
    Callable,
    Dict,
    Mapping,
    MutableMapping,
    Optional,
    Tuple,
    cast,
)

from mode import Service, label
from mode.utils.objects import KeywordReduce
from mode.utils.typing import Counter, Deque

from faust import web
from faust.types import AppT, CollectionT, EventT, StreamT, TopicT
from faust.types.assignor import PartitionAssignorT
from faust.types.tuples import Message, PendingMessage, RecordMetadata, TP
from faust.types.transports import ConsumerT, ProducerT
from faust.utils.functional import deque_pushpopmax

from .base import Sensor

__all__ = ['TableState', 'Monitor']

MAX_AVG_HISTORY = 100
MAX_COMMIT_LATENCY_HISTORY = 30
MAX_SEND_LATENCY_HISTORY = 30
MAX_ASSIGNMENT_LATENCY_HISTORY = 30

TPOffsetMapping = MutableMapping[TP, int]
PartitionOffsetMapping = MutableMapping[int, int]
TPOffsetDict = MutableMapping[str, PartitionOffsetMapping]


[docs]class TableState(KeywordReduce): """Represents the current state of a table.""" #: The table this object records statistics for. # the attribute here cannot be None, but need to exist on the # class so that Sphinx finds it, so we cast the None to its type. table: CollectionT = cast(CollectionT, None) #: Number of times a key has been retrieved from this table. keys_retrieved: int = 0 #: Number of times a key has been created/changed in this table. keys_updated: int = 0 #: Number of times a key has been deleted from this table. keys_deleted: int = 0 def __init__(self, table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) -> None: self.table: CollectionT = table self.keys_retrieved = keys_retrieved self.keys_updated = keys_updated self.keys_deleted = keys_deleted
[docs] def asdict(self) -> Mapping: """Return table state as dictionary.""" return { 'keys_retrieved': self.keys_retrieved, 'keys_updated': self.keys_updated, 'keys_deleted': self.keys_deleted, }
def __reduce_keywords__(self) -> Mapping: return {**self.asdict(), 'table': self.table}
[docs]class Monitor(Sensor, KeywordReduce): """Default Faust Sensor. This is the default sensor, recording statistics about events, etc. """ #: Max number of total run time values to keep to build average. max_avg_history: int = MAX_AVG_HISTORY #: Max number of commit latency numbers to keep. max_commit_latency_history: int = MAX_COMMIT_LATENCY_HISTORY #: Max number of send latency numbers to keep. max_send_latency_history: int = MAX_SEND_LATENCY_HISTORY #: Max number of assignment latency numbers to keep. max_assignment_latency_history: int = MAX_ASSIGNMENT_LATENCY_HISTORY #: Mapping of tables tables: MutableMapping[str, TableState] = cast( MutableMapping[str, TableState], None) #: Number of messages currently being processed. messages_active: int = 0 #: Number of messages processed in total. messages_received_total: int = 0 #: Count of messages received by topic messages_received_by_topic: Counter[str] = cast(Counter[str], None) #: Number of messages being processed this second. messages_s: int = 0 #: Number of messages sent in total. messages_sent: int = 0 #: Number of messages sent by topic. messages_sent_by_topic: Counter[str] = cast(Counter[str], None) #: Number of events currently being processed. events_active: int = 0 #: Number of events processed in total. events_total: int = 0 #: Number of events being processed this second. events_s: int = 0 #: Count of events processed by stream events_by_stream: Counter[str] = cast(Counter[str], None) #: Count of events processed by task events_by_task: Counter[str] = cast(Counter[str], None) #: Average event runtime over the last second. events_runtime_avg: float = 0.0 #: Deque of run times used for averages events_runtime: Deque[float] = cast(Deque[float], None) #: Deque of commit latency values commit_latency: Deque[float] = cast(Deque[float], None) #: Deque of send latency values send_latency: Deque[float] = cast(Deque[float], None) #: Deque of assignment latency values. assignment_latency: Deque[float] = cast(Deque[float], None) #: Counter of times a topics buffer was full topic_buffer_full: Counter[TopicT] = cast(Counter[TopicT], None) #: Arbitrary counts added by apps metric_counts: Counter[str] = cast(Counter[str], None) #: Last committed offsets by TopicPartition tp_committed_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) #: Last read offsets by TopicPartition tp_read_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) #: Log end offsets by TopicPartition tp_end_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) #: Number of produce operations that ended in error. send_errors = 0 #: Number of partition assignments completed. assignments_completed = 0 #: Number of partitions assignments that failed. assignments_failed = 0 #: Number of rebalances seen by this worker. rebalances = 0 #: Deque of previous n rebalance return latencies. rebalance_return_latency: Deque[float] = cast(Deque[float], None) #: Deque of previous n rebalance end latencies. rebalance_end_latency: Deque[float] = cast(Deque[float], None) #: Average rebalance return latency. rebalance_return_avg: float = .0 #: Average rebalance end latency. rebalance_end_avg: float = .0 #: Counter of returned HTTP status codes. http_response_codes: Counter[HTTPStatus] = cast(Counter[HTTPStatus], None) #: Deque of previous n HTTP request->response latencies. http_response_latency: Deque[float] = cast(Deque[float], None) #: Average request->response latency. http_response_latency_avg: float = .0 def __init__(self, *, max_avg_history: int = None, max_commit_latency_history: int = None, max_send_latency_history: int = None, max_assignment_latency_history: int = None, messages_sent: int = 0, tables: MutableMapping[str, TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[StreamT] = None, events_by_task: Counter[asyncio.Task] = None, events_runtime: Deque[float] = None, commit_latency: Deque[float] = None, send_latency: Deque[float] = None, assignment_latency: Deque[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[TopicT] = None, rebalances: int = None, rebalance_return_latency: Deque[float] = None, rebalance_end_latency: Deque[float] = None, rebalance_return_avg: float = 0.0, rebalance_end_avg: float = 0.0, time: Callable[[], float] = monotonic, http_response_codes: Counter[HTTPStatus] = None, http_response_latency: Deque[float] = None, http_response_latency_avg: float = 0.0, **kwargs: Any) -> None: if max_avg_history is not None: self.max_avg_history = max_avg_history if max_commit_latency_history is not None: self.max_commit_latency_history = max_commit_latency_history if max_send_latency_history is not None: self.max_send_latency_history = max_send_latency_history if max_assignment_latency_history is not None: self.max_assignment_latency_history = ( max_assignment_latency_history) if rebalances is not None: self.rebalances = rebalances self.tables = {} if tables is None else tables self.commit_latency = ( deque() if commit_latency is None else commit_latency) self.send_latency = deque() if send_latency is None else send_latency self.assignment_latency = ( deque() if assignment_latency is None else assignment_latency) self.rebalance_return_latency = ( deque() if rebalance_return_latency is None else rebalance_return_latency) self.rebalance_end_latency = ( deque() if rebalance_end_latency is None else rebalance_end_latency) self.rebalance_return_avg = rebalance_return_avg self.rebalance_end_avg = rebalance_end_avg self.messages_active = messages_active self.messages_received_total = messages_received_total self.messages_received_by_topic = Counter() self.messages_sent = messages_sent self.messages_sent_by_topic = Counter() self.messages_s = messages_s self.events_active = events_active self.events_total = events_total self.events_by_task = Counter() self.events_by_stream = Counter() self.events_s = events_s self.events_runtime_avg = events_runtime_avg self.events_runtime = ( deque() if events_runtime is None else events_runtime) self.topic_buffer_full = Counter() self.time: Callable[[], float] = time self.http_response_codes = Counter() self.http_response_latency = deque() self.http_response_latency_avg = http_response_latency_avg self.metric_counts = Counter() self.tp_committed_offsets = {} self.tp_read_offsets = {} self.tp_end_offsets = {} Service.__init__(self, **kwargs)
[docs] def secs_since(self, start_time: float) -> float: """Given timestamp start, return number of seconds since that time.""" return self.time() - start_time
[docs] def ms_since(self, start_time: float) -> float: """Given timestamp start, return number of ms since that time.""" return self.secs_to_ms(self.secs_since(start_time))
[docs] def secs_to_ms(self, timestamp: float) -> float: """Convert seconds to milliseconds.""" return timestamp * 1000.
@Service.task async def _sampler(self) -> None: prev_message_total = self.messages_received_total prev_event_total = self.events_total async for sleep_time in self.itertimer(1.0, name='Monitor.sampler'): prev_event_total, prev_message_total = self._sample( prev_event_total, prev_message_total) def _sample(self, prev_event_total: int, prev_message_total: int) -> Tuple[int, int]: # Update average event runtime. if self.events_runtime: self.events_runtime_avg = median(self.events_runtime) # Update events/s self.events_s, prev_event_total = ( self.events_total - prev_event_total, self.events_total, ) # Update messages/s self.messages_s, prev_message_total = ( self.messages_received_total - prev_message_total, self.messages_received_total) if self.rebalance_return_latency: self.rebalance_return_avg = median(self.rebalance_return_latency) if self.rebalance_end_latency: self.rebalance_end_avg = median(self.rebalance_end_latency) if self.http_response_latency: self.http_response_latency_avg = median( self.http_response_latency) return prev_event_total, prev_message_total
[docs] def asdict(self) -> Mapping: """Return monitor state as dictionary.""" return { 'messages_active': self.messages_active, 'messages_received_total': self.messages_received_total, 'messages_sent': self.messages_sent, 'messages_sent_by_topic': self.messages_sent_by_topic, 'messages_s': self.messages_s, 'messages_received_by_topic': self.messages_received_by_topic, 'events_active': self.events_active, 'events_total': self.events_total, 'events_s': self.events_s, 'events_runtime_avg': self.events_runtime_avg, 'events_by_task': self._events_by_task_dict(), 'events_by_stream': self._events_by_stream_dict(), 'commit_latency': self.commit_latency, 'send_latency': self.send_latency, 'send_errors': self.send_errors, 'assignment_latency': self.assignment_latency, 'assignments_completed': self.assignments_completed, 'assignments_failed': self.assignments_failed, 'topic_buffer_full': self._topic_buffer_full_dict(), 'tables': { name: table.asdict() for name, table in self.tables.items() }, 'metric_counts': self._metric_counts_dict(), 'topic_committed_offsets': self._tp_committed_offsets_dict(), 'topic_read_offsets': self._tp_read_offsets_dict(), 'topic_end_offsets': self._tp_end_offsets_dict(), 'rebalances': self.rebalances, 'rebalance_return_latency': self.rebalance_return_latency, 'rebalance_end_latency': self.rebalance_end_latency, 'rebalance_return_avg': self.rebalance_return_avg, 'rebalance_end_avg': self.rebalance_end_avg, 'http_response_codes': self._http_response_codes_dict(), 'http_response_latency': self.http_response_latency, 'http_response_latency_avg': self.http_response_latency_avg, }
def _events_by_stream_dict(self) -> MutableMapping[str, int]: return {label(stream): count for stream, count in self.events_by_stream.items()} def _events_by_task_dict(self) -> MutableMapping[str, int]: return {label(task): count for task, count in self.events_by_task.items()} def _topic_buffer_full_dict(self) -> MutableMapping[str, int]: return {label(topic): count for topic, count in self.topic_buffer_full.items()} def _metric_counts_dict(self) -> MutableMapping[str, int]: return {key: count for key, count in self.metric_counts.items()} def _http_response_codes_dict(self) -> MutableMapping[int, int]: return {int(code): count for code, count in self.http_response_codes.items()} def _tp_committed_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_committed_offsets) def _tp_read_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_read_offsets) def _tp_end_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_end_offsets) @classmethod def _tp_offsets_as_dict(cls, tp_offsets: TPOffsetMapping) -> TPOffsetDict: topic_partition_offsets: TPOffsetDict = {} for tp, offset in tp_offsets.items(): partition_offsets = topic_partition_offsets.get(tp.topic) or {} partition_offsets[tp.partition] = offset topic_partition_offsets[tp.topic] = partition_offsets return topic_partition_offsets
[docs] def on_message_in(self, tp: TP, offset: int, message: Message) -> None: """Call before message is delegated to streams.""" # WARNING: Sensors must never keep a reference to the Message, # as this means the message won't go out of scope! self.messages_received_total += 1 self.messages_active += 1 self.messages_received_by_topic[tp.topic] += 1 self.tp_read_offsets[tp] = offset message.time_in = self.time()
[docs] def on_stream_event_in(self, tp: TP, offset: int, stream: StreamT, event: EventT) -> Optional[Dict]: """Call when stream starts processing an event.""" self.events_total += 1 self.events_by_stream[stream] += 1 self.events_by_task[stream.task_owner] += 1 self.events_active += 1 return { 'time_in': self.time(), 'time_out': None, 'time_total': None, }
[docs] def on_stream_event_out(self, tp: TP, offset: int, stream: StreamT, event: EventT, state: Dict = None) -> None: """Call when stream is done processing an event.""" if state is not None: time_out = self.time() time_in = state['time_in'] time_total = time_out - time_in self.events_active -= 1 state.update( time_out=time_out, time_total=time_total, ) deque_pushpopmax( self.events_runtime, time_total, self.max_avg_history)
[docs] def on_topic_buffer_full(self, topic: TopicT) -> None: """Call when conductor topic buffer is full and has to wait.""" self.topic_buffer_full[topic] += 1
[docs] def on_message_out(self, tp: TP, offset: int, message: Message) -> None: """Call when message is fully acknowledged and can be committed.""" self.messages_active -= 1 time_out = message.time_out = self.time() time_in = message.time_in if time_in is not None: message.time_total = time_out - time_in
[docs] def on_table_get(self, table: CollectionT, key: Any) -> None: """Call when value in table is retrieved.""" self._table_or_create(table).keys_retrieved += 1
[docs] def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None: """Call when new value for key in table is set.""" self._table_or_create(table).keys_updated += 1
[docs] def on_table_del(self, table: CollectionT, key: Any) -> None: """Call when key in a table is deleted.""" self._table_or_create(table).keys_deleted += 1
def _table_or_create(self, table: CollectionT) -> TableState: try: return self.tables[table.name] except KeyError: state = self.tables[table.name] = TableState(table) return state
[docs] def on_commit_initiated(self, consumer: ConsumerT) -> Any: """Consumer is about to commit topic offset.""" return self.time()
[docs] def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None: """Call when consumer commit offset operation completed.""" latency = self.time() - cast(float, state) deque_pushpopmax( self.commit_latency, latency, self.max_commit_latency_history, )
[docs] def on_send_initiated(self, producer: ProducerT, topic: str, message: PendingMessage, keysize: int, valsize: int) -> Any: """Call when message added to producer buffer.""" self.messages_sent += 1 self.messages_sent_by_topic[topic] += 1 return self.time()
[docs] def on_send_completed(self, producer: ProducerT, state: Any, metadata: RecordMetadata) -> None: """Call when producer finished sending message.""" latency = self.time() - cast(float, state) deque_pushpopmax( self.send_latency, latency, self.max_send_latency_history)
[docs] def on_send_error(self, producer: ProducerT, exc: BaseException, state: Any) -> None: """Call when producer was unable to publish message.""" self.send_errors += 1
[docs] def count(self, metric_name: str, count: int = 1) -> None: """Count metric by name.""" self.metric_counts[metric_name] += count
[docs] def on_tp_commit(self, tp_offsets: TPOffsetMapping) -> None: """Call when offset in topic partition is committed.""" self.tp_committed_offsets.update(tp_offsets)
[docs] def track_tp_end_offset(self, tp: TP, offset: int) -> None: """Track new topic partition end offset for monitoring lags.""" self.tp_end_offsets[tp] = offset
[docs] def on_assignment_start(self, assignor: PartitionAssignorT) -> Dict: """Partition assignor is starting to assign partitions.""" return {'time_start': self.time()}
[docs] def on_assignment_error(self, assignor: PartitionAssignorT, state: Dict, exc: BaseException) -> None: """Partition assignor did not complete assignor due to error.""" time_total = self.time() - state['time_start'] deque_pushpopmax( self.assignment_latency, time_total, self.max_assignment_latency_history) self.assignments_failed += 1
[docs] def on_assignment_completed(self, assignor: PartitionAssignorT, state: Dict) -> None: """Partition assignor completed assignment.""" time_total = self.time() - state['time_start'] deque_pushpopmax( self.assignment_latency, time_total, self.max_assignment_latency_history) self.assignments_completed += 1
[docs] def on_rebalance_start(self, app: AppT) -> Dict: """Cluster rebalance in progress.""" self.rebalances = app.rebalancing_count return {'time_start': self.time()}
[docs] def on_rebalance_return(self, app: AppT, state: Dict) -> None: """Consumer replied assignment is done to broker.""" time_start = state['time_start'] time_return = self.time() latency_return = time_return - time_start state.update( time_return=time_return, latency_return=latency_return, ) deque_pushpopmax( self.rebalance_return_latency, latency_return, self.max_avg_history)
[docs] def on_rebalance_end(self, app: AppT, state: Dict) -> None: """Cluster rebalance fully completed (including recovery).""" time_start = state['time_start'] time_end = self.time() latency_end = time_end - time_start state.update( time_end=time_end, latency_end=latency_end, ) deque_pushpopmax( self.rebalance_end_latency, latency_end, self.max_avg_history)
[docs] def on_web_request_start(self, app: AppT, request: web.Request, *, view: web.View = None) -> Dict: """Web server started working on request.""" return {'time_start': self.time()}
[docs] def on_web_request_end(self, app: AppT, request: web.Request, response: Optional[web.Response], state: Dict, *, view: web.View = None) -> None: """Web server finished working on request.""" status_code = HTTPStatus(response.status if response else 500) time_start = state['time_start'] time_end = self.time() latency_end = time_end - time_start state.update( time_end=time_end, latency_end=latency_end, status_code=status_code, ) deque_pushpopmax( self.http_response_latency, latency_end, self.max_avg_history) self.http_response_codes[status_code] += 1