@@ -44,13 +44,23 @@ static int UnlockSpec_and_ReturnRPResult(RedisSearchCtx *sctx, int result_status
4444 RedisSearchCtx_UnlockSpec (sctx );
4545 return result_status ;
4646}
47+
48+ // Maximum number of concurrent async reads
49+ #define ASYNC_POOL_SIZE 16
50+
4751typedef struct {
4852 ResultProcessor base ;
4953 QueryIterator * iterator ;
5054 RedisSearchCtx * sctx ;
5155 uint32_t timeoutLimiter ; // counter to limit number of calls to TimedOut_WithCounter()
5256 uint32_t keySpaceVersion ; // version of the Keyspace slot ranges used for filtering
5357 const RedisModuleSlotRangeArray * querySlots ; // Query slots info, may be used for filtering
58+ // Async pool for disk-based indexes (NULL if not using disk)
59+ void * asyncPool ;
60+ // Buffer to hold ready DMDs from async poll
61+ RSDocumentMetadata * readyDmds ;
62+ uint16_t readyDmdsCount ;
63+ uint16_t readyDmdsIndex ;
5464} RPQueryIterator ;
5565
5666
@@ -91,91 +101,221 @@ static bool getDocumentMetadata(IndexSpec* spec, DocTable* docs, RedisSearchCtx
91101 return true;
92102}
93103
104+ // Helper to get DMD from async pool ready buffer and set it up as RSDocumentMetadata
105+ static const RSDocumentMetadata * popReadyDmd (RPQueryIterator * self ) {
106+ if (self -> readyDmdsIndex >= self -> readyDmdsCount ) {
107+ return NULL ; // No more ready DMDs
108+ }
109+
110+ RSDocumentMetadata * srcDmd = & self -> readyDmds [self -> readyDmdsIndex ++ ];
111+
112+ // Allocate a new DMD and copy data from the buffer
113+ RSDocumentMetadata * dmd = (RSDocumentMetadata * )rm_calloc (1 , sizeof (RSDocumentMetadata ));
114+ dmd -> ref_count = 1 ;
115+ dmd -> keyPtr = srcDmd -> keyPtr ; // Transfer ownership of keyPtr
116+ dmd -> score = srcDmd -> score ;
117+ dmd -> flags = srcDmd -> flags ;
118+ dmd -> maxTermFreq = srcDmd -> maxTermFreq ;
119+ dmd -> docLen = srcDmd -> docLen ;
120+ dmd -> id = srcDmd -> id ;
121+
122+ // Clear the source so it doesn't get double-freed
123+ srcDmd -> keyPtr = NULL ;
124+
125+ return dmd ;
126+ }
127+
128+ /**
129+ * Validate DMD against sharding/slot filters.
130+ * Returns true if DMD is valid, false if it should be skipped.
131+ */
132+ static bool validateDmdSlot (const RPQueryIterator * self , const RSDocumentMetadata * dmd ) {
133+ // Check trimming (sharding migration)
134+ if (isTrimming && RedisModule_ShardingGetKeySlot ) {
135+ RedisModuleString * key = RedisModule_CreateString (NULL , dmd -> keyPtr , sdslen (dmd -> keyPtr ));
136+ int slot = RedisModule_ShardingGetKeySlot (key );
137+ RedisModule_FreeString (NULL , key );
138+ int firstSlot , lastSlot ;
139+ RedisModule_ShardingGetSlotRange (& firstSlot , & lastSlot );
140+ if (firstSlot > slot || lastSlot < slot ) {
141+ return false;
142+ }
143+ }
144+
145+ // Check query slots (internal command filtering)
146+ if (self -> querySlots && (__atomic_load_n (& key_space_version , __ATOMIC_RELAXED ) != self -> keySpaceVersion )) {
147+ int slot = RedisModule_ClusterKeySlotC (dmd -> keyPtr , sdslen (dmd -> keyPtr ));
148+ if (!SlotRangeArray_ContainsSlot (self -> querySlots , slot )) {
149+ return false;
150+ }
151+ }
152+
153+ return true;
154+ }
155+
156+ /**
157+ * Set the search result data from a DMD.
158+ */
159+ static void setSearchResult (ResultProcessor * base , SearchResult * res , QueryIterator * it ,
160+ const RSDocumentMetadata * dmd , t_docId docId ) {
161+ base -> parent -> totalResults ++ ;
162+ SearchResult_SetDocId (res , docId );
163+ SearchResult_SetIndexResult (res , it -> current );
164+ SearchResult_SetScore (res , 0 );
165+ SearchResult_SetDocumentMetadata (res , dmd );
166+ RLookupRow_SetSortingVector (SearchResult_GetRowDataMut (res ), dmd -> sortVector );
167+ }
168+
169+ /**
170+ * Handle initial spec lock and iterator revalidation.
171+ * Returns true if we should goto validate_current (VALIDATE_MOVED case).
172+ */
173+ static bool handleSpecLockAndRevalidate (RPQueryIterator * self ) {
174+ RedisSearchCtx * sctx = self -> sctx ;
175+ QueryIterator * it = self -> iterator ;
176+
177+ if (sctx -> flags != RS_CTX_UNSET ) {
178+ return false;
179+ }
180+
181+ RedisSearchCtx_LockSpecRead (sctx );
182+ ValidateStatus rc = it -> Revalidate (it );
183+
184+ if (rc == VALIDATE_ABORTED ) {
185+ self -> iterator -> Free (self -> iterator );
186+ self -> iterator = NewEmptyIterator ();
187+ } else if (rc == VALIDATE_MOVED && !it -> atEOF ) {
188+ return true; // Caller should validate current
189+ }
190+
191+ return false;
192+ }
193+
94194/* Next implementation */
95195static int rpQueryItNext (ResultProcessor * base , SearchResult * res ) {
96196 RPQueryIterator * self = (RPQueryIterator * )base ;
97197 QueryIterator * it = self -> iterator ;
98198 RedisSearchCtx * sctx = self -> sctx ;
99- DocTable * docs = & self -> sctx -> spec -> docs ;
100199 const RSDocumentMetadata * dmd ;
101- if (sctx -> flags == RS_CTX_UNSET ) {
102- // If we need to read the iterators and we didn't lock the spec yet, lock it now
103- // and reopen the keys in the concurrent search context (iterators' validation)
104- RedisSearchCtx_LockSpecRead (sctx );
105- ValidateStatus rc = it -> Revalidate (it );
106- if (rc == VALIDATE_ABORTED ) {
107- // The iterator is no longer valid, we should not use it.
108- self -> iterator -> Free (self -> iterator );
109- it = self -> iterator = NewEmptyIterator (); // Replace with a new empty iterator
110- } else if (rc == VALIDATE_MOVED && !it -> atEOF ) {
111- // The iterator is still valid, but the current result has changed, or we are at EOF.
112- // If we are at EOF, we can enter the loop and let it handle it. (reading again should be safe)
113- goto validate_current ;
200+ bool validateCurrent = false;
201+
202+ // Handle spec lock and revalidation
203+ if (handleSpecLockAndRevalidate (self )) {
204+ validateCurrent = true;
205+ it = self -> iterator ; // May have changed
206+ }
207+
208+ // For disk indexes with async pool, use async read path
209+ if (self -> asyncPool ) {
210+ IndexSpec * spec = sctx -> spec ;
211+
212+ while (1 ) {
213+ if (TimedOut_WithCounter (& sctx -> time .timeout , & self -> timeoutLimiter ) == TIMED_OUT ) {
214+ return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_TIMEDOUT );
215+ }
216+
217+ // Try to return a ready DMD first
218+ dmd = popReadyDmd (self );
219+ if (dmd ) {
220+ if (!validateDmdSlot (self , dmd )) {
221+ DMD_Return (dmd );
222+ continue ;
223+ }
224+ setSearchResult (base , res , it , dmd , dmd -> id );
225+ return RS_RESULT_OK ;
226+ }
227+
228+ // No ready DMDs - fill the pool with more doc IDs
229+ bool poolHasSpace = true;
230+ while (poolHasSpace && !it -> atEOF ) {
231+ IteratorStatus rc = it -> Read (it );
232+ if (rc == ITERATOR_EOF ) {
233+ break ;
234+ } else if (rc == ITERATOR_TIMEOUT ) {
235+ return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_TIMEDOUT );
236+ }
237+
238+ t_docId docId = it -> current -> docId ;
239+ // Skip deleted documents (in-memory check, no IO)
240+ if (SearchDisk_DocIdDeleted (spec -> diskSpec , docId )) {
241+ continue ;
242+ }
243+
244+ if (!SearchDisk_AddAsyncRead (self -> asyncPool , docId )) {
245+ poolHasSpace = false;
246+ }
247+ }
248+
249+ // Poll for results
250+ int timeout_ms = it -> atEOF ? 1000 : 0 ;
251+ AsyncPollResult pollResult = SearchDisk_PollAsyncReads (
252+ self -> asyncPool , timeout_ms , self -> readyDmds , ASYNC_POOL_SIZE );
253+
254+ self -> readyDmdsCount = pollResult .ready_count ;
255+ self -> readyDmdsIndex = 0 ;
256+
257+ if (pollResult .ready_count > 0 ) {
258+ continue ;
259+ }
260+
261+ if (pollResult .pending_count == 0 && it -> atEOF ) {
262+ return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_EOF );
263+ }
114264 }
115265 }
116266
117- // Read from the root filter until we have a valid result
267+ // Non-disk path (original implementation)
268+ DocTable * docs = & sctx -> spec -> docs ;
269+
118270 while (1 ) {
119- // check for timeout in case we are encountering a lot of deleted documents
120271 if (TimedOut_WithCounter (& sctx -> time .timeout , & self -> timeoutLimiter ) == TIMED_OUT ) {
121272 return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_TIMEDOUT );
122273 }
123- IteratorStatus rc = it -> Read (it );
124- switch (rc ) {
125- case ITERATOR_EOF :
126- // This means we are done!
127- return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_EOF );
128- case ITERATOR_TIMEOUT :
129- return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_TIMEDOUT );
130- default :
131- RS_ASSERT (rc == ITERATOR_OK );
274+
275+ if (!validateCurrent ) {
276+ IteratorStatus rc = it -> Read (it );
277+ switch (rc ) {
278+ case ITERATOR_EOF :
279+ return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_EOF );
280+ case ITERATOR_TIMEOUT :
281+ return UnlockSpec_and_ReturnRPResult (sctx , RS_RESULT_TIMEDOUT );
282+ default :
283+ RS_ASSERT (rc == ITERATOR_OK );
284+ }
132285 }
286+ validateCurrent = false;
133287
134- validate_current :
135- IndexSpec * spec = self -> sctx -> spec ;
288+ IndexSpec * spec = sctx -> spec ;
136289 if (!getDocumentMetadata (spec , docs , sctx , it , & dmd )) {
137290 continue ;
138291 }
139292
140- if (isTrimming && RedisModule_ShardingGetKeySlot ) {
141- RedisModuleString * key = RedisModule_CreateString (NULL , dmd -> keyPtr , sdslen (dmd -> keyPtr ));
142- int slot = RedisModule_ShardingGetKeySlot (key );
143- RedisModule_FreeString (NULL , key );
144- int firstSlot , lastSlot ;
145- RedisModule_ShardingGetSlotRange (& firstSlot , & lastSlot );
146- if (firstSlot > slot || lastSlot < slot ) {
147- DMD_Return (dmd );
148- continue ;
149- }
150- }
151- // querySlots presence would indicate that is internal command, if querySlots is NULL, we don't need to filter as we would be in standalone.
152- if (self -> querySlots && (__atomic_load_n (& key_space_version , __ATOMIC_RELAXED ) != self -> keySpaceVersion )) {
153- RS_ASSERT (self -> querySlots != NULL );
154- int slot = RedisModule_ClusterKeySlotC (dmd -> keyPtr , sdslen (dmd -> keyPtr ));
155- if (!SlotRangeArray_ContainsSlot (self -> querySlots , slot )) {
156- DMD_Return (dmd );
157- continue ;
158- }
293+ if (!validateDmdSlot (self , dmd )) {
294+ DMD_Return (dmd );
295+ continue ;
159296 }
160297
161- // Increment the total results barring deleted results
162- base -> parent -> totalResults ++ ;
163- break ;
298+ setSearchResult (base , res , it , dmd , it -> lastDocId );
299+ return RS_RESULT_OK ;
164300 }
165-
166- // set the result data
167- SearchResult_SetDocId (res , it -> lastDocId );
168- SearchResult_SetIndexResult (res , it -> current );
169- SearchResult_SetScore (res , 0 );
170- SearchResult_SetDocumentMetadata (res , dmd );
171- RLookupRow_SetSortingVector (SearchResult_GetRowDataMut (res ), dmd -> sortVector );
172- return RS_RESULT_OK ;
173301}
174302
175303static void rpQueryItFree (ResultProcessor * iter ) {
176304 RPQueryIterator * self = (RPQueryIterator * )iter ;
177305 self -> iterator -> Free (self -> iterator );
178306 rm_free ((void * )self -> querySlots );
307+ if (self -> asyncPool ) {
308+ SearchDisk_FreeAsyncReadPool (self -> asyncPool );
309+ }
310+ if (self -> readyDmds ) {
311+ // Free any remaining DMDs that weren't consumed
312+ for (uint16_t i = self -> readyDmdsIndex ; i < self -> readyDmdsCount ; i ++ ) {
313+ if (self -> readyDmds [i ].keyPtr ) {
314+ sdsfree (self -> readyDmds [i ].keyPtr );
315+ }
316+ }
317+ rm_free (self -> readyDmds );
318+ }
179319 rm_free (iter );
180320}
181321
@@ -189,6 +329,15 @@ ResultProcessor *RPQueryIterator_New(QueryIterator *root, const RedisModuleSlotR
189329 ret -> base .Free = rpQueryItFree ;
190330 ret -> sctx = sctx ;
191331 ret -> base .type = RP_INDEX ;
332+
333+ // Initialize async pool for disk-based indexes if async IO is supported
334+ if (sctx -> spec -> diskSpec && SearchDisk_IsAsyncIOSupported ()) {
335+ ret -> asyncPool = SearchDisk_CreateAsyncReadPool (sctx -> spec -> diskSpec , ASYNC_POOL_SIZE );
336+ if (ret -> asyncPool ) {
337+ ret -> readyDmds = rm_calloc (ASYNC_POOL_SIZE , sizeof (RSDocumentMetadata ));
338+ }
339+ }
340+
192341 return & ret -> base ;
193342}
194343
0 commit comments