Source code for faust.assignor.cluster_assignment

"""Cluster assignment."""
from typing import List, MutableMapping, Sequence, Set, cast
from faust.models import Record
from .client_assignment import (
    ClientAssignment,
    ClientMetadata,
    CopartitionedAssignment,
)

__all__ = ['CopartMapping', 'ClusterAssignment']

CopartMapping = MutableMapping[str, CopartitionedAssignment]


[docs]class ClusterAssignment(Record, serializer='json', include_metadata=False, namespace='@ClusterAssignment'): """Cluster assignment state.""" # These are optional, but should never be set to None subscriptions: MutableMapping[str, Sequence[str]] = cast( MutableMapping[str, Sequence[str]], None) assignments: MutableMapping[str, ClientAssignment] = cast( MutableMapping[str, ClientAssignment], None) def __post_init__(self) -> None: if self.subscriptions is None: self.subscriptions = {} if self.assignments is None: self.assignments = {}
[docs] def topics(self) -> Set[str]: # All topics subscribed to in the cluster return {topic for sub in self.subscriptions.values() for topic in sub}
[docs] def add_client(self, client: str, subscription: List[str], metadata: ClientMetadata) -> None: self.subscriptions[client] = list(subscription) self.assignments[client] = metadata.assignment
[docs] def copartitioned_assignments( self, copartitioned_topics: Set[str]) -> CopartMapping: # We only pick clients that subscribe to all copartitioned topics subscribed_clis = { cli for cli, sub in self.subscriptions.items() if copartitioned_topics.issubset(sub) } return { cli: assignment.copartitioned_assignment(copartitioned_topics) for cli, assignment in self.assignments.items() if cli in subscribed_clis }