Source code for faust.web.apps.stats

"""HTTP endpoint showing statistics from the Faust monitor."""
from collections import defaultdict
from typing import List, MutableMapping, Set
from faust import web
from faust.types.tuples import TP

__all__ = ['Assignment', 'Stats', 'blueprint']

TPMap = MutableMapping[str, List[int]]


blueprint = web.Blueprint('monitor')


[docs]@blueprint.route('/', name='index') class Stats(web.View): """Monitor statistics."""
[docs] async def get(self, request: web.Request) -> web.Response: """Return JSON response with sensor information.""" return self.json( {f'Sensor{i}': s.asdict() for i, s in enumerate(self.app.sensors)})
[docs]@blueprint.route('/assignment/', name='assignment') class Assignment(web.View): """Cluster assignment information.""" @classmethod def _topic_grouped(cls, assignment: Set[TP]) -> TPMap: tps: MutableMapping[str, List[int]] = defaultdict(list) for tp in sorted(assignment): tps[tp.topic].append(tp.partition) return dict(tps)
[docs] async def get(self, request: web.Request) -> web.Response: """Return current assignment as a JSON response.""" assignor = self.app.assignor return self.json({ 'actives': self._topic_grouped(assignor.assigned_actives()), 'standbys': self._topic_grouped(assignor.assigned_standbys()), })