Skip to content

Commit ab63a17

Browse files
committed
initial commit
1 parent 24d4bec commit ab63a17

4 files changed

Lines changed: 335 additions & 57 deletions

File tree

src/result_processor.c

Lines changed: 206 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,23 @@ static int UnlockSpec_and_ReturnRPResult(RedisSearchCtx *sctx, int result_status
4444
RedisSearchCtx_UnlockSpec(sctx);
4545
return result_status;
4646
}
47+
48+
// Maximum number of concurrent async reads
49+
#define ASYNC_POOL_SIZE 16
50+
4751
typedef struct {
4852
ResultProcessor base;
4953
QueryIterator *iterator;
5054
RedisSearchCtx *sctx;
5155
uint32_t timeoutLimiter; // counter to limit number of calls to TimedOut_WithCounter()
5256
uint32_t keySpaceVersion; // version of the Keyspace slot ranges used for filtering
5357
const RedisModuleSlotRangeArray *querySlots; // Query slots info, may be used for filtering
58+
// Async pool for disk-based indexes (NULL if not using disk)
59+
void *asyncPool;
60+
// Buffer to hold ready DMDs from async poll
61+
RSDocumentMetadata *readyDmds;
62+
uint16_t readyDmdsCount;
63+
uint16_t readyDmdsIndex;
5464
} RPQueryIterator;
5565

5666

@@ -91,91 +101,221 @@ static bool getDocumentMetadata(IndexSpec* spec, DocTable* docs, RedisSearchCtx
91101
return true;
92102
}
93103

104+
// Helper to get DMD from async pool ready buffer and set it up as RSDocumentMetadata
105+
static const RSDocumentMetadata* popReadyDmd(RPQueryIterator *self) {
106+
if (self->readyDmdsIndex >= self->readyDmdsCount) {
107+
return NULL; // No more ready DMDs
108+
}
109+
110+
RSDocumentMetadata *srcDmd = &self->readyDmds[self->readyDmdsIndex++];
111+
112+
// Allocate a new DMD and copy data from the buffer
113+
RSDocumentMetadata *dmd = (RSDocumentMetadata *)rm_calloc(1, sizeof(RSDocumentMetadata));
114+
dmd->ref_count = 1;
115+
dmd->keyPtr = srcDmd->keyPtr; // Transfer ownership of keyPtr
116+
dmd->score = srcDmd->score;
117+
dmd->flags = srcDmd->flags;
118+
dmd->maxTermFreq = srcDmd->maxTermFreq;
119+
dmd->docLen = srcDmd->docLen;
120+
dmd->id = srcDmd->id;
121+
122+
// Clear the source so it doesn't get double-freed
123+
srcDmd->keyPtr = NULL;
124+
125+
return dmd;
126+
}
127+
128+
/**
129+
* Validate DMD against sharding/slot filters.
130+
* Returns true if DMD is valid, false if it should be skipped.
131+
*/
132+
static bool validateDmdSlot(const RPQueryIterator *self, const RSDocumentMetadata *dmd) {
133+
// Check trimming (sharding migration)
134+
if (isTrimming && RedisModule_ShardingGetKeySlot) {
135+
RedisModuleString *key = RedisModule_CreateString(NULL, dmd->keyPtr, sdslen(dmd->keyPtr));
136+
int slot = RedisModule_ShardingGetKeySlot(key);
137+
RedisModule_FreeString(NULL, key);
138+
int firstSlot, lastSlot;
139+
RedisModule_ShardingGetSlotRange(&firstSlot, &lastSlot);
140+
if (firstSlot > slot || lastSlot < slot) {
141+
return false;
142+
}
143+
}
144+
145+
// Check query slots (internal command filtering)
146+
if (self->querySlots && (__atomic_load_n(&key_space_version, __ATOMIC_RELAXED) != self->keySpaceVersion)) {
147+
int slot = RedisModule_ClusterKeySlotC(dmd->keyPtr, sdslen(dmd->keyPtr));
148+
if (!SlotRangeArray_ContainsSlot(self->querySlots, slot)) {
149+
return false;
150+
}
151+
}
152+
153+
return true;
154+
}
155+
156+
/**
157+
* Set the search result data from a DMD.
158+
*/
159+
static void setSearchResult(ResultProcessor *base, SearchResult *res, QueryIterator *it,
160+
const RSDocumentMetadata *dmd, t_docId docId) {
161+
base->parent->totalResults++;
162+
SearchResult_SetDocId(res, docId);
163+
SearchResult_SetIndexResult(res, it->current);
164+
SearchResult_SetScore(res, 0);
165+
SearchResult_SetDocumentMetadata(res, dmd);
166+
RLookupRow_SetSortingVector(SearchResult_GetRowDataMut(res), dmd->sortVector);
167+
}
168+
169+
/**
170+
* Handle initial spec lock and iterator revalidation.
171+
* Returns true if we should goto validate_current (VALIDATE_MOVED case).
172+
*/
173+
static bool handleSpecLockAndRevalidate(RPQueryIterator *self) {
174+
RedisSearchCtx *sctx = self->sctx;
175+
QueryIterator *it = self->iterator;
176+
177+
if (sctx->flags != RS_CTX_UNSET) {
178+
return false;
179+
}
180+
181+
RedisSearchCtx_LockSpecRead(sctx);
182+
ValidateStatus rc = it->Revalidate(it);
183+
184+
if (rc == VALIDATE_ABORTED) {
185+
self->iterator->Free(self->iterator);
186+
self->iterator = NewEmptyIterator();
187+
} else if (rc == VALIDATE_MOVED && !it->atEOF) {
188+
return true; // Caller should validate current
189+
}
190+
191+
return false;
192+
}
193+
94194
/* Next implementation */
95195
static int rpQueryItNext(ResultProcessor *base, SearchResult *res) {
96196
RPQueryIterator *self = (RPQueryIterator *)base;
97197
QueryIterator *it = self->iterator;
98198
RedisSearchCtx *sctx = self->sctx;
99-
DocTable* docs = &self->sctx->spec->docs;
100199
const RSDocumentMetadata *dmd;
101-
if (sctx->flags == RS_CTX_UNSET) {
102-
// If we need to read the iterators and we didn't lock the spec yet, lock it now
103-
// and reopen the keys in the concurrent search context (iterators' validation)
104-
RedisSearchCtx_LockSpecRead(sctx);
105-
ValidateStatus rc = it->Revalidate(it);
106-
if (rc == VALIDATE_ABORTED) {
107-
// The iterator is no longer valid, we should not use it.
108-
self->iterator->Free(self->iterator);
109-
it = self->iterator = NewEmptyIterator(); // Replace with a new empty iterator
110-
} else if (rc == VALIDATE_MOVED && !it->atEOF) {
111-
// The iterator is still valid, but the current result has changed, or we are at EOF.
112-
// If we are at EOF, we can enter the loop and let it handle it. (reading again should be safe)
113-
goto validate_current;
200+
bool validateCurrent = false;
201+
202+
// Handle spec lock and revalidation
203+
if (handleSpecLockAndRevalidate(self)) {
204+
validateCurrent = true;
205+
it = self->iterator; // May have changed
206+
}
207+
208+
// For disk indexes with async pool, use async read path
209+
if (self->asyncPool) {
210+
IndexSpec* spec = sctx->spec;
211+
212+
while (1) {
213+
if (TimedOut_WithCounter(&sctx->time.timeout, &self->timeoutLimiter) == TIMED_OUT) {
214+
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_TIMEDOUT);
215+
}
216+
217+
// Try to return a ready DMD first
218+
dmd = popReadyDmd(self);
219+
if (dmd) {
220+
if (!validateDmdSlot(self, dmd)) {
221+
DMD_Return(dmd);
222+
continue;
223+
}
224+
setSearchResult(base, res, it, dmd, dmd->id);
225+
return RS_RESULT_OK;
226+
}
227+
228+
// No ready DMDs - fill the pool with more doc IDs
229+
bool poolHasSpace = true;
230+
while (poolHasSpace && !it->atEOF) {
231+
IteratorStatus rc = it->Read(it);
232+
if (rc == ITERATOR_EOF) {
233+
break;
234+
} else if (rc == ITERATOR_TIMEOUT) {
235+
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_TIMEDOUT);
236+
}
237+
238+
t_docId docId = it->current->docId;
239+
// Skip deleted documents (in-memory check, no IO)
240+
if (SearchDisk_DocIdDeleted(spec->diskSpec, docId)) {
241+
continue;
242+
}
243+
244+
if (!SearchDisk_AddAsyncRead(self->asyncPool, docId)) {
245+
poolHasSpace = false;
246+
}
247+
}
248+
249+
// Poll for results
250+
int timeout_ms = it->atEOF ? 1000 : 0;
251+
AsyncPollResult pollResult = SearchDisk_PollAsyncReads(
252+
self->asyncPool, timeout_ms, self->readyDmds, ASYNC_POOL_SIZE);
253+
254+
self->readyDmdsCount = pollResult.ready_count;
255+
self->readyDmdsIndex = 0;
256+
257+
if (pollResult.ready_count > 0) {
258+
continue;
259+
}
260+
261+
if (pollResult.pending_count == 0 && it->atEOF) {
262+
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_EOF);
263+
}
114264
}
115265
}
116266

117-
// Read from the root filter until we have a valid result
267+
// Non-disk path (original implementation)
268+
DocTable* docs = &sctx->spec->docs;
269+
118270
while (1) {
119-
// check for timeout in case we are encountering a lot of deleted documents
120271
if (TimedOut_WithCounter(&sctx->time.timeout, &self->timeoutLimiter) == TIMED_OUT) {
121272
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_TIMEDOUT);
122273
}
123-
IteratorStatus rc = it->Read(it);
124-
switch (rc) {
125-
case ITERATOR_EOF:
126-
// This means we are done!
127-
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_EOF);
128-
case ITERATOR_TIMEOUT:
129-
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_TIMEDOUT);
130-
default:
131-
RS_ASSERT(rc == ITERATOR_OK);
274+
275+
if (!validateCurrent) {
276+
IteratorStatus rc = it->Read(it);
277+
switch (rc) {
278+
case ITERATOR_EOF:
279+
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_EOF);
280+
case ITERATOR_TIMEOUT:
281+
return UnlockSpec_and_ReturnRPResult(sctx, RS_RESULT_TIMEDOUT);
282+
default:
283+
RS_ASSERT(rc == ITERATOR_OK);
284+
}
132285
}
286+
validateCurrent = false;
133287

134-
validate_current:
135-
IndexSpec* spec = self->sctx->spec;
288+
IndexSpec* spec = sctx->spec;
136289
if (!getDocumentMetadata(spec, docs, sctx, it, &dmd)) {
137290
continue;
138291
}
139292

140-
if (isTrimming && RedisModule_ShardingGetKeySlot) {
141-
RedisModuleString *key = RedisModule_CreateString(NULL, dmd->keyPtr, sdslen(dmd->keyPtr));
142-
int slot = RedisModule_ShardingGetKeySlot(key);
143-
RedisModule_FreeString(NULL, key);
144-
int firstSlot, lastSlot;
145-
RedisModule_ShardingGetSlotRange(&firstSlot, &lastSlot);
146-
if (firstSlot > slot || lastSlot < slot) {
147-
DMD_Return(dmd);
148-
continue;
149-
}
150-
}
151-
// querySlots presence would indicate that is internal command, if querySlots is NULL, we don't need to filter as we would be in standalone.
152-
if (self->querySlots && (__atomic_load_n(&key_space_version, __ATOMIC_RELAXED) != self->keySpaceVersion)) {
153-
RS_ASSERT(self->querySlots != NULL);
154-
int slot = RedisModule_ClusterKeySlotC(dmd->keyPtr, sdslen(dmd->keyPtr));
155-
if (!SlotRangeArray_ContainsSlot(self->querySlots, slot)) {
156-
DMD_Return(dmd);
157-
continue;
158-
}
293+
if (!validateDmdSlot(self, dmd)) {
294+
DMD_Return(dmd);
295+
continue;
159296
}
160297

161-
// Increment the total results barring deleted results
162-
base->parent->totalResults++;
163-
break;
298+
setSearchResult(base, res, it, dmd, it->lastDocId);
299+
return RS_RESULT_OK;
164300
}
165-
166-
// set the result data
167-
SearchResult_SetDocId(res, it->lastDocId);
168-
SearchResult_SetIndexResult(res, it->current);
169-
SearchResult_SetScore(res, 0);
170-
SearchResult_SetDocumentMetadata(res, dmd);
171-
RLookupRow_SetSortingVector(SearchResult_GetRowDataMut(res), dmd->sortVector);
172-
return RS_RESULT_OK;
173301
}
174302

175303
static void rpQueryItFree(ResultProcessor *iter) {
176304
RPQueryIterator *self = (RPQueryIterator *)iter;
177305
self->iterator->Free(self->iterator);
178306
rm_free((void *)self->querySlots);
307+
if (self->asyncPool) {
308+
SearchDisk_FreeAsyncReadPool(self->asyncPool);
309+
}
310+
if (self->readyDmds) {
311+
// Free any remaining DMDs that weren't consumed
312+
for (uint16_t i = self->readyDmdsIndex; i < self->readyDmdsCount; i++) {
313+
if (self->readyDmds[i].keyPtr) {
314+
sdsfree(self->readyDmds[i].keyPtr);
315+
}
316+
}
317+
rm_free(self->readyDmds);
318+
}
179319
rm_free(iter);
180320
}
181321

@@ -189,6 +329,15 @@ ResultProcessor *RPQueryIterator_New(QueryIterator *root, const RedisModuleSlotR
189329
ret->base.Free = rpQueryItFree;
190330
ret->sctx = sctx;
191331
ret->base.type = RP_INDEX;
332+
333+
// Initialize async pool for disk-based indexes if async IO is supported
334+
if (sctx->spec->diskSpec && SearchDisk_IsAsyncIOSupported()) {
335+
ret->asyncPool = SearchDisk_CreateAsyncReadPool(sctx->spec->diskSpec, ASYNC_POOL_SIZE);
336+
if (ret->asyncPool) {
337+
ret->readyDmds = rm_calloc(ASYNC_POOL_SIZE, sizeof(RSDocumentMetadata));
338+
}
339+
}
340+
192341
return &ret->base;
193342
}
194343

src/search_disk.c

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,33 @@ size_t SearchDisk_GetDeletedIds(RedisSearchDiskIndexSpec *handle, t_docId *buffe
120120
return disk->docTable.getDeletedIds(handle, buffer, buffer_size);
121121
}
122122

123+
void* SearchDisk_CreateAsyncReadPool(RedisSearchDiskIndexSpec *handle, uint16_t max_concurrent) {
124+
RS_ASSERT(disk && handle);
125+
return disk->docTable.createAsyncReadPool(handle, max_concurrent);
126+
}
127+
128+
bool SearchDisk_AddAsyncRead(void *pool, t_docId docId) {
129+
RS_ASSERT(disk && pool);
130+
return disk->docTable.addAsyncRead(pool, docId);
131+
}
132+
133+
AsyncPollResult SearchDisk_PollAsyncReads(void *pool, int timeout_ms, RSDocumentMetadata *results, uint16_t results_capacity) {
134+
RS_ASSERT(disk && pool);
135+
return disk->docTable.pollAsyncReads(pool, timeout_ms, results, results_capacity, &sdsnewlen);
136+
}
137+
138+
void SearchDisk_FreeAsyncReadPool(void *pool) {
139+
RS_ASSERT(disk);
140+
if (pool) {
141+
disk->docTable.freeAsyncReadPool(pool);
142+
}
143+
}
144+
145+
bool SearchDisk_IsAsyncIOSupported(void) {
146+
RS_ASSERT(disk);
147+
return disk->basic.isAsyncIOSupported();
148+
}
149+
123150
void SearchDisk_DeleteDocument(RedisSearchDiskIndexSpec *handle, const char *key, size_t keyLen, uint32_t *oldLen, t_docId *id) {
124151
RS_ASSERT(disk && handle);
125152
disk->index.deleteDocument(handle, key, keyLen, oldLen, id);

0 commit comments

Comments
 (0)