"""Monitor using Statsd."""
import re
import typing
from typing import Any, Dict, Optional, Pattern, cast
from mode.utils.objects import cached_property
from faust.exceptions import ImproperlyConfigured
from faust.types import (
AppT,
CollectionT,
EventT,
Message,
PendingMessage,
RecordMetadata,
StreamT,
TP,
)
from faust.types.assignor import PartitionAssignorT
from faust.types.transports import ConsumerT, ProducerT
from .monitor import Monitor, TPOffsetMapping
try:
import statsd
except ImportError: # pragma: no cover
statsd = None
if typing.TYPE_CHECKING: # pragma: no cover
from statsd import StatsClient
else:
class StatsClient: ... # noqa
__all__ = ['StatsdMonitor']
# This regular expression is used to generate stream ids in Statsd.
# It converts for example
# "Stream: <Topic: withdrawals>"
# -> "Stream_Topic_withdrawals"
#
# See StatsdMonitor._normalize()
RE_NORMALIZE = re.compile(r'[\<\>:\s]+')
RE_NORMALIZE_SUBSTITUTION = '_'
[docs]class StatsdMonitor(Monitor):
"""Statsd Faust Sensor.
This sensor, records statistics to Statsd along with computing metrics
for the stats server
"""
host: str
port: int
prefix: str
def __init__(self,
host: str = 'localhost',
port: int = 8125,
prefix: str = 'faust-app',
rate: float = 1.0,
**kwargs: Any) -> None:
self.host = host
self.port = port
self.prefix = prefix
self.rate = rate
if statsd is None:
raise ImproperlyConfigured(
'StatsMonitor requires `pip install statsd`.')
super().__init__(**kwargs)
def _new_statsd_client(self) -> StatsClient:
return statsd.StatsClient(
host=self.host, port=self.port, prefix=self.prefix)
[docs] def on_message_in(self, tp: TP, offset: int, message: Message) -> None:
"""Call before message is delegated to streams."""
super().on_message_in(tp, offset, message)
self.client.incr('messages_received', rate=self.rate)
self.client.incr('messages_active', rate=self.rate)
self.client.incr(f'topic.{tp.topic}.messages_received', rate=self.rate)
self.client.gauge(f'read_offset.{tp.topic}.{tp.partition}', offset)
[docs] def on_stream_event_in(self, tp: TP, offset: int, stream: StreamT,
event: EventT) -> Optional[Dict]:
"""Call when stream starts processing an event."""
state = super().on_stream_event_in(tp, offset, stream, event)
self.client.incr('events', rate=self.rate)
self.client.incr(
f'stream.{self._stream_label(stream)}.events',
rate=self.rate,
)
self.client.incr('events_active', rate=self.rate)
return state
def _stream_label(self, stream: StreamT) -> str:
return self._normalize(
stream.shortlabel.lstrip('Stream:'),
).strip('_').lower()
[docs] def on_stream_event_out(self, tp: TP, offset: int, stream: StreamT,
event: EventT, state: Dict = None) -> None:
"""Call when stream is done processing an event."""
super().on_stream_event_out(tp, offset, stream, event, state)
self.client.decr('events_active', rate=self.rate)
self.client.timing(
'events_runtime',
self.secs_to_ms(self.events_runtime[-1]),
rate=self.rate)
[docs] def on_message_out(self,
tp: TP,
offset: int,
message: Message) -> None:
"""Call when message is fully acknowledged and can be committed."""
super().on_message_out(tp, offset, message)
self.client.decr('messages_active', rate=self.rate)
[docs] def on_table_get(self, table: CollectionT, key: Any) -> None:
"""Call when value in table is retrieved."""
super().on_table_get(table, key)
self.client.incr(f'table.{table.name}.keys_retrieved', rate=self.rate)
[docs] def on_table_set(self, table: CollectionT, key: Any, value: Any) -> None:
"""Call when new value for key in table is set."""
super().on_table_set(table, key, value)
self.client.incr(f'table.{table.name}.keys_updated', rate=self.rate)
[docs] def on_table_del(self, table: CollectionT, key: Any) -> None:
"""Call when key in a table is deleted."""
super().on_table_del(table, key)
self.client.incr(f'table.{table.name}.keys_deleted', rate=self.rate)
[docs] def on_commit_completed(self, consumer: ConsumerT, state: Any) -> None:
"""Call when consumer commit offset operation completed."""
super().on_commit_completed(consumer, state)
self.client.timing(
'commit_latency',
self.ms_since(cast(float, state)),
rate=self.rate)
[docs] def on_send_initiated(self, producer: ProducerT, topic: str,
message: PendingMessage,
keysize: int, valsize: int) -> Any:
"""Call when message added to producer buffer."""
self.client.incr(f'topic.{topic}.messages_sent', rate=self.rate)
return super().on_send_initiated(
producer, topic, message, keysize, valsize)
[docs] def on_send_completed(self,
producer: ProducerT,
state: Any,
metadata: RecordMetadata) -> None:
"""Call when producer finished sending message."""
super().on_send_completed(producer, state, metadata)
self.client.incr('messages_sent', rate=self.rate)
self.client.timing(
'send_latency',
self.ms_since(cast(float, state)),
rate=self.rate)
[docs] def on_send_error(self,
producer: ProducerT,
exc: BaseException,
state: Any) -> None:
"""Call when producer was unable to publish message."""
super().on_send_error(producer, exc, state)
self.client.incr('messages_sent_error', rate=self.rate)
self.client.timing(
'send_latency_for_error',
self.ms_since(cast(float, state)),
rate=self.rate)
[docs] def on_assignment_error(self,
assignor: PartitionAssignorT,
state: Dict,
exc: BaseException) -> None:
"""Partition assignor did not complete assignor due to error."""
super().on_assignment_error(assignor, state, exc)
self.client.incr('assignments_error', rate=self.rate)
self.client.timing(
'assignment_latency',
self.ms_since(state['time_start']),
rate=self.rate)
[docs] def on_assignment_completed(self,
assignor: PartitionAssignorT,
state: Dict) -> None:
"""Partition assignor completed assignment."""
super().on_assignment_completed(assignor, state)
self.client.incr('assignments_complete', rate=self.rate)
self.client.timing(
'assignment_latency',
self.ms_since(state['time_start']),
rate=self.rate)
[docs] def on_rebalance_start(self, app: AppT) -> Dict:
"""Cluster rebalance in progress."""
state = super().on_rebalance_start(app)
self.client.incr('rebalances', rate=self.rate)
return state
[docs] def on_rebalance_return(self, app: AppT, state: Dict) -> None:
"""Consumer replied assignment is done to broker."""
super().on_rebalance_return(app, state)
self.client.decr('rebalances', rate=self.rate)
self.client.incr('rebalances_recovering', rate=self.rate)
self.client.timing(
'rebalance_return_latency',
self.ms_since(state['time_return']),
rate=self.rate)
[docs] def on_rebalance_end(self, app: AppT, state: Dict) -> None:
"""Cluster rebalance fully completed (including recovery)."""
super().on_rebalance_end(app, state)
self.client.decr('rebalances_recovering', rate=self.rate)
self.client.timing(
'rebalance_end_latency',
self.ms_since(state['time_end']),
rate=self.rate)
[docs] def count(self, metric_name: str, count: int = 1) -> None:
"""Count metric by name."""
super().count(metric_name, count=count)
self.client.incr(metric_name, count=count, rate=self.rate)
[docs] def on_tp_commit(self, tp_offsets: TPOffsetMapping) -> None:
"""Call when offset in topic partition is committed."""
super().on_tp_commit(tp_offsets)
for tp, offset in tp_offsets.items():
metric_name = f'committed_offset.{tp.topic}.{tp.partition}'
self.client.gauge(metric_name, offset)
[docs] def track_tp_end_offset(self, tp: TP, offset: int) -> None:
"""Track new topic partition end offset for monitoring lags."""
super().track_tp_end_offset(tp, offset)
metric_name = f'end_offset.{tp.topic}.{tp.partition}'
self.client.gauge(metric_name, offset)
def _normalize(self, name: str,
*,
pattern: Pattern = RE_NORMALIZE,
substitution: str = RE_NORMALIZE_SUBSTITUTION) -> str:
return pattern.sub(substitution, name)
[docs] @cached_property
def client(self) -> StatsClient:
"""Return statsd client."""
return self._new_statsd_client()