-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core] limit parallelly read file memory usage, extract some methods #1072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] limit parallelly read file memory usage, extract some methods #1072
Conversation
| // however entry.bucket() was computed against the old numOfBuckets | ||
| // and thus the filtered manifest entries might be empty | ||
| // which renders the bucket check invalid | ||
| if (filterByBucket(file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'if' statement can be simplified
| } | ||
|
|
||
| private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan( | ||
| Function<List<ManifestFileMeta>, List<T>> processor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readManifestFile?
| doPlan( | ||
| // how to process entry files | ||
| entries -> | ||
| entries.parallelStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also put these logical into doPlan?
Only filterByStats is special, we can just instanceof ManifestEntry in filterByStats?
|
|
||
| private <T extends AbstractManifestEntry> Pair<Long, List<T>> doPlan( | ||
| Function<List<ManifestFileMeta>, List<T>> processor, | ||
| Function<T, Boolean> postFilterProcessor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need this one, postFilter can be applied to SimpleManifestEntry too.
If there is level filter, throw exception.
2f01378 to
eed1bc3
Compare
|
fixed comment |
| if (entry instanceof ManifestEntry) { | ||
| return filterByStats((ManifestEntry) entry); | ||
| } else { | ||
| throw new RuntimeException("only complete manifest entry could be filter by stats"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return true;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done this
| // reduce memory usage by batch iterable process, the cached result in memory will be 2 * | ||
| // queueSize | ||
| public static <T, U> Iterable<T> parallelismBatchIterable( | ||
| Function<List<U>, List<T>> processor, List<U> input, int queueSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default value of queueSize can be COMMON_IO_FORK_JOIN_POOL thread number * 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done this! set queueSize to FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism() * 2
| if (index < activeList.size()) { | ||
| next = activeList.get(index++); | ||
| if (index == activeList.size()) { | ||
| activeList = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You implement a wrong iterator.
Test should cover multiple invoking for hasNext .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done this. added tests for multiple invoking hasNext method
| activeList = batch.get(); | ||
| if (stack.size() > 0) { | ||
| batch = | ||
| CompletableFuture.supplyAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to produce more elements when the consumer not finish this batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done this
ece57b0 to
adf8b8e
Compare
| int size = input.size(); | ||
| int num = size / queueSize; | ||
|
|
||
| for (int i = 0; i < num; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lists.partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| if (entry instanceof ManifestEntry) { | ||
| return filterByStats((ManifestEntry) entry); | ||
| } | ||
| return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comments here: filterByStats is an action that is completed as much as possible and does not have an impact if it is not done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
adf8b8e to
0faf3b7
Compare
|
fixed comment |
| public static <T, U> Iterable<T> parallelismBatchIterable( | ||
| Function<List<U>, List<T>> processor, List<U> input) { | ||
| // default queueSize | ||
| int queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism() * 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe default queueSize should be smaller , After testing it in the outdoor environment ,it will oom and set smaller than 80 is Ok, how about not *2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Introduce an option: SCAN_MANIFEST_PARALLELISM, default is none (will be COMMON_IO_FORK_JOIN_POOL.getParallelism()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done this
352903e to
bb6f277
Compare
|
fixed comment |
|
test passed in my local environment? |
| () -> processor.apply(stack.poll()), | ||
| FileUtils.COMMON_IO_FORK_JOIN_POOL) | ||
| .get(); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just
} catch (Exception e) {
throw new RuntimeException(e);
}
?
- Swallow
InterruptedExceptionis not good. - Exception message is confused,
"should never get here", IOException will go here.
bb6f277 to
f0c4135
Compare
|
|
||
| private void advanceIfNeeded() { | ||
| if ((activeList == null || index >= activeList.size()) | ||
| while ((activeList == null || index >= activeList.size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a unit test for this?
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
…methods (apache#1072)" This reverts commit 5d23c7d.
…everted code, we don't need AbstractEntry, but we still need memory control) (apache#1072)
[core] limit parallelly read file memory usage, extract some methods(#1061)
Purpose
Tests
(List UT and IT cases to verify this change)
API and Format
(Does this change affect API or storage format)
Documentation
(Does this change introduce a new feature)