-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Streamed segment download & untar with rate limiter to control disk usage #8753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streamed segment download & untar with rate limiter to control disk usage #8753
Conversation
featurefeature
f888915 to
713de1c
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #8753 +/- ##
============================================
- Coverage 69.80% 69.61% -0.20%
- Complexity 4659 4680 +21
============================================
Files 1741 1805 +64
Lines 91476 94006 +2530
Branches 13677 13992 +315
============================================
+ Hits 63854 65441 +1587
- Misses 23199 24019 +820
- Partials 4423 4546 +123
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
9f7f905 to
ef72a3e
Compare
d3ff77f to
9572b3e
Compare
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcher.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why this change ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we need to get the FileDescriptor from the FileOutputStream in streamCopyWithRateLimiter. Creating a FileOutputStream from outputFile should be safe here.
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/http/HttpClient.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to do a sanity check / validation when streaming untar finishes to ensure the download was correct / complete ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. The untar library will fail on checksum and report IOException if files are corrupted during transmission
featurefeature
1c2751c to
c4288ff
Compare
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/TarGzCompressionUtils.java
Show resolved
Hide resolved
| fd.sync(); | ||
| } | ||
| } | ||
| return count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log one more message to show that the copy is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is actually called per file. So I removed all the logging here as it will be too much. We already have similar logging in private File downloadAndStreamUntarRateLimit(String segmentName, SegmentZKMetadata zkMetadata, File tempRootDir, long maxDownloadRateInByte) which should be enough
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
Outdated
Show resolved
Hide resolved
...-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
Outdated
Show resolved
Hide resolved
...-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManagerParams.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
Outdated
Show resolved
Hide resolved
feature|
@jasperjiaguo - let's add a brief summary of the improvement numbers we saw from internal performance test on prod data. |
jackjlli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor but LGTM. These can be addressed in later PR.
| public static long copyWithRateLimiter(InputStream inputStream, FileOutputStream outputStream, | ||
| long maxStreamRateInByte) | ||
| throws IOException { | ||
| Preconditions.checkState(inputStream != null, "inputStream is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions.checkNotNull() might be better. Same for the following line.
| }); | ||
| } | ||
|
|
||
| public File fetchUntarSegmentToLocalStreamed(URI uri, File dest, long rateLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename the last param here. Same for other method.
Also, put a @Override annotation here.
| @Override | ||
| public File fetchUntarSegmentToLocalStreamed(URI downloadURI, File dest, long maxStreamRateInByte) | ||
| throws Exception { | ||
| // Create a RoundRobinURIProvider to round robin IP addresses when retry uploading. Otherwise may always try to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of the logic here is the same as the one in fetchSegmentToLocal() method. It'd be good to check if we can extract the same logic to avoid duplicating the code.
| } | ||
|
|
||
| public TableDataManagerParams(InstanceDataManagerConfig instanceDataManagerConfig) { | ||
| _maxParallelSegmentDownloads = instanceDataManagerConfig.getMaxParallelSegmentDownloads(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it to sth like:
this(instanceDataManagerConfig.getMaxParallelSegmentDownloads(), instanceDataManagerConfig.isStreamSegmentDownloadUntar(), instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, rename the variable streamSegmentDownloadUntarRateLimit.
| return _streamSegmentDownloadUntarRateLimitBytesPerSec; | ||
| } | ||
|
|
||
| public void setStreamSegmentDownloadUntar(boolean streamSegmentDownloadUntar) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might not need all the setter methods here.
| throws Exception { | ||
| File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); | ||
| FileUtils.forceMkdir(tempRootDir); | ||
| if (_isStreamSegmentDownloadUntar && zkMetadata.getCrypterName() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that mean we cannot stream & untar if the segment is encrypted? If so, specify it to one comment in this method here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can this if statement be put into the try block, so that you don't have to duplicate the code for deleting tempRootDir.
| FileUtils.moveDirectory(untaredSegDir, indexDir); | ||
| return indexDir; | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to move segment: {} of table: {}", segmentName, _tableNameWithType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be good to print out what exact exception is thrown in this error message, if it doesn't get printed somewhere in the upper caller above.
Thanks, will address these in a follow-up PR |
…sage (apache#8753) * Add streamed download untar with ratelimit for segment download and refresh * address comments * address comments * address comments * address comments * address comments * address comments * address comments
|
@jasperjiaguo This is great enhancement. I labeled the PR with |
@Jackie-Jiang added release notes for config. Will add them to documentation later. |
| * RateLimit limits the untar rate | ||
| * <p>For security reason, the untarred files must reside in the output directory. | ||
| */ | ||
| public static List<File> untarRateLimit(InputStream inputStream, File outputDir, long rateLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the rateLimit argument, can we include the units in the argument name to make code more readable?
| } | ||
| } | ||
|
|
||
| public static long copyLarge(InputStream inputStream, FileOutputStream outputStream, long rateLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert that rateLimit is not NO_LIMIT?
| // Temporary exception | ||
| // 404 is treated as a temporary exception, as the downloadURI may be backed by multiple hosts, | ||
| // if singe host is down, can retry with another host. | ||
| _logger.warn("Got temporary error status code: {} while downloading segment from: {} to: {}", statusCode, uri, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it useful to log the retry attempt here?
| throws Exception { | ||
| File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); | ||
| FileUtils.forceMkdir(tempRootDir); | ||
| if (_isSegmentDownloadUntarStreamed && zkMetadata.getCrypterName() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so seems possible for me to configure streaming and encryption which is not a valid config but we do not log and just default to non streaming, should we log that in the else below?
We found that during the segment push of large and daily-refresh dataset the disk usage is pretty spiky. It will basically max out the disk throughput (to a few GB/s for SSD), during which the query latency can be impacted. However, the average disk throughput is just at a few hundred MB/s level in minute granularity, which means if we can make better use of the disk the contention on bandwidth may be mitigated. Meanwhile the current offline->online process for an offline segment is basically
download -> untar -> MMAP -> download -> untar -> …. This can potentially be improved in the following ways:If we can combine
download -> untarand directly write the untarred segments to disk we can save some unnecessary r/w.If we can cap the download speed on top of limiting the number of segments download in parallel (see Set max number of parallel segment downloads per table in pinot-server #8694). We may may improve the query performance during refresh.
Test Table (2.7GB/segment, 114 segments):

CPU profiling using Top/ODP given the same downloading time
Disk profiling using PCP in 50ms granularity given the same downloading time:

IO size captured with PCP w.r.t. different buffer size:

Findings & Conclusions:
release notes
Configs:
pinot.server.instance.segment.stream.download.untar.rate.limit.bytes.per.secKey of streamed server segment download-untar rate limit
limit the rate to write download-untar stream to disk, in bytes
-1 for no disk write limit, 0 for limit the writing to min(untar, download) rate
pinot.server.instance.segment.stream.download.untarKey of whether to use streamed server segment download-untar