-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add new parallel merge task executor for parallel actions within a single merge action #13190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new parallel merge task executor for parallel actions within a single merge action #13190
Conversation
…ngle merge action
…ecutor-to-index-writer-config
…ecutor-to-index-writer-config
…ecutor-to-index-writer-config
|
@dweiss @mikemccand I am currently iterating on how to best make Right now, it is all assumed that the interactions with all these are from the same thread, this obviously breaks when we add intra-merge parallelism. I was hoping y'all had some thoughts on how these should all work together with intra-merge parallelism? Is it enough to make these classes threadsafe and remove assertions? Do we want to somehow figure out if the ultimate calling thread was a MergeThread? (This is possible, but will require some wrangling on the tp-executor to keep track of which thread belongs where...) |
benwtrent
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some high level concerns and thoughts I had on the new changes.
@dweiss || @mikemccand y'all might be interested.
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
Outdated
Show resolved
Hide resolved
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
Outdated
Show resolved
Hide resolved
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java
Outdated
Show resolved
Hide resolved
| if (bytes > currentMinPauseCheckBytes.get()) { | ||
| shouldPause.set(true); | ||
| currentMinPauseCheckBytes.set(rateLimiter.getMinPauseCheckBytes()); | ||
| localBytesSinceLastPause.set(bytes); | ||
| return 0; | ||
| } else { | ||
| shouldPause.set(false); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured if another thread reached this throttling and is paused & reset the counter, than this current thread shouldn't pause. Throttle tracking is effectively global and each thread uses the same localBytesSinceLastPause, so the work done by another thread could cause a current thread to pause. Or another thread pausing could allow this thread to continue work.
| final double secondsToPause = (bytes / 1024. / 1024.) / rate; | ||
|
|
||
| AtomicLong curPauseNSSetter = new AtomicLong(); | ||
| lastNS.updateAndGet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to lock here or do an updateAndGet. lastNS is only updated in this particular method, so using atomics seemed better to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we used a lock for this whole method, then this may help pause all threads that share the same rate limiter, and effectively apply the rate limit across all threads that run the same merge, rather than applying the rate limit on a per-thread basis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the limits are on a per RateLimitedIndexOutput bases, not necessarily per thread. Since all RateLimitedIndexOutput constructed share the same rate-limiter, it might pause all threads?
But I don't think so, as each process could have its own RateLimitedIndexOutput which is tracking its own bytes locally and while the combined total of all threads might exceed the throttling limit, the output isn't really throttled until a single thread exceeds the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the output isn't really throttled until a single thread exceeds the limit.
This limitation feels ok to me, let's just add a comment about it? Intuitively, the write rate at merge time is rather bursty, so if the sum of the bytes written by all threads running this merge exceeds the limit, then there would often be one thread that exceeds the limit on its own as well.
| // Time we should sleep until; this is purely instantaneous | ||
| // rate (just adds seconds onto the last time we had paused to); | ||
| // maybe we should also offer decayed recent history one? | ||
| long targetNS = last + (long) (1000000000 * secondsToPause); | ||
| long curPauseNS = targetNS - curNS; | ||
| // We don't bother with thread pausing if the pause is smaller than 2 msec. | ||
| if (curPauseNS <= MIN_PAUSE_NS) { | ||
| // Set to curNS, not targetNS, to enforce the instant rate, not | ||
| // the "averaged over all history" rate: | ||
| curPauseNSSetter.set(0); | ||
| return curNS; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its possible that other threads request for very short pauses, thus all updating lastNS to their respective curNS. I honestly don't know if this is OK or not. But to handle this case, I decided to use updateAndGet to ensure if lastNS is changed, we account for it and update our pause request accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to reason about this as well.
I'm wondering about keeping maybePause as-is and making MergeRateLimiter#pause synchronized, essentially trying to make the pausing logic behave as if threads were writing bytes sequentially rather than in parallel. (I'm considering making pause synchronized rather than maybePause so that System.nanoTime() is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm trying to catch up here and will probably ask a bunch of stupid questions :) Thank you @benwtrent for persisting in this hairy logic!
(I'm considering making
pausesynchronized rather thanmaybePauseso thatSystem.nanoTime()is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)
Couldn't we move the curNS = System.nanoTime() inside the locked/updateAndGet'd part of maybePause? I like the thread safety that the updateAndGet is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge. Also, I don't expect the added sync required here will hurt performance much: Lucene is doing much work to produce these bytes being written already, so conflict should be rare-ish.
Also, if we make pause sync'd, it will cause scary looking jstack thread dumps? Making it look like one thread is sleeping while holding a lock and blocking other threads (which indeed is what it'd be doing). Versus the thread stacks we'd see w/ the current approach that make it quite clear that all N threads are intentionally stalling in mergeProgress.pauseNanos? It would reduce the horror reaction we'd see peeking at thread dumps maybe ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we move the
curNS = System.nanoTime()inside the locked/updateAndGet'd part of maybePause?
Yes, we could, otherwise if there are conflicts between threads (thus having to do the updateAndGet more than once), the curNS gets further in the past.
is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge
That is not what it is doing. It is ensuring that the throttling timestamps stay in sync :/. Bytes being throttled are still per RateLimitedIndexOutput, which means they are per thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge
That is not what it is doing. It is ensuring that the throttling timestamps stay in sync :/. Bytes being throttled are still per RateLimitedIndexOutput, which means they are per thread.
Ahhhhhh gotchya. Each RateLimitedIndexOutput tracks its own bytesSinceLastPause and then invokes pause with its own private byte count, OK. So, yes, with the current approach we rate limit MB/sec per thread, not per merge. I think that's fine. Best effort!
|
I understand that we need to make changes to account for the fact that multiple threads may be contributing to the same merge concurrently, but I would not expect |
The MT safety isn't really around the bytes. I agree with your assessment there. But its more around the throttling. The way I read the class is that different threads could be writing to different files but both could experience throttling and we should account for that. |
|
I don't think rate limiting has to be exact - it's already allowed to be temporarily exceeded (see comments in writeBytes on RateLimitedIndexOutput). |
| @Override | ||
| public void close() { | ||
| sync(); | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call super.close() as well, to close MergeScheduler's SameThreadExecutorService which ConcurrentMergeScheduler uses for small merges?
It's not a big deal not to close it, but it would help catch if we ever send tasks to this executor after closing.
| final double secondsToPause = (bytes / 1024. / 1024.) / rate; | ||
|
|
||
| AtomicLong curPauseNSSetter = new AtomicLong(); | ||
| lastNS.updateAndGet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the output isn't really throttled until a single thread exceeds the limit.
This limitation feels ok to me, let's just add a comment about it? Intuitively, the write rate at merge time is rather bursty, so if the sum of the bytes written by all threads running this merge exceeds the limit, then there would often be one thread that exceeds the limit on its own as well.
| // Time we should sleep until; this is purely instantaneous | ||
| // rate (just adds seconds onto the last time we had paused to); | ||
| // maybe we should also offer decayed recent history one? | ||
| long targetNS = last + (long) (1000000000 * secondsToPause); | ||
| long curPauseNS = targetNS - curNS; | ||
| // We don't bother with thread pausing if the pause is smaller than 2 msec. | ||
| if (curPauseNS <= MIN_PAUSE_NS) { | ||
| // Set to curNS, not targetNS, to enforce the instant rate, not | ||
| // the "averaged over all history" rate: | ||
| curPauseNSSetter.set(0); | ||
| return curNS; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard for me to reason about this as well.
I'm wondering about keeping maybePause as-is and making MergeRateLimiter#pause synchronized, essentially trying to make the pausing logic behave as if threads were writing bytes sequentially rather than in parallel. (I'm considering making pause synchronized rather than maybePause so that System.nanoTime() is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)
|
So, thinking about this more as I fell asleep. This is how throttling will work as it is in this PR:
This makes us throttle less often given how many bytes are actually written. Maybe this is OK as the throttling logic could "catch up" if things continue to get backed up? Throttling has always been a "best effort" thing anyways. Even if we somehow made the RateLimiter throttle every thread (via some global semaphore or something...), we would still only throttle once one of the multiple threads hit the byte throttling limit. IMO, either we:
Selfishly, I wish for the second option as its the simplest. |
|
Phew, it took me two hours of strongly caffeinated time to catch up on this! Thank you @benwtrent. What an awesome change, modernizing Lucene's merge concurrency so that intra-merge (just HNSW, but maybe other index parts soon) concurrency is enabled by default. (Separately, the nightly benchy had a silly off-by-one bug preventing it from resuming after you reverted the previous PR ... I think I've fixed that bug now, and kicked off a one-off nightly benchy run that will hopefully finish late today). Some high level questions (sorry if these were already asked/answered):
Finally, I worry that the rate-limiter's simplistic "instantaneous" measure is making it effectively super buggy (there is a TODO about this), because merging that does a lot of CPU work and then writes a lot of bytes will be effectively throttled to far below the target |
+1 to keep with the second approach. It is best effort, and, the "instant vs burst-bucket" bug above is yet more weirdness about it :) Another thought on the "instant vs burst-bucket" bug: I suppose in some case, e.g. bulk merging of stored fields (no deletions, so we are just copying byte blocks), there is very little CPU and tons of IO and in cases like that, the instant vs burst-bucket approaches would be essentially the same (both wind up throttling based on the instant rate since the burst bucket is quickly exhausted). I'll open a spinoff ... |
I'm curious what you have in mind. Are you considering throttling merges purely based on merge concurrency then? E.g. slow indexing -> single merge at a time, heavy indexing -> many parallel merges? This wouldn't sound crazy to me as I don't recally seeing indexing/merging saturate I/O. |
Nevermind, I see that you added more details on #13193. |
|
If y'all are good with it, I will merge this tomorrow. |
|
@benwtrent FYI I left a few minor comments: on |
|
@jpountz added comments to the creation of the ratelimiter in CMS and the limiter itself. Also updated the close interaction. |
jpountz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pushing on this change, I like it. The fact that the extra merge concurrency may not starve merging from threads is good. I'm curious how the nightly benchmarks will react to it, given the high number of CPU cores of beast3.
One question on my mind is whether this change should make us update the default number of merging threads that ConcurrentMergeScheduler configures (in a follow-up issue/PR).
| public void close() { | ||
| sync(); | ||
| public void close() throws IOException { | ||
| super.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we try to close as cleanly as possible in the event of an exception, e.g. doing something like below
IOUtils.close(
this::sync,
super::close,
intraMergeExecutor == null ? null : intraMergeExecutor::shutdown);
Nightly benchy is still forcing its own thread pool into the HNSW indexing Codec component (
+1 to explore this. Nightly benchy hardwires the maxMergeCount=16, maxThreadCount=12. Maybe they should be higher :) But also note that the nightly benchy does not wait for merges on |
mikemccand
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @benwtrent -- what an awesome improvement in Lucene's default concurrency!
Do you remember how you came up with these numbers? Is there some reasoning behind these numbers, or do they come from experimentation? I'm also curious if you know where the current |
For now, you still need to tell the codec how many workers are available for it to use. But you should be able to pass |
…ngle merge action (#13190) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: #12740 Relates to: #9626 This is a take 2 of: #13124
|
Elasticsearch CI has uncovered an issue related to this change. The PerFieldDocValuesFormat and PerFieldPostingsFormat, which mutate and reset the fieldInfos of the mergeState while executing a merge. Consequently, other ongoing merges may fail to access some fieldInfos. lucene/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java Lines 150 to 158 in e278833
lucene/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldPostingsFormat.java Lines 196 to 214 in 04f335a
|
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset the fieldInfos of the mergeState during merges. Consequently, other running merge sub-tasks may fail to see some fieldInfos. This was problematic since we introduced concurrency for merge sub-tasks. Relates #13190
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset the fieldInfos of the mergeState during merges. Consequently, other running merge sub-tasks may fail to see some fieldInfos. This was problematic since we introduced concurrency for merge sub-tasks. Relates #13190
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset the fieldInfos of the mergeState during merges. Consequently, other running merge sub-tasks may fail to see some fieldInfos. This was problematic since we introduced concurrency for merge sub-tasks. Relates #13190
|
It looks like this awesome change was backported for 9.11.0? I'll add the milestone. So hard to remember to set the milestones on our issues/PRs... |
|
@mikemccand oh dang, I haven't been doing that. Thanks for picking up my slack! |
…ngle merge action (apache#13190) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: apache#12740 Relates to: apache#9626 This is a take 2 of: apache#13124
|
Curious, will this change improve the concurrent merge time of non-vector index like posting lists and point values? |
This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.
Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.
Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.
Relates to: #12740
Relates to: #9626
This is a take 2 of: #13124