@@ -26,6 +26,7 @@ typedef struct {
2626 pthread_mutex_t * mutex ; // Mutex for array access and completion tracking
2727 pthread_cond_t * completionCond ; // Condition variable for completion signaling
2828 int numShards ; // Total number of expected shards
29+ bool initialized ; // Whether numShards has been set by the IO thread
2930} processCursorMappingCallbackContext ;
3031
3132void CursorMapping_Release (CursorMapping * mapping ) {
@@ -189,6 +190,19 @@ static void processCursorMappingCallback(MRIteratorCallbackCtx *ctx, MRReply *re
189190 MRReply_Free (rep );
190191}
191192
193+ // Init callback for the private data, so that numShards is set to the actual number of shards in the cluster, and the expected responses.
194+ static void processCursorMappingInit (void * privateData , MRIterator * it ) {
195+ processCursorMappingCallbackContext * ctx = (processCursorMappingCallbackContext * )privateData ;
196+ int actualNumShards = (int )MRIterator_GetNumShards (it );
197+ pthread_mutex_lock (ctx -> mutex );
198+ ctx -> numShards = actualNumShards ;
199+ ctx -> initialized = true;
200+ ctx -> errors = array_new (QueryError , actualNumShards );
201+ // Signal so the coordinator can re-check the wait condition.
202+ pthread_cond_signal (ctx -> completionCond );
203+ pthread_mutex_unlock (ctx -> mutex );
204+ }
205+
192206static inline void cleanupCtx (processCursorMappingCallbackContext * ctx ) {
193207 pthread_mutex_destroy (ctx -> mutex );
194208 pthread_cond_destroy (ctx -> completionCond );
@@ -200,7 +214,7 @@ static inline void cleanupCtx(processCursorMappingCallbackContext *ctx) {
200214 rm_free (ctx );
201215}
202216
203- bool ProcessHybridCursorMappings (const MRCommand * cmd , int numShards , StrongRef searchMappingsRef , StrongRef vsimMappingsRef , QueryError * status , const RSOomPolicy oomPolicy ) {
217+ bool ProcessHybridCursorMappings (const MRCommand * cmd , StrongRef searchMappingsRef , StrongRef vsimMappingsRef , QueryError * status , const RSOomPolicy oomPolicy ) {
204218 CursorMappings * searchMappings = StrongRef_Get (searchMappingsRef );
205219 CursorMappings * vsimMappings = StrongRef_Get (vsimMappingsRef );
206220 RS_ASSERT (array_len (searchMappings -> mappings ) == 0 && array_len (vsimMappings -> mappings ) == 0 );
@@ -215,18 +229,22 @@ bool ProcessHybridCursorMappings(const MRCommand *cmd, int numShards, StrongRef
215229 pthread_cond_init (ctx -> completionCond , NULL );
216230
217231 // Setup callback context
218- * ctx = (processCursorMappingCallbackContext ){
232+ * ctx = (processCursorMappingCallbackContext ) {
219233 .searchMappings = StrongRef_Clone (searchMappingsRef ),
220234 .vsimMappings = StrongRef_Clone (vsimMappingsRef ),
221- .errors = array_new ( QueryError , numShards ) ,
235+ .errors = NULL ,
222236 .responseCount = 0 ,
223237 .mutex = ctx -> mutex ,
224238 .completionCond = ctx -> completionCond ,
225- .numShards = numShards
226- };
239+ .numShards = 0 ,
240+ .initialized = false
241+ };
227242
228243 // Start iteration (ctx is cleaned up manually in cleanupCtx, no destructor needed)
229- MRIterator * it = MR_IterateWithPrivateData (cmd , processCursorMappingCallback , ctx , NULL , NULL , iterStartCb , NULL );
244+ // processCursorMappingInit is called from iterStartCb to update ctx->numShards
245+ // with the actual shard count from the live topology, preventing use-after-free
246+ // when topology changes during shard migration.
247+ MRIterator * it = MR_IterateWithPrivateData (cmd , processCursorMappingCallback , ctx , NULL , processCursorMappingInit , iterStartCb , NULL );
230248 if (!it ) {
231249 // Cleanup on error
232250 QueryError_SetWithoutUserDataFmt (status , QUERY_ERROR_CODE_GENERIC , "Failed to communicate with shards" );
@@ -235,8 +253,8 @@ bool ProcessHybridCursorMappings(const MRCommand *cmd, int numShards, StrongRef
235253 }
236254 // Wait for all callbacks to complete
237255 pthread_mutex_lock (ctx -> mutex );
238- // initialize count with response counts in case some shards already sent a response
239- for ( size_t count = ctx -> responseCount ; count < numShards ; count = ctx -> responseCount ) {
256+ // Wait until the IO thread has initialized numShards and all responses arrive.
257+ while (! ctx -> initialized || ctx -> responseCount < ctx -> numShards ) {
240258 pthread_cond_wait (ctx -> completionCond , ctx -> mutex );
241259 }
242260 pthread_mutex_unlock (ctx -> mutex );
0 commit comments