Skip to content

Commit 9f2f1bb

Browse files
authored
MOD-14268: Fix coordinator deadlock under mixed FT.SEARCH + FT.AGGREGATE load (#8774)
* Move MRCtx_RequestCompleted(ctx) to the libuv fanout-completion * fanoutCallback: use IORuntimeCtx for request completion * Enhance MRCtx reference counting and memory management * Remove MRCtx_RequestCompleted() * Test for mixed search and aggregate burst without deadlock * searchResultReducer: improve blocked client handling and cleanup logic * Fix RETURN-STRICT partial-timeout crash when reducer exits before req->rctx init * Refactor pause constants in coordinator reduce logic * Align DistSearchFreePrivData to freePrivDataCB * Enhance debug command documentation and improve test assertions for burst queries * Improve searchResultReducer to handle timeouts during reply processing * Refactor searchResultReducer and DistSearchMRCtxFreePrivData to ensure proper memory management for cached results; enhance timeout handling in tests to validate shard document counts. * Ensure postProcess is called on early exits in searchResultReducer * Simplify cached result cleanup in search reducer and DistSearchMRCtxFreePrivData
1 parent eac6943 commit 9f2f1bb

8 files changed

Lines changed: 500 additions & 99 deletions

File tree

src/coord/rmr/rmr.c

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
#include "asm_state_machine.h"
4141

4242
#define REFCOUNT_INCR_MSG(caller, refcount) \
43-
RS_DEBUG_LOG_FMT("%s: increased refCount to == %d", caller, refcount);
43+
RS_DEBUG_LOG_FMT("%s: increased refCount to == %d", caller, refcount)
4444
#define REFCOUNT_DECR_MSG(caller, refcount) \
45-
RS_DEBUG_LOG_FMT("%s: decreased refCount to == %d", caller, refcount);
45+
RS_DEBUG_LOG_FMT("%s: decreased refCount to == %d", caller, refcount)
4646

4747
#define CEIL_DIV(a, b) ((a + b - 1) / b)
4848

@@ -61,6 +61,7 @@ long long timeout_g = 5000; // unused value. will be set in MR_Init
6161

6262
/* MapReduce context for a specific command's execution */
6363
typedef struct MRCtx {
64+
_Atomic(int) refcount;
6465
int numReplied;
6566
int numExpected;
6667
int numErrored;
@@ -89,6 +90,7 @@ typedef struct MRCtx {
8990
_Atomic(bool) timedOut;
9091
_Atomic(bool) reducing;
9192
bool reducerDone;
93+
MRCtxFreePrivDataCB freePrivDataCB;
9294
pthread_mutex_t reducingLock;
9395
pthread_cond_t reducingCond;
9496
} MRCtx;
@@ -103,6 +105,7 @@ typedef struct {
103105
MRCtx *MR_CreateCtx(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc, void *privdata, int replyCap) {
104106
RS_ASSERT(cluster_g);
105107
MRCtx *ret = rm_calloc(1, sizeof(MRCtx));
108+
atomic_init(&ret->refcount, 1);
106109
ret->numReplied = 0;
107110
ret->numErrored = 0;
108111
ret->numExpected = 0;
@@ -120,6 +123,7 @@ MRCtx *MR_CreateCtx(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc, void *pri
120123
atomic_init(&ret->timedOut, false);
121124
atomic_init(&ret->reducing, false);
122125
ret->reducerDone = false;
126+
ret->freePrivDataCB = NULL;
123127
pthread_mutex_init(&ret->reducingLock, NULL);
124128
pthread_cond_init(&ret->reducingCond, NULL);
125129

@@ -130,7 +134,14 @@ QueryError *MRCtx_GetStatus(MRCtx *ctx) {
130134
return &ctx->status;
131135
}
132136

133-
void MRCtx_Free(MRCtx *ctx) {
137+
void MRCtx_SetFreePrivDataCB(MRCtx *ctx, MRCtxFreePrivDataCB cb) {
138+
ctx->freePrivDataCB = cb;
139+
}
140+
141+
static void MRCtx_FreeInternal(MRCtx *ctx) {
142+
if (ctx->freePrivDataCB) {
143+
ctx->freePrivDataCB(ctx);
144+
}
134145

135146
MRCommand_Free(&ctx->cmd);
136147
QueryError_ClearError(&ctx->status);
@@ -151,6 +162,22 @@ void MRCtx_Free(MRCtx *ctx) {
151162
rm_free(ctx);
152163
}
153164

165+
void MRCtx_IncrRef(MRCtx *ctx) {
166+
int refcount = atomic_fetch_add(&ctx->refcount, 1) + 1;
167+
REFCOUNT_INCR_MSG("MRCtx_IncrRef", refcount);
168+
}
169+
170+
void MRCtx_DecrRef(MRCtx *ctx) {
171+
int prev_refcount = atomic_fetch_sub(&ctx->refcount, 1);
172+
RS_ASSERT(prev_refcount > 0);
173+
174+
int refcount = prev_refcount - 1;
175+
REFCOUNT_DECR_MSG("MRCtx_DecrRef", refcount);
176+
if (refcount == 0) {
177+
MRCtx_FreeInternal(ctx);
178+
}
179+
}
180+
154181
/* Get the user stored private data from the context */
155182
void *MRCtx_GetPrivData(struct MRCtx *ctx) {
156183
return ctx->privdata;
@@ -160,10 +187,6 @@ int MRCtx_GetNumReplied(struct MRCtx *ctx) {
160187
return ctx->numReplied;
161188
}
162189

163-
void MRCtx_RequestCompleted(struct MRCtx *ctx) {
164-
IORuntimeCtx_RequestCompleted(ctx->ioRuntime);
165-
}
166-
167190
MRReply** MRCtx_GetReplies(struct MRCtx *ctx) {
168191
return ctx->replies;
169192
}
@@ -217,10 +240,11 @@ void MRCtx_WaitForReducerComplete(struct MRCtx *ctx) {
217240
}
218241

219242
static void freePrivDataCB(RedisModuleCtx *ctx, void *p) {
243+
UNUSED(ctx);
220244
if (p) {
221245
MRCtx *mc = p;
222-
IORuntimeCtx_RequestCompleted(mc->ioRuntime);
223-
MRCtx_Free(mc);
246+
/* RQ completion is owned by the libuv fanout-completion paths. */
247+
MRCtx_DecrRef(mc);
224248
}
225249
}
226250

@@ -242,6 +266,7 @@ static int unblockHandler(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
242266
/* The callback called from each fanout request to aggregate their replies */
243267
static void fanoutCallback(redisAsyncContext *c, void *r, void *privdata) {
244268
MRCtx *ctx = privdata;
269+
IORuntimeCtx *ioRuntime = ctx->ioRuntime;
245270

246271
// Check if timed out - discard reply.
247272
// Currently, timeout checks are relevant only for Coordinator FT.SEARCH fanouts.
@@ -262,16 +287,25 @@ static void fanoutCallback(redisAsyncContext *c, void *r, void *privdata) {
262287
ctx->replies[ctx->numReplied++] = r;
263288
}
264289

265-
// If we've received the last reply - unblock the client
290+
// If we've received the last reply, the fanout/network phase is complete.
291+
// Release the RQ slot here before unblocking or handing off to reduction.
266292
if (ctx->numReplied + ctx->numErrored == ctx->numExpected) {
267293
if (!timedOut && ctx->fn) {
268294
ctx->fn(ctx, ctx->numReplied, ctx->replies);
295+
// `ctx->fn` may hand off to an async reducer that can unblock and free `ctx`
296+
// before this libuv callback is scheduled again. Complete the RQ request via
297+
// the saved ioRuntime instead of reading more state from `ctx` after the handoff.
298+
IORuntimeCtx_RequestCompleted(ioRuntime);
269299
} else {
270-
RedisModuleBlockedClient *bc = ctx->bc;
271-
RS_ASSERT(bc);
272-
RedisModule_BlockedClientMeasureTimeEnd(bc);
273-
RedisModule_UnblockClient(bc, ctx);
300+
IORuntimeCtx_RequestCompleted(ioRuntime);
301+
if (!timedOut) {
302+
RedisModuleBlockedClient *bc = ctx->bc;
303+
RS_ASSERT(bc);
304+
RedisModule_BlockedClientMeasureTimeEnd(bc);
305+
RedisModule_UnblockClient(bc, ctx);
306+
}
274307
}
308+
MRCtx_DecrRef(ctx);
275309
}
276310
}
277311

@@ -289,10 +323,15 @@ static void uvFanoutRequest(void *p) {
289323
mrctx->numExpected = MRCluster_FanoutCommand(ioRuntime, &mrctx->cmd, fanoutCallback, mrctx);
290324

291325
if (mrctx->numExpected == 0) {
292-
RedisModuleBlockedClient *bc = mrctx->bc;
293-
RS_ASSERT(bc);
294-
RedisModule_BlockedClientMeasureTimeEnd(bc);
295-
RedisModule_UnblockClient(bc, mrctx);
326+
// No shard command was sent, so fanoutCallback() will never fire.
327+
IORuntimeCtx_RequestCompleted(ioRuntime);
328+
if (!MRCtx_IsTimedOut(mrctx)) {
329+
RedisModuleBlockedClient *bc = mrctx->bc;
330+
RS_ASSERT(bc);
331+
RedisModule_BlockedClientMeasureTimeEnd(bc);
332+
RedisModule_UnblockClient(bc, mrctx);
333+
}
334+
MRCtx_DecrRef(mrctx);
296335
}
297336
}
298337

@@ -309,7 +348,7 @@ int MR_Fanout(struct MRCtx *mrctx, MRReduceFunc reducer, MRCommand cmd, bool blo
309348
mrctx->reducer = reducer;
310349
mrctx->cmd = cmd;
311350

312-
351+
MRCtx_IncrRef(mrctx);
313352
IORuntimeCtx_Schedule(mrctx->ioRuntime, uvFanoutRequest, mrctx);
314353
return REDIS_OK;
315354
}

src/coord/rmr/rmr.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ void iterCursorMappingCb(void *p);
3333

3434
/* Prototype for all reduce functions */
3535
typedef int (*MRReduceFunc)(struct MRCtx *ctx, int count, MRReply **replies);
36+
typedef void (*MRCtxFreePrivDataCB)(struct MRCtx *ctx);
3637

3738
/* Fanout map - send the same command to all the shards, sending the collective
3839
* reply to the reducer callback */
@@ -85,14 +86,16 @@ void *MRCtx_GetPrivData(struct MRCtx *ctx);
8586

8687
struct RedisModuleCtx *MRCtx_GetRedisCtx(struct MRCtx *ctx);
8788
int MRCtx_GetNumReplied(struct MRCtx *ctx);
88-
void MRCtx_RequestCompleted(struct MRCtx *ctx);
8989
MRReply** MRCtx_GetReplies(struct MRCtx *ctx);
9090
RedisModuleBlockedClient *MRCtx_GetBlockedClient(struct MRCtx *ctx);
9191
void MRCtx_SetReduceFunction(struct MRCtx *ctx, MRReduceFunc fn);
9292

9393
int MRCtx_GetCommandProtocol(struct MRCtx *ctx);
9494

9595
QueryError *MRCtx_GetStatus(struct MRCtx *ctx);
96+
void MRCtx_IncrRef(struct MRCtx *ctx);
97+
void MRCtx_DecrRef(struct MRCtx *ctx);
98+
void MRCtx_SetFreePrivDataCB(struct MRCtx *ctx, MRCtxFreePrivDataCB cb);
9699

97100
/* Set the blocked client for the context (used when MRCtx is created before blocking) */
98101
void MRCtx_SetBlockedClient(struct MRCtx *ctx, RedisModuleBlockedClient *bc);
@@ -104,9 +107,6 @@ bool MRCtx_TryClaimReducing(struct MRCtx *ctx);
104107
void MRCtx_SignalReducerComplete(struct MRCtx *ctx);
105108
void MRCtx_WaitForReducerComplete(struct MRCtx *ctx);
106109

107-
/* Free the MapReduce context */
108-
void MRCtx_Free(struct MRCtx *ctx);
109-
110110
/* Create a new MapReduce context with a given private data. In a redis module
111111
* this should be the RedisModuleCtx */
112112
struct MRCtx *MR_CreateCtx(struct RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc, void *privdata, int replyCap);

src/debug_commands.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,8 +2193,12 @@ DEBUG_COMMAND(getIsRPPaused) {
21932193
#ifdef ENABLE_ASSERT
21942194
/**
21952195
* FT.DEBUG QUERY_CONTROLLER SET_PAUSE_BEFORE_REDUCE <N>
2196-
* N=0: no pause
2197-
* N=-1: pause after the last result is reduced
2196+
* COORD_REDUCE_NO_PAUSE (0): no pause
2197+
* COORD_REDUCE_PAUSE_BEFORE_REDUCER_INIT (-2): pause after acquiring the
2198+
* REDUCING state but before reducer context setup (used to test the
2199+
* edge case where the background reducer starts, but a timeout fires
2200+
* before it can finish setting up req->rctx)
2201+
* COORD_REDUCE_PAUSE_AFTER_LAST_RESULT (-1): pause after the last result is reduced
21982202
* N>0: pause before the Nth result is reduced (1-based)
21992203
*/
22002204
DEBUG_COMMAND(setPauseBeforeReduce) {

src/debug_commands.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,17 @@ void QueryDebugCtx_SetDebugRP(ResultProcessor* debugRP);
6161
bool QueryDebugCtx_HasDebugRP(void);
6262

6363
#ifdef ENABLE_ASSERT
64+
// Named sentinel values for the pauseBeforeN field of CoordReduceDebugCtx
65+
#define COORD_REDUCE_NO_PAUSE 0 // Disable pause (no pause point set)
66+
#define COORD_REDUCE_PAUSE_AFTER_LAST_RESULT (-1) // Pause after the last result is reduced
67+
#define COORD_REDUCE_PAUSE_BEFORE_REDUCER_INIT (-2) // Pause after claiming reducing but before reducer context init
68+
6469
// Struct used for debugging coordinator reduction (pause mid-reduce)
6570
// Only available in debug builds to avoid affecting release performance
6671
typedef struct CoordReduceDebugCtx {
6772
atomic_bool pause; // Atomic bool to wait for the resume command
68-
atomic_int pauseBeforeN; // N value: 0=no pause, -1=pause after last, N>0=pause before Nth result
73+
atomic_int pauseBeforeN; // COORD_REDUCE_NO_PAUSE, COORD_REDUCE_PAUSE_BEFORE_REDUCER_INIT,
74+
// COORD_REDUCE_PAUSE_AFTER_LAST_RESULT, or N>0 to pause before the Nth result
6975
atomic_int reduceCount; // Counter of results reduced so far
7076
} CoordReduceDebugCtx;
7177

0 commit comments

Comments
 (0)