Source code for faust.assignor.leader_assignor
"""Leader assignor."""
from typing import Any
from mode import Service
from mode.utils.objects import cached_property
from faust.types import AppT, TP, TopicT
from faust.types.assignor import LeaderAssignorT
__all__ = ['LeaderAssignor']
[docs]class LeaderAssignor(Service, LeaderAssignorT):
"""Leader assignor, ensures election of a leader."""
def __init__(self, app: AppT, **kwargs: Any) -> None:
Service.__init__(self, **kwargs)
self.app = app
[docs] async def on_start(self) -> None:
if not self.app.conf.topic_disable_leader:
await self._enable_leader_topic()
async def _enable_leader_topic(self) -> None:
leader_topic = self._leader_topic
await leader_topic.maybe_declare()
self.app.topics.add(leader_topic)
self.app.consumer.randomly_assigned_topics.add(
leader_topic.get_topic_name())
@cached_property
def _leader_topic(self) -> TopicT:
return self.app.topic(
self._leader_topic_name,
partitions=1,
acks=False,
internal=True,
)
@cached_property
def _leader_topic_name(self) -> str:
return f'{self.app.conf.id}-__assignor-__leader'
@cached_property
def _leader_tp(self) -> TP:
return TP(self._leader_topic_name, 0)
[docs] def is_leader(self) -> bool:
return self._leader_tp in self.app.consumer.assignment()