Source code for faust.assignor.copartitioned_assignor

"""Copartitioned Assignor."""
from itertools import cycle
from math import ceil
from typing import Iterable, Iterator, MutableMapping, Optional, Sequence, Set
from mode.utils.typing import Counter
from .client_assignment import CopartitionedAssignment

__all__ = ['CopartitionedAssignor']


[docs]class CopartitionedAssignor: """Copartitioned Assignor. All copartitioned topics must have the same number of partitions The assignment is sticky which uses the following heuristics: - Maintain existing assignments as long as within capacity for each client - Assign actives to standbys when possible (within capacity) - Assign in order to fill capacity of the clients We optimize for not over utilizing resources instead of under-utilizing resources. This results in a balanced assignment when capacity is the default value which is ``ceil(num partitions / num clients)`` Notes: Currently we raise an exception if number of clients is not enough for the desired `replication`. """ capacity: int num_partitions: int replicas: int topics: Set[str] _num_clients: int _client_assignments: MutableMapping[str, CopartitionedAssignment] def __init__(self, topics: Iterable[str], cluster_asgn: MutableMapping[str, CopartitionedAssignment], num_partitions: int, replicas: int, capacity: int = None) -> None: self._num_clients = len(cluster_asgn) assert self._num_clients, 'Should assign to at least 1 client' self.num_partitions = num_partitions self.replicas = min(replicas, self._num_clients - 1) self.capacity = ( int(ceil(float(self.num_partitions) / self._num_clients)) if capacity is None else capacity ) self.topics = set(topics) assert self.capacity * self._num_clients >= self.num_partitions, \ 'Not enough capacity' self._client_assignments = cluster_asgn
[docs] def get_assignment(self) -> MutableMapping[str, CopartitionedAssignment]: for copartitioned in self._client_assignments.values(): copartitioned.unassign_extras(self.capacity, self.replicas) self._assign(active=True) self._assign(active=False) return self._client_assignments
def _all_assigned(self, active: bool) -> bool: assigned_counts = self._assigned_partition_counts(active) total_assigns = self._total_assigns_per_partition(active) return all(assigned_counts[partition] == total_assigns for partition in range(self.num_partitions)) def _assign(self, active: bool) -> None: self._unassign_overassigned(active) unassigned = self._get_unassigned(active) self._assign_round_robin(unassigned, active) assert self._all_assigned(active) def _assigned_partition_counts(self, active: bool) -> Counter[int]: return Counter( partition for copartitioned in self._client_assignments.values() for partition in copartitioned.get_assigned_partitions(active) ) def _get_client_limit(self, active: bool) -> int: return self.capacity * self._total_assigns_per_partition(active) def _total_assigns_per_partition(self, active: bool) -> int: return 1 if active else self.replicas def _unassign_overassigned(self, active: bool) -> None: # There are cases when multiple clients could have the same # assignment (zombies). We need to handle that appropriately. partition_counts = self._assigned_partition_counts(active) total_assigns = self._total_assigns_per_partition(active=active) for partition in range(self.num_partitions): extras = partition_counts[partition] - total_assigns for _ in range(extras): assgn = next( assgn for assgn in self._client_assignments.values() if assgn.partition_assigned(partition, active=active) ) assgn.unassign_partition(partition, active=active) def _get_unassigned(self, active: bool) -> Sequence[int]: partition_counts = self._assigned_partition_counts(active) total_assigns = self._total_assigns_per_partition(active=active) assert all( partition_counts[partition] <= total_assigns for partition in range(self.num_partitions) ) return [ partition for partition in range(self.num_partitions) for _ in range(total_assigns - partition_counts[partition]) ] def _can_assign(self, assignment: CopartitionedAssignment, partition: int, active: bool) -> bool: return ( not self._client_exhausted(assignment, active) and assignment.can_assign(partition, active) ) def _client_exhausted(self, assignemnt: CopartitionedAssignment, active: bool, client_limit: int = None) -> bool: if client_limit is None: client_limit = self._get_client_limit(active) return assignemnt.num_assigned(active) == client_limit def _find_promotable_standby(self, partition: int, candidates: Iterator[CopartitionedAssignment], ) -> Optional[CopartitionedAssignment]: # Round robin to find standby until we make a full cycle for _ in range(self._num_clients): assignment = next(candidates) can_assign = ( assignment.partition_assigned(partition, active=False) and self._can_assign(assignment, partition, active=True) ) if can_assign: return assignment return None def _find_round_robin_assignable(self, partition: int, candidates: Iterator[ CopartitionedAssignment], active: bool, ) -> Optional[CopartitionedAssignment]: # Round robin and assign until we make a full circle for _ in range(self._num_clients): assignment = next(candidates) if self._can_assign(assignment, partition, active): return assignment return None def _assign_round_robin(self, unassigned: Iterable[int], active: bool) -> None: # We do round robin assignment as follows: # - For actives, we first try to assign to a standby # - For standby, we offset the start for round robin to evenly # distribute standbys for colocated actives # - We do round robin # - If no assignment found, it must be a standby and the only unfilled # client(s) must be actives/standbys for the partition # - If no assignment found, we unassign and arbitrary partition from a # filled assignment such that the partition can be assigned to it # - This guarantees eventual assignment of all partitions client_limit = self._get_client_limit(active) candidates = cycle(self._client_assignments.values()) unassigned = list(unassigned) while unassigned: partition = unassigned.pop(0) assign_to = None if active: # For actives we first try to find a standby to assign to assign_to = self._find_promotable_standby(partition, candidates) if assign_to is not None: # Unassign standby which will be promoted assign_to.unassign_partition(partition, active=False) else: # For standbys we offset to round robin start to shuffle # assignment of standbys for _ in range(partition): next(candidates) assert assign_to is None or active assign_to = assign_to or self._find_round_robin_assignable( partition, candidates, active) # If round robin assignment didn't work then we must be # assigning a standby and the only un-exhausted clients # are actives for the partition assert ( assign_to is not None or ( not active and all( assgn.partition_assigned(partition, active=True) or assgn.partition_assigned(partition, active=False) or self._client_exhausted(assgn, active, client_limit) for assgn in self._client_assignments.values() ) ) ) # If round robin didn't work, we free up the first full standby # assignment to which the partition can be assigned. if assign_to is None: assign_to = next( assigment for assigment in self._client_assignments.values() if (self._client_exhausted(assigment, active) and assigment.can_assign(partition, active)) ) # By above assertion, should never throw error unassigned_partition = assign_to.pop_partition(active) unassigned.append(unassigned_partition) # Assign partition assign_to.assign_partition(partition, active)