-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Adaptive Server Selection #9311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive Server Selection #9311
Conversation
a5a3cfe to
d975dbf
Compare
Codecov Report
@@ Coverage Diff @@
## master #9311 +/- ##
============================================
- Coverage 69.73% 61.31% -8.42%
+ Complexity 5017 4563 -454
============================================
Files 1873 1885 +12
Lines 99598 100958 +1360
Branches 15163 15357 +194
============================================
- Hits 69453 61904 -7549
- Misses 25217 34368 +9151
+ Partials 4928 4686 -242
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
d975dbf to
7faa610
Compare
7faa610 to
a66b86e
Compare
32bf5a3 to
9178dd9
Compare
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
Show resolved
Hide resolved
...main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
Outdated
Show resolved
Hide resolved
...va/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
Show resolved
Hide resolved
...va/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
Outdated
Show resolved
Hide resolved
...va/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
Show resolved
Hide resolved
...er/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
Show resolved
Hide resolved
...er/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/LatencySelector.java
Show resolved
Hide resolved
...main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
Outdated
Show resolved
Hide resolved
pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/broker/routing/adaptiveserverselector/NumInFlightReqSelector.java
Outdated
Show resolved
Hide resolved
...ector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
Show resolved
Hide resolved
...ot-driver/src/main/java/org/apache/pinot/connector/presto/PinotScatterGatherQueryClient.java
Show resolved
Hide resolved
| ServerRoutingInstance serverRoutingInstance = entry.getKey(); | ||
| ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels; | ||
| try { | ||
| _serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for asking again if you clarified this in the internal review. Shouldn't this be after line 126 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified with comments in code.
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryResponse.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsEntry.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
25aba36 to
f80ff7e
Compare
...main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelector.java
Outdated
Show resolved
Hide resolved
|
|
||
| // No stats for this server. That means this server hasn't received any queries yet. | ||
| if (score == null) { | ||
| int randIdx = _random.nextInt(serverCandidates.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) How about if we simply pick the server that has score == null? This gives more determinism and is less likely to create thundering herd problem, because the next call will also pick a null (if any). The current solution is fine for not too large serverCandidates.size(). The expectation of initializing all servers is size*\sum_{j=1...size} 1/j (see this). My only concern is that if the serverCandidates.size() could get large for balanced assignment, and we become unfortunate with random numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my initial testing, I observed thundering herd problem when selecting the server with score = null (Thundering herd happened because we routed a great number of queries to the server with the null score before any stats was updated). But for now this should be fine because - We could route in a biased fashion initially. However, we will eventually recover?
I understand that this can cause problems with large number of servers. Let's discuss offline on how to solve this (in a subsequent PR).
Let me know your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... are you saying that after picking a score == null but before the stats is even updated by the recordStatsAfterQuerySubmission() later there can be multiple other queries routed to the same server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. One solution could be pick a random server among the servers that have null score.
pinot-common/src/main/java/org/apache/pinot/common/utils/ExponentialMovingAverage.java
Outdated
Show resolved
Hide resolved
f80ff7e to
7ec0a47
Compare
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Show resolved
Hide resolved
...ain/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| public void updateNumInFlightRequestsForResponseArrival() { | ||
| --_numInFlightRequests; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we intentionally not updating _inFlighRequestsEMA here? If a bunch of queries are fired together, the _inFlighRequestsEMA will still remain pretty high after the responses are back, until the next query gets routed right? Are we expecting this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is intentional. The reasoning is that we want a snapshot of the queue at regular intervals and just updating at querySubmit should be good enough. In the example mentioned, the ema will decrease if one of the two events happen:
- The next query after burst is submitted.
- Autodecay kicks in.
jasperjiaguo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't get a chance to review it before merged. Have some minor comments.
Can you please update the PR description with a release-note section of the new added rest API and an example of table config to enable the feature? Also add a page for this new feature in the Pinot documentation
| */ | ||
| abstract Map<String, String> select(List<String> segments, int requestId, | ||
| Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions); | ||
| Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to change this. We should make _adaptiveServerSelector protected so that it can be accessed by the subclasses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done.
| public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) { | ||
| super(tableNameWithType, brokerMetrics); | ||
| public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics, | ||
| AdaptiveServerSelector adaptiveServerSelector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Annotate with @Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| @Nullable AdaptiveServerSelector adaptiveServerSelector) { | ||
| Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); | ||
|
|
||
| List<String> serverRankList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create an extra list even when adaptive server selection is not enabled. Let's avoid that by wrapping the new logic under the if check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| continue; | ||
| } | ||
|
|
||
| String selectedServer = enabledInstances.get(requestId++ % enabledInstances.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this in the else clause
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| @Nullable String retentionPeriod) { | ||
| try { | ||
| LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType); | ||
| LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| private final ConcurrentHashMap<ServerRoutingInstance, ServerResponse> _responseMap; | ||
| private final CountDownLatch _countDownLatch; | ||
| private final long _maxEndTimeMs; | ||
| private final long _queryTimeoutMs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Rename to _timeoutMs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| @Override | ||
| public long getTimeOutMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) Rename to getTimeoutMs()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@Jackie-Jiang Thanks for the comments. I've addressed them in #9462. Please review. I'll add the following soon.
|
* Implementation for AdaptiveServerSelection feature * Add tests for Adaptive Server Selection feature * Fix flaky unit test * Address review comments * Fix test and indentation.
|
|
||
| if (_autoDecayWindowMs > 0) { | ||
| // Create a timer to automatically decay the average if updates are not performed in the last _autoDecayWindowMs. | ||
| Timer timer = new Timer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use a ScheduledThreadPoolExecutor instead of a Timer for tracking EWMA? We were testing out this change on a fairly large cluster (200 hosts) and noticed a large uptick in JVM thread count when turning on stats collection. I think expected thread growth equals # of servers * 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll open a PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Thanks @dmyang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future reference - timer fix is merged in PR #9697
label = feature
OSS issue #8618
Design Doc
PR Description
Based on the segment assignment strategies used, we use one of the following instance selectors today:
Irrespective of which instance selection mechanism is used, we use a round robin approach. The round-robin approach is not sensitive to changes in the system like server slowness, GC on servers, etc. Hence we could end up overloading some servers especially in cases where they are underperforming thereby leading to higher latencies for queries.
This PR provides the implementation for the Adaptive Server Selection feature at brokers. When a query is received, we could use one of the implemented Adaptive Selectors (NumInFlightRequests, Latency, Hybrid) to efficiently route queries to the best server instead of using a naive round robin approach.
Test Results
Some of the important and interesting test results are documented here