Skip to content

Add modules API for streams#8288

Merged
oranagra merged 21 commits intoredis:unstablefrom
zuiderkwast:stream-module-api
Jan 28, 2021
Merged

Add modules API for streams#8288
oranagra merged 21 commits intoredis:unstablefrom
zuiderkwast:stream-module-api

Conversation

@zuiderkwast
Copy link
Contributor

@zuiderkwast zuiderkwast commented Jan 5, 2021

APIs added for these stream operations: add, delete, iterate and
trim (by ID or maxlength). The functions are prefixed by RM_Stream.

The type RedisModuleStreamID is added and functions for converting
from and to RedisModuleString.

Whenever the stream functions return REDISMODULE_ERR, errno is set to
provide additional error information.

Refactoring: The zset iterator fields in the RedisModuleKey struct
are wrapped in a union, to allow the same space to be used for type-
specific info for streams and allow future use for other key types.

Fixes #5760.

Copy link
Contributor

@bjosv bjosv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@oranagra
Copy link
Member

oranagra commented Jan 5, 2021

@zuiderkwast thanks for this PR, haven't reviewed the code yet, but i wanna mention my initial thoughts.

  1. i'm a bit uncomfortable to merge an API for just "add" before we have a design for the rest of the stream APIs. my fear is that when we'll add the rest later, we'll have some realizations and will want to retroactively change something in an API that's already released.
  2. i wonder if the "add" API is indeed useful on its own? i.e. if there are a lot of use cases for modules that would just like to write into a stream, and don't care much about reading (or in which performance of writing is more critical than reading).

pinging @MeirShpilraien and @gkorland in case they can share some thoughts on this.

Copy link
Member

@oranagra oranagra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did a quick review of the code (haven't reviewed the test yet).
i added some minor comments.
let's discuss the rest of the API and plans for the near future before we dive into the implementation details.

@zuiderkwast
Copy link
Contributor Author

i'm a bit uncomfortable to merge an API for just "add" before we have a design for the rest of the stream APIs.

Sure, leave it open until we have the rest of the design in place. I created this PR to initiate a discussion.

i wonder if the "add" API is indeed useful on its own?

Yes, it is. Any module currently using RM_Call() will be faster with each native API function available. My associates gave me this wishlist with the following priority:

  1. XADD "Missing and very much needed, high overhead, called a lot."
  2. XREAD/XRANGE
  3. XTRIM

XLEN is already supported as RedisModule_ValueLength().

let's discuss the rest of the API and plans for the near future before we dive into the implementation details.

Sounds good.

I might start drafting a design for read, range and trim too as separate PRs (but I'd prefer to wait for @guybe7's "Trim by MINID" PR to be merged before I start implementing a trim API).

@oranagra
Copy link
Member

oranagra commented Jan 6, 2021

I have a feeling that one Issue or PR would be better to discuss these API since changes in one may reflect the other, and they can share some flags / type.
Also, the code is not huge, being too large to review in one go.
The way i see it, the only reason to split them into separate PRs is if one is easier and more urgently needed than others, and we'd like to merge it before others are ready.

Type RedisModuleStreamID and functions RM_StreamParseID() and
RM_StreamFormatID() for conversion from/to RedisModuleString.
Similar to XADD without trimming.
@zuiderkwast zuiderkwast changed the title Module API: RM_StreamAdd() Modules API for streams Jan 12, 2021
@zuiderkwast
Copy link
Contributor Author

PR updated to include adding, iterating and trimming. Please review mainly the API design.

Copy link
Member

@oranagra oranagra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than Add, Trim and Iterator, do we need to expose any other APIs for streams?

i suppose the iterator can also be useful for single "get", but maybe we wanna expose a specific easy to use API for that?

what about deletion?

if we put aside consumer groups, i suppose with the addition of an explicit "read" (if we chose to add it), and a "del" we got streams covered, right?

@zuiderkwast
Copy link
Contributor Author

i suppose the iterator can also be useful for single "get", but maybe we wanna expose a specific easy to use API for that?

Sure we can, but combined with your suggestion for iterating over the fields, what should "get" return?

what about deletion?

If anyone needs it, it's always possible to use Call. But sure, I can add StreamDelete if you want. Or maybe as a flag to the iterator, deleting as we go?

if we put aside consumer groups, i suppose with the addition of an explicit "read" (if we chose to add it), and a "del" we got streams covered, right?

AFAIK yes.

@oranagra
Copy link
Member

Sure we can, but combined with your suggestion for iterating over the fields, what should "get" return?

good point. i guess an iterator is enough. come to think of it, the XREAD and XRANGE are also iterator based.

If anyone needs it, it's always possible to use Call. But sure, I can add StreamDelete if you want. Or maybe as a flag to the iterator, deleting as we go?

Ideally, we'll have a delete API that takes ID, can can be used both without iteration, and as part of an iteration.
but IIRC the iterator is actually holding the current node, so we can't delete it before doing next.
this leaves us with two a bit ugly options.

  1. tell the user that if he want's to delete during iteration, he need to remember the last ID, do a Next, and only then delete the previous ID.
  2. add a separate API that does "Delete and Step". (in addition to a Next and a stand-alone Del).

is that right? or am i missing something (sorry busy with other things so i can't afford to dive into the code and plan right now)

This was already implemented when adding to empty keys. This commit
unblocks readers when adding to an existing stream.

This is consistent with other datatypes: RM_ListPush unblocks clients
blocked on BLPOP and similar for ZSet.
Note: RM_StreamIteratorNextField() has no safe-guard for the number
of times it's being called. Fetching more than numfields is
documented as UB and an example of how to iterate properly is added.
Standalone delete. Fails if an iterator has been started.

I'm not sure, but I suspect that deleting may mess up an existing
iterator, so it's forbidden.
Flags for add, iterator and trim all start from 1.
@oranagra
Copy link
Member

@zuiderkwast seems good.. what else is missing before this is ready to be merged?

@zuiderkwast
Copy link
Contributor Author

zuiderkwast commented Jan 20, 2021

@zuiderkwast seems good.. what else is missing before this is ready to be merged?

TODO:

  • set errno (EOF vs ERR with ENOENT for end-of-iterator TBD)
  • RM_StreamIteratorDelete() – requires that the current stream ID is stored somewhere, probably in the key. (It can share space in the RedisModuleKey struct by making a union { zset iterator stuff ; stream iterator stuff })
  • Out-of-bounds checking for RM_StreamIteratorNextField() – requires storing numfields and a field counter somewhere, in the key struct.
  • Optimization: Call signalKeyAsReady() on RM_CloseKey() instead of on every StreamAdd().

If you agree with the above...

@oranagra
Copy link
Member

the above seem fine, but i need to look at the details / implementation to be able to judge better.
i posted a comment about signalKeyAsReady above which i think we also still need to discuss.

errno is set whenever ERR is returned. Details are documented at
every stream function.
REDISMODULE_EOF is defined to EOF (from stdio.h) defined in C99 to
be a negative number, so there's no risk of a clash with OK or ERR.

If a user checks for EOF (instead of REDISMODULE_EOF), it also works
although in the docs we mention only REDISMODULE_EOF.
src/module.c Outdated
/* Retrieves the next field of the current stream ID and its corresponding value
* in a stream iteration. This function should be called directly after calling
* RedisModule_StreamIteratorNextID() and not more than `numfields` times. If
* called more times than the number of fields, the behaviour is undefined.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not comfortable with this undefined behavior.
i think we should check si->lp_ele (maybe add an streamIteratorIsValidField, or modify streamIteratorGetField to return an error), and handle that too (possibly setting errno to ENOENT).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try this. I was planning to store a field counter of the current entry in the key struct and use it to return an error in NextField(), but if it can be done in streamIterator itself, it's better of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved. I followed my original idea and stored a counter in the key struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm curious, did you try the si->lp_ele idea and found some problem with it, or just didn't want to do any changes in t_stream.c ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter. I haven't spent enough time to understand the internals of t_stream.c and with the union in the key struct, it doesn't really cost anything extra to put it there... Do you prefer the si->lp_ele solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at this.

  • streamIteratorGetField returns void. It would need to return int to indicate if there was a field or not.
  • I can't see the two places where you want to nullify si->lp_ele. lpNext() and lpPrev() already return NULL when running out of range. Maybe you can enlighten me?

I suppose this can also be changed as a separate improvement after merging this PR. I don't feel very confident about the listpack and rax structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the issue in streamIteratorGetField() be that if the first lpNext() returns NULL since it is running out of range, the second lpNext() will get NULL in its listpack pointer, which triggers an assert in lpNext().
I guess streamIteratorGetField() needs to bail out after attempting to get the missing fieldptr, and then it might need to nullify the returned field and value ptrs before returning. This can later be checked in RM_StreamIteratorNextField() to make sure that more fields existed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the two places i meant that i think streamIteratorGetID should nullify si->lp_ele are:

return 0; /* We are already out of range. */

return 0; /* We are already out of range. */

i think that if we do that, and change streamIteratorGetField to return an error when si->lp_ele is NULL on entry, should make all of this safe.

i'm ok leaving this issue for some future day, and also ok to change the PR to use that instead of counting the fields.
up to you if you wanna try it.

Copy link
Contributor Author

@zuiderkwast zuiderkwast Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks. I tried it, but GetField() doens't reach si->lp_ele == NULL after the last field in an entry, so it just continues returning fields from the next stream entry. My guess that we need a numfieldsleft counter for this, either in the streamIterator or in the API...

Btw, this PR reached 💯 comments. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, right. for some reason i thought the next entry is in the next rax node.
sorry.

A union allows the same space to be used for other things when the
key is not a zset. Some checks are added in zset functions to make
sure the fields are not accessed if the key is not a zset.
A field counter is added in the key struct.
Benchmark: 1M StreamAdd() with a client blocking on XREAD took

    Before this commit: 790ms
    After this commit:  660ms

This is 16% faster than before.
@zuiderkwast zuiderkwast marked this pull request as ready for review January 21, 2021 16:09
Copy link
Member

@oranagra oranagra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i really don't like all the change to zset.. but i guess there's no other way. 8-(

@zuiderkwast
Copy link
Contributor Author

i really don't like all the change to zset.. but i guess there's no other way. 8-(

Well, if we allow anonymous unions and structs here (which are standard in C11) we can use a union without changing the reference to the zset fields...

After reverting the EOF commit, now ERR is returned on end-of-iterator.

Set errno to ENOENT where missing in NextID().

Remove code setting ID to 0-0 and numfields to 0 when iterator reaches
the end. (There's no reason to do that.)

Add note about error handling in NextID() and NextField() docs.

Fix postponed signalKeyAsReady logic.
errno = EFBIG -- "write at a position past the maximum allowed offset"

Rationale: from the write() man page:

       EFBIG  An attempt was made to write a file that exceeds
              the implementation-defined maximum file size or
              the process's file size limit, or to write at a
              position past the maximum allowed offset.
src/module.c Outdated
Comment on lines 3394 to 3398
if (id) {
id->ms = 0;
id->seq = 0;
}
if (numfields) *numfields = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's a good idea to also zero the output arguments, not just rely on the return value.

Copy link
Contributor Author

@zuiderkwast zuiderkwast Jan 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll put this back again, tomorrow. (For reference: First I added this but then I removed it again because I thought it would be unexpected to touch the output args when ERR is returned.)

[Edit] I think it can be useful not to zero them actually. Let's say a module iterates over the stream, saves the last id and later continues iterating from there, like in this pseudo-code:

int consume_stream() {
    static RedisModuleStreamID id = {0, 0};
    RedisModule_StreamIteratorStart(key, REDISMODULE_STREAM_EXCLUSIVE, &id, NULL);
    while(RedisModule_IteratorNextID(key, &id, &numfields) == REDISMODULE_OK) {
        ...
    }
    RedisModule_BlockClientOnKeys(..., consume_stream, ...);
}

In a way it's similar to for (i = 0; i < n; i++) { ... } -- you'd expect i == n afterwards and some code even relies on it.

Anyway, if we zero the args, I think it should at least be documented to avoid any confusion if anyone tries to do like in consume_stream() above.

@oranagra
Copy link
Member

Well, if we allow anonymous unions and structs here (which are standard in C11) we can use a union without changing the reference to the zset fields...

Sadly we can't assume C11 is supported. Redis 6.0 went that way, and we had to revert that in 6.2.
The other option is to create proprocessor replacements that will expand these member names, but that's ugly.

@oranagra
Copy link
Member

@redis/core-team please approve the creation of new module API for stream manipulation and iteration (handing anything other than consumer groups).
@zuiderkwast please make sure that the top comment is up to date (will be used as commit comment too).
please make sure to run the new tests with --valgrind.

@zuiderkwast zuiderkwast changed the title Modules API for streams Add modules API for streams Jan 25, 2021
@oranagra oranagra added approval-needed Waiting for core team approval to be merged release-notes indication that this issue needs to be mentioned in the release notes state:major-decision Requires core team consensus labels Jan 26, 2021
@oranagra oranagra merged commit 4355145 into redis:unstable Jan 28, 2021
@zuiderkwast
Copy link
Contributor Author

Thanks a lot, especially @oranagra, for your fast and useful responses! I makes me enjoy working with Redis. :-) See you in a different PR. Next, I'm considering picking something from the [Meta] Modules API issue.

@oranagra
Copy link
Member

thank you Viktor for taking the initiative and doing all the work.

@oranagra oranagra mentioned this pull request Jan 31, 2021
@zuiderkwast zuiderkwast deleted the stream-module-api branch February 8, 2021 03:33
JackieXie168 pushed a commit to JackieXie168/redis that referenced this pull request Mar 2, 2021
APIs added for these stream operations: add, delete, iterate and
trim (by ID or maxlength). The functions are prefixed by RM_Stream.

* RM_StreamAdd
* RM_StreamDelete
* RM_StreamIteratorStart
* RM_StreamIteratorStop
* RM_StreamIteratorNextID
* RM_StreamIteratorNextField
* RM_StreamIteratorDelete
* RM_StreamTrimByLength
* RM_StreamTrimByID

The type RedisModuleStreamID is added and functions for converting
from and to RedisModuleString.

* RM_CreateStringFromStreamID
* RM_StringToStreamID

Whenever the stream functions return REDISMODULE_ERR, errno is set to
provide additional error information.

Refactoring: The zset iterator fields in the RedisModuleKey struct
are wrapped in a union, to allow the same space to be used for type-
specific info for streams and allow future use for other key types.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approval-needed Waiting for core team approval to be merged release-notes indication that this issue needs to be mentioned in the release notes state:major-decision Requires core team consensus

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Stream API in Module

6 participants