Skip to content

Conversation

@benwtrent
Copy link
Member

Opening this PR for discussion. I took a stab at #12740

The idea is this:

  • Add new task executor to IWC & num parallel worker actions
  • Extend knn writer to accept the executor during merge
  • Throw if both executors (via format & writer) are provided

If this direction seems good, I can finish cleaning up things and write some tests.

@zhaih @jpountz y'all might be interested here.

@jpountz
Copy link
Contributor

jpountz commented Feb 21, 2024

Thinking out loud: since merge schedulers already have the ability to merge concurrently (across multiple merges rather than within a merge though), it would be nice to fully encapsulate the merging concurrency there instead of having two sources of merging concurrency that are not aware of one another.

I'm also keen on keeping the codec API as simple as possible and adding the executor as a member of the MergeState rather than a new parameter of all codec write APIs.

@benwtrent
Copy link
Member Author

it would be nice to fully encapsulate the merging concurrency there instead of having two sources of merging concurrency that are not aware of one another.

To take advantage of the executor, etc. it needs to be accessible from the scheduler. So, we would have to update the MergeScheduler to have some methods to return the executor for us to use and pass to MergeState (which is only created via the SegmentMerger object). This means that the scheduler for individual merges and the parallelism available to those individual merges are independent.

I don't know how they would ever be "aware" of one another.

I'm also keen on keeping the codec API as simple as possible and adding the executor as a member of the MergeState rather than a new parameter of all codec write APIs.

I updated the PR to do just this. Its a much cleaner API.

@zhaih
Copy link
Contributor

zhaih commented Feb 22, 2024

+1 to move executor away from the Codec API (altho it's me who placed them there LOL)

it would be nice to fully encapsulate the merging concurrency there instead of having two sources of merging concurrency that are not aware of one another.

I like the idea, but I'm a little bit worry about the situation where the HNSW merging threads are taking most of the resource such that original CMS threads cannot be executed. As the numWorkers we configured is a per-merge limit, which means if there are several big merges happening at the same time, say n merges, they will use n * (numWorkers + 1) threads, the thread pool can run out of threads very fastly and result in smaller merges be blocked by large merges.

So I think it is still better to have two separate thread pools for inter-segment merge and inner segment merge, but I wonder whether we can have a ThreadPoolManager which configures both inter and inner pool. And CMS and HNSW merge (and any future merges) will source from this manager?

Also I heard there's a usecase where they're currently sharing one CMS with multiple IW in a same process for better resource management, this new ThreadPoolManager can be a solution for them as well maybe?

@benwtrent
Copy link
Member Author

So I think it is still better to have two separate thread pools for inter-segment merge and inner segment merge, but I wonder whether we can have a ThreadPoolManager which configures both inter and inner pool. And CMS and HNSW merge (and any future merges) will source from this manager?

Maybe? But this doesn't seem to simplify anything for users. They will still need two things to configure (number of merges that can happen at a time, number of threads available to all those merges to take action).

There is always the issue of best using resources and the potential of over-allocating the CPU cores. I don't think any finagling we do will ultimately change that.

Selfishly, I really don't want to mess with the CMS code at all. Even if we switch it to use an executor of some sort, the complexities of I/O throttling & executable queuing would still all exist.

@jpountz
Copy link
Contributor

jpountz commented Feb 22, 2024

So, we would have to update the MergeScheduler to have some methods to return the executor for us to use and pass to MergeState (which is only created via the SegmentMerger object). This means that the scheduler for individual merges and the parallelism available to those individual merges are independent.

I agree with passing some executor from the scheduler to the MergeState, but I'm not sure I agree that this implies that inter-merge and intra-merge parallelism would be independent. For instance the merge scheduler could return a custom Executor that dynamically decides to run a new task in the current thread or to fork to a separate thread depending on how many threads are currently busy across all merges vs. the current value of ConcurrentMergeScheduler.maxThreadCount?

@benwtrent
Copy link
Member Author

For instance the merge scheduler could return a custom Executor that dynamically decides to run a new task in the current thread or to fork to a separate thread depending on how many threads are currently busy across all merges vs. the current value of ConcurrentMergeScheduler.maxThreadCount?

This makes me think that instead of it being a separate executor it should return a dynamic value for numParallelMergeWorkers or if numParallelMergeWorkers==1 return null so that there is no parallelism.

This would require some sort of "planning" on the merge scheduler (knowing how many are queued and how many numParallelMergeWorkers it provided to each executing merge action). I guess we could rely on an executor's getActiveCount() but that seems trappy.

This all implies that the configuration will actually maxParallelMergeWorkers and the MergeScheduler is free to provide any number of workers up to that limit.

The tricky part to me is determining 'how busy' the current merges are. The MergeScheduler could pass numParallelMergeWorkers to the SegmentMerger, but it goes completely unused. Maybe this is OK and we just assume it will be used.

Another confusion is determining the total number of threads allowed for merging (inter/intra). We could default this to maxParallelMergeWorkers * maxThreadCount. In this instance maxParallelMergeWorkers == 1 would behave as the current working. maxParallelMergeWorkers == 2 would mean that we potentially use twice as many resources and the user should adjust maxThreadCount accordingly.

@benwtrent
Copy link
Member Author

OK, my head is spinning a bit trying to grok the side-effects of the CMS. I think I understand most of it now.

Currently we have two adjustable parameters.

maxThreadCount dictates merge through put, or now many merges we have at a time.

mergeWorkerCount dictates back pressure, or when we stop allowing merges to be queued at all & block upstream indexing from continuing.

Throughout the life of a merge, it can become (un)paused numerous times, this is controlled via the RateLimitedIndexOutput which only rate limits WRITING results, not reading or anything else related to threads.

Now we want to add a new configuration, maxMergeThreads, which will control merge latency by adding parallelism to each individual thread.

Since merge thread pausing only has to do with index output, I am not sure we need to add any individual thread throttling other than what’s already there. The Directory wrapper will pause/throttle all writing occurring for the merge. This is acceptable even if the merge is using multiple threads.

I also think that small merges (<50MB) should never be allowed to run over multiple threads. Similar to how we never throttle those because they are so small, the benefit in latency reduction will be minuscule and we should reserve the extra thread usage to larger merges.

What I am stuck on is on this:


  • Should we use a common pool for all merges? Thus restricting the total threads used by merging to be maxThreadCount + maxMergeThreads? This will simplify the logic in the CMS significantly as a single task executor can be used for all merges.
  • Or should we use an individual executor/pool per merge? Thus restricting total threads by maxMergeThreads * maxMergeThreads (or some fraction of maxMergeThreads)? This could get interesting… How do we determine how many threads each merge can get? Are we ok with creating a new task executor on every larger merge and then closing it?

What do you think @jpountz?

One other thing we need to fix is this idea of “numWorkers” in HNSW. It seems like it should just pick optimal slices given the number of vectors it has (similar to multi-segment search stuff). Chunk itself into those slices and then be at the behest of the task executor. What say you @zhaih ? It seems weird to use numWorkers as a way to say “only use these many threads” when we have no knowledge of how many threads the task executor actually has.

@jpountz
Copy link
Contributor

jpountz commented Feb 23, 2024

Maybe some of these things are too ambitious, but ideally I'd like it to work this way.

ConcurrentMergeScheduler already tracks a maxMergeCount which controls the max number of running merges and a maxThreadCount that tracks the max number of threads that merges may use at most. Ideally I'd like maxThreadCount to include both threads used for inter-merge concurrency and intra-merge concurrency. So this is similar to your first suggestion except that I'm bounding the total number of threads to maxThreadCount rather than maxThreadCount + maxMergeCount.

Intra-merge concurrency would take advantage of the fact that there will sometimes be fewer active merges than threads to enable intra-merge concurrency. E.g. we could have a pool of threads for intra-merge concurrency that would try to ensure that its number of active threads is always less than or equals to max(0, maxThreadCount - mergeThreads.size()). For instance Executor#execute could be implemented such that it runs the runnable in the current thread if the number of active merges plus the number of active threads in the intra-merge thread pool is greater than or equal to maxThreadCount. Otherwise it would fork to the intra-merge thread pool.

Concurrent merging for vectors wants to know the number of available workers today, but maybe we can change the logic (like you suggested) to split the doc ID space into some number of slices, e.g. max(128, maxDoc / 2^16), and sequentially send these slices to Executor#execute (sometimes running in the same thread, sometime forked to the intra-merge threadpool), except the last one that would be forced to run in the current thread (like we used to do in IndexSearcher until recently).

@benwtrent
Copy link
Member Author

Intra-merge concurrency would take advantage of the fact that there will sometimes be fewer active merges than threads to enable intra-merge concurrency.

Sorry for being so dense, Executor#execute finally clicked it for me. Executor#execute can check for currentRunningThreads + mergeThreads.size(). If its larger than >= maxThreadCount execute in the current thread, otherwise spawn or give a thread from a pool. This would then increment currentRunningThreads.

I will spin a little bit on this to see what I can come up with.

@zhaih
Copy link
Contributor

zhaih commented Feb 23, 2024

So the current way of HNSW concurrent merge implemented is: each worker will try to use an AtomicInteger to coordinate and only do a small batch of work (1024 documents) each time. The advantage is we are able to load balance between workers and I remember this did brings some (5-10%) performance gain when I was testing it.

Maybe, instead of specify a numWorkers per merge, we can default a expected work load per thread (like 10K), and then allocate numWorkers dynamically? But still keep the current way of merge to keep the performance?

One thing I'm worried about putting all things into CMS is that we're binding intra segment merge with CMS. But to my understanding using CMS means we're using background thread to merge and merge become indeterministic, such that there are still some part of users are using SMS (SerialMS) or similar thing to keep the deterministic of merging. But on the other hand the HNSW concurrent merge does not affect that aspect at all, no matter how many threads you're using it won't affect the determinism of merge result. So if we bind those two together whether we potentially prevent a part of users using the intra-segment merges?

@benwtrent
Copy link
Member Author

@zhaih I don't see any reason why we also cannot extend the SerlialMS and allow multiple threads per merge ran.

We will have to update the base MergeScheduler class anyways as the merges being ran don't know anything about who kicked off the merge (and shouldn't).

The reason for all the CMS discussion is that it is the hardest to implement correctly (to me anyways...). For SerialMS, users could provide a number and its a static executor with that number of threads.

But still keep the current way of merge to keep the performance?

I really don't think users should configure numWorkers or workPerThread at all. I would much prefer us supply good defaults and remove configuration.

If we did the parallelism outside the codec to being with, I don't think we would have added any configurable values to the HNSW codec.

So if we bind those two together whether we potentially prevent a part of users using the intra-segment merges?

I am not thinking about binding them. I think that MergeScheduler itself should be extended to return a TaskExecutor (probably defaulting to null to indicate none, or maybe SameThreadExecutorService).

CMS is just the most difficult one to figure out.

@zhaih
Copy link
Contributor

zhaih commented Feb 23, 2024

I am not thinking about binding them. I think that MergeScheduler itself should be extended to return a TaskExecutor (probably defaulting to null to indicate none, or maybe SameThreadExecutorService).

Sounds good

I really don't think users should configure numWorkers or workPerThread at all. I would much prefer us supply good defaults and remove configuration.

No I'm not suggesting that either, I'm open to let user specify or we provide default, just want to make sure the performance is not affected (by not pre-dividing the doc space into each thread).

@benwtrent
Copy link
Member Author

OK, I took a stab at this @jpountz latest commit has a POC. Need to test. But early performance testing shows indexing speed is better and forcemerge speed is just about the same. I am still using the numWorkers setting from HNSW to determine workers. This is mainly to prevent testing too many performance changes at a time.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some nitpicks but I find the change promising/exciting!

});
}

private class ScaledExecutor extends ThreadPoolExecutor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I usually have a preference for composition over inheritance, ie. could we wrap the thread-pool executor instead of wrapping it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah no doubt.

Long.MAX_VALUE,
TimeUnit.NANOSECONDS,
new SynchronousQueue<>());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud: for the case when tens of index writers are open in the same JVM, we may want to configure a timeout on threads in order to avoid spending too much heap on idle threads?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud: for the case when tens of index writers are open in the same JVM, we may want to configure a timeout on threads in order to avoid spending too much heap on idle threads?

The timeout would only apply to non-core threads. But, your point is taken, it is possible for merges to be idle for a while and we don't want threads just sitting around taking up unnecessary space.

So, I can make core always 0, and rely on the pool dynamically adding threads up to max.

}
if (activeCount.compareAndSet(value, value + 1)) {
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read correctly, this tries to keep activeCount <= maxThreadCount. I was thinking we should try to keep activeCount <= maxThreadCount - mergeThreadCount(). Otherwise we're effectively using more than maxThreadCount for merging in total, which I find a bit surprising?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpountz you are correct, my logic here isn't what it needs to be. I will fix it up.

* Provides an executor for parallelism during a single merge operation. By default, this method
* returns an executor that runs tasks in the calling thread.
*/
public Executor getInterMergeExecutor(OneMerge merge) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be getIntraMergeExecutor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 inter vs intra is just as bad as affect and effect.


@Override
public Executor getInterMergeExecutor(OneMerge merge) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: throw instead?

readers,
segmentInfo,
infoStream,
parallelMergeTaskExecutor == null ? null : new TaskExecutor(parallelMergeTaskExecutor));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it's a bit weird to use a class from the search package for merging (TaskExecutor). Should merging get access to the raw Executor, which is a bit more flexible (I don't know if all formats will be able to split work into a list of tasks up-front)? Vectors could still wrap inside a TaskExecutor for convenience?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to use a class from the search package for merging (TaskExecutor)

+1 ...... @jpountz since now we are using TaskExecutor on both search and indexing use cases does it make sense to move it into org.apache.lucene.util package maybe?

@msokolov
Copy link
Contributor

ooh exciting! I left some comments in a related issue that were maybe a little clueless given all the discussion here that I missed until now. Still I'm happy about the direction this seems to be going re: moving the configuration out of codec and into merge policy, and coordinating thread usage across inter/intra segment merges

Comment on lines 956 to 960
setMaximumPoolSize(Math.max(newMax, 1));
setCorePoolSize(newMax);
} else {
setCorePoolSize(newMax);
setMaximumPoolSize(Math.max(newMax, 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't those two branch the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of setting the max is important.

But I am gonna change this. I think the core thread count should be 0 and we shouldn't update it at all and allow the threadpool to scale.

Comment on lines 978 to 987
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size();
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One major decision I made here is that CMS thread throttling knows nothing about intra-merge threads.

This means, if there are intra-merge threads running, it is possible that concurrency creeps above maxThreadCount until the intra-merge thread(s) finish(es).

Making throttling & overall CMS behavior more aware of intra-merge thread usage will be a much trickier change. I wasn't sure it was worth it. I am open to opinions on this @msokolov @jpountz .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to just make stuff up here since I don't know this code well at all -- but I think I was assuming there was a ThreadPoolExecutor somewhere that was doling out threads for both of these operations. Now I'm realizing perhaps that's not the case and CMS is doing all that tracking on its own. I wonder if we should open a separate issue to migrate CMS to use JDK's thread pool abstractions? It could make some of this easier to handle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benwtrent ++ There is a risk of letting bigger merges starve smaller merges from threads otherwise, and I'm not sure how we could fix it.

@benwtrent benwtrent marked this pull request as ready for review March 8, 2024 18:58
Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change a lot.

Currently this is only used by vectors. Could we also take advantage of this new executor to paralellize at the data structure level and submit separate tasks for postings, points, norms, doc values, etc. from SegmentMerger.merge? Earlier experiments suggested that it did not help much before there typically is one bottleneck but this would be good dogfooding for this change?

Comment on lines 978 to 987
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size();
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benwtrent ++ There is a risk of letting bigger merges starve smaller merges from threads otherwise, and I'm not sure how we could fix it.

* returns `null` indicating that there is no parallelism during a single merge operation.
*/
public Executor getIntraMergeExecutor(OneMerge merge) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about returning SameThreadExecutor by default instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that certain things might actually be slower trying to do parallel work when there aren't any threads to be had. I suppose we will end up in this situation when all the merge threads are actually being used by other merges...

I need to benchmark this with vectors to see if things are measurably worse.


private void updatePoolSize() {
executor.setMaximumPoolSize(Math.max(1, maxThreadCount));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we don't actually need this since we already have control on the thread pool size via maxThreadCount. What about setting a high max thread count in the ctor and never updating it (ie. making it a "cached" executor rather than a "scaled" executor).

@benwtrent
Copy link
Member Author

@jpountz ok, that naive attempt failed as norms & terms apparently need to be merged in order (one or the other would fail due to missing files...).

I am not sure if this is true with other things. But when I adjusted for this (latest commit) I did see a speed improvement.

My test was over IndexGeoNames, I did no segment merging and flushed every 12MB to ensure I got many segments. I also set CMS merge threads to 8 threads.

Baseline forcemerge(1): 17793 ms
Candidates forcemerge(1): 13739 ms

});
}

private class ScaledExecutor implements Executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should it be called CachedExecutor now?


@Override
public void execute(Runnable command) {
assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect this assertion to no longer be valid, since SegmentMerger may fork into this executor for vectors, and then the task for vectors may want to further fork into a separate thread? So the caller may be either a MergeThread, or a thread from the wrapper thread pool?

mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}

mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a parallel task that handles norms + terms so that the order is respected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I added a commit that does this. The candidate merge time is now:

forceMerge took 4965 ms
vs. baseline (i re-ran to make sure)
forceMerge took 15783 ms

So, 3x faster? Seems pretty good. The major downside is that now memory usage on merge might increase as we are potentially doing all this merge activity at the same time. If we are cool with that, this seems like a nice speed improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably, the worst-case scenario is not much worse than if you have many concurrent merges, so I think it's fine.

@benwtrent benwtrent requested a review from zhaih March 14, 2024 17:42
Copy link
Contributor

@zhaih zhaih left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very exciting PR!

TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
}

private class CachedExecutor implements Executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some simple javadoc explaining what this executor is doing? E.g. will use a spare thread if it is within configured max thread and will run directly on caller thread if we have no extra thread available?

@benwtrent benwtrent merged commit e3a34bf into apache:main Mar 14, 2024
@benwtrent benwtrent deleted the feature/add-task-executor-to-index-writer-config branch March 14, 2024 19:58
benwtrent added a commit that referenced this pull request Mar 14, 2024
…ngle merge action (#13124)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626
@mikemccand
Copy link
Member

This looks like a great change! ... Lucene finally catching up to modern concurrency :) I have not looked closely yet.

But nightly benchy is upset, prolly related to this, when running runFacetsBenchmarks.py. I haven't looked closely yet, and will be offline mostly next couple days (sorry for the hit&run!), but here's the exception is popped out:

Exception in thread "main" org.apache.lucene.store.AlreadyClosedException: this IndexWriter is closed
        at org.apache.lucene.index.IndexWriter.ensureOpen(IndexWriter.java:913)
        at org.apache.lucene.index.IndexWriter.ensureOpen(IndexWriter.java:926)
        at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:4070)
        at perf.facets.IndexFacets.main(IndexFacets.java:127)
Caused by: java.lang.RuntimeException: Only the merge owner thread can call pauseNanos(). This thread: pool-1-thread-1, owner thread: Thread[#290,Lucene Merge Thread #12,5,main]
        at org.apache.lucene.index.MergePolicy$OneMergeProgress.pauseNanos(MergePolicy.java:142)
        at org.apache.lucene.index.MergeRateLimiter.maybePause(MergeRateLimiter.java:147)
        at org.apache.lucene.index.MergeRateLimiter.pause(MergeRateLimiter.java:92)
        at org.apache.lucene.store.RateLimitedIndexOutput.checkRate(RateLimitedIndexOutput.java:86)
        at org.apache.lucene.store.RateLimitedIndexOutput.writeBytes(RateLimitedIndexOutput.java:55)
        at org.apache.lucene.store.ByteBuffersDataOutput.copyTo(ByteBuffersDataOutput.java:339)
        at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter$TermsWriter.writeBlock(Lucene90BlockTreeTermsWriter.java:1022)
        at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter$TermsWriter.writeBlocks(Lucene90BlockTreeTermsWriter.java:760)
        at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter$TermsWriter.pushTerm(Lucene90BlockTreeTermsWriter.java:1133)
        at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter$TermsWriter.write(Lucene90BlockTreeTermsWriter.java:1089)
        at org.apache.lucene.codecs.lucene90.blocktree.Lucene90BlockTreeTermsWriter.write(Lucene90BlockTreeTermsWriter.java:399)
        at org.apache.lucene.codecs.FieldsConsumer.merge(FieldsConsumer.java:95)
        at org.apache.lucene.codecs.perfield.PerFieldPostingsFormat$FieldsWriter.merge(PerFieldPostingsFormat.java:205)
        at org.apache.lucene.index.SegmentMerger.mergeTerms(SegmentMerger.java:236)
        at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:325)
        at org.apache.lucene.index.SegmentMerger.lambda$merge$0(SegmentMerger.java:147)
        at org.apache.lucene.search.TaskExecutor$TaskGroup.lambda$createTask$0(TaskExecutor.java:117)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at org.apache.lucene.index.ConcurrentMergeScheduler$CachedExecutor.lambda$execute$0(ConcurrentMergeScheduler.java:978)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
        Suppressed: java.lang.RuntimeException: Only the merge owner thread can call pauseNanos(). This thread: pool-1-thread-2, owner thread: Thread[#290,Lucene Merge Thread #12,5,main]
                at org.apache.lucene.index.MergePolicy$OneMergeProgress.pauseNanos(MergePolicy.java:142)
                at org.apache.lucene.index.MergeRateLimiter.maybePause(MergeRateLimiter.java:147)
                at org.apache.lucene.index.MergeRateLimiter.pause(MergeRateLimiter.java:92)
                at org.apache.lucene.store.RateLimitedIndexOutput.checkRate(RateLimitedIndexOutput.java:86)
                at org.apache.lucene.store.RateLimitedIndexOutput.writeBytes(RateLimitedIndexOutput.java:55)
                at org.apache.lucene.store.DataOutput.writeBytes(DataOutput.java:54)
                at org.apache.lucene.util.packed.DirectWriter.flush(DirectWriter.java:97)
                at org.apache.lucene.util.packed.DirectWriter.add(DirectWriter.java:83)
                at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.writeValuesSingleBlock(Lucene90DocValuesConsumer.java:349)
                at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.writeValues(Lucene90DocValuesConsumer.java:328)
                at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.doAddSortedNumericField(Lucene90DocValuesConsumer.java:707)
                at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.addSortedSetField(Lucene90DocValuesConsumer.java:772)
                at org.apache.lucene.codecs.DocValuesConsumer.mergeSortedSetField(DocValuesConsumer.java:853)
                at org.apache.lucene.codecs.DocValuesConsumer.merge(DocValuesConsumer.java:148)
                at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.merge(PerFieldDocValuesFormat.java:154)
                at org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:205)
                at org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:325)
                at org.apache.lucene.index.SegmentMerger.lambda$merge$1(SegmentMerger.java:155)
                ... 6 more
                Suppressed: java.lang.RuntimeException: Only the merge owner thread can call pauseNanos(). This thread: pool-1-thread-2, owner thread: Thread[#290,Lucene Merge Thread #12,5,main]
                        at org.apache.lucene.index.MergePolicy$OneMergeProgress.pauseNanos(MergePolicy.java:142)
                        at org.apache.lucene.index.MergeRateLimiter.maybePause(MergeRateLimiter.java:147)
                        at org.apache.lucene.index.MergeRateLimiter.pause(MergeRateLimiter.java:92)
                        at org.apache.lucene.store.RateLimitedIndexOutput.checkRate(RateLimitedIndexOutput.java:86)
                        at org.apache.lucene.store.RateLimitedIndexOutput.writeByte(RateLimitedIndexOutput.java:48)
                        at org.apache.lucene.codecs.CodecUtil.writeBEInt(CodecUtil.java:654)
                        at org.apache.lucene.codecs.CodecUtil.writeFooter(CodecUtil.java:410)
                        at org.apache.lucene.codecs.lucene90.Lucene90DocValuesConsumer.close(Lucene90DocValuesConsumer.java:116)
                        at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$ConsumerAndSuffix.close(PerFieldDocValuesFormat.java:86)
                        at org.apache.lucene.util.IOUtils.close(IOUtils.java:85)
                        at org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat$FieldsWriter.close(PerFieldDocValuesFormat.java:249)
                        at org.apache.lucene.index.SegmentMerger.mergeDocValues(SegmentMerger.java:204)
                        ... 8 more
Traceback (most recent call last):
  File "/l/util.nightly/src/python/nightlyBench.py", line 1957, in <module>
    run()
  File "/l/util.nightly/src/python/nightlyBench.py", line 533, in run
    runFacetsBenchmark.run_benchmark(f'{constants.BASE_DIR}/{NIGHTLY_DIR}',
  File "/l/util.nightly/src/python/runFacetsBenchmark.py", line 56, in run_benchmark

@benwtrent
Copy link
Member Author

Ah, that is likely due to this change. I bet throttling is occurring but in one of the other threadpool threads.

I didn't override the thread construction, so maybe that is enough?

I can look more closely next week

@benwtrent benwtrent restored the feature/add-task-executor-to-index-writer-config branch March 18, 2024 12:15
@benwtrent
Copy link
Member Author

I am going to revert the change and open a new PR for iterating a fix. RateLimitedIndexOutput isn't threadsafe and our rate limiting assumes a single thread.

With this commit, more than one thread could be writing and that needs to be accounted for.

benwtrent added a commit that referenced this pull request Mar 18, 2024
benwtrent added a commit that referenced this pull request Mar 21, 2024
…ngle merge action (#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options. 

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others. 

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626

This is a take 2 of: #13124
benwtrent added a commit that referenced this pull request Mar 21, 2024
…ngle merge action (#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626

This is a take 2 of: #13124
jpountz added a commit to jpountz/lucene that referenced this pull request Apr 30, 2024
Because of concurrent merging (apache#13124), multiple threads may be updating
(different) attributes concurrently, so we need to make reads and writes to
attributes thread-safe.
jpountz added a commit that referenced this pull request May 2, 2024
Because of concurrent merging (#13124), multiple threads may be updating
(different) attributes concurrently, so we need to make reads and writes to
attributes thread-safe.
mayya-sharipova added a commit to mayya-sharipova/elasticsearch that referenced this pull request May 6, 2024
Lucene PR  apache/lucene#12660 introduced
concurrent HNSW merge and PR
 apache/lucene#13124 made ConcurrentMergeScheduler
to provide an intra merge executor.

But to enable concurrent intra merge for HNSW graphs we still need to
provide numMergeWorker to the codec definition.
This PR does the following:

- provides numMergeWorker to the HNSW vectors codecs definions, where
numMergerWorkes set as a node's property of index.merge.scheduler.max_thread_count
- enables concurrent merge only for force merge operations.
jpountz added a commit that referenced this pull request May 6, 2024
Because of concurrent merging (#13124), multiple threads may be updating
(different) attributes concurrently, so we need to make reads and writes to
attributes thread-safe.
sherman pushed a commit to sherman/lucene that referenced this pull request Jun 3, 2024
…ngle merge action (apache#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: apache#12740
Relates to: apache#9626

This is a take 2 of: apache#13124
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants