Skip to content

Conversation

@jasperjiaguo
Copy link
Contributor

@jasperjiaguo jasperjiaguo commented May 20, 2022

We found that during the segment push of large and daily-refresh dataset the disk usage is pretty spiky. It will basically max out the disk throughput (to a few GB/s for SSD), during which the query latency can be impacted. However, the average disk throughput is just at a few hundred MB/s level in minute granularity, which means if we can make better use of the disk the contention on bandwidth may be mitigated. Meanwhile the current offline->online process for an offline segment is basically download -> untar -> MMAP -> download -> untar -> …. This can potentially be improved in the following ways:

  1. If we can combine download -> untar and directly write the untarred segments to disk we can save some unnecessary r/w.

  2. If we can cap the download speed on top of limiting the number of segments download in parallel (see Set max number of parallel segment downloads per table in pinot-server #8694). We may may improve the query performance during refresh.

Test Table (2.7GB/segment, 114 segments):
CPU profiling using Top/ODP given the same downloading time
Screen Shot 2022-06-09 at 4 56 22 PM

Disk profiling using PCP in 50ms granularity given the same downloading time:
Screen Shot 2022-06-09 at 4 57 07 PM

IO size captured with PCP w.r.t. different buffer size:
Screen Shot 2022-06-09 at 4 59 35 PM

Findings & Conclusions:

  • Experiment shows with Jack's change to control download parallelism we can control per table stream rate.
  • This change makes the disk write rate curve very constant even at 50ms granularity, as avoids disk saturation.
  • The streamed download-untar feature bypasses writing tarball to the disk, as will reduce the disk IO to 2/3 in the tested case.
  • ODP profiling shows the new download code has minimum CPU impact: RateLimiter: [0.583%] / FileDescriptor#sync: [0.583%], and top profiling also indicates similar/lower CPU utilization.
  • The rate limiter can also effectively control the network rx rate.

release notes
Configs:
pinot.server.instance.segment.stream.download.untar.rate.limit.bytes.per.sec
Key of streamed server segment download-untar rate limit
limit the rate to write download-untar stream to disk, in bytes
-1 for no disk write limit, 0 for limit the writing to min(untar, download) rate

pinot.server.instance.segment.stream.download.untar
Key of whether to use streamed server segment download-untar

@jasperjiaguo jasperjiaguo changed the title [WIP] Streamed segment download & untar with rate limiter to control disk usage feature [WIP][Not ready for review] Streamed segment download & untar with rate limiter to control disk usage feature May 20, 2022
@jasperjiaguo jasperjiaguo marked this pull request as draft May 20, 2022 23:17
@jasperjiaguo jasperjiaguo force-pushed the limit_download_rate branch from f888915 to 713de1c Compare May 21, 2022 05:54
@codecov-commenter
Copy link

codecov-commenter commented May 21, 2022

Codecov Report

❌ Patch coverage is 23.25581% with 99 lines in your changes missing coverage. Please review.
✅ Project coverage is 69.61%. Comparing base (d1d2ddb) to head (2f0ad7e).
⚠️ Report is 5260 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/core/data/manager/BaseTableDataManager.java 17.07% 32 Missing and 2 partials ⚠️
...pinot/common/utils/fetcher/HttpSegmentFetcher.java 3.44% 28 Missing ⚠️
...ache/pinot/common/utils/TarGzCompressionUtils.java 15.78% 15 Missing and 1 partial ⚠️
...org/apache/pinot/common/utils/http/HttpClient.java 0.00% 11 Missing ⚠️
...ent/local/data/manager/TableDataManagerParams.java 70.00% 6 Missing ⚠️
...ot/common/utils/fetcher/SegmentFetcherFactory.java 0.00% 2 Missing ⚠️
...e/pinot/common/utils/FileUploadDownloadClient.java 0.00% 1 Missing ⚠️
...pinot/common/utils/fetcher/BaseSegmentFetcher.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #8753      +/-   ##
============================================
- Coverage     69.80%   69.61%   -0.20%     
- Complexity     4659     4680      +21     
============================================
  Files          1741     1805      +64     
  Lines         91476    94006    +2530     
  Branches      13677    13992     +315     
============================================
+ Hits          63854    65441    +1587     
- Misses        23199    24019     +820     
- Partials       4423     4546     +123     
Flag Coverage Δ
integration1 26.30% <12.40%> (-0.78%) ⬇️
integration2 24.99% <12.40%> (-0.69%) ⬇️
unittests1 66.36% <21.25%> (+0.14%) ⬆️
unittests2 15.45% <0.00%> (+1.31%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@jasperjiaguo jasperjiaguo force-pushed the limit_download_rate branch 10 times, most recently from 9f7f905 to ef72a3e Compare May 27, 2022 06:02
@jasperjiaguo jasperjiaguo force-pushed the limit_download_rate branch 6 times, most recently from d3ff77f to 9572b3e Compare June 7, 2022 16:26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to get the FileDescriptor from the FileOutputStream in streamCopyWithRateLimiter. Creating a FileOutputStream from outputFile should be safe here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to do a sanity check / validation when streaming untar finishes to ensure the download was correct / complete ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. The untar library will fail on checksum and report IOException if files are corrupted during transmission

@jasperjiaguo jasperjiaguo changed the title [WIP][Not ready for review] Streamed segment download & untar with rate limiter to control disk usage feature Streamed segment download & untar with rate limiter to control disk usage feature Jun 8, 2022
@jasperjiaguo jasperjiaguo marked this pull request as ready for review June 8, 2022 19:04
@jasperjiaguo jasperjiaguo force-pushed the limit_download_rate branch from 1c2751c to c4288ff Compare June 8, 2022 21:39
fd.sync();
}
}
return count;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log one more message to show that the copy is done.

Copy link
Contributor Author

@jasperjiaguo jasperjiaguo Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is actually called per file. So I removed all the logging here as it will be too much. We already have similar logging in private File downloadAndStreamUntarRateLimit(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir, long maxDownloadRateInByte) which should be enough

@jasperjiaguo jasperjiaguo changed the title Streamed segment download & untar with rate limiter to control disk usage feature Streamed segment download & untar with rate limiter to control disk usage Jun 9, 2022
@siddharthteotia
Copy link
Contributor

@jasperjiaguo - let's add a brief summary of the improvement numbers we saw from internal performance test on prod data.

@siddharthteotia siddharthteotia merged commit 90fa0f9 into apache:master Jun 10, 2022
Copy link
Member

@jackjlli jackjlli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but LGTM. These can be addressed in later PR.

public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream,
long maxStreamRateInByte)
throws IOException {
Preconditions.checkState(inputStream != null, "inputStream is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkNotNull() might be better. Same for the following line.

});
}

public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the last param here. Same for other method.
Also, put a @Override annotation here.

@Override
public File fetchUntarSegmentToLocalStreamed(URI downloadURI, File dest, long maxStreamRateInByte)
throws Exception {
// Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to
Copy link
Member

@jackjlli jackjlli Jun 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of the logic here is the same as the one in fetchSegmentToLocal() method. It'd be good to check if we can extract the same logic to avoid duplicating the code.

}

public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) {
_maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it to sth like:
this(instanceDataManagerConfig.getMaxParallelSegmentDownloads(), instanceDataManagerConfig.isStreamSegmentDownloadUntar(), instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, rename the variable streamSegmentDownloadUntarRateLimit.

return _streamSegmentDownloadUntarRateLimitBytesPerSec;
}

public void setStreamSegmentDownloadUntar(boolean streamSegmentDownloadUntar) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might not need all the setter methods here.

throws Exception {
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
FileUtils.forceMkdir(tempRootDir);
if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean we cannot stream & untar if the segment is encrypted? If so, specify it to one comment in this method here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can this if statement be put into the try block, so that you don't have to duplicate the code for deleting tempRootDir.

FileUtils.moveDirectory(untaredSegDir, indexDir);
return indexDir;
} catch (Exception e) {
LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to print out what exact exception is thrown in this error message, if it doesn't get printed somewhere in the upper caller above.

@jasperjiaguo
Copy link
Contributor Author

Minor but LGTM. These can be addressed in later PR.

Thanks, will address these in a follow-up PR

dongxiaoman pushed a commit to dongxiaoman/incubator-pinot that referenced this pull request Jun 13, 2022
…sage (apache#8753)

* Add streamed download untar with ratelimit for segment download and refresh

* address comments

* address comments

* address comments

* address comments

* address comments

* address comments

* address comments
@Jackie-Jiang Jackie-Jiang added the release-notes Referenced by PRs that need attention when compiling the next release notes label Jun 13, 2022
@Jackie-Jiang
Copy link
Contributor

@jasperjiaguo This is great enhancement. I labeled the PR with release-notes, and can you please add a release-notes section to the PR description to list down the new config and metrics added in this PR? Also, can you please also help update the documentation?

@jasperjiaguo
Copy link
Contributor Author

@jasperjiaguo This is great enhancement. I labeled the PR with release-notes, and can you please add a release-notes section to the PR description to list down the new config and metrics added in this PR? Also, can you please also help update the documentation?

@Jackie-Jiang added release notes for config. Will add them to documentation later.

* RateLimit limits the untar rate
* <p>For security reason, the untarred files must reside in the output directory.
*/
public static List<File> untarRateLimit(InputStream inputStream, File outputDir, long rateLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the rateLimit argument, can we include the units in the argument name to make code more readable?

}
}

public static long copyLarge(InputStream inputStream, FileOutputStream outputStream, long rateLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert that rateLimit is not NO_LIMIT?

// Temporary exception
// 404 is treated as a temporary exception, as the downloadURI may be backed by multiple hosts,
// if singe host is down, can retry with another host.
_logger.warn("Got temporary error status code: {} while downloading segment from: {} to: {}", statusCode, uri,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it useful to log the retry attempt here?

throws Exception {
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
FileUtils.forceMkdir(tempRootDir);
if (_isSegmentDownloadUntarStreamed && zkMetadata.getCrypterName() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so seems possible for me to configure streaming and encryption which is not a valid config but we do not log and just default to non streaming, should we log that in the else below?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants