Skip to content

Commit 02afd66

Browse files
committed
Merge remote-tracking branch 'origin/master' into index-spi-all-types
2 parents 9ccbdda + 11272b7 commit 02afd66

File tree

36 files changed

+2700
-1549
lines changed

36 files changed

+2700
-1549
lines changed

pinot-broker/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,5 +153,10 @@
153153
<artifactId>mockito-core</artifactId>
154154
<scope>test</scope>
155155
</dependency>
156+
<dependency>
157+
<groupId>com.mercateo</groupId>
158+
<artifactId>test-clock</artifactId>
159+
<scope>test</scope>
160+
</dependency>
156161
</dependencies>
157162
</project>

pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.apache.pinot.broker.routing.instanceselector;
2020

21+
import java.time.Clock;
22+
import java.util.ArrayList;
2123
import java.util.HashMap;
2224
import java.util.List;
2325
import java.util.Map;
2426
import javax.annotation.Nullable;
27+
import org.apache.helix.store.zk.ZkHelixPropertyStore;
28+
import org.apache.helix.zookeeper.datamodel.ZNRecord;
2529
import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
2630
import org.apache.pinot.common.metrics.BrokerMetrics;
2731
import org.apache.pinot.common.utils.HashUtil;
@@ -37,40 +41,54 @@
3741
* Step1: Process seg1. Fetch server rankings. Pick the best server.
3842
* Step2: Process seg2. Fetch server rankings (could have changed or not since Step 1). Pick the best server.
3943
* Step3: Process seg3. Fetch server rankings (could have changed or not since Step 2). Pick the best server.
40-
44+
*
4145
* <p>If AdaptiveServerSelection is disabled, the selection algorithm will always evenly distribute the traffic to all
4246
* replicas of each segment, and will try to select different replica id for each segment. The algorithm is very
4347
* light-weight and will do best effort to balance the number of segments served by each selected server instance.
4448
*/
4549
public class BalancedInstanceSelector extends BaseInstanceSelector {
4650

47-
public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
48-
@Nullable AdaptiveServerSelector adaptiveServerSelector) {
49-
super(tableNameWithType, brokerMetrics, adaptiveServerSelector);
51+
public BalancedInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<ZNRecord> propertyStore,
52+
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock) {
53+
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock);
5054
}
5155

5256
@Override
53-
Map<String, String> select(List<String> segments, int requestId,
54-
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
57+
Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
58+
Map<String, String> queryOptions) {
5559
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
56-
for (String segment : segments) {
57-
List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
58-
// NOTE: enabledInstances can be null when there is no enabled instances for the segment, or the instance selector
59-
// has not been updated (we update all components for routing in sequence)
60-
if (enabledInstances == null) {
61-
continue;
60+
if (_adaptiveServerSelector != null) {
61+
for (String segment : segments) {
62+
List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
63+
// NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has
64+
// not been updated (we update all components for routing in sequence)
65+
if (candidates == null) {
66+
continue;
67+
}
68+
List<String> candidateInstances = new ArrayList<>(candidates.size());
69+
for (SegmentInstanceCandidate candidate : candidates) {
70+
candidateInstances.add(candidate.getInstance());
71+
}
72+
String selectedInstance = _adaptiveServerSelector.select(candidateInstances);
73+
if (candidates.get(candidateInstances.indexOf(selectedInstance)).isOnline()) {
74+
segmentToSelectedInstanceMap.put(segment, selectedInstance);
75+
}
6276
}
63-
64-
String selectedServer;
65-
if (_adaptiveServerSelector != null) {
66-
selectedServer = _adaptiveServerSelector.select(enabledInstances);
67-
} else {
68-
selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
77+
} else {
78+
for (String segment : segments) {
79+
List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
80+
// NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has
81+
// not been updated (we update all components for routing in sequence)
82+
if (candidates == null) {
83+
continue;
84+
}
85+
int selectedIdx = requestId++ % candidates.size();
86+
SegmentInstanceCandidate selectedCandidate = candidates.get(selectedIdx);
87+
if (selectedCandidate.isOnline()) {
88+
segmentToSelectedInstanceMap.put(segment, selectedCandidate.getInstance());
89+
}
6990
}
70-
71-
segmentToSelectedInstanceMap.put(segment, selectedServer);
7291
}
73-
7492
return segmentToSelectedInstanceMap;
7593
}
7694
}

0 commit comments

Comments
 (0)