Skip to content

Commit 10bebde

Browse files
authored
Parallelize knn query rewrite across slices rather than segments (apache#12325)
The concurrent query rewrite for knn vectory query introduced with apache#12160 requests one thread per segment to the executor. To align this with the IndexSearcher parallel behaviour, we should rather parallelize across slices. Also, we can reuse the same slice executor instance that the index searcher already holds, in that way we are using a QueueSizeBasedExecutor when a thread pool executor is provided.
1 parent c188d47 commit 10bebde

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

lucene/core/src/java/org/apache/lucene/search/AbstractKnnVectorQuery.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
2020

2121
import java.io.IOException;
22+
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Comparator;
2425
import java.util.List;
2526
import java.util.Objects;
2627
import java.util.concurrent.ExecutionException;
27-
import java.util.concurrent.Executor;
2828
import java.util.concurrent.FutureTask;
2929
import org.apache.lucene.codecs.KnnVectorsReader;
3030
import org.apache.lucene.index.FieldInfo;
@@ -81,11 +81,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
8181
filterWeight = null;
8282
}
8383

84-
Executor executor = indexSearcher.getExecutor();
84+
SliceExecutor sliceExecutor = indexSearcher.getSliceExecutor();
85+
// in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
8586
TopDocs[] perLeafResults =
86-
(executor == null)
87+
(sliceExecutor == null)
8788
? sequentialSearch(reader.leaves(), filterWeight)
88-
: parallelSearch(reader.leaves(), filterWeight, executor);
89+
: parallelSearch(indexSearcher.getSlices(), filterWeight, sliceExecutor);
8990

9091
// Merge sort the results
9192
TopDocs topK = TopDocs.merge(k, perLeafResults);
@@ -109,27 +110,40 @@ private TopDocs[] sequentialSearch(
109110
}
110111

111112
private TopDocs[] parallelSearch(
112-
List<LeafReaderContext> leafReaderContexts, Weight filterWeight, Executor executor) {
113-
List<FutureTask<TopDocs>> tasks =
114-
leafReaderContexts.stream()
115-
.map(ctx -> new FutureTask<>(() -> searchLeaf(ctx, filterWeight)))
116-
.toList();
113+
IndexSearcher.LeafSlice[] slices, Weight filterWeight, SliceExecutor sliceExecutor) {
114+
115+
List<FutureTask<TopDocs[]>> tasks = new ArrayList<>(slices.length);
116+
int segmentsCount = 0;
117+
for (IndexSearcher.LeafSlice slice : slices) {
118+
segmentsCount += slice.leaves.length;
119+
tasks.add(
120+
new FutureTask<>(
121+
() -> {
122+
TopDocs[] results = new TopDocs[slice.leaves.length];
123+
int i = 0;
124+
for (LeafReaderContext context : slice.leaves) {
125+
results[i++] = searchLeaf(context, filterWeight);
126+
}
127+
return results;
128+
}));
129+
}
117130

118-
SliceExecutor sliceExecutor = new SliceExecutor(executor);
119131
sliceExecutor.invokeAll(tasks);
120132

121-
return tasks.stream()
122-
.map(
123-
task -> {
124-
try {
125-
return task.get();
126-
} catch (ExecutionException e) {
127-
throw new RuntimeException(e.getCause());
128-
} catch (InterruptedException e) {
129-
throw new ThreadInterruptedException(e);
130-
}
131-
})
132-
.toArray(TopDocs[]::new);
133+
TopDocs[] topDocs = new TopDocs[segmentsCount];
134+
int i = 0;
135+
for (FutureTask<TopDocs[]> task : tasks) {
136+
try {
137+
for (TopDocs docs : task.get()) {
138+
topDocs[i++] = docs;
139+
}
140+
} catch (ExecutionException e) {
141+
throw new RuntimeException(e.getCause());
142+
} catch (InterruptedException e) {
143+
throw new ThreadInterruptedException(e);
144+
}
145+
}
146+
return topDocs;
133147
}
134148

135149
private TopDocs searchLeaf(LeafReaderContext ctx, Weight filterWeight) throws IOException {

lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,10 @@ public Executor getExecutor() {
962962
return executor;
963963
}
964964

965+
SliceExecutor getSliceExecutor() {
966+
return sliceExecutor;
967+
}
968+
965969
/**
966970
* Thrown when an attempt is made to add more than {@link #getMaxClauseCount()} clauses. This
967971
* typically happens if a PrefixQuery, FuzzyQuery, WildcardQuery, or TermRangeQuery is expanded to

0 commit comments

Comments
 (0)