Skip to content

Bug fixes related to Timeout with return partial results policy#5556

Merged
meiravgri merged 26 commits intomeiravg_debug_query_commandfrom
meiravg_nonstrict_timeout_bug_fixes
Feb 16, 2025
Merged

Bug fixes related to Timeout with return partial results policy#5556
meiravgri merged 26 commits intomeiravg_debug_query_commandfrom
meiravg_nonstrict_timeout_bug_fixes

Conversation

@meiravgri
Copy link
Collaborator

@meiravgri meiravgri commented Jan 23, 2025

This PR addresses multiple bugs that occur when the timeout policy is return partial results.

MOD-8483 Bug 1: Cursor with SORTBY is never depleted

When a cursor read with a sorter encounters a timeout during the sorter’s accumulation phase, the sorter switches to yield mode and sets its timedOut flag.
After this, it begins emptying its heap, returning the results collected up to the timeout.
During this process, each returned result has the status RS_RESULT_OK.
Once the heap is emptied, the sorter returns RS_RESULT_TIMEDOUT, and the current read command is done, with a valid cursor id.

return self->timedOut ? RS_RESULT_TIMEDOUT : RS_RESULT_EOF;

At this point, every subsequent FT.CURSOR READ call receives RS_RESULT_TIMEDOUT, even though no more results are available. Since the cursor is only considered depleted when RS_RESULT_EOF is returned, it remains open indefinitely despite having no results left to provide.

Fix:

To address this issue, the timedOut flag is now reset after the timeout status is returned. This ensures that subsequent FT.CURSOR READ commands return RS_RESULT_EOF, allowing the cursor to be properly marked as depleted once all results have been processed.

MOD-8482 Bug 2: Serializing empty result with resp3

in sendChunk_Resp3, During the initial execution of the query pipeline, if a timeout occurred before any results were successfully collected, the return code (rc) was incorrectly changed from RS_RESULT_TIMEDOUT to RS_RESULT_OK This caused an empty result to be serialized and incorrectly returned as a valid result.

Additionally, in standalone mode (SA), the RP_INDEX result processor checks for timeouts every 100 results. If a timeout occurred during the first pipeline run, the timeout counter of RP_INDEX was reset to 0, allowing up to 99 valid results to be returned before the next timeout check, thereby violating the timeout limit.

For subsequent pipeline runs, the rc for RS_RESULT_TIMEDOUT was handled correctly and not altered.

Fix:

The fix ensures that the rc is not changed when a timeout occurs.

MOD-8515 Bug 3: FT.AGGREGATE WITHCURSOR hangs when shards return 0 results, but cursor is not depleted

When shards return an empty results array (length 0), MRIteratorCallback_AddReply is not called to push the result to the channel, leaving the channel waiting indefinitely in MRChannel_Pop.

Additionally, since the cursor is not depleted the MRIteratorCallback_Done function—which decreases the pending count and calls MRChannel_Unblock to wake the waiting MRChannel_Pop—is skipped.
Instead, MRIteratorCallback_ProcessDone is called, completing the results collection from the shard but leaving MRChannel_Pop in a hanging state.

Fix:

Move MRChannel_Unblock call to MRIteratorCallback_ProcessDone, ensuring it is executed even when the cursor is not depleted. This change releases the conditional wait in MRChannel_Pop, preventing the hang.
Additionally, when no results are returned from the shards, we avoid returning EOF. Instead, a valid cursor ID is returned gracefully, allowing the next FT.CURSOR READ to attempt to collect results.

MOD-8606 Bug 4: RP_Pager incorrectly updates its counter when no results are received

When used with cursors, this behavior causes the counter to be incorrectly updated for each FT.CURSOR READ that times out. As a result, the total number of results returned by all cursor reads is less than the requested LIMIT, specifically reduced by the number of FT.CURSOR READ commands that encountered a timeout.
for example:

FT.AGGREGATE WITHCURSOR count 500000 LIMIT 0 500 TIMEOUT 1
- first read: 399 results returned (timedout)
Expected pager remaining: 101, but it is incorrectly set to 100.
- Next read: The pager counter reaches 0 after 100 results and returns EOF.
Cursor ID is set to 0 since EOF was reached. 
Total results: 499 instead of the requested 500.

Fix:

Ensure that the counter is only updated when the upstream RPs return RS_RESULT_OK. This prevents erroneous decrements when no results are received, maintaining the correct count and ensuring the LIMIT is respected.

MOD-8794 Bug 5: freeFlag race condition

There is a race condition on it->ctx.freeFlag checked in MRIterator_Release to trigger an iterator release process in the following scenario:

shard thread: a shard decreases inProcess atomic counter. The counter reaches 0.

coord thread: now the coordinator (reader) reaches MR_ManuallyTriggerNextIfNeeded, since inProcess == 0 and it->ctx.pending > 0, is sets it->ctx.freeFlag = false and triggers another cursor commands to the shards

shard thread: since nProcess == 0, it calls MRIterator_Release, sets it->ctx.freeFlag = true and returns immediately since it->ctx.freeFlag was false.

now it->ctx.freeFlag = true so next shard to reach MRIterator_Release will release it prematurely.

Fix:

move it->ctx.freeFlag = false from the coordinator (reader) thread to the uv thread, ensuring it happens sequentially after the shards (writers) are all done.

MOD-8792 (Not related to timeout) Wrong string length of command arg in FlatSearchCommandHandler

MRCommand_Insert expects to receive the length of the string written into n_prefixes.

instead, we return the size of the pointer variable.

  char *n_prefixes;
  rm_asprintf(&n_prefixes, "%u", array_len(prefixes));
  MRCommand_Insert(cmd, arg_pos++, n_prefixes, sizeof(n_prefixes) - 1);
  rm_free(n_prefixes);

Fix:

rm_asprintf returns the number of bytes printed (excluding the null terminator)

use this to pass the length of the command string.

  char *n_prefixes;
  int cmd_str_len = rm_asprintf(&n_prefixes, "%u", array_len(prefixes));
  MRCommand_Insert(cmd, arg_pos++, n_prefixes, cmd_str_len);
  rm_free(n_prefixes);

@codecov
Copy link

codecov bot commented Jan 23, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 87.85%. Comparing base (f173dd3) to head (9f3e807).
Report is 15 commits behind head on meiravg_debug_query_command.

Additional details and impacted files
@@                       Coverage Diff                       @@
##           meiravg_debug_query_command    #5556      +/-   ##
===============================================================
+ Coverage                        87.63%   87.85%   +0.22%     
===============================================================
  Files                              196      196              
  Lines                            35592    35603      +11     
===============================================================
+ Hits                             31190    31278      +88     
+ Misses                            4402     4325      -77     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@meiravgri meiravgri changed the title Meiravg_nonstrict_timeout_bug_fixes Bug fixes related to Timeout with return partial results polocy Jan 26, 2025
@meiravgri meiravgri requested review from GuyAv46 and raz-mon January 26, 2025 09:56
Comment on lines 478 to 481
// Unblock the channel before calling `ProcessDone`, as it may trigger the freeing of the iterator
MRChannel_Unblock(ctx->it->ctx.chan);
MRIterator_Release(ctx->it);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Comment is incorrect
  2. As discussed, this is a race condition and we should unblock the channel after attempting to release the iterator (setting the freeing flag). IMO the call to unblock should be part of the release flow, but we can also make the MRIterator_Release call return a value to indicate whether we freed the iterator or not, and call the unblock accordingly

Copy link
Collaborator

@raz-mon raz-mon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GREAT work!
Partial review (since more changes are on the way)

@meiravgri meiravgri changed the title Bug fixes related to Timeout with return partial results polocy Bug fixes related to Timeout with return partial results policy Feb 8, 2025
@meiravgri meiravgri requested review from GuyAv46 and raz-mon February 10, 2025 14:32
Copy link
Collaborator

@GuyAv46 GuyAv46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice

pthread_mutex_lock(&chan->lock);
while (!chan->size) {
if (!chan->wait) {
chan->wait = true; // reset the flag
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now necessarily only unblock a single reader, should we change the broadcast call to a signal in the unblock function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the risks? what is the benefit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future-proofing, if we ever want to have multiple readers. We now unblock a single reader, so why wake everyone up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// Unblock the channel before calling `ProcessDone`, as it may trigger the freeing of the iterator
MRChannel_Unblock(ctx->it->ctx.chan);
}
RS_LOG_ASSERT(pending >= 0, "Pending should not reach a negative value");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace this with a simple assert call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the diffrence?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully, soon, we will have dev-time and prod-time assertions, this one should be a dev-time assertion IMO, so no need for a redis module API assertion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you think this is a dev time assertion?
I suggest replacing it with a simple assert + log message so we won't crush (here) on production, WDYT?


# Ensure the cursor is properly depleted after one FT.CURSOR READ
iter = 0
while (cursor):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use a while loop if we expect a single read?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simulate the scenario we don't expect and have debugging info if it happens

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so why not assert that we get cursor id 0 from the read, and add meaningful info if it fails?

@meiravgri meiravgri requested a review from alonre24 February 11, 2025 15:58
@meiravgri
Copy link
Collaborator Author

meiravgri commented Feb 12, 2025

Runnning on all linux platform:
https://github.com/RediSearch/RediSearch/actions/runs/13267001852/job/37036891672 test_resp3:test_error_with_partial_results fails as a result of fixing MOD-8482 Bug 2: Serializing empty result with resp3.

(FAIL):	0 > 0	test_resp3.py:1510 
# `FT.AGGREGATE`
  res = conn.execute_command(
    'FT.AGGREGATE', 'idx', '*', 'TIMEOUT', '1'
  )

  # Assert that we got results
  env.assertGreater(len(res['results']), 0)

The test is flaky because the timeout is not always triggered- depends on the timing of the query execution.
When the timeout does occur (as expected), the result may return 0 results. Prior to fixing MOD-8482, even if no documents were returned from the shards, a non-valid result was still serialized, making it seem like at least one result was returned.

Now, after the bug fix, when the timeout happens, no results are returned, causing the test to fail as it expects the results count to be greater than 0.

I'll fix the test to enforce timeout by calling the debug query mechanism.

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each
before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount
… N==0 to avoid coordinator infinite loop

fix missing return in debug commands
the problem was that we were trying to decrese &ctx->it->ctx.inProcess when the shard decided to release the iterator (MRIterator_Release released it)
The solution is to change MRIterator_Release signature. Now it returns weather it freed the iterator or not. if it was released, return
…itions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block
…imulate well the scenario:

i.e -the shard returns timeout error string, butthe coordinator clock is not set to be timeout.
Trying to only return an error from the shards without any special changes in the coordinator: RPNetNext returns error (and exits the while loop) only if the reply contains timeout AND the policy is to fail. otherwise, we enter another while loop iteration without processing the reply, overriding it with the next loop. This is a leak.
Another approach i took was: in the callback passed to the resultprocessor I tried to first call getnextreply before rpnetNext checks the timeout, but it didn't work since for some reason i coudnlt figure (the shards didn't shut down. mught be a bug in RLTest)

Returning a generic query code soen't simulate the scenario required.
revmove sortable
…itions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block
@meiravgri meiravgri force-pushed the meiravg_nonstrict_timeout_bug_fixes branch from 1e9e5a5 to be8eae2 Compare February 12, 2025 11:43
Copy link
Collaborator

@raz-mon raz-mon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice!
Kudos for the super clear docs all around!
Bravo 👏


# This simulates a scenario where shards return empty results due to timeout (so the cursor is still valid),
# but the coordinator managed to call 'getNextReply' and waits for replies in MRChannel_Pop, before it checked timeout.
# Note: An empty reply does not wake up the coordinator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it does, use did for clarity

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK it's clear from the rest as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still doesn't. the channel will be unblocked because all the shards are done sending the current poll of results.

simplify testCursorDepletionNonStrictTimeoutPolicySortby test
@meiravgri meiravgri requested a review from GuyAv46 February 16, 2025 12:58
@meiravgri meiravgri merged commit fa5b61a into meiravg_debug_query_command Feb 16, 2025
6 checks passed
@meiravgri meiravgri deleted the meiravg_nonstrict_timeout_bug_fixes branch February 16, 2025 15:40
github-merge-queue bot pushed a commit that referenced this pull request Mar 6, 2025
* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* Add query debug commands:
API:
_FT.DEBUG <query>  <DEBUG_TYPE> <DEBUG_TYPE_ARGS> ... DEBUG_PARAMS_COUNT 2

query can be either FT.AGGREGATE ... or FT.SEARCH ... (with all supported args)

DEBUG_TYPE Currently supports TIMEOUT_AFTER_N, followed by the number of results to return before timeout is returned, including 0.

can be called both in SA and cluster:
_FT.DEBUG FT.AGGREGATE
_FT.DEBUG FT.SEARCH

Can be called only if num shards is 1:
_FT.DEBUG _FT.AGGREGATE
_FT.DEBUG _FT.SEARCH

Changes taking place in production functions:
module.c
*if in `DistSearchCommand` (ft.search) and `DistAggregateCommand` (ft.aggregate) to check if it's a debug command
* if debug and 1 shard -> call DEBUG_RSSearchCommand or DEBUG_RSAggregateCommand for search and aggregate, respectively.
*if debug and cluster: call DEBUG_DistSearchCommandHandler or DEBUG_RSExecDistAggregate for search and aggregate, respectively.

* split FlatSearchCommandHandler into:
  * CreateReq
  * prepareCommand
  * MR_CreateCtx
  * MRCtx_SetReduceFunction
  * MR_Fanout

* DEBUG_FlatSearchCommandHandler flow;
  * CreateReq
  * prepareCommand
  * add _FT.DEBUG as first arg
  * add debug params at the end

* expose DistAggregateCommandand DistSearchCommand so it can be called from debug_commands.c

aggregate_exec.c:
split execCommandCommon into
  * prepareRequest
  * buildPipelineAndExecute

check if AREQ->flags & QEXEC_F_DEBUG in prepareExecutionPlan
change arg withProfile to execOptions
change profile options to bit flags
remove NO_PROFILE and replace with a flag indicating this is a profile command EXEC_WITH_PROFILE
than if its profile, checking if EXEC_WITH_PROFILE_LIMITED bit is set.
parseProfile also gets execOptions

*DEBUG_* command are defined in debug_commands.h

New:
RPTimeoutAfterCount_New
a result processor sumulates timeout after n results.

AREQ_Debug_New create AREQ_Debug holding a request by value. hence freed toghther.
new AREQ flag : QEXEC_F_DEBUG

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* finalize test debug comman

* add GetNumShards_UnSafe to read number of shards
use in in debug command instead of havong an if in the production function

* add sanity test

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* remove bug fixes
1. sorter
2. pager
3. resp3 change rc in RS_RESULT_TIMEDOUT
4. coord hangign

tests:
revert rest cursors with _FT.DEBUG QUERY
remove new test_timeout

* add timeout build mechanism

* set non strict mode to debug tests
isClusterCoord to check if we are the coordinator in cluster mode

* REVERT timeout on build

* Bug fixes related to Timeout with return partial results policy (#5556)

* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* disable testTimeoutPartialWithEmptyResults test

* enforce timeout when calling INTERNAL_ONLY with TIMEOUT_AFTER_N where N==0 to avoid coordinator infinite loop

fix missing return in debug commands

* fix tests: explictly settimeout policy to on_timeout return

* fix use after free in rmr.c:MRIteratorCallback_ProcessDone

the problem was that we were trying to decrese &ctx->it->ctx.inProcess when the shard decided to release the iterator (MRIterator_Release released it)
The solution is to change MRIterator_Release signature. Now it returns weather it freed the iterator or not. if it was released, return

* return false if free flag is set to false

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* fix leak

* simulate timeout during build - TO BE REVERTED as currently doesn't simulate well the scenario:
i.e -the shard returns timeout error string, butthe coordinator clock is not set to be timeout.
Trying to only return an error from the shards without any special changes in the coordinator: RPNetNext returns error (and exits the while loop) only if the reply contains timeout AND the policy is to fail. otherwise, we enter another while loop iteration without processing the reply, overriding it with the next loop. This is a leak.
Another approach i took was: in the callback passed to the resultprocessor I tried to first call getnextreply before rpnetNext checks the timeout, but it didn't work since for some reason i coudnlt figure (the shards didn't shut down. mught be a bug in RLTest)

Returning a generic query code soen't simulate the scenario required.

* REVERT tieout on buld

* enable test_cursors:testTimeoutPartialWithEmptyResults

skip testEmptyResult in coord

* fix comment

revmove sortable

* stable test_error_with_partial_results by using query debug

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* signal one reader

simplify testCursorDepletionNonStrictTimeoutPolicySortby test

* fix typo

* revert force failure

* fix spelling errors

* review fixes:

remove load from AggregateDebug test

introduce DistAggregateCleanups to handle `go to err` in DEBUG_RSExecDistAggregate and RSExecDistAggregate

comments in PipelineAddTimeoutAfterCount

* move debug things to a header

* alaborate comment

* more details in aggregate_debug.h
refrence cursor test to the header

modify and add some comments

* fix comment in  rp
use cursor to parse dewbug args

fix

* disable internal only when used with 0 results and no user cursor

* remove DUMMY_DEBUG_OPTION

* fix
meiravgri added a commit that referenced this pull request Mar 6, 2025
* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* Add query debug commands:
API:
_FT.DEBUG <query>  <DEBUG_TYPE> <DEBUG_TYPE_ARGS> ... DEBUG_PARAMS_COUNT 2

query can be either FT.AGGREGATE ... or FT.SEARCH ... (with all supported args)

DEBUG_TYPE Currently supports TIMEOUT_AFTER_N, followed by the number of results to return before timeout is returned, including 0.

can be called both in SA and cluster:
_FT.DEBUG FT.AGGREGATE
_FT.DEBUG FT.SEARCH

Can be called only if num shards is 1:
_FT.DEBUG _FT.AGGREGATE
_FT.DEBUG _FT.SEARCH

Changes taking place in production functions:
module.c
*if in `DistSearchCommand` (ft.search) and `DistAggregateCommand` (ft.aggregate) to check if it's a debug command
* if debug and 1 shard -> call DEBUG_RSSearchCommand or DEBUG_RSAggregateCommand for search and aggregate, respectively.
*if debug and cluster: call DEBUG_DistSearchCommandHandler or DEBUG_RSExecDistAggregate for search and aggregate, respectively.

* split FlatSearchCommandHandler into:
  * CreateReq
  * prepareCommand
  * MR_CreateCtx
  * MRCtx_SetReduceFunction
  * MR_Fanout

* DEBUG_FlatSearchCommandHandler flow;
  * CreateReq
  * prepareCommand
  * add _FT.DEBUG as first arg
  * add debug params at the end

* expose DistAggregateCommandand DistSearchCommand so it can be called from debug_commands.c

aggregate_exec.c:
split execCommandCommon into
  * prepareRequest
  * buildPipelineAndExecute

check if AREQ->flags & QEXEC_F_DEBUG in prepareExecutionPlan
change arg withProfile to execOptions
change profile options to bit flags
remove NO_PROFILE and replace with a flag indicating this is a profile command EXEC_WITH_PROFILE
than if its profile, checking if EXEC_WITH_PROFILE_LIMITED bit is set.
parseProfile also gets execOptions

*DEBUG_* command are defined in debug_commands.h

New:
RPTimeoutAfterCount_New
a result processor sumulates timeout after n results.

AREQ_Debug_New create AREQ_Debug holding a request by value. hence freed toghther.
new AREQ flag : QEXEC_F_DEBUG

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* finalize test debug comman

* add GetNumShards_UnSafe to read number of shards
use in in debug command instead of havong an if in the production function

* add sanity test

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* remove bug fixes
1. sorter
2. pager
3. resp3 change rc in RS_RESULT_TIMEDOUT
4. coord hangign

tests:
revert rest cursors with _FT.DEBUG QUERY
remove new test_timeout

* add timeout build mechanism

* set non strict mode to debug tests
isClusterCoord to check if we are the coordinator in cluster mode

* REVERT timeout on build

* Bug fixes related to Timeout with return partial results policy (#5556)

* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* disable testTimeoutPartialWithEmptyResults test

* enforce timeout when calling INTERNAL_ONLY with TIMEOUT_AFTER_N where N==0 to avoid coordinator infinite loop

fix missing return in debug commands

* fix tests: explictly settimeout policy to on_timeout return

* fix use after free in rmr.c:MRIteratorCallback_ProcessDone

the problem was that we were trying to decrese &ctx->it->ctx.inProcess when the shard decided to release the iterator (MRIterator_Release released it)
The solution is to change MRIterator_Release signature. Now it returns weather it freed the iterator or not. if it was released, return

* return false if free flag is set to false

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* fix leak

* simulate timeout during build - TO BE REVERTED as currently doesn't simulate well the scenario:
i.e -the shard returns timeout error string, butthe coordinator clock is not set to be timeout.
Trying to only return an error from the shards without any special changes in the coordinator: RPNetNext returns error (and exits the while loop) only if the reply contains timeout AND the policy is to fail. otherwise, we enter another while loop iteration without processing the reply, overriding it with the next loop. This is a leak.
Another approach i took was: in the callback passed to the resultprocessor I tried to first call getnextreply before rpnetNext checks the timeout, but it didn't work since for some reason i coudnlt figure (the shards didn't shut down. mught be a bug in RLTest)

Returning a generic query code soen't simulate the scenario required.

* REVERT tieout on buld

* enable test_cursors:testTimeoutPartialWithEmptyResults

skip testEmptyResult in coord

* fix comment

revmove sortable

* stable test_error_with_partial_results by using query debug

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* signal one reader

simplify testCursorDepletionNonStrictTimeoutPolicySortby test

* fix typo

* revert force failure

* fix spelling errors

* review fixes:

remove load from AggregateDebug test

introduce DistAggregateCleanups to handle `go to err` in DEBUG_RSExecDistAggregate and RSExecDistAggregate

comments in PipelineAddTimeoutAfterCount

* move debug things to a header

* alaborate comment

* more details in aggregate_debug.h
refrence cursor test to the header

modify and add some comments

* fix comment in  rp
use cursor to parse dewbug args

fix

* disable internal only when used with 0 results and no user cursor

* remove DUMMY_DEBUG_OPTION

* fix

(cherry picked from commit d45701e)
github-merge-queue bot pushed a commit that referenced this pull request Mar 9, 2025
* Introducing query debug mechanism 🎉 (#5554)

* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* Add query debug commands:
API:
_FT.DEBUG <query>  <DEBUG_TYPE> <DEBUG_TYPE_ARGS> ... DEBUG_PARAMS_COUNT 2

query can be either FT.AGGREGATE ... or FT.SEARCH ... (with all supported args)

DEBUG_TYPE Currently supports TIMEOUT_AFTER_N, followed by the number of results to return before timeout is returned, including 0.

can be called both in SA and cluster:
_FT.DEBUG FT.AGGREGATE
_FT.DEBUG FT.SEARCH

Can be called only if num shards is 1:
_FT.DEBUG _FT.AGGREGATE
_FT.DEBUG _FT.SEARCH

Changes taking place in production functions:
module.c
*if in `DistSearchCommand` (ft.search) and `DistAggregateCommand` (ft.aggregate) to check if it's a debug command
* if debug and 1 shard -> call DEBUG_RSSearchCommand or DEBUG_RSAggregateCommand for search and aggregate, respectively.
*if debug and cluster: call DEBUG_DistSearchCommandHandler or DEBUG_RSExecDistAggregate for search and aggregate, respectively.

* split FlatSearchCommandHandler into:
  * CreateReq
  * prepareCommand
  * MR_CreateCtx
  * MRCtx_SetReduceFunction
  * MR_Fanout

* DEBUG_FlatSearchCommandHandler flow;
  * CreateReq
  * prepareCommand
  * add _FT.DEBUG as first arg
  * add debug params at the end

* expose DistAggregateCommandand DistSearchCommand so it can be called from debug_commands.c

aggregate_exec.c:
split execCommandCommon into
  * prepareRequest
  * buildPipelineAndExecute

check if AREQ->flags & QEXEC_F_DEBUG in prepareExecutionPlan
change arg withProfile to execOptions
change profile options to bit flags
remove NO_PROFILE and replace with a flag indicating this is a profile command EXEC_WITH_PROFILE
than if its profile, checking if EXEC_WITH_PROFILE_LIMITED bit is set.
parseProfile also gets execOptions

*DEBUG_* command are defined in debug_commands.h

New:
RPTimeoutAfterCount_New
a result processor sumulates timeout after n results.

AREQ_Debug_New create AREQ_Debug holding a request by value. hence freed toghther.
new AREQ flag : QEXEC_F_DEBUG

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* finalize test debug comman

* add GetNumShards_UnSafe to read number of shards
use in in debug command instead of havong an if in the production function

* add sanity test

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* remove bug fixes
1. sorter
2. pager
3. resp3 change rc in RS_RESULT_TIMEDOUT
4. coord hangign

tests:
revert rest cursors with _FT.DEBUG QUERY
remove new test_timeout

* add timeout build mechanism

* set non strict mode to debug tests
isClusterCoord to check if we are the coordinator in cluster mode

* REVERT timeout on build

* Bug fixes related to Timeout with return partial results policy (#5556)

* reset self->timedOut
add test

* revert otiginal test

* suuport empty results (to bereverted)

* revert getting empty result

fix bug 1:
aggregate_exec:sendChunk_Resp3
Don't change rc upon timeout, to avoid returning empty results.

bug fix2:
reset sorter timeout so we retrun rof after depleting the heap.

bug fix3:
if shards didn't send any reply (curerntly assuming only possible due to timeout), but they did finish collecting the results, than inprocess would set to 0, but pending is still larger than 0, and the cursor is not depleted.
in this case, we unblock the chanel, signle it to proceed. we rely on the coordinator to check the timeout. If it's timeout it will return.

to allow that, get next reply returns pending (number of shard that potentially have results) instead of 0. in this case rpnet won't return EOF, but timeout.

ehancement:
rpnetNext
cache results len instead of getting it each

* revert result len

* revert resp2 reply length

* fix rppagerNext_Limit counter update

before this fix self->remaining was decreased and than base->upstream->Next(base->upstream, r) was returned
but the ->Next call could have returned no results, so the counter was wrongly updated for it.
This caused wrong number of accumelated results retuned by cursor, when timeout was reached in one or more of the cursor's read command. In each timedout read we lost one result.

* add optional paramter: [INTERNAL_ONLY]
that will add the timeout Rp only if the query runs in a shard

add tst_timeout
add to debug tests:
TeststrictPolicy
testInternalOnly

add test_cursors:testTimeoutPartialWithEmptyResults

rename parseDebugParams->parseDebugParamsCount

* finalize tests

* fix comment

* disable testTimeoutPartialWithEmptyResults test

* enforce timeout when calling INTERNAL_ONLY with TIMEOUT_AFTER_N where N==0 to avoid coordinator infinite loop

fix missing return in debug commands

* fix tests: explictly settimeout policy to on_timeout return

* fix use after free in rmr.c:MRIteratorCallback_ProcessDone

the problem was that we were trying to decrese &ctx->it->ctx.inProcess when the shard decided to release the iterator (MRIterator_Release released it)
The solution is to change MRIterator_Release signature. Now it returns weather it freed the iterator or not. if it was released, return

* return false if free flag is set to false

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* fix leak

* simulate timeout during build - TO BE REVERTED as currently doesn't simulate well the scenario:
i.e -the shard returns timeout error string, butthe coordinator clock is not set to be timeout.
Trying to only return an error from the shards without any special changes in the coordinator: RPNetNext returns error (and exits the while loop) only if the reply contains timeout AND the policy is to fail. otherwise, we enter another while loop iteration without processing the reply, overriding it with the next loop. This is a leak.
Another approach i took was: in the callback passed to the resultprocessor I tried to first call getnextreply before rpnetNext checks the timeout, but it didn't work since for some reason i coudnlt figure (the shards didn't shut down. mught be a bug in RLTest)

Returning a generic query code soen't simulate the scenario required.

* REVERT tieout on buld

* enable test_cursors:testTimeoutPartialWithEmptyResults

skip testEmptyResult in coord

* fix comment

revmove sortable

* stable test_error_with_partial_results by using query debug

* move reader reset of the freeFlag to the uv thread to avoid race conditions with the shards.

ensure the sequence of returning  from MRChannel_Popand re - blocking the chanel
is atomic under the chanel guard

remove MRChannel_Block

* signal one reader

simplify testCursorDepletionNonStrictTimeoutPolicySortby test

* fix typo

* revert force failure

* fix spelling errors

* review fixes:

remove load from AggregateDebug test

introduce DistAggregateCleanups to handle `go to err` in DEBUG_RSExecDistAggregate and RSExecDistAggregate

comments in PipelineAddTimeoutAfterCount

* move debug things to a header

* alaborate comment

* more details in aggregate_debug.h
refrence cursor test to the header

modify and add some comments

* fix comment in  rp
use cursor to parse dewbug args

fix

* disable internal only when used with 0 results and no user cursor

* remove DUMMY_DEBUG_OPTION

* fix

(cherry picked from commit d45701e)

* fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants