Skip to content

Conversation

@vvivekiyer
Copy link
Contributor

@vvivekiyer vvivekiyer commented Aug 31, 2022

label = feature
OSS issue #8618
Design Doc

PR Description

Based on the segment assignment strategies used, we use one of the following instance selectors today:

  • BalancedInstanceSelector
  • ReplicaGroupInstanceSelector
  • StrictReplicaGroupInstanceSelector

Irrespective of which instance selection mechanism is used, we use a round robin approach. The round-robin approach is not sensitive to changes in the system like server slowness, GC on servers, etc. Hence we could end up overloading some servers especially in cases where they are underperforming thereby leading to higher latencies for queries.

This PR provides the implementation for the Adaptive Server Selection feature at brokers. When a query is received, we could use one of the implemented Adaptive Selectors (NumInFlightRequests, Latency, Hybrid) to efficiently route queries to the best server instead of using a naive round robin approach.

Test Results

Some of the important and interesting test results are documented here

@vvivekiyer vvivekiyer changed the title Adaptive Server Selection to improve Query Processing Resiliency [WIP] Adaptive Server Selection to improve Query Processing Resiliency Aug 31, 2022
@vvivekiyer vvivekiyer force-pushed the AdaptiveServerSelectionOSS branch from a5a3cfe to d975dbf Compare August 31, 2022 21:36
@codecov-commenter
Copy link

codecov-commenter commented Aug 31, 2022

Codecov Report

Merging #9311 (3e84a61) into master (1c9528c) will decrease coverage by 8.41%.
The diff coverage is 52.38%.

@@             Coverage Diff              @@
##             master    #9311      +/-   ##
============================================
- Coverage     69.73%   61.31%   -8.42%     
+ Complexity     5017     4563     -454     
============================================
  Files          1873     1885      +12     
  Lines         99598   100958    +1360     
  Branches      15163    15357     +194     
============================================
- Hits          69453    61904    -7549     
- Misses        25217    34368    +9151     
+ Partials       4928     4686     -242     
Flag Coverage Δ
integration1 25.91% <13.75%> (-0.23%) ⬇️
integration2 ?
unittests1 67.13% <77.35%> (+0.07%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...e/pinot/broker/api/resources/PinotBrokerDebug.java 82.75% <0.00%> (-2.96%) ⬇️
...routing/adaptiveserverselector/HybridSelector.java 0.00% <0.00%> (ø)
...outing/adaptiveserverselector/LatencySelector.java 0.00% <0.00%> (ø)
...adaptiveserverselector/NumInFlightReqSelector.java 0.00% <0.00%> (ø)
...instanceselector/ReplicaGroupInstanceSelector.java 0.00% <0.00%> (-100.00%) ⬇️
...ceselector/StrictReplicaGroupInstanceSelector.java 0.00% <0.00%> (-92.43%) ⬇️
...org/apache/pinot/core/transport/QueryResponse.java 100.00% <ø> (ø)
...va/org/apache/pinot/spi/utils/CommonConstants.java 25.00% <0.00%> (-2.70%) ⬇️
...ting/instanceselector/InstanceSelectorFactory.java 37.50% <33.33%> (-50.00%) ⬇️
...eserverselector/AdaptiveServerSelectorFactory.java 38.09% <38.09%> (ø)
... and 512 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@vvivekiyer vvivekiyer force-pushed the AdaptiveServerSelectionOSS branch from d975dbf to 7faa610 Compare September 1, 2022 00:12
@vvivekiyer vvivekiyer changed the title [WIP] Adaptive Server Selection to improve Query Processing Resiliency Adaptive Server Selection to improve Query Processing Resiliency Sep 1, 2022
@vvivekiyer vvivekiyer marked this pull request as ready for review September 1, 2022 04:46
@siddharthteotia siddharthteotia changed the title Adaptive Server Selection to improve Query Processing Resiliency Adaptive Server Selection Sep 1, 2022
@vvivekiyer vvivekiyer force-pushed the AdaptiveServerSelectionOSS branch from 32bf5a3 to 9178dd9 Compare September 6, 2022 18:20
ServerRoutingInstance serverRoutingInstance = entry.getKey();
ServerChannels serverChannels = serverRoutingInstance.isTlsEnabled() ? _serverChannelsTls : _serverChannels;
try {
_serverRoutingStatsManager.recordStatsAfterQuerySubmission(requestId, serverRoutingInstance.getInstanceId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for asking again if you clarified this in the internal review. Shouldn't this be after line 126 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified with comments in code.

@vvivekiyer vvivekiyer force-pushed the AdaptiveServerSelectionOSS branch 2 times, most recently from 25aba36 to f80ff7e Compare September 20, 2022 23:37

// No stats for this server. That means this server hasn't received any queries yet.
if (score == null) {
int randIdx = _random.nextInt(serverCandidates.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) How about if we simply pick the server that has score == null? This gives more determinism and is less likely to create thundering herd problem, because the next call will also pick a null (if any). The current solution is fine for not too large serverCandidates.size(). The expectation of initializing all servers is size*\sum_{j=1...size} 1/j (see this). My only concern is that if the serverCandidates.size() could get large for balanced assignment, and we become unfortunate with random numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my initial testing, I observed thundering herd problem when selecting the server with score = null (Thundering herd happened because we routed a great number of queries to the server with the null score before any stats was updated). But for now this should be fine because - We could route in a biased fashion initially. However, we will eventually recover?

I understand that this can cause problems with large number of servers. Let's discuss offline on how to solve this (in a subsequent PR).

Let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... are you saying that after picking a score == null but before the stats is even updated by the recordStatsAfterQuerySubmission() later there can be multiple other queries routed to the same server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. One solution could be pick a random server among the servers that have null score.

@vvivekiyer vvivekiyer force-pushed the AdaptiveServerSelectionOSS branch from f80ff7e to 7ec0a47 Compare September 21, 2022 05:54
}

public void updateNumInFlightRequestsForResponseArrival() {
--_numInFlightRequests;
Copy link
Contributor

@jasperjiaguo jasperjiaguo Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intentionally not updating _inFlighRequestsEMA here? If a bunch of queries are fired together, the _inFlighRequestsEMA will still remain pretty high after the responses are back, until the next query gets routed right? Are we expecting this behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is intentional. The reasoning is that we want a snapshot of the queue at regular intervals and just updating at querySubmit should be good enough. In the example mentioned, the ema will decrease if one of the two events happen:

  1. The next query after burst is submitted.
  2. Autodecay kicks in.

Copy link
Contributor

@jasperjiaguo jasperjiaguo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@siddharthteotia siddharthteotia merged commit 65f1f22 into apache:master Sep 21, 2022
@Jackie-Jiang Jackie-Jiang added the release-notes Referenced by PRs that need attention when compiling the next release notes label Sep 21, 2022
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't get a chance to review it before merged. Have some minor comments.

Can you please update the PR description with a release-note section of the new added rest API and an example of table config to enable the feature? Also add a page for this new feature in the Pinot documentation

*/
abstract Map<String, String> select(List<String> segments, int requestId,
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions);
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change this. We should make _adaptiveServerSelector protected so that it can be accessed by the subclasses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) {
super(tableNameWithType, brokerMetrics);
public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics,
AdaptiveServerSelector adaptiveServerSelector) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Annotate with @Nullable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Nullable AdaptiveServerSelector adaptiveServerSelector) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));

List<String> serverRankList = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create an extra list even when adaptive server selection is not enabled. Let's avoid that by wrapping the new logic under the if check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

continue;
}

String selectedServer = enabledInstances.get(requestId++ % enabledInstances.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in the else clause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Nullable String retentionPeriod) {
try {
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final ConcurrentHashMap<ServerRoutingInstance, ServerResponse> _responseMap;
private final CountDownLatch _countDownLatch;
private final long _maxEndTimeMs;
private final long _queryTimeoutMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Rename to _timeoutMs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Override
public long getTimeOutMs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Rename to getTimeoutMs()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@vvivekiyer
Copy link
Contributor Author

@Jackie-Jiang Thanks for the comments. I've addressed them in #9462. Please review.

I'll add the following soon.

  1. Release-note in PR description.
  2. Example of configs to use this feature.
  3. Pinot docs.

@vvivekiyer vvivekiyer deleted the AdaptiveServerSelectionOSS branch October 2, 2022 07:07
61yao pushed a commit to 61yao/pinot that referenced this pull request Oct 3, 2022
* Implementation for AdaptiveServerSelection feature

* Add tests for Adaptive Server Selection feature

* Fix flaky unit test

* Address review comments

* Fix test and indentation.
@vvivekiyer
Copy link
Contributor Author


if (_autoDecayWindowMs > 0) {
// Create a timer to automatically decay the average if updates are not performed in the last _autoDecayWindowMs.
Timer timer = new Timer();
Copy link

@dmyang dmyang Oct 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use a ScheduledThreadPoolExecutor instead of a Timer for tracking EWMA? We were testing out this change on a fairly large cluster (200 hosts) and noticed a large uptick in JVM thread count when turning on stats collection. I think expected thread growth equals # of servers * 2.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Thanks @dmyang.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference - timer fix is merged in PR #9697

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants