Skip to content

[2.10] MOD-11751: Support WITHCOUNT keyword in FT.AGGREGATE#7656

Merged
oshadmi merged 17 commits into2.10from
nafraf_backport-7202-to-2.10
Dec 7, 2025
Merged

[2.10] MOD-11751: Support WITHCOUNT keyword in FT.AGGREGATE#7656
oshadmi merged 17 commits into2.10from
nafraf_backport-7202-to-2.10

Conversation

@nafraf
Copy link
Collaborator

@nafraf nafraf commented Dec 4, 2025

Manual backport #7202 to 2.10

Differences:

  • Creation of targetShard
  • Coordinator files are in different directory.
  • Profile test were modified because the profile output only returns the output of one shard.
  • Tests using debug command were removed (unsupported in 2.10)
  • FT.AGGREGATE + WITHCOUNT is guarded by ENABLE_UNSTABLE_FEATURES
  • Some tests with FILTER were removed since PR MOD-11416: Fix total_result Accuracy When Using FILTER #6880 is not included with 2.10 branch.

More detailed review can be seen here


Note

Add WITHCOUNT to FT.AGGREGATE, using a coordinator-side barrier and pipeline changes to compute accurate total_results, with new RP and infra updates plus tests.

  • Aggregate/Coordinator (core):
    • Introduce ShardResponseBarrier to wait for first replies from all shards and accumulate total_results (handles errors/timeouts).
    • Refactor network processing into rpnet.* and dist_utils.*; move netCursorCallback/getCursorCommand and add targetShard to MRCommand.
    • Enhance iterator/channel: private-data lifecycle hooks, MRIterator_NextWithTimeout, timed MRChannel_PopWithTimeout.
  • Query Pipeline:
    • Add flags QEXEC_F_HAS_WITHCOUNT, QEXEC_F_HAS_DEPLETER, QEXEC_F_HAS_SORTBY, QEXEC_F_HAS_GROUPBY; validate WITHCOUNT vs WITHCURSOR.
    • New RPDepleter to fully consume upstream when needed for accurate counts; adjust arrange/pager building accordingly.
    • Optimizer and arrange logic tweaks; profile updated to include RP_DEPLETER type.
  • Tests:
    • Add comprehensive WITHCOUNT and barrier tests (timeouts, concurrency, errors, KNN, pagination, profiles); update existing aggregate tests to toggle unstable features.
  • Misc/Infra:
    • Default REJSON_BRANCH to 2.8 in build; fix array_del to use memmove; remove unused strtolower.

Written by Cursor Bugbot for commit 31c1b34. This will update automatically on new commits. Configure here.

@oshadmi oshadmi self-requested a review December 7, 2025 07:56
@oshadmi oshadmi marked this pull request as ready for review December 7, 2025 09:04
@oshadmi
Copy link
Collaborator

oshadmi commented Dec 7, 2025

PR #7202 Backport Diff: Master vs 2.10

This document shows the differences between PR #7202 in master and the backport to 2.10 branch (nafraf_backport-7202-to-2.10).

Overview

The backport introduces the WITHCOUNT feature to 2.10, enabling accurate total_results counting in FT.AGGREGATE queries via the ShardResponseBarrier mechanism.

Details

New Files in 2.10

File Description
coord/src/rpnet.c Network result processor with ShardResponseBarrier (extracted from dist_aggregate.c + new barrier logic)
coord/src/rpnet.h Header for RPNet
coord/src/dist_utils.c / dist_utils.h Distributed utility functions
tests/pytests/test_aggregate_barrier.py ShardResponseBarrier tests
tests/pytests/test_aggregate_count.py WITHCOUNT functionality tests

Note: rpnet.c and rpnet.h are new files. The base RPNet functionality (RPNet struct, getNextReply(), rpnetNext(), MRReply_ToValue(), etc.) was extracted from coord/src/dist_aggregate.c, then the ShardResponseBarrier logic was added on top.

MRCommand Structure Change

Added targetShard field to MRCommand struct in coord/src/rmr/command.h:

int16_t targetShard;  // NEW: tracks which shard this command targets
int targetSlot;       // existing field

This field is needed by ShardResponseBarrier to:

  • Track which shard responded in the notify callback
  • Log which shard had issues extracting total_results
  • Initialize each command with its shard index in MR_Iterate()

Table of Contents


rpnet.h

diff --git a/src/coord/rpnet.h b/coord/src/rpnet.h
index 3d2edf2fd..11000e8cc 100644
--- a/src/coord/rpnet.h
+++ b/coord/src/rpnet.h
@@ -14,7 +14,6 @@
 #include "result_processor.h"
 #include "rmr/rmr.h"
 #include "aggregate/aggregate.h"
-#include "hybrid/hybrid_cursor_mappings.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -57,9 +56,6 @@ typedef struct {
   MRCommand cmd;
   AREQ *areq;
 
-  // NEW: Direct cursor mappings (no more dispatcher context)
-  StrongRef mappings;  // Single mapping array per RPNet
-
   // profile vars
   arrayof(MRReply *) shardsProfile;
 
@@ -73,11 +69,10 @@ typedef struct {
 
 
 void rpnetFree(ResultProcessor *rp);
-RPNet *RPNet_New(const MRCommand *cmd, int (*nextFunc)(ResultProcessor *, SearchResult *));
+RPNet *RPNet_New(const MRCommand *cmd);
 void RPNet_resetCurrent(RPNet *nc);
 int rpnetNext(ResultProcessor *self, SearchResult *r);
 int rpnetNext_EOF(ResultProcessor *self, SearchResult *r);
-int rpnetNext_StartWithMappings(ResultProcessor *rp, SearchResult *r);
 
 // Get the next reply from the channel.
 // Return RS_RESULT_OK if there is a next reply to process, RS_RESULT_EOF if there are no more replies

rpnet.c

diff --git a/src/coord/rpnet.c b/coord/src/rpnet.c
index 7244d4e50..05696b462 100644
--- a/src/coord/rpnet.c
+++ b/coord/src/rpnet.c
@@ -12,47 +12,48 @@
 #include "rmr/reply.h"
 #include "rmr/rmr.h"
 #include "hiredis/sds.h"
-#include "coord/dist_utils.h"
+#include "dist_utils.h"
+#include "coord/src/coord_module.h"
 
 
 #define CURSOR_EOF 0
 
 
-static RSValue *MRReply_ToValue(MRReply *r) {
-  if (!r) return RSValue_NullStatic();
+RSValue *MRReply_ToValue(MRReply *r) {
+  if (!r) return RS_NullVal();
   RSValue *v = NULL;
   switch (MRReply_Type(r)) {
     case MR_REPLY_STATUS:
     case MR_REPLY_STRING: {
       size_t l;
       const char *s = MRReply_String(r, &l);
-      v = RSValue_NewCopiedString(s, l);
+      v = RS_NewCopiedString(s, l);
       break;
     }
     case MR_REPLY_ERROR: {
       double d = 42;
       MRReply_ToDouble(r, &d);
-      v = RSValue_NewNumber(d);
+      v = RS_NumVal(d);
       break;
     }
     case MR_REPLY_INTEGER:
-      v = RSValue_NewNumber((double)MRReply_Integer(r));
+      v = RS_NumVal((double)MRReply_Integer(r));
       break;
     case MR_REPLY_DOUBLE:
-      v = RSValue_NewNumber(MRReply_Double(r));
+      v = RS_NumVal(MRReply_Double(r));
       break;
     case MR_REPLY_MAP: {
       size_t n = MRReply_Length(r);
       RS_LOG_ASSERT(n % 2 == 0, "map of odd length");
-      size_t map_len = n / 2;
-      RSValueMap map = RSValueMap_AllocUninit(map_len);
-      for (size_t i = 0; i < map_len; i++) {
-        MRReply *e_k = MRReply_ArrayElement(r, i * 2);
-        RS_LOG_ASSERT(MRReply_Type(e_k) == MR_REPLY_STRING, "non-string map key");
-        MRReply *e_v = MRReply_ArrayElement(r, (i * 2) + 1);
-        RSValueMap_SetEntry(&map, i,  MRReply_ToValue(e_k), MRReply_ToValue(e_v));
+      RSValue **map = rm_malloc(n * sizeof(*map));
+      for (size_t i = 0; i < n; ++i) {
+        MRReply *e = MRReply_ArrayElement(r, i);
+        if (i % 2 == 0) {
+          RS_LOG_ASSERT(MRReply_Type(e) == MR_REPLY_STRING, "non-string map key");
+        }
+        map[i] = MRReply_ToValue(e);
       }
-      v = RSValue_NewMap(map);
+      v = RSValue_NewMap(map, n / 2);
       break;
     }
     case MR_REPLY_ARRAY: {
@@ -65,10 +66,10 @@ static RSValue *MRReply_ToValue(MRReply *r) {
       break;
     }
     case MR_REPLY_NIL:
-      v = RSValue_NullStatic();
+      v = RS_NullVal();
       break;
     default:
-      v = RSValue_NullStatic();
+      v = RS_NullVal();
       break;
   }
   return v;
@@ -178,7 +179,7 @@ static struct timespec *getAbsTimeout(RPNet *nc) {
   if (!nc->areq || !nc->areq->sctx) {
     return NULL;
   }
-  return (struct timespec *)&nc->areq->sctx->time.timeout;
+  return (struct timespec *)&nc->areq->sctx->timeout;
 }

rpnet.c - Error Handling and Timeout

@@ -192,10 +193,10 @@ static bool shardResponseBarrier_HandleTimeout(RPNet *nc) {
     // cleanup pending replies
     shardResponseBarrier_PendingReplies_Free(nc);

-    // Set error in AREQ context
+    // Set timeout error in AREQ context
     QueryError_SetError(
-      AREQ_QueryProcessingCtx(nc->areq)->err,
-      QUERY_ERROR_CODE_TIMED_OUT,
+      nc->areq->qiter.err,
+      QUERY_ETIMEDOUT,
       "ShardResponseBarrier: Timeout while waiting for first responses from all shards");
     return true;
   }
@@ -237,7 +238,7 @@ int getNextReply(RPNet *nc) {
     while ((numShards = atomic_load(&nc->shardResponseBarrier->numShards)) == 0 ||
            atomic_load(&nc->shardResponseBarrier->numResponded) < numShards) {
       // Check for timeout to avoid blocking indefinitely
-      if (nc->areq && nc->areq->sctx && TimedOut(&nc->areq->sctx->time.timeout)) {
+      if (nc->areq && nc->areq->sctx && TimedOut(&nc->areq->sctx->timeout)) {
         break;
       }

rpnet.c - Profile Handling Removed in 2.10

@@ -256,12 +257,6 @@ int getNextReply(RPNet *nc) {
       // Check for errors
       if (shardResponseBarrier_HandleError(nc)) {
         nc->waitedForAllShards = true;
-        // If for profiling, clone and append the error
-        if (nc->cmd.forProfiling) {
-          // Clone the error and append it to the profile
-          MRReply *error = MRReply_Clone(nc->current.root);
-          array_append(nc->shardsProfile, error);
-        }
         return RS_RESULT_OK;
       }
     }
@@ -293,109 +291,43 @@ int getNextReply(RPNet *nc) {
   }

   // Check if an error was returned
-  if (MRReply_Type(root) == MR_REPLY_ERROR) {
+  if(MRReply_Type(root) == MR_REPLY_ERROR) {
     nc->current.root = root;
-    // If for profiling, clone and append the error
-    if (nc->cmd.forProfiling) {
-      // Clone the error and append it to the profile
-      MRReply *error = MRReply_Clone(root);
-      array_append(nc->shardsProfile, error);
-    }
     return RS_RESULT_OK;
   }
-
-  // For profile command, extract the profile data from the reply
-  if (nc->cmd.forProfiling) {
-    // if the cursor id is 0, this is the last reply from this shard, and it has the profile data
-    if (CURSOR_EOF == MRReply_Integer(MRReply_ArrayElement(root, 1))) {
-      MRReply *profile_data;
-      if (nc->cmd.protocol == 3) {
-        // RESP3 profile extraction...
-        profile_data = MRReply_TakeMapElement(data, "profile");
-      } else {
-        // RESP2 profile extraction...
-        profile_data = MRReply_TakeArrayElement(root, 2);
-      }
-      array_append(nc->shardsProfile, profile_data);
-    }
-  }

rpnet.c - Error Code Handling (Master vs 2.10)

     // If an error was returned, propagate it
     if (nc->current.root && MRReply_Type(nc->current.root) == MR_REPLY_ERROR) {
-      QueryErrorCode errCode = QueryError_GetCodeFromMessage(MRReply_String(nc->current.root, NULL));
-      // TODO - use should_return_error after it is changed to support RequestConfig ptr
-      if (errCode == QUERY_ERROR_CODE_GENERIC ||
-          ((errCode == QUERY_ERROR_CODE_TIMED_OUT) && nc -> areq -> reqConfig.timeoutPolicy == TimeoutPolicy_Fail) ||
-          ((errCode == QUERY_ERROR_CODE_OUT_OF_MEMORY) && nc -> areq -> reqConfig.oomPolicy == OomPolicy_Fail)) {
-        // We need to pass the reply string as the error message, since the error code might be generic
-        QueryError_SetError(AREQ_QueryProcessingCtx(nc->areq)->err, errCode,  MRReply_String(nc->current.root, NULL));
+      const char *strErr = MRReply_String(nc->current.root, NULL);
+      if (!strErr
+          || strcmp(strErr, "Timeout limit was reached")
+          || nc->areq->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) {
+        QueryError_SetError(nc->areq->qiter.err, QUERY_EGENERIC, strErr);
         return RS_RESULT_ERROR;
       } else {
-        // Handle shards returning error unexpectedly
-        // Might be from different Timeout/OOM policy (See MOD-10774)
-        // Free the error reply before we override it and continue
         MRReply_Free(nc->current.root);
-        // Set it as NULL avoid another free
-        nc->current.root = NULL;
+        RPNet_resetCurrent(nc);
       }
     }

rpnet.c - Warning Handling

-          if (!strcmp(warning_str, QueryError_Strerror(QUERY_ERROR_CODE_TIMED_OUT))) {
+          if (!strcmp(warning_str, QueryError_Strerror(QUERY_ETIMEDOUT))) {
             timed_out = true;
           } else if (!strcmp(warning_str, QUERY_WMAXPREFIXEXPANSIONS)) {
-            QueryError_SetReachedMaxPrefixExpansionsWarning(AREQ_QueryProcessingCtx(nc->areq)->err);
-          } else if (!strcmp(warning_str, QUERY_WOOM_SHARD)) {
-            QueryError_SetQueryOOMWarning(AREQ_QueryProcessingCtx(nc->areq)->err);
+            nc->areq->qiter.err->reachedMaxPrefixExpansions = true;
           }
           if (!strcmp(warning_str, QUERY_WINDEXING_FAILURE)) {
-            AREQ_QueryProcessingCtx(nc->areq)->bgScanOOM = true;
+              nc->areq->qiter.bgScanOOM = true;
           }

rpnet.c - RPNet_New and rpnetNext_StartWithMappings

-RPNet *RPNet_New(const MRCommand *cmd, int (*nextFunc)(ResultProcessor *, SearchResult *)) {
+RPNet *RPNet_New(const MRCommand *cmd) {
   RPNet *nc = rm_calloc(1, sizeof(*nc));
   nc->cmd = *cmd; // Take ownership of the command's internal allocations
   nc->areq = NULL;
   nc->shardsProfile = NULL;
   nc->base.Free = rpnetFree;
-  nc->base.Next = nextFunc;
+  nc->base.Next = rpnetNext_Start;
   nc->base.type = RP_NETWORK;
   return nc;
 }

-/**
- * Start function for RPNet with cursor mappings
- * Replaces rpnetNext_StartDispatcher
- */
-int rpnetNext_StartWithMappings(ResultProcessor *rp, SearchResult *r) {
-    RPNet *nc = (RPNet *)rp;
-    // ... entire function removed (hybrid cursor mappings not in 2.10)
-}

-int rpnetNext_EOF(ResultProcessor *self, SearchResult *r) {
-  return RS_RESULT_EOF;
-}
+// Note: rpnetNext_EOF declaration exists in header but implementation missing in 2.10

command.h / command.c

Added targetShard field to track which shard each command targets:

diff --git a/coord/src/rmr/command.h b/coord/src/rmr/command.h
+#define INVALID_SHARD -1
+
 typedef struct {
   /* The command args starting from the command itself */
   const char **strs;
   size_t *lens;
   uint32_t num;

+  /* if not -1, this value indicate to which shard the command should be sent */
+  int16_t targetShard;
+
   /* if not -1, this value indicate to which slot the command should be sent */
   int targetSlot;
diff --git a/coord/src/rmr/command.c b/coord/src/rmr/command.c
 static void MRCommand_Init(MRCommand *cmd, size_t len) {
   cmd->strs = rm_malloc(sizeof(*cmd->strs) * len);
   cmd->lens = rm_malloc(sizeof(*cmd->lens) * len);
   cmd->targetSlot = -1;
+  cmd->targetShard = INVALID_SHARD;
   // ...
 }

 MRCommand MRCommand_Copy(const MRCommand *cmd) {
   MRCommand ret;
   MRCommand_Init(&ret, cmd->num);
+  ret.targetShard = cmd->targetShard;
   ret.protocol = cmd->protocol;
   // ...
 }

rmr.h / rmr.c

Added iterator private data support and timed operations for ShardResponseBarrier:

diff --git a/coord/src/rmr/rmr.h b/coord/src/rmr/rmr.h
+// Get private data from iterator callback context
+void *MRIteratorCallback_GetPrivateData(MRIteratorCallbackCtx *ctx);

+// Pop with timeout support
+MRReply *MRIterator_NextWithTimeout(MRIterator *it, const struct timespec *abstime, bool *timedOut);

+// Create iterator with private data for barrier support
+MRIterator *MR_IterateWithPrivateData(const MRCommand *cmd, MRIteratorCallback cb, void *cbPrivateData,
+                                      void (*cbPrivateDataDestructor)(void *),
+                                      void (*cbPrivateDataInit)(void *, MRIterator *),
+                                      void (*iterStartCb)(void *), StrongRef *iterStartCbPrivateData);
diff --git a/coord/src/rmr/rmr.c b/coord/src/rmr/rmr.c
 struct MRIteratorCtx {
   // ...
+  void (*privateDataDestructor)(void *);  // Destructor for privateData
+  void (*privateDataInit)(void *, MRIterator *);  // Init callback for privateData
 };

 struct MRIteratorCallbackCtx {
   MRIterator *it;
   MRCommand cmd;
+  void *privateData;
 };

+void *MRIteratorCallback_GetPrivateData(MRIteratorCallbackCtx *ctx) {
+  return ctx->privateData;
+}

 void iterStartCb(void *p) {
   MRIterator *it = p;
   // ...
+  // Call privateData init callback if set (e.g., to initialize ShardResponseBarrier)
+  void *privateData = MRIteratorCallback_GetPrivateData(&it->cbxs[0]);
+  if (privateData && it->ctx.privateDataInit) {
+    it->ctx.privateDataInit(privateData, it);
+  }

   // Initialize targetShard for each command
   cmd->targetSlot = cluster_g->topo->shards[0].startSlot;
+  cmd->targetShard = 0;
   for (size_t i = 1; i < len; i++) {
     // ...
     it->cbxs[i].cmd.targetSlot = cluster_g->topo->shards[i].startSlot;
+    it->cbxs[i].cmd.targetShard = i;
+    it->cbxs[i].privateData = MRIteratorCallback_GetPrivateData(&it->cbxs[0]);
   }
 }

+MRReply *MRIterator_NextWithTimeout(MRIterator *it, const struct timespec *abstime, bool *timedOut) {
+  return MRChannel_PopWithTimeout(it->ctx.chan, abstime, timedOut);
+}

chan.h / chan.c

Added timed wait support for channel operations (needed by ShardResponseBarrier):

diff --git a/coord/src/rmr/chan.h b/coord/src/rmr/chan.h
+// Pop with timeout - returns NULL and sets timedOut=true if timeout expires
+void *MRChannel_PopWithTimeout(MRChannel *chan, const struct timespec *abstimeMono, bool *timedOut);
diff --git a/coord/src/rmr/chan.c b/coord/src/rmr/chan.c
+#include <errno.h>
+#include <time.h>
+#include "search_ctx.h"
+#include "util/timeout.h"

 MRChannel *MR_NewChannel() {
   MRChannel *chan = rm_malloc(sizeof(*chan));
   // ...
+#if defined(__APPLE__) && defined(__MACH__)
+  // macOS doesn't support pthread_condattr_setclock, use default clock
   pthread_cond_init(&chan->cond, NULL);
+#else
+  // Initialize with CLOCK_MONOTONIC for use with pthread_cond_timedwait
+  pthread_condattr_t cond_attr;
+  pthread_condattr_init(&cond_attr);
+  pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
+  pthread_cond_init(&chan->cond, &cond_attr);
+  pthread_condattr_destroy(&cond_attr);
+#endif
   // ...
 }

+// Platform-specific timed wait on condition variable
+// Returns: true if timed out, false if signaled
+static bool condTimedWait(pthread_cond_t *cond, pthread_mutex_t *lock,
+                          const struct timespec *abstimeMono) {
+  struct timespec nowRaw, remaining;
+  clock_gettime(CLOCK_MONOTONIC_RAW, &nowRaw);
+  rs_timerremaining((struct timespec *)abstimeMono, &nowRaw, &remaining);
+  if (remaining.tv_sec == 0 && remaining.tv_nsec == 0) {
+    return true;  // timed out
+  }
+#if defined(__APPLE__) && defined(__MACH__)
+  return pthread_cond_timedwait_relative_np(cond, lock, &remaining) == ETIMEDOUT;
+#else
+  struct timespec nowMono, absMono;
+  clock_gettime(CLOCK_MONOTONIC, &nowMono);
+  rs_timeradd(&nowMono, &remaining, &absMono);
+  return pthread_cond_timedwait(cond, lock, &absMono) == ETIMEDOUT;
+#endif
+}

+void *MRChannel_PopWithTimeout(MRChannel *chan, const struct timespec *abstimeMono, bool *timedOut) {
+  *timedOut = false;
+  pthread_mutex_lock(&chan->lock);
+  while (!chan->size) {
+    if (!chan->wait) {
+      pthread_mutex_unlock(&chan->lock);
+      return NULL;
+    }
+    if (condTimedWait(&chan->cond, &chan->lock, abstimeMono)) {
+      *timedOut = true;
+      pthread_mutex_unlock(&chan->lock);
+      return NULL;
+    }
+  }
+  return popHeadAndUnlock(chan);
+}

aggregate.h

Added new query flags and helper macros for WITHCOUNT support:

diff --git a/src/aggregate/aggregate.h b/src/aggregate/aggregate.h
 typedef enum {
   // ... existing flags ...

+  // The query has an explicit SORTBY x - sort by a field
+  QEXEC_F_HAS_SORTBY = 0x8000000,
+
+  // The query should use a depleter in the pipeline (for FT.AGGREGATE)
+  QEXEC_F_HAS_DEPLETER = 0x10000000,
+
+  // The query has an explicit WITHCOUNT (for FT.AGGREGATE)
+  QEXEC_F_HAS_WITHCOUNT = 0x20000000,
+
+  // The query has an explicit GROUPBY (for FT.AGGREGATE)
+  QEXEC_F_HAS_GROUPBY = 0x40000000,

 } QEFlags;

+#define IsAggregate(r) ((r)->reqflags & QEXEC_F_IS_AGGREGATE)
+#define HasDepleter(r) ((r)->reqflags & QEXEC_F_HAS_DEPLETER)
+#define HasWithCount(r) ((r)->reqflags & QEXEC_F_HAS_WITHCOUNT)
+#define IsCursor(r) ((r)->reqflags & QEXEC_F_IS_CURSOR)
+#define HasSortBy(r) ((r)->reqflags & QEXEC_F_HAS_SORTBY)
+#define HasGroupBy(r) ((r)->reqflags & QEXEC_F_HAS_GROUPBY)

+static inline QEFlags AREQ_RequestFlags(const AREQ *req) {
+  return (QEFlags)req->reqflags;
+}
+
+static inline void AREQ_AddRequestFlags(AREQ *req, QEFlags flags) {
+  req->reqflags = (QEFlags)(req->reqflags | flags);
+}
+
+static inline void AREQ_RemoveRequestFlags(AREQ *req, QEFlags flags) {
+  req->reqflags = (QEFlags)(req->reqflags & ~flags);
+}
+
+#define REQFLAGS_AddFlags(reqflags, flags) (*(reqflags) |= (flags))
+#define REQFLAGS_RemoveFlags(reqflags, flags) (*(reqflags) &= ~(flags))

result_processor.h / result_processor.c

Added RPDepleter result processor for synchronous result depletion:

diff --git a/src/result_processor.h b/src/result_processor.h
 typedef enum {
   // ... existing types ...
+  RP_DEPLETER,
   RP_MAX
 } ResultProcessorType;

+ResultProcessor *RPDepleter_New();
diff --git a/src/result_processor.c b/src/result_processor.c
 static char *RPTypeLookup[RP_MAX] = {"Index",   "Loader",    "Threadsafe-Loader", "Scorer",
                                      "Sorter",  "Counter",   "Pager/Limiter",     "Highlighter",
                                      "Grouper", "Projector", "Filter",            "Profile",
-                                     "Network", "Metrics Applier", "Key Name Loader", "Score Max Normalizer"};
+                                     "Network", "Metrics Applier", "Key Name Loader", "Score Max Normalizer",
+                                     "Depleter"};

+/*******************************************************************************************************************
+ *  Depleter Result Processor
+ *
+ *  The RPDepleter result processor consumes all results from its upstream
+ *  processor synchronously, storing them in an internal array. It then yields
+ *  results one by one from this array.
+ *******************************************************************************************************************/
+typedef struct {
+  ResultProcessor base;
+  arrayof(SearchResult *) results;
+  size_t cur_idx;
+  RPStatus last_rc;
+  uint32_t depleted_results;
+} RPDepleter;
+
+static void RPDepleter_Deplete(RPDepleter *self) {
+  RPStatus rc;
+  SearchResult *r = rm_calloc(1, sizeof(*r));
+  while ((rc = self->base.upstream->Next(self->base.upstream, r)) == RS_RESULT_OK) {
+    array_append(self->results, r);
+    r = rm_calloc(1, sizeof(*r));
+    self->depleted_results++;
+  }
+  SearchResult_Destroy(r);
+  rm_free(r);
+  self->last_rc = rc;
+}
+
+static int RPDepleter_Next_Yield(ResultProcessor *base, SearchResult *r) {
+  RPDepleter *self = (RPDepleter *)base;
+  if (self->cur_idx >= array_len(self->results)) {
+    int ret = self->last_rc;
+    self->last_rc = RS_RESULT_EOF;
+    return ret;
+  }
+  SearchResult *current = self->results[self->cur_idx];
+  SearchResult_Override(r, current);
+  rm_free(current);
+  self->results[self->cur_idx] = NULL;
+  self->cur_idx++;
+  return RS_RESULT_OK;
+}
+
+static int RPDepleter_Next_Accumulate(ResultProcessor *base, SearchResult *r) {
+  RPDepleter *self = (RPDepleter *)base;
+  RPDepleter_Deplete(self);
+  self->base.Next = RPDepleter_Next_Yield;
+  return RPDepleter_Next_Yield(base, r);
+}
+
+static void RPDepleter_Free(ResultProcessor *base) {
+  RPDepleter *self = (RPDepleter *)base;
+  array_free_ex(self->results, srDtor(*(SearchResult**)ptr));
+  rm_free(self);
+}
+
+ResultProcessor *RPDepleter_New() {
+  RPDepleter *ret = rm_calloc(1, sizeof(*ret));
+  ret->results = array_new(SearchResult*, 0);
+  ret->base.Next = RPDepleter_Next_Accumulate;
+  ret->base.Free = RPDepleter_Free;
+  ret->base.type = RP_DEPLETER;
+  ret->depleted_results = 0;
+  return &ret->base;
+}

test_aggregate.py

Removed: distro import and Alpine Linux handling

-import distro

Changed: _test_withcount now uses unstable_features context manager

 def _test_withcount(env, cmd:list, limit=10000):
-    cmd_withcount = cmd.copy()
-    # ...
-    res_withcount = env.cmd(*cmd_withcount)
-    env.assertEqual(res_withcount[0], len(res_withcount[1:]))
+    with unstable_features(env):
+        cmd_withcount = cmd.copy()
+        # ...
+        res_withcount = env.cmd(*cmd_withcount)
+        env.assertEqual(res_withcount[0], len(res_withcount[1:]))

Added: enable_unstable_features in test classes

 class TestAggregate():
     def __init__(self):
         self.env = Env()
+        enable_unstable_features(self.env)
         add_values(self.env)

 class TestAggregateSecondUseCases():
     def __init__(self):
         self.env = Env()
+        enable_unstable_features(self.env)
         add_values(self.env, 2)

Removed: _test_withcount calls in several tests

     def testFilter(self):
         # ...
         if self.env.isCluster():
             res = self.env.cmd(*cmd)
-            _test_withcount(self.env, cmd)
             self.env.assertEqual([...])

     def testFilterCase2(self):
         # ...
         res = self.env.cmd(*cmd)
-        _test_withcount(self.env, cmd)
         if self.env.isCluster():

     def testParseTime(self):
         # ... simplified, removed _test_withcount calls
-        self.env.assertEqual(expected, res[1])
-        _test_withcount(self.env, cmd)
+        self.env.assertEqual(['brand', '', 'count', '1518', ...], res[1])

Not in 2.10: testDefaultValues and testeAggregateBadApplyFunction

These tests exist in master but were never in 2.10 - they test functionality added after the 2.10 branch:

Since the underlying fixes aren't in 2.10, these tests don't apply.

Changed: Float modulo result expectation

         res = self.env.cmd('ft.aggregate', 'games', '*',
                                        'APPLY', '547758.3 % 5.1')
-        self.env.assertEqual(res[1][1], '3.00000000008')
+        self.env.assertEqual(res[1][1], '3')

Changed: config_cmd() to 'ft.config'

-        self.env.expect(config_cmd(), 'set', 'MAXSEARCHRESULTS', -1).ok()
+        self.env.expect('ft.config', 'set', 'MAXSEARCHRESULTS', -1).ok()

Changed: f-strings to % formatting

-            expected = f"{row['title']}|{row['brand']}|Mark|{float(row['price']):g}"
+            expected = '%s|%s|%s|%g' % (
+                row['title'], row['brand'], 'Mark', float(row['price']))

test_aggregate_barrier.py

Removed: Tests using PAUSE_BEFORE_RP_N debug feature

-def _test_barrier_waits_for_delayed_shard(protocol):
-    """Test using PAUSE_BEFORE_RP_N to simulate delayed shards"""
-    env = Env(moduleArgs='DEFAULT_DIALECT 2 WORKERS 1', protocol=protocol)
-    # ... ~60 lines
-
-@skip(cluster=False)
-def test_barrier_waits_for_delayed_shard_resp2():
-    _test_barrier_waits_for_delayed_shard(2)
-
-@skip(cluster=False)
-def test_barrier_waits_for_delayed_shard_resp3():
-    _test_barrier_waits_for_delayed_shard(3)

-def _test_barrier_all_shards_delayed_then_resume(protocol):
-    """Test with all shards paused then resumed together"""
-    # ... ~40 lines
-
-@skip(cluster=False)
-def test_barrier_all_shards_delayed_then_resume_resp2():
-    _test_barrier_all_shards_delayed_then_resume(2)
-
-@skip(cluster=False)
-def test_barrier_all_shards_delayed_then_resume_resp3():
-    _test_barrier_all_shards_delayed_then_resume(3)

Added: enable_unstable_features in remaining tests

 def _test_barrier_waits_for_delayed_unbalanced_shard(protocol):
     env = Env(moduleArgs='DEFAULT_DIALECT 2', protocol=protocol, shardsCount=3)
+    enable_unstable_features(env)

 def _test_barrier_concurrent_queries(protocol):
     env = Env(moduleArgs='DEFAULT_DIALECT 2', protocol=protocol)
+    enable_unstable_features(env)

 def _test_barrier_handles_empty_results(protocol):
     env = Env(moduleArgs='DEFAULT_DIALECT 2', protocol=protocol)
+    enable_unstable_features(env)

 # ... and similar for other test functions

Changed: Config command usage

-    config_cmd = ['CONFIG', 'SET', 'search-on-timeout', 'FAIL']
+    config_commad = [config_cmd(), 'SET', 'ON_TIMEOUT', 'FAIL']
     query_result = []
-    verify_command_OK_on_all_shards(env, *config_cmd)
+    verify_command_OK_on_all_shards(env, *config_commad)

Summary of Key Differences

New Infrastructure Added by Backport

Component What was added
MRCommand.targetShard New field to track which shard a command targets
MR_IterateWithPrivateData() Iterator creation with private data for barrier callbacks
MRIteratorCallback_GetPrivateData() Access private data from callback context
MRIterator_NextWithTimeout() Timed pop from iterator channel
MRChannel_PopWithTimeout() Timed channel pop with platform-specific clock handling
condTimedWait() Platform-specific (macOS/Linux) timed condition wait
RPDepleter New result processor for synchronous depletion
QEXEC_F_HAS_WITHCOUNT Query flag for WITHCOUNT
QEXEC_F_HAS_DEPLETER Query flag for depleter usage
QEXEC_F_HAS_SORTBY Query flag for explicit SORTBY
QEXEC_F_HAS_GROUPBY Query flag for explicit GROUPBY
HasWithCount(), HasDepleter(), etc. Helper macros for flag checking
AREQ_RequestFlags(), AREQ_AddRequestFlags() Flag manipulation helpers

Code Adaptations

Category Master 2.10 Backport Notes
RSValue API RSValue_NullStatic(), RSValue_NewCopiedString(), RSValue_NewNumber() RS_NullVal(), RS_NewCopiedString(), RS_NumVal() Adapted to use 2.10's existing APIs
Map API RSValueMap_AllocUninit() + RSValueMap_SetEntry() rm_malloc() + RSValue_NewMap(map, n/2) Different allocation pattern in 2.10
Error API AREQ_QueryProcessingCtx(nc->areq)->err nc->areq->qiter.err Different access pattern
Error Codes QUERY_ERROR_CODE_TIMED_OUT, QUERY_ERROR_CODE_OUT_OF_MEMORY QUERY_ETIMEDOUT (no OOM handling) Different enum names
Timeout Access nc->areq->sctx->time.timeout nc->areq->sctx->timeout Different struct nesting
Error Detection QueryError_GetCodeFromMessage() strcmp(strErr, "Timeout limit was reached") Function doesn't exist in 2.10
Warning Setters QueryError_SetReachedMaxPrefixExpansionsWarning() Direct field: err->reachedMaxPrefixExpansions = true Helper doesn't exist in 2.10

Features Not Included in Backport

Feature Reason
Profile error cloning Master clones errors into shardsProfile; 2.10 does not
Hybrid Cursors rpnetNext_StartWithMappings(), StrongRef mappings - master-only feature
OOM policy handling QUERY_ERROR_CODE_OUT_OF_MEMORY, QueryError_SetQueryOOMWarning()
PR #6880 Fix rpevalNext_filter decrementing totalResults on FILTER - not backported
rpnetNext_EOF Declaration exists but implementation missing (dead code, not used)

Test Changes

Change Notes
WITHCOUNT tests use unstable_features Feature behind ENABLE_UNSTABLE_FEATURES flag in 2.10
_test_withcount() removed from some tests testParseTime, testFilter (cluster), testFilterCase2
Barrier delay tests removed Require PAUSE_BEFORE_RP_N debug feature not in 2.10
testDefaultValues, testeAggregateBadApplyFunction Never existed in 2.10 (test features added after branch)

Known Limitations

@codecov
Copy link

codecov bot commented Dec 7, 2025

Codecov Report

❌ Patch coverage is 94.55185% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.37%. Comparing base (0911151) to head (31c1b34).
⚠️ Report is 2 commits behind head on 2.10.

Files with missing lines Patch % Lines
coord/src/rpnet.c 95.74% 12 Missing ⚠️
coord/src/dist_utils.c 91.66% 9 Missing ⚠️
coord/src/rmr/chan.c 86.48% 5 Missing ⚠️
src/aggregate/aggregate_request.c 95.08% 3 Missing ⚠️
src/util/timeout.h 60.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             2.10    #7656      +/-   ##
==========================================
+ Coverage   89.31%   89.37%   +0.05%     
==========================================
  Files         207      210       +3     
  Lines       35504    35796     +292     
==========================================
+ Hits        31711    31991     +280     
- Misses       3793     3805      +12     
Flag Coverage Δ
flow 83.99% <94.55%> (-0.03%) ⬇️
unit 42.12% <7.38%> (-0.27%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@oshadmi oshadmi added this pull request to the merge queue Dec 7, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to a conflict with the base branch Dec 7, 2025
@oshadmi oshadmi added this pull request to the merge queue Dec 7, 2025
Merged via the queue into 2.10 with commit 6144fa2 Dec 7, 2025
22 checks passed
@oshadmi oshadmi deleted the nafraf_backport-7202-to-2.10 branch December 7, 2025 10:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants