[2.10] MOD-11751: Support WITHCOUNT keyword in FT.AGGREGATE#7656
[2.10] MOD-11751: Support WITHCOUNT keyword in FT.AGGREGATE#7656
Conversation
This reverts commit 97635ce.
PR #7202 Backport Diff: Master vs 2.10This document shows the differences between PR #7202 in master and the backport to 2.10 branch ( OverviewThe backport introduces the WITHCOUNT feature to 2.10, enabling accurate DetailsNew Files in 2.10
Note: MRCommand Structure ChangeAdded int16_t targetShard; // NEW: tracks which shard this command targets
int targetSlot; // existing fieldThis field is needed by ShardResponseBarrier to:
Table of Contents
rpnet.hdiff --git a/src/coord/rpnet.h b/coord/src/rpnet.h
index 3d2edf2fd..11000e8cc 100644
--- a/src/coord/rpnet.h
+++ b/coord/src/rpnet.h
@@ -14,7 +14,6 @@
#include "result_processor.h"
#include "rmr/rmr.h"
#include "aggregate/aggregate.h"
-#include "hybrid/hybrid_cursor_mappings.h"
#ifdef __cplusplus
extern "C" {
@@ -57,9 +56,6 @@ typedef struct {
MRCommand cmd;
AREQ *areq;
- // NEW: Direct cursor mappings (no more dispatcher context)
- StrongRef mappings; // Single mapping array per RPNet
-
// profile vars
arrayof(MRReply *) shardsProfile;
@@ -73,11 +69,10 @@ typedef struct {
void rpnetFree(ResultProcessor *rp);
-RPNet *RPNet_New(const MRCommand *cmd, int (*nextFunc)(ResultProcessor *, SearchResult *));
+RPNet *RPNet_New(const MRCommand *cmd);
void RPNet_resetCurrent(RPNet *nc);
int rpnetNext(ResultProcessor *self, SearchResult *r);
int rpnetNext_EOF(ResultProcessor *self, SearchResult *r);
-int rpnetNext_StartWithMappings(ResultProcessor *rp, SearchResult *r);
// Get the next reply from the channel.
// Return RS_RESULT_OK if there is a next reply to process, RS_RESULT_EOF if there are no more repliesrpnet.cdiff --git a/src/coord/rpnet.c b/coord/src/rpnet.c
index 7244d4e50..05696b462 100644
--- a/src/coord/rpnet.c
+++ b/coord/src/rpnet.c
@@ -12,47 +12,48 @@
#include "rmr/reply.h"
#include "rmr/rmr.h"
#include "hiredis/sds.h"
-#include "coord/dist_utils.h"
+#include "dist_utils.h"
+#include "coord/src/coord_module.h"
#define CURSOR_EOF 0
-static RSValue *MRReply_ToValue(MRReply *r) {
- if (!r) return RSValue_NullStatic();
+RSValue *MRReply_ToValue(MRReply *r) {
+ if (!r) return RS_NullVal();
RSValue *v = NULL;
switch (MRReply_Type(r)) {
case MR_REPLY_STATUS:
case MR_REPLY_STRING: {
size_t l;
const char *s = MRReply_String(r, &l);
- v = RSValue_NewCopiedString(s, l);
+ v = RS_NewCopiedString(s, l);
break;
}
case MR_REPLY_ERROR: {
double d = 42;
MRReply_ToDouble(r, &d);
- v = RSValue_NewNumber(d);
+ v = RS_NumVal(d);
break;
}
case MR_REPLY_INTEGER:
- v = RSValue_NewNumber((double)MRReply_Integer(r));
+ v = RS_NumVal((double)MRReply_Integer(r));
break;
case MR_REPLY_DOUBLE:
- v = RSValue_NewNumber(MRReply_Double(r));
+ v = RS_NumVal(MRReply_Double(r));
break;
case MR_REPLY_MAP: {
size_t n = MRReply_Length(r);
RS_LOG_ASSERT(n % 2 == 0, "map of odd length");
- size_t map_len = n / 2;
- RSValueMap map = RSValueMap_AllocUninit(map_len);
- for (size_t i = 0; i < map_len; i++) {
- MRReply *e_k = MRReply_ArrayElement(r, i * 2);
- RS_LOG_ASSERT(MRReply_Type(e_k) == MR_REPLY_STRING, "non-string map key");
- MRReply *e_v = MRReply_ArrayElement(r, (i * 2) + 1);
- RSValueMap_SetEntry(&map, i, MRReply_ToValue(e_k), MRReply_ToValue(e_v));
+ RSValue **map = rm_malloc(n * sizeof(*map));
+ for (size_t i = 0; i < n; ++i) {
+ MRReply *e = MRReply_ArrayElement(r, i);
+ if (i % 2 == 0) {
+ RS_LOG_ASSERT(MRReply_Type(e) == MR_REPLY_STRING, "non-string map key");
+ }
+ map[i] = MRReply_ToValue(e);
}
- v = RSValue_NewMap(map);
+ v = RSValue_NewMap(map, n / 2);
break;
}
case MR_REPLY_ARRAY: {
@@ -65,10 +66,10 @@ static RSValue *MRReply_ToValue(MRReply *r) {
break;
}
case MR_REPLY_NIL:
- v = RSValue_NullStatic();
+ v = RS_NullVal();
break;
default:
- v = RSValue_NullStatic();
+ v = RS_NullVal();
break;
}
return v;
@@ -178,7 +179,7 @@ static struct timespec *getAbsTimeout(RPNet *nc) {
if (!nc->areq || !nc->areq->sctx) {
return NULL;
}
- return (struct timespec *)&nc->areq->sctx->time.timeout;
+ return (struct timespec *)&nc->areq->sctx->timeout;
}rpnet.c - Error Handling and Timeout@@ -192,10 +193,10 @@ static bool shardResponseBarrier_HandleTimeout(RPNet *nc) {
// cleanup pending replies
shardResponseBarrier_PendingReplies_Free(nc);
- // Set error in AREQ context
+ // Set timeout error in AREQ context
QueryError_SetError(
- AREQ_QueryProcessingCtx(nc->areq)->err,
- QUERY_ERROR_CODE_TIMED_OUT,
+ nc->areq->qiter.err,
+ QUERY_ETIMEDOUT,
"ShardResponseBarrier: Timeout while waiting for first responses from all shards");
return true;
}
@@ -237,7 +238,7 @@ int getNextReply(RPNet *nc) {
while ((numShards = atomic_load(&nc->shardResponseBarrier->numShards)) == 0 ||
atomic_load(&nc->shardResponseBarrier->numResponded) < numShards) {
// Check for timeout to avoid blocking indefinitely
- if (nc->areq && nc->areq->sctx && TimedOut(&nc->areq->sctx->time.timeout)) {
+ if (nc->areq && nc->areq->sctx && TimedOut(&nc->areq->sctx->timeout)) {
break;
}rpnet.c - Profile Handling Removed in 2.10@@ -256,12 +257,6 @@ int getNextReply(RPNet *nc) {
// Check for errors
if (shardResponseBarrier_HandleError(nc)) {
nc->waitedForAllShards = true;
- // If for profiling, clone and append the error
- if (nc->cmd.forProfiling) {
- // Clone the error and append it to the profile
- MRReply *error = MRReply_Clone(nc->current.root);
- array_append(nc->shardsProfile, error);
- }
return RS_RESULT_OK;
}
}
@@ -293,109 +291,43 @@ int getNextReply(RPNet *nc) {
}
// Check if an error was returned
- if (MRReply_Type(root) == MR_REPLY_ERROR) {
+ if(MRReply_Type(root) == MR_REPLY_ERROR) {
nc->current.root = root;
- // If for profiling, clone and append the error
- if (nc->cmd.forProfiling) {
- // Clone the error and append it to the profile
- MRReply *error = MRReply_Clone(root);
- array_append(nc->shardsProfile, error);
- }
return RS_RESULT_OK;
}
-
- // For profile command, extract the profile data from the reply
- if (nc->cmd.forProfiling) {
- // if the cursor id is 0, this is the last reply from this shard, and it has the profile data
- if (CURSOR_EOF == MRReply_Integer(MRReply_ArrayElement(root, 1))) {
- MRReply *profile_data;
- if (nc->cmd.protocol == 3) {
- // RESP3 profile extraction...
- profile_data = MRReply_TakeMapElement(data, "profile");
- } else {
- // RESP2 profile extraction...
- profile_data = MRReply_TakeArrayElement(root, 2);
- }
- array_append(nc->shardsProfile, profile_data);
- }
- }rpnet.c - Error Code Handling (Master vs 2.10) // If an error was returned, propagate it
if (nc->current.root && MRReply_Type(nc->current.root) == MR_REPLY_ERROR) {
- QueryErrorCode errCode = QueryError_GetCodeFromMessage(MRReply_String(nc->current.root, NULL));
- // TODO - use should_return_error after it is changed to support RequestConfig ptr
- if (errCode == QUERY_ERROR_CODE_GENERIC ||
- ((errCode == QUERY_ERROR_CODE_TIMED_OUT) && nc -> areq -> reqConfig.timeoutPolicy == TimeoutPolicy_Fail) ||
- ((errCode == QUERY_ERROR_CODE_OUT_OF_MEMORY) && nc -> areq -> reqConfig.oomPolicy == OomPolicy_Fail)) {
- // We need to pass the reply string as the error message, since the error code might be generic
- QueryError_SetError(AREQ_QueryProcessingCtx(nc->areq)->err, errCode, MRReply_String(nc->current.root, NULL));
+ const char *strErr = MRReply_String(nc->current.root, NULL);
+ if (!strErr
+ || strcmp(strErr, "Timeout limit was reached")
+ || nc->areq->reqConfig.timeoutPolicy == TimeoutPolicy_Fail) {
+ QueryError_SetError(nc->areq->qiter.err, QUERY_EGENERIC, strErr);
return RS_RESULT_ERROR;
} else {
- // Handle shards returning error unexpectedly
- // Might be from different Timeout/OOM policy (See MOD-10774)
- // Free the error reply before we override it and continue
MRReply_Free(nc->current.root);
- // Set it as NULL avoid another free
- nc->current.root = NULL;
+ RPNet_resetCurrent(nc);
}
}rpnet.c - Warning Handling- if (!strcmp(warning_str, QueryError_Strerror(QUERY_ERROR_CODE_TIMED_OUT))) {
+ if (!strcmp(warning_str, QueryError_Strerror(QUERY_ETIMEDOUT))) {
timed_out = true;
} else if (!strcmp(warning_str, QUERY_WMAXPREFIXEXPANSIONS)) {
- QueryError_SetReachedMaxPrefixExpansionsWarning(AREQ_QueryProcessingCtx(nc->areq)->err);
- } else if (!strcmp(warning_str, QUERY_WOOM_SHARD)) {
- QueryError_SetQueryOOMWarning(AREQ_QueryProcessingCtx(nc->areq)->err);
+ nc->areq->qiter.err->reachedMaxPrefixExpansions = true;
}
if (!strcmp(warning_str, QUERY_WINDEXING_FAILURE)) {
- AREQ_QueryProcessingCtx(nc->areq)->bgScanOOM = true;
+ nc->areq->qiter.bgScanOOM = true;
}rpnet.c - RPNet_New and rpnetNext_StartWithMappings-RPNet *RPNet_New(const MRCommand *cmd, int (*nextFunc)(ResultProcessor *, SearchResult *)) {
+RPNet *RPNet_New(const MRCommand *cmd) {
RPNet *nc = rm_calloc(1, sizeof(*nc));
nc->cmd = *cmd; // Take ownership of the command's internal allocations
nc->areq = NULL;
nc->shardsProfile = NULL;
nc->base.Free = rpnetFree;
- nc->base.Next = nextFunc;
+ nc->base.Next = rpnetNext_Start;
nc->base.type = RP_NETWORK;
return nc;
}
-/**
- * Start function for RPNet with cursor mappings
- * Replaces rpnetNext_StartDispatcher
- */
-int rpnetNext_StartWithMappings(ResultProcessor *rp, SearchResult *r) {
- RPNet *nc = (RPNet *)rp;
- // ... entire function removed (hybrid cursor mappings not in 2.10)
-}
-int rpnetNext_EOF(ResultProcessor *self, SearchResult *r) {
- return RS_RESULT_EOF;
-}
+// Note: rpnetNext_EOF declaration exists in header but implementation missing in 2.10command.h / command.cAdded diff --git a/coord/src/rmr/command.h b/coord/src/rmr/command.h
+#define INVALID_SHARD -1
+
typedef struct {
/* The command args starting from the command itself */
const char **strs;
size_t *lens;
uint32_t num;
+ /* if not -1, this value indicate to which shard the command should be sent */
+ int16_t targetShard;
+
/* if not -1, this value indicate to which slot the command should be sent */
int targetSlot;diff --git a/coord/src/rmr/command.c b/coord/src/rmr/command.c
static void MRCommand_Init(MRCommand *cmd, size_t len) {
cmd->strs = rm_malloc(sizeof(*cmd->strs) * len);
cmd->lens = rm_malloc(sizeof(*cmd->lens) * len);
cmd->targetSlot = -1;
+ cmd->targetShard = INVALID_SHARD;
// ...
}
MRCommand MRCommand_Copy(const MRCommand *cmd) {
MRCommand ret;
MRCommand_Init(&ret, cmd->num);
+ ret.targetShard = cmd->targetShard;
ret.protocol = cmd->protocol;
// ...
}rmr.h / rmr.cAdded iterator private data support and timed operations for ShardResponseBarrier: diff --git a/coord/src/rmr/rmr.h b/coord/src/rmr/rmr.h
+// Get private data from iterator callback context
+void *MRIteratorCallback_GetPrivateData(MRIteratorCallbackCtx *ctx);
+// Pop with timeout support
+MRReply *MRIterator_NextWithTimeout(MRIterator *it, const struct timespec *abstime, bool *timedOut);
+// Create iterator with private data for barrier support
+MRIterator *MR_IterateWithPrivateData(const MRCommand *cmd, MRIteratorCallback cb, void *cbPrivateData,
+ void (*cbPrivateDataDestructor)(void *),
+ void (*cbPrivateDataInit)(void *, MRIterator *),
+ void (*iterStartCb)(void *), StrongRef *iterStartCbPrivateData);diff --git a/coord/src/rmr/rmr.c b/coord/src/rmr/rmr.c
struct MRIteratorCtx {
// ...
+ void (*privateDataDestructor)(void *); // Destructor for privateData
+ void (*privateDataInit)(void *, MRIterator *); // Init callback for privateData
};
struct MRIteratorCallbackCtx {
MRIterator *it;
MRCommand cmd;
+ void *privateData;
};
+void *MRIteratorCallback_GetPrivateData(MRIteratorCallbackCtx *ctx) {
+ return ctx->privateData;
+}
void iterStartCb(void *p) {
MRIterator *it = p;
// ...
+ // Call privateData init callback if set (e.g., to initialize ShardResponseBarrier)
+ void *privateData = MRIteratorCallback_GetPrivateData(&it->cbxs[0]);
+ if (privateData && it->ctx.privateDataInit) {
+ it->ctx.privateDataInit(privateData, it);
+ }
// Initialize targetShard for each command
cmd->targetSlot = cluster_g->topo->shards[0].startSlot;
+ cmd->targetShard = 0;
for (size_t i = 1; i < len; i++) {
// ...
it->cbxs[i].cmd.targetSlot = cluster_g->topo->shards[i].startSlot;
+ it->cbxs[i].cmd.targetShard = i;
+ it->cbxs[i].privateData = MRIteratorCallback_GetPrivateData(&it->cbxs[0]);
}
}
+MRReply *MRIterator_NextWithTimeout(MRIterator *it, const struct timespec *abstime, bool *timedOut) {
+ return MRChannel_PopWithTimeout(it->ctx.chan, abstime, timedOut);
+}chan.h / chan.cAdded timed wait support for channel operations (needed by ShardResponseBarrier): diff --git a/coord/src/rmr/chan.h b/coord/src/rmr/chan.h
+// Pop with timeout - returns NULL and sets timedOut=true if timeout expires
+void *MRChannel_PopWithTimeout(MRChannel *chan, const struct timespec *abstimeMono, bool *timedOut);diff --git a/coord/src/rmr/chan.c b/coord/src/rmr/chan.c
+#include <errno.h>
+#include <time.h>
+#include "search_ctx.h"
+#include "util/timeout.h"
MRChannel *MR_NewChannel() {
MRChannel *chan = rm_malloc(sizeof(*chan));
// ...
+#if defined(__APPLE__) && defined(__MACH__)
+ // macOS doesn't support pthread_condattr_setclock, use default clock
pthread_cond_init(&chan->cond, NULL);
+#else
+ // Initialize with CLOCK_MONOTONIC for use with pthread_cond_timedwait
+ pthread_condattr_t cond_attr;
+ pthread_condattr_init(&cond_attr);
+ pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
+ pthread_cond_init(&chan->cond, &cond_attr);
+ pthread_condattr_destroy(&cond_attr);
+#endif
// ...
}
+// Platform-specific timed wait on condition variable
+// Returns: true if timed out, false if signaled
+static bool condTimedWait(pthread_cond_t *cond, pthread_mutex_t *lock,
+ const struct timespec *abstimeMono) {
+ struct timespec nowRaw, remaining;
+ clock_gettime(CLOCK_MONOTONIC_RAW, &nowRaw);
+ rs_timerremaining((struct timespec *)abstimeMono, &nowRaw, &remaining);
+ if (remaining.tv_sec == 0 && remaining.tv_nsec == 0) {
+ return true; // timed out
+ }
+#if defined(__APPLE__) && defined(__MACH__)
+ return pthread_cond_timedwait_relative_np(cond, lock, &remaining) == ETIMEDOUT;
+#else
+ struct timespec nowMono, absMono;
+ clock_gettime(CLOCK_MONOTONIC, &nowMono);
+ rs_timeradd(&nowMono, &remaining, &absMono);
+ return pthread_cond_timedwait(cond, lock, &absMono) == ETIMEDOUT;
+#endif
+}
+void *MRChannel_PopWithTimeout(MRChannel *chan, const struct timespec *abstimeMono, bool *timedOut) {
+ *timedOut = false;
+ pthread_mutex_lock(&chan->lock);
+ while (!chan->size) {
+ if (!chan->wait) {
+ pthread_mutex_unlock(&chan->lock);
+ return NULL;
+ }
+ if (condTimedWait(&chan->cond, &chan->lock, abstimeMono)) {
+ *timedOut = true;
+ pthread_mutex_unlock(&chan->lock);
+ return NULL;
+ }
+ }
+ return popHeadAndUnlock(chan);
+}aggregate.hAdded new query flags and helper macros for WITHCOUNT support: diff --git a/src/aggregate/aggregate.h b/src/aggregate/aggregate.h
typedef enum {
// ... existing flags ...
+ // The query has an explicit SORTBY x - sort by a field
+ QEXEC_F_HAS_SORTBY = 0x8000000,
+
+ // The query should use a depleter in the pipeline (for FT.AGGREGATE)
+ QEXEC_F_HAS_DEPLETER = 0x10000000,
+
+ // The query has an explicit WITHCOUNT (for FT.AGGREGATE)
+ QEXEC_F_HAS_WITHCOUNT = 0x20000000,
+
+ // The query has an explicit GROUPBY (for FT.AGGREGATE)
+ QEXEC_F_HAS_GROUPBY = 0x40000000,
} QEFlags;
+#define IsAggregate(r) ((r)->reqflags & QEXEC_F_IS_AGGREGATE)
+#define HasDepleter(r) ((r)->reqflags & QEXEC_F_HAS_DEPLETER)
+#define HasWithCount(r) ((r)->reqflags & QEXEC_F_HAS_WITHCOUNT)
+#define IsCursor(r) ((r)->reqflags & QEXEC_F_IS_CURSOR)
+#define HasSortBy(r) ((r)->reqflags & QEXEC_F_HAS_SORTBY)
+#define HasGroupBy(r) ((r)->reqflags & QEXEC_F_HAS_GROUPBY)
+static inline QEFlags AREQ_RequestFlags(const AREQ *req) {
+ return (QEFlags)req->reqflags;
+}
+
+static inline void AREQ_AddRequestFlags(AREQ *req, QEFlags flags) {
+ req->reqflags = (QEFlags)(req->reqflags | flags);
+}
+
+static inline void AREQ_RemoveRequestFlags(AREQ *req, QEFlags flags) {
+ req->reqflags = (QEFlags)(req->reqflags & ~flags);
+}
+
+#define REQFLAGS_AddFlags(reqflags, flags) (*(reqflags) |= (flags))
+#define REQFLAGS_RemoveFlags(reqflags, flags) (*(reqflags) &= ~(flags))result_processor.h / result_processor.cAdded diff --git a/src/result_processor.h b/src/result_processor.h
typedef enum {
// ... existing types ...
+ RP_DEPLETER,
RP_MAX
} ResultProcessorType;
+ResultProcessor *RPDepleter_New();diff --git a/src/result_processor.c b/src/result_processor.c
static char *RPTypeLookup[RP_MAX] = {"Index", "Loader", "Threadsafe-Loader", "Scorer",
"Sorter", "Counter", "Pager/Limiter", "Highlighter",
"Grouper", "Projector", "Filter", "Profile",
- "Network", "Metrics Applier", "Key Name Loader", "Score Max Normalizer"};
+ "Network", "Metrics Applier", "Key Name Loader", "Score Max Normalizer",
+ "Depleter"};
+/*******************************************************************************************************************
+ * Depleter Result Processor
+ *
+ * The RPDepleter result processor consumes all results from its upstream
+ * processor synchronously, storing them in an internal array. It then yields
+ * results one by one from this array.
+ *******************************************************************************************************************/
+typedef struct {
+ ResultProcessor base;
+ arrayof(SearchResult *) results;
+ size_t cur_idx;
+ RPStatus last_rc;
+ uint32_t depleted_results;
+} RPDepleter;
+
+static void RPDepleter_Deplete(RPDepleter *self) {
+ RPStatus rc;
+ SearchResult *r = rm_calloc(1, sizeof(*r));
+ while ((rc = self->base.upstream->Next(self->base.upstream, r)) == RS_RESULT_OK) {
+ array_append(self->results, r);
+ r = rm_calloc(1, sizeof(*r));
+ self->depleted_results++;
+ }
+ SearchResult_Destroy(r);
+ rm_free(r);
+ self->last_rc = rc;
+}
+
+static int RPDepleter_Next_Yield(ResultProcessor *base, SearchResult *r) {
+ RPDepleter *self = (RPDepleter *)base;
+ if (self->cur_idx >= array_len(self->results)) {
+ int ret = self->last_rc;
+ self->last_rc = RS_RESULT_EOF;
+ return ret;
+ }
+ SearchResult *current = self->results[self->cur_idx];
+ SearchResult_Override(r, current);
+ rm_free(current);
+ self->results[self->cur_idx] = NULL;
+ self->cur_idx++;
+ return RS_RESULT_OK;
+}
+
+static int RPDepleter_Next_Accumulate(ResultProcessor *base, SearchResult *r) {
+ RPDepleter *self = (RPDepleter *)base;
+ RPDepleter_Deplete(self);
+ self->base.Next = RPDepleter_Next_Yield;
+ return RPDepleter_Next_Yield(base, r);
+}
+
+static void RPDepleter_Free(ResultProcessor *base) {
+ RPDepleter *self = (RPDepleter *)base;
+ array_free_ex(self->results, srDtor(*(SearchResult**)ptr));
+ rm_free(self);
+}
+
+ResultProcessor *RPDepleter_New() {
+ RPDepleter *ret = rm_calloc(1, sizeof(*ret));
+ ret->results = array_new(SearchResult*, 0);
+ ret->base.Next = RPDepleter_Next_Accumulate;
+ ret->base.Free = RPDepleter_Free;
+ ret->base.type = RP_DEPLETER;
+ ret->depleted_results = 0;
+ return &ret->base;
+}test_aggregate.pyRemoved:
|
| Component | What was added |
|---|---|
| MRCommand.targetShard | New field to track which shard a command targets |
| MR_IterateWithPrivateData() | Iterator creation with private data for barrier callbacks |
| MRIteratorCallback_GetPrivateData() | Access private data from callback context |
| MRIterator_NextWithTimeout() | Timed pop from iterator channel |
| MRChannel_PopWithTimeout() | Timed channel pop with platform-specific clock handling |
| condTimedWait() | Platform-specific (macOS/Linux) timed condition wait |
| RPDepleter | New result processor for synchronous depletion |
| QEXEC_F_HAS_WITHCOUNT | Query flag for WITHCOUNT |
| QEXEC_F_HAS_DEPLETER | Query flag for depleter usage |
| QEXEC_F_HAS_SORTBY | Query flag for explicit SORTBY |
| QEXEC_F_HAS_GROUPBY | Query flag for explicit GROUPBY |
| HasWithCount(), HasDepleter(), etc. | Helper macros for flag checking |
| AREQ_RequestFlags(), AREQ_AddRequestFlags() | Flag manipulation helpers |
Code Adaptations
| Category | Master | 2.10 Backport | Notes |
|---|---|---|---|
| RSValue API | RSValue_NullStatic(), RSValue_NewCopiedString(), RSValue_NewNumber() |
RS_NullVal(), RS_NewCopiedString(), RS_NumVal() |
Adapted to use 2.10's existing APIs |
| Map API | RSValueMap_AllocUninit() + RSValueMap_SetEntry() |
rm_malloc() + RSValue_NewMap(map, n/2) |
Different allocation pattern in 2.10 |
| Error API | AREQ_QueryProcessingCtx(nc->areq)->err |
nc->areq->qiter.err |
Different access pattern |
| Error Codes | QUERY_ERROR_CODE_TIMED_OUT, QUERY_ERROR_CODE_OUT_OF_MEMORY |
QUERY_ETIMEDOUT (no OOM handling) |
Different enum names |
| Timeout Access | nc->areq->sctx->time.timeout |
nc->areq->sctx->timeout |
Different struct nesting |
| Error Detection | QueryError_GetCodeFromMessage() |
strcmp(strErr, "Timeout limit was reached") |
Function doesn't exist in 2.10 |
| Warning Setters | QueryError_SetReachedMaxPrefixExpansionsWarning() |
Direct field: err->reachedMaxPrefixExpansions = true |
Helper doesn't exist in 2.10 |
Features Not Included in Backport
| Feature | Reason |
|---|---|
| Profile error cloning | Master clones errors into shardsProfile; 2.10 does not |
| Hybrid Cursors | rpnetNext_StartWithMappings(), StrongRef mappings - master-only feature |
| OOM policy handling | QUERY_ERROR_CODE_OUT_OF_MEMORY, QueryError_SetQueryOOMWarning() |
| PR #6880 Fix | rpevalNext_filter decrementing totalResults on FILTER - not backported |
| rpnetNext_EOF | Declaration exists but implementation missing (dead code, not used) |
Test Changes
| Change | Notes |
|---|---|
WITHCOUNT tests use unstable_features |
Feature behind ENABLE_UNSTABLE_FEATURES flag in 2.10 |
_test_withcount() removed from some tests |
testParseTime, testFilter (cluster), testFilterCase2 |
| Barrier delay tests removed | Require PAUSE_BEFORE_RP_N debug feature not in 2.10 |
testDefaultValues, testeAggregateBadApplyFunction |
Never existed in 2.10 (test features added after branch) |
Known Limitations
total_resultsaccuracy with FILTER is not guaranteed (PR MOD-11416: Fixtotal_resultAccuracy When Using FILTER #6880 not backported)- WITHCOUNT is behind
ENABLE_UNSTABLE_FEATURESflag in 2.10
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## 2.10 #7656 +/- ##
==========================================
+ Coverage 89.31% 89.37% +0.05%
==========================================
Files 207 210 +3
Lines 35504 35796 +292
==========================================
+ Hits 31711 31991 +280
- Misses 3793 3805 +12
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Manual backport #7202 to 2.10
Differences:
FT.AGGREGATE + WITHCOUNTis guarded byENABLE_UNSTABLE_FEATUREStotal_resultAccuracy When Using FILTER #6880 is not included with 2.10 branch.More detailed review can be seen here
Note
Add WITHCOUNT to FT.AGGREGATE, using a coordinator-side barrier and pipeline changes to compute accurate total_results, with new RP and infra updates plus tests.
ShardResponseBarrierto wait for first replies from all shards and accumulatetotal_results(handles errors/timeouts).rpnet.*anddist_utils.*; movenetCursorCallback/getCursorCommandand addtargetShardtoMRCommand.MRIterator_NextWithTimeout, timedMRChannel_PopWithTimeout.QEXEC_F_HAS_WITHCOUNT,QEXEC_F_HAS_DEPLETER,QEXEC_F_HAS_SORTBY,QEXEC_F_HAS_GROUPBY; validateWITHCOUNTvsWITHCURSOR.RPDepleterto fully consume upstream when needed for accurate counts; adjust arrange/pager building accordingly.RP_DEPLETERtype.WITHCOUNTand barrier tests (timeouts, concurrency, errors, KNN, pagination, profiles); update existing aggregate tests to toggle unstable features.REJSON_BRANCHto2.8in build; fixarray_delto usememmove; remove unusedstrtolower.Written by Cursor Bugbot for commit 31c1b34. This will update automatically on new commits. Configure here.