Add stream consumer group lag tracking and reporting#9127
Add stream consumer group lag tracking and reporting#9127oranagra merged 54 commits intoredis:unstablefrom
Conversation
oranagra
left a comment
There was a problem hiding this comment.
i'm not certain i understand what happens on upgrade.
does the new feature work well after loading an rdb from an old version?
p.s. didn't review the tests.
Refactors to rely on rdbver passed to `rdbLoadObject` et al
|
@oranagra thank you for the CR. I addressed most of the findings, with the exception of the ongoing question about getting the first non-tombstone entry from the stream and the associated work/nesting. We can consider a) uniting tip and edge getters b) keeping as is or c) just maintain 'first_id'. The latter avoids seeking/iterating but will make XTRIM/XDEL/XADD work a little harder. As for legacy persistence, "it should work" and there's a test that uses a v5 stream DUMP payload to check that migration is successful (ref: https://github.com/redis/redis/pull/9127/files#diff-4c5f2c034539b46fa4bebee128f4409782f376d5f419ce96a6728fdc10fbd900R703-R726). |
oranagra
left a comment
There was a problem hiding this comment.
Looks like the top comment is outdated.
at last about the new command arguments, but please verify other aspects.
Add a comma, this would have resulted in missing newline in the message. Forgot to add in redis#9127
Add a comma, this would have resulted in missing newline in the message. Forgot to add in #9127
Upstream ref: redis/redis#9127 update commands .json from unstable (pre 7.0 RC2) Co-authored-by: Oran Agra <[email protected]>
si is initialized by streamIteratorStart(), we should call streamIteratorStop() on it when done. regression introduced in #9127 (redis 7.0)
si is initialized by streamIteratorStart(), we should call streamIteratorStop() on it when done. regression introduced in redis#9127 (redis 7.0)
| /* Save the group's logical reads counter. */ | ||
| if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) { | ||
| raxStop(&ri); | ||
| return -1; | ||
| } |
There was a problem hiding this comment.
entries_read can be SCG_INVALID_ENTRIES_READ, which is -1.
and here rdbSaveLen take a uint64_t input, so what is actually written here is another value, such as 18446744073709551615
and then when we load this entries_read back, we will convert it from uint64_t to long long, it just happened to be -1 again, so everything was fine. Do we need to make changes here?
There was a problem hiding this comment.
@guybe7 recently pointed out to me that RM_SaveSigned has a similar problem (and that RDB_MODULE_OPCODE_SINT is unused).
the implication is that unlike rdbEncodeInteger (used for values of keys), these are less efficient in their file size consumption.
fixing these would mean a new rdb format, and for new versions to be able to properly handle the old one.
since unstable already has an rdb version bump, we can try tackling these two issues if we want.
not a real priority IMHO.
There was a problem hiding this comment.
since unstable already has an rdb version bump
The most recent one was in #11099, released in 7.2. yes, it is not a real priority, i have some spare time these day, just ping me if you want me to fix it
There was a problem hiding this comment.
the main benefit for that would be disk space reduction for modules that heavily use RM_SaveSigned.
if you have time you can try to see if this gets complicated (including tests for backwards compatibility).
|
Do we need to propagate group->entries_read in streamPropagateXCLAIM? master: replica: |
|
i don't recall if we discussed and dismissed it, or simply overlooked it. @guybe7 @itamarhaber please ack. |
|
ohh, i forget to mention that we will propagate it entries_read streamPropagateGroupID, so when we use NOACK, the entries-read is fine: replica: so i think it is an overlook. But fixing it means we have to add a new option to XCLAIM, and the old server will not recognize it. |
|
i don't like to have XCLAIM fail on the replica (and have it's other responsibilities skipped too).
i don't like either of them, but i suppose option 1 is the better one of the two. |
|
I don't like these options either. I didn't think of any other good options. Depending on your ideas and how you need to proceed, I can do the coding. |
|
@oranagra @enjoy-binbin yes, it feels like a bug I suggest calling |
thanks, i tried it and it is OK now. Is there any other impact (according to the comments)? want me to submit the PR? /* Group last ID should be propagated only if NOACK was
* specified, otherwise the last id will be included
* in the propagation of XCLAIM itself. */
- if (noack) propagate_last_id = 1;
+ propagate_last_id = 1;
PR: #12898 |
|
@enjoy-binbin yes, thanks just don't forget to update the comment there |
…onditionally In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate entries-read, entries-read will be inconsistent between master and replicas. The fix was suggested by guybe7, call streamPropagateGroupID unconditionally, so that we will normalize entries_read on the replicas. Issue was introduced in redis#9127.
…onditionally (#12898) In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate entries-read, entries-read will be inconsistent between master and replicas. I.e. if no entries were claimed, it would have propagated correctly, but if some were claimed, then the entries-read field would be inconsistent on the replica. The fix was suggested by guybe7, call streamPropagateGroupID unconditionally, so that we will normalize entries_read on the replicas. In the past, we would only set propagate_last_id when NOACK was specified. And in #9127, XCLAIM did not propagate entries_read in ACK, which would cause entries_read to be inconsistent between master and replicas. Another approach is add another arg to XCLAIM and let it propagate entries_read, but we decided not to use it. Because we want minimal damage in case there's an old target and new source (in the worst case scenario, the new source doesn't recognize XGROUP SETID ... ENTRIES READ and the lag is lost. If we change XCLAIM, the damage is much more severe). In this patch, now if the user uses XREADGROUP .. COUNT 1 there will be an additional overhead of MULTI, EXEC and XGROUPSETID. We assume the extra command in case of COUNT 1 (4x factor, changing from one XCLAIM to MULTI+XCLAIM+XSETID+EXEC), is probably ok since reading just one entry is in any case very inefficient (a client round trip per record), so we're hoping it's not a common case. Issue was introduced in #9127.
…onditionally (redis#12898) In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate entries-read, entries-read will be inconsistent between master and replicas. I.e. if no entries were claimed, it would have propagated correctly, but if some were claimed, then the entries-read field would be inconsistent on the replica. The fix was suggested by guybe7, call streamPropagateGroupID unconditionally, so that we will normalize entries_read on the replicas. In the past, we would only set propagate_last_id when NOACK was specified. And in redis#9127, XCLAIM did not propagate entries_read in ACK, which would cause entries_read to be inconsistent between master and replicas. Another approach is add another arg to XCLAIM and let it propagate entries_read, but we decided not to use it. Because we want minimal damage in case there's an old target and new source (in the worst case scenario, the new source doesn't recognize XGROUP SETID ... ENTRIES READ and the lag is lost. If we change XCLAIM, the damage is much more severe). In this patch, now if the user uses XREADGROUP .. COUNT 1 there will be an additional overhead of MULTI, EXEC and XGROUPSETID. We assume the extra command in case of COUNT 1 (4x factor, changing from one XCLAIM to MULTI+XCLAIM+XSETID+EXEC), is probably ok since reading just one entry is in any case very inefficient (a client round trip per record), so we're hoping it's not a common case. Issue was introduced in redis#9127.
Adds the ability to track the lag of a consumer group (CG), that is, the number of entries yet-to-be-delivered from the stream.
The proposed constant-time solution is in the spirit of "best-effort."
Partially addresses #8737.
Description of approach
We add a new "entries_added" property to the stream. This starts at 0 for a new stream and is incremented by 1 with every
XADD. It is essentially an all-time counter of the entries added to the stream.Given the stream's length and this counter value, we can trivially find the logical "entries_added" counter of the first ID if and only if the stream is contiguous. A fragmented stream contains one or more tombstones generated by
XDELs. The new "xdel_max_id" stream property tracks the latest tombstone.The CG also tracks its last delivered ID's as an "entries_read" counter and increments it independently when delivering new messages, unless the this read counter is invalid (-1 means invalid offset). When the CG's counter is available, the reported lag is the difference between added and read counters.
Lastly, this also adds a "first_id" field to the stream structure in order to make looking it up cheaper in most cases.
Limitations
There are two cases in which the mechanism isn't able to track the lag. In these cases,
XINFOreplies withnullin the "lag" field.The first case is when a CG is created with an arbitrary last delivered ID, that isn't "0-0", nor the first or the last entries of the stream. In this case, it is impossible to obtain a valid read counter (short of an O(N) operation). The second case is when there are one or more tombstones fragmenting the stream's entries range.
In both cases, given enough time and assuming that the consumers are active (reading and lacking) and advancing, the CG should be able to catch up with the tip of the stream and report zero lag. Once that's achieved, lag tracking would resume as normal (until the next tombstone is set).
API changes
XGROUP CREATEadded with the optional named argument[ENTRIESREAD entries-read]for explicitly specifying the new CG's counter.XGROUP SETIDadded with an optional positional argument[ENTRIESREAD entries-read]for specifying the CG's counter.XINFOreports the maximal tombstone ID, the recorded first entry ID, and total number of entries added to the stream.XINFOreports the current lag and logical read counter of CGs.XSETIDis an internal command that's used in replication/aof. It has been added with the optional positional arguments[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]for propagating the CG's offset and maximal tombstone ID of the stream.The generic unsolved problem
The current stream implementation doesn't provide an efficient way to obtain the approximate/exact size of a range of entries. While it could've been nice to have that ability (#5813) in general, let alone specifically in the context of CGs, the risk and complexities involved in such implementation are in all likelihood prohibitive.
A refactoring note
The
streamGetEdgeIDhas been refactored to accommodate both the existing seek of any entry as well as seeking non-deleted entries (the addition of theskip_tombstonesargument). Furthermore, this refactoring also migrated the seek logic to use thestreamIterator(rather thanraxIterator) that was, in turn, extended with theskip_tombstonesBoolean struct field to control the emission of these.TODO
uint64_toverflow WRToffset++uint64_ttolong long(replies) castingsXGROUP CREATE|SETID .... [COMPUTE-OFFSET cursor]for arbitrary IDs at the cost of O(N)XGROUPLAG <key> [group ...]command for reduced verbosity compared toXINFO STREAM ... FULLandXINFO GROUPSXGROUP SETIDextended form