1919import static org .apache .lucene .search .DocIdSetIterator .NO_MORE_DOCS ;
2020
2121import java .io .IOException ;
22+ import java .util .ArrayList ;
2223import java .util .Arrays ;
2324import java .util .Comparator ;
2425import java .util .List ;
2526import java .util .Objects ;
2627import java .util .concurrent .ExecutionException ;
27- import java .util .concurrent .Executor ;
2828import java .util .concurrent .FutureTask ;
2929import org .apache .lucene .codecs .KnnVectorsReader ;
3030import org .apache .lucene .index .FieldInfo ;
@@ -81,11 +81,12 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
8181 filterWeight = null ;
8282 }
8383
84- Executor executor = indexSearcher .getExecutor ();
84+ SliceExecutor sliceExecutor = indexSearcher .getSliceExecutor ();
85+ // in case of parallel execution, the leaf results are not ordered by leaf context's ordinal
8586 TopDocs [] perLeafResults =
86- (executor == null )
87+ (sliceExecutor == null )
8788 ? sequentialSearch (reader .leaves (), filterWeight )
88- : parallelSearch (reader . leaves (), filterWeight , executor );
89+ : parallelSearch (indexSearcher . getSlices (), filterWeight , sliceExecutor );
8990
9091 // Merge sort the results
9192 TopDocs topK = TopDocs .merge (k , perLeafResults );
@@ -109,27 +110,40 @@ private TopDocs[] sequentialSearch(
109110 }
110111
111112 private TopDocs [] parallelSearch (
112- List <LeafReaderContext > leafReaderContexts , Weight filterWeight , Executor executor ) {
113- List <FutureTask <TopDocs >> tasks =
114- leafReaderContexts .stream ()
115- .map (ctx -> new FutureTask <>(() -> searchLeaf (ctx , filterWeight )))
116- .toList ();
113+ IndexSearcher .LeafSlice [] slices , Weight filterWeight , SliceExecutor sliceExecutor ) {
114+
115+ List <FutureTask <TopDocs []>> tasks = new ArrayList <>(slices .length );
116+ int segmentsCount = 0 ;
117+ for (IndexSearcher .LeafSlice slice : slices ) {
118+ segmentsCount += slice .leaves .length ;
119+ tasks .add (
120+ new FutureTask <>(
121+ () -> {
122+ TopDocs [] results = new TopDocs [slice .leaves .length ];
123+ int i = 0 ;
124+ for (LeafReaderContext context : slice .leaves ) {
125+ results [i ++] = searchLeaf (context , filterWeight );
126+ }
127+ return results ;
128+ }));
129+ }
117130
118- SliceExecutor sliceExecutor = new SliceExecutor (executor );
119131 sliceExecutor .invokeAll (tasks );
120132
121- return tasks .stream ()
122- .map (
123- task -> {
124- try {
125- return task .get ();
126- } catch (ExecutionException e ) {
127- throw new RuntimeException (e .getCause ());
128- } catch (InterruptedException e ) {
129- throw new ThreadInterruptedException (e );
130- }
131- })
132- .toArray (TopDocs []::new );
133+ TopDocs [] topDocs = new TopDocs [segmentsCount ];
134+ int i = 0 ;
135+ for (FutureTask <TopDocs []> task : tasks ) {
136+ try {
137+ for (TopDocs docs : task .get ()) {
138+ topDocs [i ++] = docs ;
139+ }
140+ } catch (ExecutionException e ) {
141+ throw new RuntimeException (e .getCause ());
142+ } catch (InterruptedException e ) {
143+ throw new ThreadInterruptedException (e );
144+ }
145+ }
146+ return topDocs ;
133147 }
134148
135149 private TopDocs searchLeaf (LeafReaderContext ctx , Weight filterWeight ) throws IOException {
0 commit comments