Source code for faust.cli.agents

"""Program ``faust agents`` used to list agents."""
from operator import attrgetter
from typing import Any, Callable, Optional, Sequence, Type, cast
from faust.types import AgentT
from .base import AppCommand, option


[docs]class agents(AppCommand): """List agents.""" title = 'Agents' headers = ['name', 'topic', 'help'] sortkey = attrgetter('name') options = [ option( '--local/--no-local', help='Include agents using a local channel'), ]
[docs] async def run(self, local: bool) -> None: """Dump list of available agents in this application.""" self.say( self.tabulate( [ self.agent_to_row(agent) for agent in self.agents(local=local) ], headers=self.headers, title=self.title, ))
[docs] def agents(self, *, local: bool = False) -> Sequence[AgentT]: """Convert list of agents to terminal table rows.""" sortkey = cast(Callable[[Type[AgentT]], Any], self.sortkey) return [ agent for agent in sorted(self.app.agents.values(), key=sortkey) if self._maybe_topic(agent) or local ]
[docs] def agent_to_row(self, agent: AgentT) -> Sequence[str]: """Convert agent fields to terminal table row.""" return [ self.bold_tail(self._name(agent)), self._topic(agent), self.dark(self._help(agent)), ]
def _name(self, agent: AgentT) -> str: return '@' + self.abbreviate_fqdn(agent.name) def _maybe_topic(self, agent: AgentT) -> Optional[str]: try: return agent.channel.get_topic_name() except NotImplementedError: return None def _topic(self, agent: AgentT) -> str: return self._maybe_topic(agent) or '<LOCAL>' def _help(self, agent: AgentT) -> str: return agent.help or '<N/A>'