Skip to content

Add stream consumer group lag tracking and reporting#9127

Merged
oranagra merged 54 commits intoredis:unstablefrom
itamarhaber:stream-cglag
Feb 23, 2022
Merged

Add stream consumer group lag tracking and reporting#9127
oranagra merged 54 commits intoredis:unstablefrom
itamarhaber:stream-cglag

Conversation

@itamarhaber
Copy link
Member

@itamarhaber itamarhaber commented Jun 22, 2021

Adds the ability to track the lag of a consumer group (CG), that is, the number of entries yet-to-be-delivered from the stream.

The proposed constant-time solution is in the spirit of "best-effort."

Partially addresses #8737.

Description of approach

We add a new "entries_added" property to the stream. This starts at 0 for a new stream and is incremented by 1 with every XADD. It is essentially an all-time counter of the entries added to the stream.

Given the stream's length and this counter value, we can trivially find the logical "entries_added" counter of the first ID if and only if the stream is contiguous. A fragmented stream contains one or more tombstones generated by XDELs. The new "xdel_max_id" stream property tracks the latest tombstone.

The CG also tracks its last delivered ID's as an "entries_read" counter and increments it independently when delivering new messages, unless the this read counter is invalid (-1 means invalid offset). When the CG's counter is available, the reported lag is the difference between added and read counters.

Lastly, this also adds a "first_id" field to the stream structure in order to make looking it up cheaper in most cases.

Limitations

There are two cases in which the mechanism isn't able to track the lag. In these cases, XINFO replies with null in the "lag" field.

The first case is when a CG is created with an arbitrary last delivered ID, that isn't "0-0", nor the first or the last entries of the stream. In this case, it is impossible to obtain a valid read counter (short of an O(N) operation). The second case is when there are one or more tombstones fragmenting the stream's entries range.

In both cases, given enough time and assuming that the consumers are active (reading and lacking) and advancing, the CG should be able to catch up with the tip of the stream and report zero lag. Once that's achieved, lag tracking would resume as normal (until the next tombstone is set).

API changes

  • XGROUP CREATE added with the optional named argument [ENTRIESREAD entries-read] for explicitly specifying the new CG's counter.
  • XGROUP SETID added with an optional positional argument [ENTRIESREAD entries-read]for specifying the CG's counter.
  • XINFO reports the maximal tombstone ID, the recorded first entry ID, and total number of entries added to the stream.
  • XINFO reports the current lag and logical read counter of CGs.
  • XSETID is an internal command that's used in replication/aof. It has been added with the optional positional arguments [ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id] for propagating the CG's offset and maximal tombstone ID of the stream.

The generic unsolved problem

The current stream implementation doesn't provide an efficient way to obtain the approximate/exact size of a range of entries. While it could've been nice to have that ability (#5813) in general, let alone specifically in the context of CGs, the risk and complexities involved in such implementation are in all likelihood prohibitive.

A refactoring note

The streamGetEdgeID has been refactored to accommodate both the existing seek of any entry as well as seeking non-deleted entries (the addition of the skip_tombstones argument). Furthermore, this refactoring also migrated the seek logic to use the streamIterator (rather than raxIterator) that was, in turn, extended with the skip_tombstones Boolean struct field to control the emission of these.

TODO

  • (reserved for future use)
  • Rationalize about uint64_t overflow WRT offset++
  • Rationalize about uint64_t to long long (replies) castings
  • Consider adding scan-like XGROUP CREATE|SETID .... [COMPUTE-OFFSET cursor] for arbitrary IDs at the cost of O(N)
  • Consider adding an XGROUPLAG <key> [group ...] command for reduced verbosity compared to XINFO STREAM ... FULL and XINFO GROUPS
  • Handle RDB persistence
  • Test migration from older persistence (rdb_ver < 10)
  • See if the Module API needs attention: none at this point.
  • More tests:
    • (reserved for future use)
    • The 'first_id' stream field isn't exposed so it isn't covered by tests - fixed.
    • AOF rewrite
    • XGROUP SETID extended form
  • Documentation: Documents consumer group lag redis-doc#1694

@itamarhaber itamarhaber requested a review from guybe7 June 22, 2021 10:04
@itamarhaber itamarhaber added the state:major-decision Requires core team consensus label Jun 25, 2021
@itamarhaber itamarhaber marked this pull request as draft June 27, 2021 11:50
Copy link
Member

@oranagra oranagra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not certain i understand what happens on upgrade.
does the new feature work well after loading an rdb from an old version?

p.s. didn't review the tests.

@itamarhaber
Copy link
Member Author

@oranagra thank you for the CR. I addressed most of the findings, with the exception of the ongoing question about getting the first non-tombstone entry from the stream and the associated work/nesting. We can consider a) uniting tip and edge getters b) keeping as is or c) just maintain 'first_id'. The latter avoids seeking/iterating but will make XTRIM/XDEL/XADD work a little harder.

As for legacy persistence, "it should work" and there's a test that uses a v5 stream DUMP payload to check that migration is successful (ref: https://github.com/redis/redis/pull/9127/files#diff-4c5f2c034539b46fa4bebee128f4409782f376d5f419ce96a6728fdc10fbd900R703-R726).

Copy link
Member

@oranagra oranagra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the top comment is outdated.
at last about the new command arguments, but please verify other aspects.

@oranagra oranagra changed the title Adds consumer group lag Add stream consumer group lag tracking and reporting Feb 23, 2022
@oranagra oranagra merged commit c81c7f5 into redis:unstable Feb 23, 2022
enjoy-binbin added a commit to enjoy-binbin/redis that referenced this pull request Feb 24, 2022
Add a comma, this would have resulted in missing newline in the message.
Forgot to add in redis#9127
oranagra pushed a commit that referenced this pull request Feb 24, 2022
Add a comma, this would have resulted in missing newline in the message.
Forgot to add in #9127
oranagra added a commit to redis/redis-doc that referenced this pull request Feb 27, 2022
Upstream ref: redis/redis#9127
update commands .json from unstable (pre 7.0 RC2)

Co-authored-by: Oran Agra <[email protected]>
@oranagra oranagra mentioned this pull request Feb 28, 2022
oranagra pushed a commit that referenced this pull request May 22, 2022
si is initialized by streamIteratorStart(), we should call
streamIteratorStop() on it when done.

regression introduced in #9127 (redis 7.0)
enjoy-binbin pushed a commit to enjoy-binbin/redis that referenced this pull request Jul 31, 2023
si is initialized by streamIteratorStart(), we should call
streamIteratorStop() on it when done.

regression introduced in redis#9127 (redis 7.0)
Comment on lines +1037 to +1041
/* Save the group's logical reads counter. */
if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) {
raxStop(&ri);
return -1;
}
Copy link
Contributor

@enjoy-binbin enjoy-binbin Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries_read can be SCG_INVALID_ENTRIES_READ, which is -1.
and here rdbSaveLen take a uint64_t input, so what is actually written here is another value, such as 18446744073709551615

and then when we load this entries_read back, we will convert it from uint64_t to long long, it just happened to be -1 again, so everything was fine. Do we need to make changes here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guybe7 recently pointed out to me that RM_SaveSigned has a similar problem (and that RDB_MODULE_OPCODE_SINT is unused).
the implication is that unlike rdbEncodeInteger (used for values of keys), these are less efficient in their file size consumption.

fixing these would mean a new rdb format, and for new versions to be able to properly handle the old one.
since unstable already has an rdb version bump, we can try tackling these two issues if we want.
not a real priority IMHO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since unstable already has an rdb version bump

The most recent one was in #11099, released in 7.2. yes, it is not a real priority, i have some spare time these day, just ping me if you want me to fix it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main benefit for that would be disk space reduction for modules that heavily use RM_SaveSigned.
if you have time you can try to see if this gets complicated (including tests for backwards compatibility).

@enjoy-binbin
Copy link
Contributor

Do we need to propagate group->entries_read in streamPropagateXCLAIM?
The following sequence will cause the entries-read of replica to always be nul

master:

xadd mystream0 * a b c d e f
xgroup create  mystream0 group1 $
xreadgroup group group1 ryan count 1 streams mystream0 >
xadd mystream0 * a1 b1 a1 b2
xadd mystream0 * name v1 name v1
xreadgroup group group1 ryan count 1 streams mystream0 >
xreadgroup group group1 ryan count 1 streams mystream0 >

127.0.0.1:6379> xinfo groups mystream0
1)  1) "name"
    2) "group1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1703685291682-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0

replica:

127.0.0.1:7000> xinfo groups mystream0
1)  1) "name"
    2) "group1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1703685291682-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

@oranagra
Copy link
Member

i don't recall if we discussed and dismissed it, or simply overlooked it.
considering that XREADGROUP is a write command and propagates commands to the replica anyway, i suppose it should handle this as well.

@guybe7 @itamarhaber please ack.

@enjoy-binbin
Copy link
Contributor

ohh, i forget to mention that we will propagate it entries_read streamPropagateGroupID, so when we use NOACK, the entries-read is fine:

xadd mystream0 * a b c d e f
xgroup create  mystream0 group1 $
xreadgroup group group1 ryan count 1 streams mystream0 >
xadd mystream0 * a1 b1 a1 b2
xadd mystream0 * name v1 name v1
xreadgroup group group1 ryan count 1 NOACK streams mystream0 >      # NOACK
xreadgroup group group1 ryan count 1 NOACK streams mystream0 >      # NOACK

127.0.0.1:6379> xinfo groups mystream0
1)  1) "name"
    2) "group1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2
    7) "last-delivered-id"
    8) "1703685291682-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0

replica:

127.0.0.1:7000> xinfo groups mystream0
1)  1) "name"
    2) "group1"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1703691495978-0"
    9) "entries-read"
   10) (integer) 3
   11) "lag"
   12) (integer) 0

so i think it is an overlook. But fixing it means we have to add a new option to XCLAIM, and the old server will not recognize it.

@oranagra
Copy link
Member

i don't like to have XCLAIM fail on the replica (and have it's other responsibilities skipped too).
the two options i see are:

  1. send an additional command just for that purpose, and let it fail,
  2. start negotiating version / capabilities between the master and replica.

i don't like either of them, but i suppose option 1 is the better one of the two.
i see that when this PR was merged it was easy because XSETID used to ignore excessive arguments (and it no longer does).
any other options you see?

@enjoy-binbin
Copy link
Contributor

I don't like these options either. I didn't think of any other good options. Depending on your ideas and how you need to proceed, I can do the coding.

@guybe7
Copy link
Collaborator

guybe7 commented Dec 28, 2023

@oranagra @enjoy-binbin yes, it feels like a bug

I suggest calling streamPropagateGroupID unconditionally, not only if NOACK was provided.
That will normalize entries_read on the replica

@enjoy-binbin
Copy link
Contributor

enjoy-binbin commented Dec 28, 2023

I suggest calling streamPropagateGroupID unconditionally, not only if NOACK was provided.
That will normalize entries_read on the replica

thanks, i tried it and it is OK now. Is there any other impact (according to the comments)? want me to submit the PR?

             /* Group last ID should be propagated only if NOACK was
              * specified, otherwise the last id will be included
              * in the propagation of XCLAIM itself. */
-            if (noack) propagate_last_id = 1;
+            propagate_last_id = 1;

PR: #12898

@guybe7
Copy link
Collaborator

guybe7 commented Dec 28, 2023

@enjoy-binbin yes, thanks

just don't forget to update the comment there
and add tests

enjoy-binbin added a commit to enjoy-binbin/redis that referenced this pull request Dec 28, 2023
…onditionally

In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate
entries-read, entries-read will be inconsistent between master and replicas.

The fix was suggested by guybe7, call streamPropagateGroupID unconditionally,
so that we will normalize entries_read on the replicas.

Issue was introduced in redis#9127.
oranagra pushed a commit that referenced this pull request Feb 29, 2024
…onditionally (#12898)

In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate
entries-read, entries-read will be inconsistent between master and
replicas.
I.e. if no entries were claimed, it would have propagated correctly, but
if some
were claimed, then the entries-read field would be inconsistent on the
replica.

The fix was suggested by guybe7, call streamPropagateGroupID
unconditionally,
so that we will normalize entries_read on the replicas. In the past, we
would
only set propagate_last_id when NOACK was specified. And in #9127,
XCLAIM did
not propagate entries_read in ACK, which would cause entries_read to be
inconsistent between master and replicas.

Another approach is add another arg to XCLAIM and let it propagate
entries_read,
but we decided not to use it. Because we want minimal damage in case
there's an
old target and new source (in the worst case scenario, the new source
doesn't
recognize XGROUP SETID ... ENTRIES READ and the lag is lost. If we
change XCLAIM,
the damage is much more severe).

In this patch, now if the user uses XREADGROUP .. COUNT 1 there will be
an additional
overhead of MULTI, EXEC and XGROUPSETID. We assume the extra command in
case of
COUNT 1 (4x factor, changing from one XCLAIM to
MULTI+XCLAIM+XSETID+EXEC), is probably
ok since reading just one entry is in any case very inefficient (a
client round trip
per record), so we're hoping it's not a common case.

Issue was introduced in #9127.
funny-dog pushed a commit to funny-dog/redis that referenced this pull request Sep 17, 2025
…onditionally (redis#12898)

In XREADGROUP ACK, because streamPropagateXCLAIM does not propagate
entries-read, entries-read will be inconsistent between master and
replicas.
I.e. if no entries were claimed, it would have propagated correctly, but
if some
were claimed, then the entries-read field would be inconsistent on the
replica.

The fix was suggested by guybe7, call streamPropagateGroupID
unconditionally,
so that we will normalize entries_read on the replicas. In the past, we
would
only set propagate_last_id when NOACK was specified. And in redis#9127,
XCLAIM did
not propagate entries_read in ACK, which would cause entries_read to be
inconsistent between master and replicas.

Another approach is add another arg to XCLAIM and let it propagate
entries_read,
but we decided not to use it. Because we want minimal damage in case
there's an
old target and new source (in the worst case scenario, the new source
doesn't
recognize XGROUP SETID ... ENTRIES READ and the lag is lost. If we
change XCLAIM,
the damage is much more severe).

In this patch, now if the user uses XREADGROUP .. COUNT 1 there will be
an additional
overhead of MULTI, EXEC and XGROUPSETID. We assume the extra command in
case of
COUNT 1 (4x factor, changing from one XCLAIM to
MULTI+XCLAIM+XSETID+EXEC), is probably
ok since reading just one entry is in any case very inefficient (a
client round trip
per record), so we're hoping it's not a common case.

Issue was introduced in redis#9127.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

state:major-decision Requires core team consensus

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

6 participants