-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add new parallel merge task executor for parallel actions within a single merge action #13124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new parallel merge task executor for parallel actions within a single merge action #13124
Conversation
…ngle merge action
|
Thinking out loud: since merge schedulers already have the ability to merge concurrently (across multiple merges rather than within a merge though), it would be nice to fully encapsulate the merging concurrency there instead of having two sources of merging concurrency that are not aware of one another. I'm also keen on keeping the codec API as simple as possible and adding the executor as a member of the |
To take advantage of the executor, etc. it needs to be accessible from the scheduler. So, we would have to update the I don't know how they would ever be "aware" of one another.
I updated the PR to do just this. Its a much cleaner API. |
|
+1 to move executor away from the Codec API (altho it's me who placed them there LOL)
I like the idea, but I'm a little bit worry about the situation where the HNSW merging threads are taking most of the resource such that original CMS threads cannot be executed. As the So I think it is still better to have two separate thread pools for inter-segment merge and inner segment merge, but I wonder whether we can have a Also I heard there's a usecase where they're currently sharing one CMS with multiple IW in a same process for better resource management, this new |
lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java
Show resolved
Hide resolved
Maybe? But this doesn't seem to simplify anything for users. They will still need two things to configure (number of merges that can happen at a time, number of threads available to all those merges to take action). There is always the issue of best using resources and the potential of over-allocating the CPU cores. I don't think any finagling we do will ultimately change that. Selfishly, I really don't want to mess with the CMS code at all. Even if we switch it to use an executor of some sort, the complexities of I/O throttling & executable queuing would still all exist. |
I agree with passing some executor from the scheduler to the |
This makes me think that instead of it being a separate executor it should return a dynamic value for This would require some sort of "planning" on the merge scheduler (knowing how many are queued and how many This all implies that the configuration will actually The tricky part to me is determining 'how busy' the current merges are. The MergeScheduler could pass Another confusion is determining the total number of threads allowed for merging (inter/intra). We could default this to |
|
OK, my head is spinning a bit trying to grok the side-effects of the CMS. I think I understand most of it now. Currently we have two adjustable parameters.
Throughout the life of a merge, it can become (un)paused numerous times, this is controlled via the Now we want to add a new configuration, Since merge thread pausing only has to do with index output, I am not sure we need to add any individual thread throttling other than what’s already there. The Directory wrapper will pause/throttle all writing occurring for the merge. This is acceptable even if the merge is using multiple threads. I also think that small merges (<50MB) should never be allowed to run over multiple threads. Similar to how we never throttle those because they are so small, the benefit in latency reduction will be minuscule and we should reserve the extra thread usage to larger merges. What I am stuck on is on this:
What do you think @jpountz? One other thing we need to fix is this idea of “numWorkers” in HNSW. It seems like it should just pick optimal slices given the number of vectors it has (similar to multi-segment search stuff). Chunk itself into those slices and then be at the behest of the task executor. What say you @zhaih ? It seems weird to use |
|
Maybe some of these things are too ambitious, but ideally I'd like it to work this way.
Intra-merge concurrency would take advantage of the fact that there will sometimes be fewer active merges than threads to enable intra-merge concurrency. E.g. we could have a pool of threads for intra-merge concurrency that would try to ensure that its number of active threads is always less than or equals to Concurrent merging for vectors wants to know the number of available workers today, but maybe we can change the logic (like you suggested) to split the doc ID space into some number of slices, e.g. max(128, maxDoc / 2^16), and sequentially send these slices to |
Sorry for being so dense, I will spin a little bit on this to see what I can come up with. |
|
So the current way of HNSW concurrent merge implemented is: each worker will try to use an AtomicInteger to coordinate and only do a small batch of work (1024 documents) each time. The advantage is we are able to load balance between workers and I remember this did brings some (5-10%) performance gain when I was testing it. Maybe, instead of specify a numWorkers per merge, we can default a expected work load per thread (like 10K), and then allocate numWorkers dynamically? But still keep the current way of merge to keep the performance? One thing I'm worried about putting all things into CMS is that we're binding intra segment merge with CMS. But to my understanding using CMS means we're using background thread to merge and merge become indeterministic, such that there are still some part of users are using SMS (SerialMS) or similar thing to keep the deterministic of merging. But on the other hand the HNSW concurrent merge does not affect that aspect at all, no matter how many threads you're using it won't affect the determinism of merge result. So if we bind those two together whether we potentially prevent a part of users using the intra-segment merges? |
|
@zhaih I don't see any reason why we also cannot extend the SerlialMS and allow multiple threads per merge ran. We will have to update the base The reason for all the CMS discussion is that it is the hardest to implement correctly (to me anyways...). For SerialMS, users could provide a number and its a static executor with that number of threads.
I really don't think users should configure numWorkers or workPerThread at all. I would much prefer us supply good defaults and remove configuration. If we did the parallelism outside the codec to being with, I don't think we would have added any configurable values to the HNSW codec.
I am not thinking about binding them. I think that CMS is just the most difficult one to figure out. |
Sounds good
No I'm not suggesting that either, I'm open to let user specify or we provide default, just want to make sure the performance is not affected (by not pre-dividing the doc space into each thread). |
|
OK, I took a stab at this @jpountz latest commit has a POC. Need to test. But early performance testing shows indexing speed is better and forcemerge speed is just about the same. I am still using the numWorkers setting from HNSW to determine workers. This is mainly to prevent testing too many performance changes at a time. |
jpountz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some nitpicks but I find the change promising/exciting!
| }); | ||
| } | ||
|
|
||
| private class ScaledExecutor extends ThreadPoolExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I usually have a preference for composition over inheritance, ie. could we wrap the thread-pool executor instead of wrapping it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah no doubt.
| Long.MAX_VALUE, | ||
| TimeUnit.NANOSECONDS, | ||
| new SynchronousQueue<>()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud: for the case when tens of index writers are open in the same JVM, we may want to configure a timeout on threads in order to avoid spending too much heap on idle threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud: for the case when tens of index writers are open in the same JVM, we may want to configure a timeout on threads in order to avoid spending too much heap on idle threads?
The timeout would only apply to non-core threads. But, your point is taken, it is possible for merges to be idle for a while and we don't want threads just sitting around taking up unnecessary space.
So, I can make core always 0, and rely on the pool dynamically adding threads up to max.
| } | ||
| if (activeCount.compareAndSet(value, value + 1)) { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I read correctly, this tries to keep activeCount <= maxThreadCount. I was thinking we should try to keep activeCount <= maxThreadCount - mergeThreadCount(). Otherwise we're effectively using more than maxThreadCount for merging in total, which I find a bit surprising?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jpountz you are correct, my logic here isn't what it needs to be. I will fix it up.
| * Provides an executor for parallelism during a single merge operation. By default, this method | ||
| * returns an executor that runs tasks in the calling thread. | ||
| */ | ||
| public Executor getInterMergeExecutor(OneMerge merge) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be getIntraMergeExecutor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 inter vs intra is just as bad as affect and effect.
|
|
||
| @Override | ||
| public Executor getInterMergeExecutor(OneMerge merge) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: throw instead?
| readers, | ||
| segmentInfo, | ||
| infoStream, | ||
| parallelMergeTaskExecutor == null ? null : new TaskExecutor(parallelMergeTaskExecutor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it's a bit weird to use a class from the search package for merging (TaskExecutor). Should merging get access to the raw Executor, which is a bit more flexible (I don't know if all formats will be able to split work into a list of tasks up-front)? Vectors could still wrap inside a TaskExecutor for convenience?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit weird to use a class from the search package for merging (TaskExecutor)
+1 ...... @jpountz since now we are using TaskExecutor on both search and indexing use cases does it make sense to move it into org.apache.lucene.util package maybe?
|
ooh exciting! I left some comments in a related issue that were maybe a little clueless given all the discussion here that I missed until now. Still I'm happy about the direction this seems to be going re: moving the configuration out of codec and into merge policy, and coordinating thread usage across inter/intra segment merges |
| setMaximumPoolSize(Math.max(newMax, 1)); | ||
| setCorePoolSize(newMax); | ||
| } else { | ||
| setCorePoolSize(newMax); | ||
| setMaximumPoolSize(Math.max(newMax, 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't those two branch the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of setting the max is important.
But I am gonna change this. I think the core thread count should be 0 and we shouldn't update it at all and allow the threadpool to scale.
| synchronized (ConcurrentMergeScheduler.this) { | ||
| int max = maxThreadCount - mergeThreads.size(); | ||
| int value = activeCount.get(); | ||
| if (value < max) { | ||
| activeCount.incrementAndGet(); | ||
| isThreadAvailable = true; | ||
| } else { | ||
| isThreadAvailable = false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One major decision I made here is that CMS thread throttling knows nothing about intra-merge threads.
This means, if there are intra-merge threads running, it is possible that concurrency creeps above maxThreadCount until the intra-merge thread(s) finish(es).
Making throttling & overall CMS behavior more aware of intra-merge thread usage will be a much trickier change. I wasn't sure it was worth it. I am open to opinions on this @msokolov @jpountz .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to just make stuff up here since I don't know this code well at all -- but I think I was assuming there was a ThreadPoolExecutor somewhere that was doling out threads for both of these operations. Now I'm realizing perhaps that's not the case and CMS is doing all that tracking on its own. I wonder if we should open a separate issue to migrate CMS to use JDK's thread pool abstractions? It could make some of this easier to handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benwtrent ++ There is a risk of letting bigger merges starve smaller merges from threads otherwise, and I'm not sure how we could fix it.
jpountz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the change a lot.
Currently this is only used by vectors. Could we also take advantage of this new executor to paralellize at the data structure level and submit separate tasks for postings, points, norms, doc values, etc. from SegmentMerger.merge? Earlier experiments suggested that it did not help much before there typically is one bottleneck but this would be good dogfooding for this change?
| synchronized (ConcurrentMergeScheduler.this) { | ||
| int max = maxThreadCount - mergeThreads.size(); | ||
| int value = activeCount.get(); | ||
| if (value < max) { | ||
| activeCount.incrementAndGet(); | ||
| isThreadAvailable = true; | ||
| } else { | ||
| isThreadAvailable = false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benwtrent ++ There is a risk of letting bigger merges starve smaller merges from threads otherwise, and I'm not sure how we could fix it.
| * returns `null` indicating that there is no parallelism during a single merge operation. | ||
| */ | ||
| public Executor getIntraMergeExecutor(OneMerge merge) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about returning SameThreadExecutor by default instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that certain things might actually be slower trying to do parallel work when there aren't any threads to be had. I suppose we will end up in this situation when all the merge threads are actually being used by other merges...
I need to benchmark this with vectors to see if things are measurably worse.
|
|
||
| private void updatePoolSize() { | ||
| executor.setMaximumPoolSize(Math.max(1, maxThreadCount)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like we don't actually need this since we already have control on the thread pool size via maxThreadCount. What about setting a high max thread count in the ctor and never updating it (ie. making it a "cached" executor rather than a "scaled" executor).
|
@jpountz ok, that naive attempt failed as norms & terms apparently need to be merged in order (one or the other would fail due to missing files...). I am not sure if this is true with other things. But when I adjusted for this (latest commit) I did see a speed improvement. My test was over IndexGeoNames, I did no segment merging and flushed every 12MB to ensure I got many segments. I also set CMS merge threads to Baseline forcemerge(1): 17793 ms |
| }); | ||
| } | ||
|
|
||
| private class ScaledExecutor implements Executor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should it be called CachedExecutor now?
|
|
||
| @Override | ||
| public void execute(Runnable command) { | ||
| assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expect this assertion to no longer be valid, since SegmentMerger may fork into this executor for vectors, and then the task for vectors may want to further fork into a separate thread? So the caller may be either a MergeThread, or a thread from the wrapper thread pool?
| mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); | ||
| } | ||
|
|
||
| mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a parallel task that handles norms + terms so that the order is respected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I added a commit that does this. The candidate merge time is now:
forceMerge took 4965 ms
vs. baseline (i re-ran to make sure)
forceMerge took 15783 ms
So, 3x faster? Seems pretty good. The major downside is that now memory usage on merge might increase as we are potentially doing all this merge activity at the same time. If we are cool with that, this seems like a nice speed improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably, the worst-case scenario is not much worse than if you have many concurrent merges, so I think it's fine.
zhaih
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very exciting PR!
| TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions); | ||
| } | ||
|
|
||
| private class CachedExecutor implements Executor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have some simple javadoc explaining what this executor is doing? E.g. will use a spare thread if it is within configured max thread and will run directly on caller thread if we have no extra thread available?
…ngle merge action (#13124) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: #12740 Relates to: #9626
|
This looks like a great change! ... Lucene finally catching up to modern concurrency :) I have not looked closely yet. But nightly benchy is upset, prolly related to this, when running |
|
Ah, that is likely due to this change. I bet throttling is occurring but in one of the other threadpool threads. I didn't override the thread construction, so maybe that is enough? I can look more closely next week |
|
I am going to revert the change and open a new PR for iterating a fix. With this commit, more than one thread could be writing and that needs to be accounted for. |
…ngle merge action (#13190) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: #12740 Relates to: #9626 This is a take 2 of: #13124
…ngle merge action (#13190) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: #12740 Relates to: #9626 This is a take 2 of: #13124
Because of concurrent merging (apache#13124), multiple threads may be updating (different) attributes concurrently, so we need to make reads and writes to attributes thread-safe.
Because of concurrent merging (#13124), multiple threads may be updating (different) attributes concurrently, so we need to make reads and writes to attributes thread-safe.
Lucene PR apache/lucene#12660 introduced concurrent HNSW merge and PR apache/lucene#13124 made ConcurrentMergeScheduler to provide an intra merge executor. But to enable concurrent intra merge for HNSW graphs we still need to provide numMergeWorker to the codec definition. This PR does the following: - provides numMergeWorker to the HNSW vectors codecs definions, where numMergerWorkes set as a node's property of index.merge.scheduler.max_thread_count - enables concurrent merge only for force merge operations.
Because of concurrent merging (#13124), multiple threads may be updating (different) attributes concurrently, so we need to make reads and writes to attributes thread-safe.
…ngle merge action (apache#13190) This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly. Relates to: apache#12740 Relates to: apache#9626 This is a take 2 of: apache#13124
Opening this PR for discussion. I took a stab at #12740
The idea is this:
If this direction seems good, I can finish cleaning up things and write some tests.
@zhaih @jpountz y'all might be interested here.