@@ -26,6 +26,7 @@ typedef struct {
2626 pthread_mutex_t * mutex ; // Mutex for array access and completion tracking
2727 pthread_cond_t * completionCond ; // Condition variable for completion signaling
2828 int numShards ; // Total number of expected shards
29+ bool initialized ; // Whether numShards has been set by the IO thread
2930} processCursorMappingCallbackContext ;
3031
3132void CursorMapping_Release (CursorMapping * mapping ) {
@@ -191,6 +192,19 @@ static void processCursorMappingCallback(MRIteratorCallbackCtx *ctx, MRReply *re
191192 MRReply_Free (rep );
192193}
193194
195+ // Init callback for the private data, so that numShards is set to the actual number of shards in the cluster, and the expected responses.
196+ static void processCursorMappingInit (void * privateData , MRIterator * it ) {
197+ processCursorMappingCallbackContext * ctx = (processCursorMappingCallbackContext * )privateData ;
198+ int actualNumShards = (int )MRIterator_GetNumShards (it );
199+ pthread_mutex_lock (ctx -> mutex );
200+ ctx -> numShards = actualNumShards ;
201+ ctx -> initialized = true;
202+ ctx -> errors = array_new (QueryError , actualNumShards );
203+ // Signal so the coordinator can re-check the wait condition.
204+ pthread_cond_signal (ctx -> completionCond );
205+ pthread_mutex_unlock (ctx -> mutex );
206+ }
207+
194208static inline void cleanupCtx (processCursorMappingCallbackContext * ctx ) {
195209 pthread_mutex_destroy (ctx -> mutex );
196210 pthread_cond_destroy (ctx -> completionCond );
@@ -202,7 +216,7 @@ static inline void cleanupCtx(processCursorMappingCallbackContext *ctx) {
202216 rm_free (ctx );
203217}
204218
205- bool ProcessHybridCursorMappings (const MRCommand * cmd , int numShards , StrongRef searchMappingsRef , StrongRef vsimMappingsRef , QueryError * status , const RSOomPolicy oomPolicy ) {
219+ bool ProcessHybridCursorMappings (const MRCommand * cmd , StrongRef searchMappingsRef , StrongRef vsimMappingsRef , QueryError * status , const RSOomPolicy oomPolicy ) {
206220 CursorMappings * searchMappings = StrongRef_Get (searchMappingsRef );
207221 CursorMappings * vsimMappings = StrongRef_Get (vsimMappingsRef );
208222 RS_ASSERT (array_len (searchMappings -> mappings ) == 0 && array_len (vsimMappings -> mappings ) == 0 );
@@ -217,18 +231,22 @@ bool ProcessHybridCursorMappings(const MRCommand *cmd, int numShards, StrongRef
217231 pthread_cond_init (ctx -> completionCond , NULL );
218232
219233 // Setup callback context
220- * ctx = (processCursorMappingCallbackContext ){
234+ * ctx = (processCursorMappingCallbackContext ) {
221235 .searchMappings = StrongRef_Clone (searchMappingsRef ),
222236 .vsimMappings = StrongRef_Clone (vsimMappingsRef ),
223- .errors = array_new ( QueryError , numShards ) ,
237+ .errors = NULL ,
224238 .responseCount = 0 ,
225239 .mutex = ctx -> mutex ,
226240 .completionCond = ctx -> completionCond ,
227- .numShards = numShards
228- };
241+ .numShards = 0 ,
242+ .initialized = false
243+ };
229244
230245 // Start iteration (ctx is cleaned up manually in cleanupCtx, no destructor needed)
231- MRIterator * it = MR_IterateWithPrivateData (cmd , processCursorMappingCallback , ctx , NULL , NULL , iterStartCb , NULL );
246+ // processCursorMappingInit is called from iterStartCb to update ctx->numShards
247+ // with the actual shard count from the live topology, preventing use-after-free
248+ // when topology changes during shard migration.
249+ MRIterator * it = MR_IterateWithPrivateData (cmd , processCursorMappingCallback , ctx , NULL , processCursorMappingInit , iterStartCb , NULL );
232250 if (!it ) {
233251 // Cleanup on error
234252 QueryError_SetWithoutUserDataFmt (status , QUERY_ERROR_CODE_GENERIC , "Failed to communicate with shards" );
@@ -237,8 +255,8 @@ bool ProcessHybridCursorMappings(const MRCommand *cmd, int numShards, StrongRef
237255 }
238256 // Wait for all callbacks to complete
239257 pthread_mutex_lock (ctx -> mutex );
240- // initialize count with response counts in case some shards already sent a response
241- for ( size_t count = ctx -> responseCount ; count < numShards ; count = ctx -> responseCount ) {
258+ // Wait until the IO thread has initialized numShards and all responses arrive.
259+ while (! ctx -> initialized || ctx -> responseCount < ctx -> numShards ) {
242260 pthread_cond_wait (ctx -> completionCond , ctx -> mutex );
243261 }
244262 pthread_mutex_unlock (ctx -> mutex );
0 commit comments