Skip to content

Conversation

@benwtrent
Copy link
Member

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626

This is a take 2 of: #13124

@benwtrent
Copy link
Member Author

@dweiss @mikemccand I am currently iterating on how to best make RateLimitedIndexOutput MergePolicy and MergeRateLimiter thread safe.

Right now, it is all assumed that the interactions with all these are from the same thread, this obviously breaks when we add intra-merge parallelism.

I was hoping y'all had some thoughts on how these should all work together with intra-merge parallelism?

Is it enough to make these classes threadsafe and remove assertions?

Do we want to somehow figure out if the ultimate calling thread was a MergeThread? (This is possible, but will require some wrangling on the tp-executor to keep track of which thread belongs where...)

Copy link
Member Author

@benwtrent benwtrent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some high level concerns and thoughts I had on the new changes.

@dweiss || @mikemccand y'all might be interested.

Comment on lines 91 to 98
if (bytes > currentMinPauseCheckBytes.get()) {
shouldPause.set(true);
currentMinPauseCheckBytes.set(rateLimiter.getMinPauseCheckBytes());
localBytesSinceLastPause.set(bytes);
return 0;
} else {
shouldPause.set(false);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured if another thread reached this throttling and is paused & reset the counter, than this current thread shouldn't pause. Throttle tracking is effectively global and each thread uses the same localBytesSinceLastPause, so the work done by another thread could cause a current thread to pause. Or another thread pausing could allow this thread to continue work.

final double secondsToPause = (bytes / 1024. / 1024.) / rate;

AtomicLong curPauseNSSetter = new AtomicLong();
lastNS.updateAndGet(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to lock here or do an updateAndGet. lastNS is only updated in this particular method, so using atomics seemed better to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we used a lock for this whole method, then this may help pause all threads that share the same rate limiter, and effectively apply the rate limit across all threads that run the same merge, rather than applying the rate limit on a per-thread basis?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the limits are on a per RateLimitedIndexOutput bases, not necessarily per thread. Since all RateLimitedIndexOutput constructed share the same rate-limiter, it might pause all threads?

But I don't think so, as each process could have its own RateLimitedIndexOutput which is tracking its own bytes locally and while the combined total of all threads might exceed the throttling limit, the output isn't really throttled until a single thread exceeds the limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output isn't really throttled until a single thread exceeds the limit.

This limitation feels ok to me, let's just add a comment about it? Intuitively, the write rate at merge time is rather bursty, so if the sum of the bytes written by all threads running this merge exceeds the limit, then there would often be one thread that exceeds the limit on its own as well.

Comment on lines +127 to +138
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
long targetNS = last + (long) (1000000000 * secondsToPause);
long curPauseNS = targetNS - curNS;
// We don't bother with thread pausing if the pause is smaller than 2 msec.
if (curPauseNS <= MIN_PAUSE_NS) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaged over all history" rate:
curPauseNSSetter.set(0);
return curNS;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its possible that other threads request for very short pauses, thus all updating lastNS to their respective curNS. I honestly don't know if this is OK or not. But to handle this case, I decided to use updateAndGet to ensure if lastNS is changed, we account for it and update our pause request accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard for me to reason about this as well.

I'm wondering about keeping maybePause as-is and making MergeRateLimiter#pause synchronized, essentially trying to make the pausing logic behave as if threads were writing bytes sequentially rather than in parallel. (I'm considering making pause synchronized rather than maybePause so that System.nanoTime() is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm trying to catch up here and will probably ask a bunch of stupid questions :) Thank you @benwtrent for persisting in this hairy logic!

(I'm considering making pause synchronized rather than maybePause so that System.nanoTime() is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)

Couldn't we move the curNS = System.nanoTime() inside the locked/updateAndGet'd part of maybePause? I like the thread safety that the updateAndGet is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge. Also, I don't expect the added sync required here will hurt performance much: Lucene is doing much work to produce these bytes being written already, so conflict should be rare-ish.

Also, if we make pause sync'd, it will cause scary looking jstack thread dumps? Making it look like one thread is sleeping while holding a lock and blocking other threads (which indeed is what it'd be doing). Versus the thread stacks we'd see w/ the current approach that make it quite clear that all N threads are intentionally stalling in mergeProgress.pauseNanos? It would reduce the horror reaction we'd see peeking at thread dumps maybe ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we move the curNS = System.nanoTime() inside the locked/updateAndGet'd part of maybePause?

Yes, we could, otherwise if there are conflicts between threads (thus having to do the updateAndGet more than once), the curNS gets further in the past.

is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge

That is not what it is doing. It is ensuring that the throttling timestamps stay in sync :/. Bytes being throttled are still per RateLimitedIndexOutput, which means they are per thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is giving us here, ensuring we accurately account for all bytes written by N threads within a single merge

That is not what it is doing. It is ensuring that the throttling timestamps stay in sync :/. Bytes being throttled are still per RateLimitedIndexOutput, which means they are per thread.

Ahhhhhh gotchya. Each RateLimitedIndexOutput tracks its own bytesSinceLastPause and then invokes pause with its own private byte count, OK. So, yes, with the current approach we rate limit MB/sec per thread, not per merge. I think that's fine. Best effort!

@jpountz
Copy link
Contributor

jpountz commented Mar 19, 2024

I understand that we need to make changes to account for the fact that multiple threads may be contributing to the same merge concurrently, but I would not expect RateLimitedIndexOutput to need to be thread-safe: the caller needs to manage synchronization themselves anyway if they want bytes to be written in the correct order?

@benwtrent
Copy link
Member Author

@jpountz

the caller needs to manage synchronization themselves anyway if they want bytes to be written in the correct order?

The MT safety isn't really around the bytes. I agree with your assessment there. But its more around the throttling. The way I read the class is that different threads could be writing to different files but both could experience throttling and we should account for that.

@dweiss
Copy link
Contributor

dweiss commented Mar 19, 2024

I don't think rate limiting has to be exact - it's already allowed to be temporarily exceeded (see comments in writeBytes on RateLimitedIndexOutput).

@Override
public void close() {
sync();
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call super.close() as well, to close MergeScheduler's SameThreadExecutorService which ConcurrentMergeScheduler uses for small merges?

It's not a big deal not to close it, but it would help catch if we ever send tasks to this executor after closing.

final double secondsToPause = (bytes / 1024. / 1024.) / rate;

AtomicLong curPauseNSSetter = new AtomicLong();
lastNS.updateAndGet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output isn't really throttled until a single thread exceeds the limit.

This limitation feels ok to me, let's just add a comment about it? Intuitively, the write rate at merge time is rather bursty, so if the sum of the bytes written by all threads running this merge exceeds the limit, then there would often be one thread that exceeds the limit on its own as well.

Comment on lines +127 to +138
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
long targetNS = last + (long) (1000000000 * secondsToPause);
long curPauseNS = targetNS - curNS;
// We don't bother with thread pausing if the pause is smaller than 2 msec.
if (curPauseNS <= MIN_PAUSE_NS) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaged over all history" rate:
curPauseNSSetter.set(0);
return curNS;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard for me to reason about this as well.

I'm wondering about keeping maybePause as-is and making MergeRateLimiter#pause synchronized, essentially trying to make the pausing logic behave as if threads were writing bytes sequentially rather than in parallel. (I'm considering making pause synchronized rather than maybePause so that System.nanoTime() is computed within the lock and the pausing logic accounts for the fact that some time may have been spent waiting on the lock already.)

@benwtrent
Copy link
Member Author

@mikemccand @jpountz

So, thinking about this more as I fell asleep.

This is how throttling will work as it is in this PR:

  • Throttling is per thread. Meaning, a intra-merge thread only gets throttled once the bytes it has written get to the rate limit set
  • Consequently, we may get to rateLimitBytes*numIntraMergeIO bytes before a single thread gets throttled.

This makes us throttle less often given how many bytes are actually written. Maybe this is OK as the throttling logic could "catch up" if things continue to get backed up? Throttling has always been a "best effort" thing anyways.

Even if we somehow made the RateLimiter throttle every thread (via some global semaphore or something...), we would still only throttle once one of the multiple threads hit the byte throttling limit.

IMO, either we:

  • account for bytes globally (per directory) & throttle globally (global lock that pauses all threads)
  • accept that throttling is per thread and bytes used is measured in individual threads.

Selfishly, I wish for the second option as its the simplest.

@mikemccand
Copy link
Member

Phew, it took me two hours of strongly caffeinated time to catch up on this! Thank you @benwtrent.

What an awesome change, modernizing Lucene's merge concurrency so that intra-merge (just HNSW, but maybe other index parts soon) concurrency is enabled by default.

(Separately, the nightly benchy had a silly off-by-one bug preventing it from resuming after you reverted the previous PR ... I think I've fixed that bug now, and kicked off a one-off nightly benchy run that will hopefully finish late today).

Some high level questions (sorry if these were already asked/answered):

  • How do we control the risk that a massive merge with KNN vectors soaks up all available concurrency from the shared Executor for intra-merge concurrency (all threads doing HNSW merging) and then starves smaller merges that would finish quickly?
  • Maybe we don't need the IO write rate limiter anymore? It's a tricky setting because the mbPerSec you set is then multiplied by the number of concurrent merges that are running. It is a per-merge setting, not a global setting, so it's kinda trappy today.

Finally, I worry that the rate-limiter's simplistic "instantaneous" measure is making it effectively super buggy (there is a TODO about this), because merging that does a lot of CPU work and then writes a lot of bytes will be effectively throttled to far below the target mbPerSec in aggregate. A better solution might be something like the "burst IOPs bucket" that AWS (and likely other cloud providers) offer ("Burst IOPS is a feature of Amazon Web Services (AWS) EBS volume types that allows applications to store unused IOPS in a burst bucket and then drain them when needed" -- thank you Gemini for the summary). This is clearly not a blocker for this issue, but we really should (separately) fix it. I'll open a follow-on issue about this ... but to me it's another reason to maybe remove this feature entirely.

@mikemccand
Copy link
Member

Selfishly, I wish for the second option as its the simplest.

+1 to keep with the second approach. It is best effort, and, the "instant vs burst-bucket" bug above is yet more weirdness about it :)

Another thought on the "instant vs burst-bucket" bug: I suppose in some case, e.g. bulk merging of stored fields (no deletions, so we are just copying byte blocks), there is very little CPU and tons of IO and in cases like that, the instant vs burst-bucket approaches would be essentially the same (both wind up throttling based on the instant rate since the burst bucket is quickly exhausted). I'll open a spinoff ...

@jpountz
Copy link
Contributor

jpountz commented Mar 20, 2024

Maybe we don't need the IO write rate limiter anymore?

I'm curious what you have in mind. Are you considering throttling merges purely based on merge concurrency then? E.g. slow indexing -> single merge at a time, heavy indexing -> many parallel merges? This wouldn't sound crazy to me as I don't recally seeing indexing/merging saturate I/O.

@benwtrent benwtrent requested review from jpountz and mikemccand March 20, 2024 13:02
@jpountz
Copy link
Contributor

jpountz commented Mar 20, 2024

Maybe we don't need the IO write rate limiter anymore?

Nevermind, I see that you added more details on #13193.

@benwtrent
Copy link
Member Author

If y'all are good with it, I will merge this tomorrow.

@jpountz
Copy link
Contributor

jpountz commented Mar 20, 2024

@benwtrent FYI I left a few minor comments: on ConcurrentMergeScheduler#close, and about adding a comment for the limitation you identified that a merge may not get throttled if the sum of bytes written exceeds the rate but none of the merge threads for the same merge exceed the rate.

@benwtrent
Copy link
Member Author

@jpountz added comments to the creation of the ratelimiter in CMS and the limiter itself. Also updated the close interaction.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing on this change, I like it. The fact that the extra merge concurrency may not starve merging from threads is good. I'm curious how the nightly benchmarks will react to it, given the high number of CPU cores of beast3.

One question on my mind is whether this change should make us update the default number of merging threads that ConcurrentMergeScheduler configures (in a follow-up issue/PR).

public void close() {
sync();
public void close() throws IOException {
super.close();
Copy link
Contributor

@jpountz jpountz Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we try to close as cleanly as possible in the event of an exception, e.g. doing something like below

    IOUtils.close(
        this::sync,
        super::close,
        intraMergeExecutor == null ? null : intraMergeExecutor::shutdown);

@mikemccand
Copy link
Member

Thanks for pushing on this change, I like it. The fact that the extra merge concurrency may not starve merging from threads is good. I'm curious how the nightly benchmarks will react to it, given the high number of CPU cores of beast3.

Nightly benchy is still forcing its own thread pool into the HNSW indexing Codec component (Lucene99Hnsw*VectorsFormat) -- once this change is merged, I'll remove that so we switch to this awesome default approach. But maybe we won't see too much change in the nightly indexing throughput since it's already doing intra-merge concurrency "itself".

One question on my mind is whether this change should make us update the default number of merging threads that ConcurrentMergeScheduler configures (in a follow-up issue/PR).

+1 to explore this. Nightly benchy hardwires the maxMergeCount=16, maxThreadCount=12. Maybe they should be higher :)

But also note that the nightly benchy does not wait for merges on IndexWriter.close, so it's not really a fair test of merge performance.

Copy link
Member

@mikemccand mikemccand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @benwtrent -- what an awesome improvement in Lucene's default concurrency!

@jpountz
Copy link
Contributor

jpountz commented Mar 21, 2024

Nightly benchy hardwires the maxMergeCount=16, maxThreadCount=12

Do you remember how you came up with these numbers? Is there some reasoning behind these numbers, or do they come from experimentation?

I'm also curious if you know where the current maxThreadCount = max(1, min(4, coreCount / 2)) is coming from. Is the 4 number trying to approximate the number of tiers of TieredMergePolicy (max seg size = 5GB, 500MB, 50MB, 5MB ~= 2MB = min seg size) in order to allow one merge on each tier to run concurrently?

@benwtrent
Copy link
Member Author

@mikemccand

I'll remove that so we switch to this awesome default approach.

For now, you still need to tell the codec how many workers are available for it to use. But you should be able to pass null for the executor.

@benwtrent benwtrent merged commit 75e1ebc into apache:main Mar 21, 2024
@benwtrent benwtrent deleted the feature/add-task-executor-to-index-writer-config branch March 21, 2024 13:11
benwtrent added a commit that referenced this pull request Mar 21, 2024
…ngle merge action (#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626

This is a take 2 of: #13124
@dnhatn
Copy link
Member

dnhatn commented Mar 24, 2024

Elasticsearch CI has uncovered an issue related to this change. The PerFieldDocValuesFormat and PerFieldPostingsFormat, which mutate and reset the fieldInfos of the mergeState while executing a merge. Consequently, other ongoing merges may fail to access some fieldInfos.

// Delegate the merge to the appropriate consumer
PerFieldMergeState pfMergeState = new PerFieldMergeState(mergeState);
try {
for (Map.Entry<DocValuesConsumer, Collection<String>> e : consumersToField.entrySet()) {
e.getKey().merge(pfMergeState.apply(e.getValue()));
}
} finally {
pfMergeState.reset();
}

PerFieldMergeState pfMergeState = new PerFieldMergeState(mergeState);
boolean success = false;
try {
for (Map.Entry<PostingsFormat, FieldsGroup> ent : formatToGroups.entrySet()) {
PostingsFormat format = ent.getKey();
final FieldsGroup group = ent.getValue();
FieldsConsumer consumer = format.fieldsConsumer(group.state);
toClose.add(consumer);
consumer.merge(pfMergeState.apply(group.fields), norms);
}
success = true;
} finally {
pfMergeState.reset();
if (!success) {
IOUtils.closeWhileHandlingException(toClose);
}
}
}

dnhatn added a commit that referenced this pull request Mar 25, 2024
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset 
the fieldInfos of the mergeState during merges. Consequently, other 
running merge sub-tasks may fail to see some fieldInfos. This was
problematic since we introduced concurrency for merge sub-tasks.

Relates #13190
dnhatn added a commit that referenced this pull request Mar 25, 2024
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset 
the fieldInfos of the mergeState during merges. Consequently, other 
running merge sub-tasks may fail to see some fieldInfos. This was
problematic since we introduced concurrency for merge sub-tasks.

Relates #13190
dnhatn added a commit that referenced this pull request Mar 25, 2024
The PerFieldDocValuesFormat and PerFieldPostingsFormat mutate and reset 
the fieldInfos of the mergeState during merges. Consequently, other 
running merge sub-tasks may fail to see some fieldInfos. This was
problematic since we introduced concurrency for merge sub-tasks.

Relates #13190
@mikemccand
Copy link
Member

It looks like this awesome change was backported for 9.11.0? I'll add the milestone. So hard to remember to set the milestones on our issues/PRs...

@mikemccand mikemccand added this to the 9.11.0 milestone Apr 2, 2024
@benwtrent
Copy link
Member Author

@mikemccand oh dang, I haven't been doing that. Thanks for picking up my slack!

sherman pushed a commit to sherman/lucene that referenced this pull request Jun 3, 2024
…ngle merge action (apache#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: apache#12740
Relates to: apache#9626

This is a take 2 of: apache#13124
@yupeng9
Copy link

yupeng9 commented Nov 16, 2024

Curious, will this change improve the concurrent merge time of non-vector index like posting lists and point values?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants