@@ -1581,84 +1581,57 @@ private static Map<String, Map<String, String>> getNextStrictReplicaGroupAssignm
1581
1581
Logger tableRebalanceLogger ) {
1582
1582
Map <String , Map <String , String >> nextAssignment = new TreeMap <>();
1583
1583
Map <String , Integer > numSegmentsToOffloadMap = getNumSegmentsToOffloadMap (currentAssignment , targetAssignment );
1584
- Map <Integer , Map <Set <String >, Map <String , Map <String , String >>>>
1585
- partitionIdToAssignedInstancesToCurrentAssignmentMap ;
1586
- if (batchSizePerServer == RebalanceConfig .DISABLE_BATCH_SIZE_PER_SERVER ) {
1587
- // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled
1588
- // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0
1589
- // and a dummy set for the assigned instances.
1590
- partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap <>();
1591
- partitionIdToAssignedInstancesToCurrentAssignmentMap .put (0 , new HashMap <>());
1592
- partitionIdToAssignedInstancesToCurrentAssignmentMap .get (0 ).put (Set .of ("" ), currentAssignment );
1593
- } else {
1594
- partitionIdToAssignedInstancesToCurrentAssignmentMap =
1595
- getPartitionIdToAssignedInstancesToCurrentAssignmentMap (currentAssignment , segmentPartitionIdMap ,
1596
- partitionIdFetcher );
1597
- }
1598
1584
Map <Pair <Set <String >, Set <String >>, Set <String >> assignmentMap = new HashMap <>();
1599
1585
Map <Set <String >, Set <String >> availableInstancesMap = new HashMap <>();
1600
-
1601
1586
Map <String , Integer > serverToNumSegmentsAddedSoFar = new HashMap <>();
1602
- for (Map <Set <String >, Map <String , Map <String , String >>> assignedInstancesToCurrentAssignment
1603
- : partitionIdToAssignedInstancesToCurrentAssignmentMap .values ()) {
1604
- boolean anyServerExhaustedBatchSize = false ;
1605
- if (batchSizePerServer != RebalanceConfig .DISABLE_BATCH_SIZE_PER_SERVER ) {
1606
- // The number of segments for a given partition, accumulates as we iterate over the assigned instances
1607
- Map <String , Integer > serverToNumSegmentsToBeAddedForPartitionMap = new HashMap <>();
1608
-
1609
- // Check if the servers of the first assignment for each unique set of assigned instances has any space left
1610
- // to move this partition. If so, let's mark the partitions as to be moved, otherwise we mark the partition
1611
- // as a whole as not moveable.
1612
- for (Map <String , Map <String , String >> curAssignment : assignedInstancesToCurrentAssignment .values ()) {
1613
- Map .Entry <String , Map <String , String >> firstEntry = curAssignment .entrySet ().iterator ().next ();
1614
- // It is enough to check for whether any server for one segment is above the limit or not since all segments
1615
- // in curAssignment will have the same assigned instances list
1616
- Map <String , String > firstEntryInstanceStateMap = firstEntry .getValue ();
1617
- SingleSegmentAssignment firstAssignment =
1618
- getNextSingleSegmentAssignment (firstEntryInstanceStateMap , targetAssignment .get (firstEntry .getKey ()),
1619
- minAvailableReplicas , lowDiskMode , numSegmentsToOffloadMap , assignmentMap );
1620
- Set <String > serversAdded = getServersAddedInSingleSegmentAssignment (firstEntryInstanceStateMap ,
1621
- firstAssignment ._instanceStateMap );
1622
- for (String server : serversAdded ) {
1623
- // Case I: We already exceeded the batchSizePerServer for this server, cannot add any more segments
1624
- if (serverToNumSegmentsAddedSoFar .getOrDefault (server , 0 ) >= batchSizePerServer ) {
1625
- anyServerExhaustedBatchSize = true ;
1626
- break ;
1627
- }
1628
1587
1629
- // All segments assigned to the current instances will be moved, so track segments to be added for the given
1630
- // server based on this
1631
- serverToNumSegmentsToBeAddedForPartitionMap .put (server ,
1632
- serverToNumSegmentsToBeAddedForPartitionMap .getOrDefault (server , 0 ) + curAssignment .size ());
1633
- }
1634
- if (anyServerExhaustedBatchSize ) {
1588
+ if (batchSizePerServer == RebalanceConfig .DISABLE_BATCH_SIZE_PER_SERVER ) {
1589
+ // Directly update the nextAssignment with anyServerExhaustedBatchSize = false and return if batching is disabled
1590
+ updateNextAssignmentForPartitionIdStrictReplicaGroup (currentAssignment , targetAssignment , nextAssignment ,
1591
+ false , minAvailableReplicas , lowDiskMode , numSegmentsToOffloadMap , assignmentMap ,
1592
+ availableInstancesMap , serverToNumSegmentsAddedSoFar );
1593
+ return nextAssignment ;
1594
+ }
1595
+
1596
+ // Batching is enabled, calculate the Pair(current instances, target instances) -> partitionId -> currentAssignment
1597
+ Map <Pair <Set <String >, Set <String >>, Map <Integer , Map <String , Map <String , String >>>>
1598
+ currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap =
1599
+ getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap (currentAssignment , targetAssignment ,
1600
+ segmentPartitionIdMap , partitionIdFetcher );
1601
+
1602
+ // Iterating over the unique pairs of current and target instances
1603
+ for (Map <Integer , Map <String , Map <String , String >>> partitionIdToCurrentAssignment
1604
+ : currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap .values ()) {
1605
+ // Check if the servers of the first assignment for each unique partition has any space left to move the
1606
+ // segments assigned to the partition and unique assigned instances as a whole. If so, let's mark the partitions
1607
+ // as to be moved, otherwise we mark the partition as a whole as not moveable.
1608
+ // Iterating over the partitionIds with the same unique pair of current and assigned instances
1609
+ for (Map <String , Map <String , String >> curAssignment : partitionIdToCurrentAssignment .values ()) {
1610
+ Map .Entry <String , Map <String , String >> firstEntry = curAssignment .entrySet ().iterator ().next ();
1611
+ // It is enough to check for whether any server for one segment is above the limit or not since all segments
1612
+ // in curAssignment will have the same current and target instances and same partitionId
1613
+ Map <String , String > firstEntryInstanceStateMap = firstEntry .getValue ();
1614
+ SingleSegmentAssignment firstAssignment =
1615
+ getNextSingleSegmentAssignment (firstEntryInstanceStateMap , targetAssignment .get (firstEntry .getKey ()),
1616
+ minAvailableReplicas , lowDiskMode , numSegmentsToOffloadMap , assignmentMap );
1617
+ Set <String > serversAdded = getServersAddedInSingleSegmentAssignment (firstEntryInstanceStateMap ,
1618
+ firstAssignment ._instanceStateMap );
1619
+ boolean anyServerExhaustedBatchSize = false ;
1620
+ for (String server : serversAdded ) {
1621
+ int segmentsAddedToServerSoFar = serverToNumSegmentsAddedSoFar .getOrDefault (server , 0 );
1622
+ // Case I: We already exceeded the batchSizePerServer for this server, cannot add any more segments
1623
+ // Case II: We have not yet exceeded the batchSizePerServer for this server, but we don't have sufficient
1624
+ // space to host the segments for this assignment on the server, and we have allocated some partitions so
1625
+ // far. If the batchSizePerServer is less than the number of segments in a given partitionId, we must host
1626
+ // at least 1 partition and exceed the batchSizePerServer to ensure progress is made. Thus, performing this
1627
+ // check only if segmentsAddedToServerSoFar > 0 is necessary.
1628
+ if ((segmentsAddedToServerSoFar >= batchSizePerServer )
1629
+ || (segmentsAddedToServerSoFar > 0
1630
+ && (segmentsAddedToServerSoFar + curAssignment .size ()) > batchSizePerServer )) {
1631
+ anyServerExhaustedBatchSize = true ;
1635
1632
break ;
1636
1633
}
1637
1634
}
1638
-
1639
- // Case II: We have not yet exceeded the batchSizePerServer for any server, but we don't have sufficient
1640
- // space to host the segments for this assignment on some server, and we have allocated some partitions so
1641
- // far. If the batchSizePerServer is less than the number of segments in a given partitionId, we must host
1642
- // at least 1 partition and exceed the batchSizePerServer to ensure progress is made. Thus, performing this
1643
- // check only if segmentsAddedToServerSoFar > 0 is necessary.
1644
- if (!anyServerExhaustedBatchSize ) {
1645
- for (Map .Entry <String , Integer > serverToNumSegmentsToAdd
1646
- : serverToNumSegmentsToBeAddedForPartitionMap .entrySet ()) {
1647
- int segmentsAddedToServerSoFar =
1648
- serverToNumSegmentsAddedSoFar .getOrDefault (serverToNumSegmentsToAdd .getKey (), 0 );
1649
- if (segmentsAddedToServerSoFar > 0
1650
- && (segmentsAddedToServerSoFar + serverToNumSegmentsToAdd .getValue ()) > batchSizePerServer ) {
1651
- anyServerExhaustedBatchSize = true ;
1652
- break ;
1653
- }
1654
- }
1655
- }
1656
- }
1657
- // TODO: Consider whether we should process the nextAssignment for each unique assigned instances rather than the
1658
- // full partition to get a more granular number of segment moves in each step. For now since we expect
1659
- // strict replica groups to mostly be used for tables like upserts which require a full partition to be
1660
- // moved, we move a full partition at a time.
1661
- for (Map <String , Map <String , String >> curAssignment : assignedInstancesToCurrentAssignment .values ()) {
1662
1635
updateNextAssignmentForPartitionIdStrictReplicaGroup (curAssignment , targetAssignment , nextAssignment ,
1663
1636
anyServerExhaustedBatchSize , minAvailableReplicas , lowDiskMode , numSegmentsToOffloadMap , assignmentMap ,
1664
1637
availableInstancesMap , serverToNumSegmentsAddedSoFar );
@@ -1728,46 +1701,49 @@ private static void checkIfAnyServersAssignedMoreSegmentsThanBatchSize(int batch
1728
1701
Map <String , Integer > serverToNumSegmentsAddedSoFar , Logger tableRebalanceLogger ) {
1729
1702
int maxSegmentsAddedToAnyServer = serverToNumSegmentsAddedSoFar .isEmpty () ? 0
1730
1703
: Collections .max (serverToNumSegmentsAddedSoFar .values ());
1731
- if (batchSizePerServer != RebalanceConfig .DISABLE_BATCH_SIZE_PER_SERVER
1732
- && maxSegmentsAddedToAnyServer > batchSizePerServer ) {
1704
+ if (maxSegmentsAddedToAnyServer > batchSizePerServer ) {
1733
1705
tableRebalanceLogger .warn ("Found at least one server with {} segments added which is larger than "
1734
1706
+ "batchSizePerServer: {}. This is expected for strictReplicaGroup based assignment that needs to move a "
1735
1707
+ "full partition to maintain consistency for queries." , maxSegmentsAddedToAnyServer , batchSizePerServer );
1736
1708
}
1737
1709
}
1738
1710
1739
1711
/**
1740
- * Create a mapping of partitionId to the mapping of assigned instances to the current assignment of segments that
1741
- * belong to that partitionId and assigned instances. This is to be used for batching purposes for StrictReplicaGroup
1742
- * routing, for all segment assignment types: RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and
1743
- * OfflineSegmentAssignment
1712
+ * Create a mapping of Pair(currentInstances, targetInstances) to partitionId to the current assignment of segments.
1713
+ * This is to be used for batching purposes for StrictReplicaGroup routing, for all segment assignment types:
1714
+ * RealtimeSegmentAssignment, StrictRealtimeSegmentAssignment and OfflineSegmentAssignment
1744
1715
* @param currentAssignment the current assignment
1716
+ * @param targetAssignment the target assignment
1745
1717
* @param segmentPartitionIdMap cache to store the partition ids to avoid fetching ZK segment metadata
1746
1718
* @param partitionIdFetcher function to fetch the partition id
1747
- * @return a mapping from partitionId to the assigned instances to the segment assignment map of all segments that
1748
- * map to that partitionId and assigned instances
1719
+ * @return a mapping from Pair(currentInstances, targetInstances) to the partitionId to the segment assignment map of
1720
+ * all segments that fall in that category
1749
1721
*/
1750
- private static Map <Integer , Map <Set <String >, Map <String , Map <String , String >>>>
1751
- getPartitionIdToAssignedInstancesToCurrentAssignmentMap (Map <String , Map <String , String >> currentAssignment ,
1752
- Object2IntOpenHashMap <String > segmentPartitionIdMap , PartitionIdFetcher partitionIdFetcher ) {
1753
- Map <Integer , Map <Set <String >, Map <String , Map <String , String >>>>
1754
- partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap <>();
1722
+ private static Map <Pair <Set <String >, Set <String >>, Map <Integer , Map <String , Map <String , String >>>>
1723
+ getCurrentAndTargetInstancesToPartitionIdToCurrentAssignmentMap (Map <String , Map <String , String >> currentAssignment ,
1724
+ Map <String , Map <String , String >> targetAssignment , Object2IntOpenHashMap <String > segmentPartitionIdMap ,
1725
+ PartitionIdFetcher partitionIdFetcher ) {
1726
+ Map <Pair <Set <String >, Set <String >>, Map <Integer , Map <String , Map <String , String >>>>
1727
+ currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap = new HashMap <>();
1755
1728
1756
1729
for (Map .Entry <String , Map <String , String >> assignment : currentAssignment .entrySet ()) {
1757
1730
String segmentName = assignment .getKey ();
1758
- Map <String , String > instanceStateMap = assignment .getValue ();
1759
- Collection <String > segmentStates = instanceStateMap . values ( );
1731
+ Map <String , String > currentInstanceStateMap = assignment .getValue ();
1732
+ Map <String , String > targetInstanceStateMap = targetAssignment . get ( segmentName );
1760
1733
1734
+ Collection <String > segmentStates = currentInstanceStateMap .values ();
1761
1735
boolean isConsuming = segmentStates .stream ().noneMatch (state -> state .equals (SegmentStateModel .ONLINE ))
1762
1736
&& segmentStates .stream ().anyMatch (state -> state .equals (SegmentStateModel .CONSUMING ));
1763
1737
int partitionId =
1764
1738
segmentPartitionIdMap .computeIfAbsent (segmentName , v -> partitionIdFetcher .fetch (segmentName , isConsuming ));
1765
- Set <String > assignedInstances = instanceStateMap .keySet ();
1766
- partitionIdToAssignedInstancesToCurrentAssignmentMap .computeIfAbsent (partitionId , k -> new HashMap <>())
1767
- .computeIfAbsent (assignedInstances , k -> new TreeMap <>()).put (segmentName , instanceStateMap );
1739
+ Pair <Set <String >, Set <String >> currentAndTargetInstances =
1740
+ Pair .of (currentInstanceStateMap .keySet (), targetInstanceStateMap .keySet ());
1741
+ currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap
1742
+ .computeIfAbsent (currentAndTargetInstances , k -> new TreeMap <>())
1743
+ .computeIfAbsent (partitionId , k -> new TreeMap <>()).put (segmentName , currentInstanceStateMap );
1768
1744
}
1769
1745
1770
- return partitionIdToAssignedInstancesToCurrentAssignmentMap ;
1746
+ return currentAndTargetInstancesToPartitionIdToCurrentAssignmentMap ;
1771
1747
}
1772
1748
1773
1749
@ VisibleForTesting
0 commit comments