Source code for faust.sensors.monitor

"""Monitor - sensor tracking metrics."""
import asyncio
import statistics
from time import monotonic
from typing import Any, Callable, List, Mapping, MutableMapping, cast

from mode import Service, ServiceT, label
from mode.proxy import ServiceProxy
from mode.utils.compat import Counter
from mode.utils.objects import KeywordReduce, cached_property

from faust.types import CollectionT, EventT, Message, StreamT, TP, TopicT
from faust.types.transports import ConsumerT, ProducerT

from .base import Sensor

__all__ = [
    'TableState',
    'Monitor',
    'MonitorService',
]

MAX_AVG_HISTORY = 100
MAX_COMMIT_LATENCY_HISTORY = 30
MAX_SEND_LATENCY_HISTORY = 30

TPOffsetMapping = MutableMapping[TP, int]
PartitionOffsetMapping = MutableMapping[int, int]
TPOffsetDict = MutableMapping[str, PartitionOffsetMapping]


[docs]class TableState(KeywordReduce): """Represents the current state of a table.""" #: The table this object records statistics for. # the attribute here cannot be None, but need to exist on the # class so that Sphinx finds it, so we cast the None to its type. table: CollectionT = cast(CollectionT, None) #: Number of times a key has been retrieved from this table. keys_retrieved: int = 0 #: Number of times a key has been created/changed in this table. keys_updated: int = 0 #: Number of times a key has been deleted from this table. keys_deleted: int = 0 def __init__(self, table: CollectionT, *, keys_retrieved: int = 0, keys_updated: int = 0, keys_deleted: int = 0) -> None: self.table: CollectionT = table self.keys_retrieved = keys_retrieved self.keys_updated = keys_updated self.keys_deleted = keys_deleted
[docs] def asdict(self) -> Mapping: return { 'keys_retrieved': self.keys_retrieved, 'keys_updated': self.keys_updated, 'keys_deleted': self.keys_deleted, }
def __reduce_keywords__(self) -> Mapping: return {**self.asdict(), 'table': self.table}
[docs]class Monitor(ServiceProxy, Sensor, KeywordReduce): """Default Faust Sensor. This is the default sensor, recording statistics about events, etc. """ #: Max number of total run time values to keep to build average. max_avg_history: int = 0 #: Max number of commit latency numbers to keep. max_commit_latency_history: int = 0 #: Max number of send latency numbers to keep. max_send_latency_history: int = 0 #: Mapping of tables tables: MutableMapping[str, TableState] = cast( MutableMapping[str, TableState], None) #: Number of messages currently being processed. messages_active: int = 0 #: Number of messages processed in total. messages_received_total: int = 0 #: Count of messages received by topic messages_received_by_topic: Counter[str] = cast(Counter[str], None) #: Number of messages being processed this second. messages_s: int = 0 #: Number of messages sent in total. messages_sent: int = 0 #: Number of messages sent by topic. messages_sent_by_topic: Counter[str] = cast(Counter[str], None) #: Number of events currently being processed. events_active: int = 0 #: Number of events processed in total. events_total: int = 0 #: Number of events being processed this second. events_s: int = 0 #: Count of events processed by stream events_by_stream: Counter[str] = cast(Counter[str], None) #: Count of events processed by task events_by_task: Counter[str] = cast(Counter[str], None) #: Average event runtime over the last second. events_runtime_avg: float = 0.0 #: List of run times used for averages events_runtime: List[float] = cast(List[float], None) #: List of commit latency values commit_latency: List[float] = cast(List[float], None) #: List of send latency values send_latency: List[float] = cast(List[float], None) #: Counter of times a topics buffer was full topic_buffer_full: Counter[TopicT] = cast(Counter[TopicT], None) #: Arbitrary counts added by apps metric_counts: Counter[str] = cast(Counter[str], None) #: Last committed offsets by TopicPartition tp_committed_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) #: Last read offsets by TopicPartition tp_read_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) #: Log end offsets by TopicPartition tp_end_offsets: TPOffsetMapping = cast(TPOffsetMapping, None) def __init__(self, *, max_avg_history: int = MAX_AVG_HISTORY, max_commit_latency_history: int = MAX_COMMIT_LATENCY_HISTORY, max_send_latency_history: int = MAX_SEND_LATENCY_HISTORY, messages_sent: int = 0, tables: MutableMapping[str, TableState] = None, messages_active: int = 0, events_active: int = 0, messages_received_total: int = 0, messages_received_by_topic: Counter[str] = None, events_total: int = 0, events_by_stream: Counter[StreamT] = None, events_by_task: Counter[asyncio.Task] = None, events_runtime: List[float] = None, commit_latency: List[float] = None, send_latency: List[float] = None, events_s: int = 0, messages_s: int = 0, events_runtime_avg: float = 0.0, topic_buffer_full: Counter[TopicT] = None, **kwargs: Any) -> None: self.max_avg_history = max_avg_history self.max_commit_latency_history = max_commit_latency_history self.max_send_latency_history = max_send_latency_history self.tables = {} if tables is None else tables self.commit_latency = [] if commit_latency is None else commit_latency self.send_latency = [] if send_latency is None else send_latency self.messages_active = messages_active self.messages_received_total = messages_received_total self.messages_received_by_topic = Counter() self.messages_sent = messages_sent self.messages_sent_by_topic = Counter() self.messages_s = messages_s self.events_active = events_active self.events_total = events_total self.events_by_task = Counter() self.events_by_stream = Counter() self.events_s = events_s self.events_runtime_avg = events_runtime_avg self.events_runtime = [] if events_runtime is None else events_runtime self.topic_buffer_full = Counter() self.time: Callable[[], float] = monotonic self.metric_counts = Counter() self.tp_committed_offsets = {} self.tp_read_offsets = {} self.tp_end_offsets = {}
[docs] def asdict(self) -> Mapping: return { 'messages_active': self.messages_active, 'messages_received_total': self.messages_received_total, 'messages_sent': self.messages_sent, 'messages_sent_by_topic': self.messages_sent_by_topic, 'messages_s': self.messages_s, 'messages_received_by_topic': self.messages_received_by_topic, 'events_active': self.events_active, 'events_total': self.events_total, 'events_s': self.events_s, 'events_runtime_avg': self.events_runtime_avg, 'events_by_task': self._events_by_task_dict(), 'events_by_stream': self._events_by_stream_dict(), 'commit_latency': self.commit_latency, 'send_latency': self.send_latency, 'topic_buffer_full': self._topic_buffer_full_dict(), 'tables': { name: table.asdict() for name, table in self.tables.items() }, 'metric_counts': self._metric_counts_dict(), 'topic_committed_offsets': self._tp_committed_offsets_dict(), 'topic_read_offsets': self._tp_read_offsets_dict(), 'topic_end_offsets': self._tp_end_offsets_dict(), }
def _events_by_stream_dict(self) -> MutableMapping[str, int]: return {label(stream): count for stream, count in self.events_by_stream.items()} def _events_by_task_dict(self) -> MutableMapping[str, int]: return {label(task): count for task, count in self.events_by_task.items()} def _topic_buffer_full_dict(self) -> MutableMapping[str, int]: return {label(topic): count for topic, count in self.topic_buffer_full.items()} def _metric_counts_dict(self) -> MutableMapping[str, int]: return {key: count for key, count in self.metric_counts.items()} def _tp_committed_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_committed_offsets) def _tp_read_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_read_offsets) def _tp_end_offsets_dict(self) -> TPOffsetDict: return self._tp_offsets_as_dict(self.tp_end_offsets) @classmethod def _tp_offsets_as_dict(cls, tp_offsets: TPOffsetMapping) -> TPOffsetDict: topic_partition_offsets: TPOffsetDict = {} for tp, offset in tp_offsets.items(): partition_offsets = topic_partition_offsets.get(tp.topic) or {} partition_offsets[tp.partition] = offset topic_partition_offsets[tp.topic] = partition_offsets return topic_partition_offsets def _cleanup(self) -> None: self._cleanup_max_avg_history() self._cleanup_commit_latency_history() self._cleanup_send_latency_history() def _cleanup_max_avg_history(self) -> None: max_history = self.max_avg_history events_runtime = self.events_runtime if max_history is not None and len(events_runtime) > max_history: events_runtime[:len(events_runtime) - max_history] = [] def _cleanup_commit_latency_history(self) -> None: max_history = self.max_commit_latency_history commit_latency = self.commit_latency if max_history is not None and len(commit_latency) > max_history: commit_latency[:len(commit_latency) - max_history] = [] def _cleanup_send_latency_history(self) -> None: max_history = self.max_send_latency_history send_latency = self.send_latency if max_history is not None and len(send_latency) > max_history: send_latency[:len(send_latency) - max_history] = []
[docs] def on_message_in(self, tp: TP, offset: int, message: Message) -> None: # WARNING: Sensors must never keep a reference to the Message, # as this means the message won't go out of scope! self.messages_received_total += 1 self.messages_active += 1 self.messages_received_by_topic[tp.topic] += 1 self.tp_read_offsets[tp] = offset message.time_in = self.time()
[docs] def on_stream_event_in(self, tp: TP, offset: int, stream: StreamT, event: EventT) -> None: self.events_total += 1 self.events_by_stream[stream] += 1 self.events_by_task[stream.task_owner] += 1 self.events_active += 1 event.message.stream_meta[id(stream)] = { 'time_in': self.time(), 'time_out': None, 'time_total': None, }
[docs] def on_stream_event_out(self, tp: TP, offset: int, stream: StreamT, event: EventT) -> None: time_out = self.time() state = event.message.stream_meta[id(stream)] time_in = state['time_in'] time_total = time_out - time_in self.events_active -= 1 state.update( time_out=time_out, time_total=time_total, ) self.events_runtime.append(time_total)
[docs] def on_topic_buffer_full(self, topic: TopicT) -> None: self.topic_buffer_full[topic] += 1
[docs] def on_message_out(self, tp: TP, offset: int, message: Message) -> None: self.messages_active -= 1 time_out = message.time_out = self.time() time_in = message.time_in if time_in is not None: message.time_total = time_out - time_in
[docs] def on_table_get(self, table: CollectionT, key: Any) -> None: self._table_or_create(table).keys_retrieved += 1
[docs] def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None: self._table_or_create(table).keys_updated += 1
[docs] def on_table_del(self, table: CollectionT, key: Any) -> None: self._table_or_create(table).keys_deleted += 1
def _table_or_create(self, table: CollectionT) -> TableState: try: return self.tables[table.name] except KeyError: state = self.tables[table.name] = TableState(table) return state
[docs] def on_commit_initiated(self, consumer: ConsumerT) -> Any: return self.time()
[docs] def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None: self.commit_latency.append(self.time() - cast(float, state))
[docs] def on_send_initiated(self, producer: ProducerT, topic: str, keysize: int, valsize: int) -> Any: self.messages_sent += 1 self.messages_sent_by_topic[topic] += 1 return self.time()
[docs] def on_send_completed(self, producer: ProducerT, state: Any) -> None: self.send_latency.append(self.time() - cast(float, state))
[docs] def count(self, metric_name: str, count: int = 1) -> None: self.metric_counts[metric_name] += count
[docs] def on_tp_commit(self, tp_offsets: TPOffsetMapping) -> None: self.tp_committed_offsets.update(tp_offsets)
[docs] def track_tp_end_offset(self, tp: TP, offset: int) -> None: self.tp_end_offsets[tp] = offset
@cached_property def _service(self) -> ServiceT: return MonitorService(self)
[docs]class MonitorService(Service): """Service responsible for starting/stopping a sensor.""" # Users may pass custom monitor to app, for example:: # app = faust.App(monitor=StatsdMonitor(prefix='word-count')) # When they do it's important to remember that the app is created during # module import, and that Service.__init__ creates the asyncio event loop. # To stop that from happening we use ServiceProxy to split this # into Monitor/MonitorService so that instantiating Monitor will not create # the service, instead the service is created lazily when first needed. def __init__(self, monitor: Monitor, **kwargs: Any) -> None: self.monitor: Monitor = monitor super().__init__() @Service.task async def _sampler(self) -> None: monitor = self.monitor median = statistics.median prev_message_total = monitor.messages_received_total prev_event_total = monitor.events_total while not self.should_stop: await self.sleep(1.0) # Update average event runtime. if monitor.events_runtime: monitor.events_runtime_avg = median(monitor.events_runtime) # Update events/s monitor.events_s, prev_event_total = ( monitor.events_total - prev_event_total, monitor.events_total, ) # Update messages/s monitor.messages_s, prev_message_total = ( monitor.messages_received_total - prev_message_total, monitor.messages_received_total) # Cleanup monitor._cleanup()