4040#include "asm_state_machine.h"
4141
4242#define REFCOUNT_INCR_MSG (caller , refcount ) \
43- RS_DEBUG_LOG_FMT("%s: increased refCount to == %d", caller, refcount);
43+ RS_DEBUG_LOG_FMT("%s: increased refCount to == %d", caller, refcount)
4444#define REFCOUNT_DECR_MSG (caller , refcount ) \
45- RS_DEBUG_LOG_FMT("%s: decreased refCount to == %d", caller, refcount);
45+ RS_DEBUG_LOG_FMT("%s: decreased refCount to == %d", caller, refcount)
4646
4747#define CEIL_DIV (a , b ) ((a + b - 1) / b)
4848
@@ -61,6 +61,7 @@ long long timeout_g = 5000; // unused value. will be set in MR_Init
6161
6262/* MapReduce context for a specific command's execution */
6363typedef struct MRCtx {
64+ _Atomic (int ) refcount ;
6465 int numReplied ;
6566 int numExpected ;
6667 int numErrored ;
@@ -89,6 +90,7 @@ typedef struct MRCtx {
8990 _Atomic (bool ) timedOut ;
9091 _Atomic (bool ) reducing ;
9192 bool reducerDone ;
93+ MRCtxFreePrivDataCB freePrivDataCB ;
9294 pthread_mutex_t reducingLock ;
9395 pthread_cond_t reducingCond ;
9496} MRCtx ;
@@ -103,6 +105,7 @@ typedef struct {
103105MRCtx * MR_CreateCtx (RedisModuleCtx * ctx , RedisModuleBlockedClient * bc , void * privdata , int replyCap ) {
104106 RS_ASSERT (cluster_g );
105107 MRCtx * ret = rm_calloc (1 , sizeof (MRCtx ));
108+ atomic_init (& ret -> refcount , 1 );
106109 ret -> numReplied = 0 ;
107110 ret -> numErrored = 0 ;
108111 ret -> numExpected = 0 ;
@@ -120,6 +123,7 @@ MRCtx *MR_CreateCtx(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc, void *pri
120123 atomic_init (& ret -> timedOut , false);
121124 atomic_init (& ret -> reducing , false);
122125 ret -> reducerDone = false;
126+ ret -> freePrivDataCB = NULL ;
123127 pthread_mutex_init (& ret -> reducingLock , NULL );
124128 pthread_cond_init (& ret -> reducingCond , NULL );
125129
@@ -130,7 +134,14 @@ QueryError *MRCtx_GetStatus(MRCtx *ctx) {
130134 return & ctx -> status ;
131135}
132136
133- void MRCtx_Free (MRCtx * ctx ) {
137+ void MRCtx_SetFreePrivDataCB (MRCtx * ctx , MRCtxFreePrivDataCB cb ) {
138+ ctx -> freePrivDataCB = cb ;
139+ }
140+
141+ static void MRCtx_FreeInternal (MRCtx * ctx ) {
142+ if (ctx -> freePrivDataCB ) {
143+ ctx -> freePrivDataCB (ctx );
144+ }
134145
135146 MRCommand_Free (& ctx -> cmd );
136147 QueryError_ClearError (& ctx -> status );
@@ -151,6 +162,22 @@ void MRCtx_Free(MRCtx *ctx) {
151162 rm_free (ctx );
152163}
153164
165+ void MRCtx_IncrRef (MRCtx * ctx ) {
166+ int refcount = atomic_fetch_add (& ctx -> refcount , 1 ) + 1 ;
167+ REFCOUNT_INCR_MSG ("MRCtx_IncrRef" , refcount );
168+ }
169+
170+ void MRCtx_DecrRef (MRCtx * ctx ) {
171+ int prev_refcount = atomic_fetch_sub (& ctx -> refcount , 1 );
172+ RS_ASSERT (prev_refcount > 0 );
173+
174+ int refcount = prev_refcount - 1 ;
175+ REFCOUNT_DECR_MSG ("MRCtx_DecrRef" , refcount );
176+ if (refcount == 0 ) {
177+ MRCtx_FreeInternal (ctx );
178+ }
179+ }
180+
154181/* Get the user stored private data from the context */
155182void * MRCtx_GetPrivData (struct MRCtx * ctx ) {
156183 return ctx -> privdata ;
@@ -160,10 +187,6 @@ int MRCtx_GetNumReplied(struct MRCtx *ctx) {
160187 return ctx -> numReplied ;
161188}
162189
163- void MRCtx_RequestCompleted (struct MRCtx * ctx ) {
164- IORuntimeCtx_RequestCompleted (ctx -> ioRuntime );
165- }
166-
167190MRReply * * MRCtx_GetReplies (struct MRCtx * ctx ) {
168191 return ctx -> replies ;
169192}
@@ -217,10 +240,11 @@ void MRCtx_WaitForReducerComplete(struct MRCtx *ctx) {
217240}
218241
219242static void freePrivDataCB (RedisModuleCtx * ctx , void * p ) {
243+ UNUSED (ctx );
220244 if (p ) {
221245 MRCtx * mc = p ;
222- IORuntimeCtx_RequestCompleted ( mc -> ioRuntime );
223- MRCtx_Free (mc );
246+ /* RQ completion is owned by the libuv fanout-completion paths. */
247+ MRCtx_DecrRef (mc );
224248 }
225249}
226250
@@ -242,6 +266,7 @@ static int unblockHandler(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
242266/* The callback called from each fanout request to aggregate their replies */
243267static void fanoutCallback (redisAsyncContext * c , void * r , void * privdata ) {
244268 MRCtx * ctx = privdata ;
269+ IORuntimeCtx * ioRuntime = ctx -> ioRuntime ;
245270
246271 // Check if timed out - discard reply.
247272 // Currently, timeout checks are relevant only for Coordinator FT.SEARCH fanouts.
@@ -262,16 +287,25 @@ static void fanoutCallback(redisAsyncContext *c, void *r, void *privdata) {
262287 ctx -> replies [ctx -> numReplied ++ ] = r ;
263288 }
264289
265- // If we've received the last reply - unblock the client
290+ // If we've received the last reply, the fanout/network phase is complete.
291+ // Release the RQ slot here before unblocking or handing off to reduction.
266292 if (ctx -> numReplied + ctx -> numErrored == ctx -> numExpected ) {
267293 if (!timedOut && ctx -> fn ) {
268294 ctx -> fn (ctx , ctx -> numReplied , ctx -> replies );
295+ // `ctx->fn` may hand off to an async reducer that can unblock and free `ctx`
296+ // before this libuv callback is scheduled again. Complete the RQ request via
297+ // the saved ioRuntime instead of reading more state from `ctx` after the handoff.
298+ IORuntimeCtx_RequestCompleted (ioRuntime );
269299 } else {
270- RedisModuleBlockedClient * bc = ctx -> bc ;
271- RS_ASSERT (bc );
272- RedisModule_BlockedClientMeasureTimeEnd (bc );
273- RedisModule_UnblockClient (bc , ctx );
300+ IORuntimeCtx_RequestCompleted (ioRuntime );
301+ if (!timedOut ) {
302+ RedisModuleBlockedClient * bc = ctx -> bc ;
303+ RS_ASSERT (bc );
304+ RedisModule_BlockedClientMeasureTimeEnd (bc );
305+ RedisModule_UnblockClient (bc , ctx );
306+ }
274307 }
308+ MRCtx_DecrRef (ctx );
275309 }
276310}
277311
@@ -289,10 +323,15 @@ static void uvFanoutRequest(void *p) {
289323 mrctx -> numExpected = MRCluster_FanoutCommand (ioRuntime , & mrctx -> cmd , fanoutCallback , mrctx );
290324
291325 if (mrctx -> numExpected == 0 ) {
292- RedisModuleBlockedClient * bc = mrctx -> bc ;
293- RS_ASSERT (bc );
294- RedisModule_BlockedClientMeasureTimeEnd (bc );
295- RedisModule_UnblockClient (bc , mrctx );
326+ // No shard command was sent, so fanoutCallback() will never fire.
327+ IORuntimeCtx_RequestCompleted (ioRuntime );
328+ if (!MRCtx_IsTimedOut (mrctx )) {
329+ RedisModuleBlockedClient * bc = mrctx -> bc ;
330+ RS_ASSERT (bc );
331+ RedisModule_BlockedClientMeasureTimeEnd (bc );
332+ RedisModule_UnblockClient (bc , mrctx );
333+ }
334+ MRCtx_DecrRef (mrctx );
296335 }
297336}
298337
@@ -309,7 +348,7 @@ int MR_Fanout(struct MRCtx *mrctx, MRReduceFunc reducer, MRCommand cmd, bool blo
309348 mrctx -> reducer = reducer ;
310349 mrctx -> cmd = cmd ;
311350
312-
351+ MRCtx_IncrRef ( mrctx );
313352 IORuntimeCtx_Schedule (mrctx -> ioRuntime , uvFanoutRequest , mrctx );
314353 return REDIS_OK ;
315354}
0 commit comments