Add modules API for streams#8288
Conversation
85ba0d1 to
18707cf
Compare
|
@zuiderkwast thanks for this PR, haven't reviewed the code yet, but i wanna mention my initial thoughts.
pinging @MeirShpilraien and @gkorland in case they can share some thoughts on this. |
oranagra
left a comment
There was a problem hiding this comment.
i did a quick review of the code (haven't reviewed the test yet).
i added some minor comments.
let's discuss the rest of the API and plans for the near future before we dive into the implementation details.
18707cf to
3d3bd71
Compare
Sure, leave it open until we have the rest of the design in place. I created this PR to initiate a discussion.
Yes, it is. Any module currently using RM_Call() will be faster with each native API function available. My associates gave me this wishlist with the following priority:
XLEN is already supported as RedisModule_ValueLength().
Sounds good. I might start drafting a design for read, range and trim too as separate PRs (but I'd prefer to wait for @guybe7's "Trim by MINID" PR to be merged before I start implementing a trim API). |
|
I have a feeling that one Issue or PR would be better to discuss these API since changes in one may reflect the other, and they can share some flags / type. |
Type RedisModuleStreamID and functions RM_StreamParseID() and RM_StreamFormatID() for conversion from/to RedisModuleString.
Similar to XADD without trimming.
3d3bd71 to
094b5b9
Compare
|
PR updated to include adding, iterating and trimming. Please review mainly the API design. |
oranagra
left a comment
There was a problem hiding this comment.
other than Add, Trim and Iterator, do we need to expose any other APIs for streams?
i suppose the iterator can also be useful for single "get", but maybe we wanna expose a specific easy to use API for that?
what about deletion?
if we put aside consumer groups, i suppose with the addition of an explicit "read" (if we chose to add it), and a "del" we got streams covered, right?
Sure we can, but combined with your suggestion for iterating over the fields, what should "get" return?
If anyone needs it, it's always possible to use Call. But sure, I can add StreamDelete if you want. Or maybe as a flag to the iterator, deleting as we go?
AFAIK yes. |
good point. i guess an iterator is enough. come to think of it, the XREAD and XRANGE are also iterator based.
Ideally, we'll have a delete API that takes ID, can can be used both without iteration, and as part of an iteration.
is that right? or am i missing something (sorry busy with other things so i can't afford to dive into the code and plan right now) |
This was already implemented when adding to empty keys. This commit unblocks readers when adding to an existing stream. This is consistent with other datatypes: RM_ListPush unblocks clients blocked on BLPOP and similar for ZSet.
Note: RM_StreamIteratorNextField() has no safe-guard for the number of times it's being called. Fetching more than numfields is documented as UB and an example of how to iterate properly is added.
Standalone delete. Fails if an iterator has been started. I'm not sure, but I suspect that deleting may mess up an existing iterator, so it's forbidden.
Flags for add, iterator and trim all start from 1.
|
@zuiderkwast seems good.. what else is missing before this is ready to be merged? |
TODO:
If you agree with the above... |
|
the above seem fine, but i need to look at the details / implementation to be able to judge better. |
errno is set whenever ERR is returned. Details are documented at every stream function.
REDISMODULE_EOF is defined to EOF (from stdio.h) defined in C99 to be a negative number, so there's no risk of a clash with OK or ERR. If a user checks for EOF (instead of REDISMODULE_EOF), it also works although in the docs we mention only REDISMODULE_EOF.
src/module.c
Outdated
| /* Retrieves the next field of the current stream ID and its corresponding value | ||
| * in a stream iteration. This function should be called directly after calling | ||
| * RedisModule_StreamIteratorNextID() and not more than `numfields` times. If | ||
| * called more times than the number of fields, the behaviour is undefined. |
There was a problem hiding this comment.
i'm not comfortable with this undefined behavior.
i think we should check si->lp_ele (maybe add an streamIteratorIsValidField, or modify streamIteratorGetField to return an error), and handle that too (possibly setting errno to ENOENT).
There was a problem hiding this comment.
I'll try this. I was planning to store a field counter of the current entry in the key struct and use it to return an error in NextField(), but if it can be done in streamIterator itself, it's better of course.
There was a problem hiding this comment.
Solved. I followed my original idea and stored a counter in the key struct.
There was a problem hiding this comment.
i'm curious, did you try the si->lp_ele idea and found some problem with it, or just didn't want to do any changes in t_stream.c ?
There was a problem hiding this comment.
The latter. I haven't spent enough time to understand the internals of t_stream.c and with the union in the key struct, it doesn't really cost anything extra to put it there... Do you prefer the si->lp_ele solution?
There was a problem hiding this comment.
I'm looking at this.
streamIteratorGetFieldreturnsvoid. It would need to returnintto indicate if there was a field or not.- I can't see the two places where you want to nullify
si->lp_ele. lpNext() and lpPrev() already return NULL when running out of range. Maybe you can enlighten me?
I suppose this can also be changed as a separate improvement after merging this PR. I don't feel very confident about the listpack and rax structure.
There was a problem hiding this comment.
Would the issue in streamIteratorGetField() be that if the first lpNext() returns NULL since it is running out of range, the second lpNext() will get NULL in its listpack pointer, which triggers an assert in lpNext().
I guess streamIteratorGetField() needs to bail out after attempting to get the missing fieldptr, and then it might need to nullify the returned field and value ptrs before returning. This can later be checked in RM_StreamIteratorNextField() to make sure that more fields existed.
There was a problem hiding this comment.
the two places i meant that i think streamIteratorGetID should nullify si->lp_ele are:
Line 1121 in 9e56d39
Line 1132 in 9e56d39
i think that if we do that, and change streamIteratorGetField to return an error when si->lp_ele is NULL on entry, should make all of this safe.
i'm ok leaving this issue for some future day, and also ok to change the PR to use that instead of counting the fields.
up to you if you wanna try it.
There was a problem hiding this comment.
OK, thanks. I tried it, but GetField() doens't reach si->lp_ele == NULL after the last field in an entry, so it just continues returning fields from the next stream entry. My guess that we need a numfieldsleft counter for this, either in the streamIterator or in the API...
Btw, this PR reached 💯 comments. :)
There was a problem hiding this comment.
ohh, right. for some reason i thought the next entry is in the next rax node.
sorry.
A union allows the same space to be used for other things when the key is not a zset. Some checks are added in zset functions to make sure the fields are not accessed if the key is not a zset.
A field counter is added in the key struct.
Benchmark: 1M StreamAdd() with a client blocking on XREAD took
Before this commit: 790ms
After this commit: 660ms
This is 16% faster than before.
oranagra
left a comment
There was a problem hiding this comment.
i really don't like all the change to zset.. but i guess there's no other way. 8-(
Well, if we allow anonymous unions and structs here (which are standard in C11) we can use a union without changing the reference to the zset fields... |
This reverts commit 7f01891.
After reverting the EOF commit, now ERR is returned on end-of-iterator. Set errno to ENOENT where missing in NextID(). Remove code setting ID to 0-0 and numfields to 0 when iterator reaches the end. (There's no reason to do that.) Add note about error handling in NextID() and NextField() docs. Fix postponed signalKeyAsReady logic.
errno = EFBIG -- "write at a position past the maximum allowed offset"
Rationale: from the write() man page:
EFBIG An attempt was made to write a file that exceeds
the implementation-defined maximum file size or
the process's file size limit, or to write at a
position past the maximum allowed offset.
src/module.c
Outdated
| if (id) { | ||
| id->ms = 0; | ||
| id->seq = 0; | ||
| } | ||
| if (numfields) *numfields = 0; |
There was a problem hiding this comment.
i think it's a good idea to also zero the output arguments, not just rely on the return value.
There was a problem hiding this comment.
OK, I'll put this back again, tomorrow. (For reference: First I added this but then I removed it again because I thought it would be unexpected to touch the output args when ERR is returned.)
[Edit] I think it can be useful not to zero them actually. Let's say a module iterates over the stream, saves the last id and later continues iterating from there, like in this pseudo-code:
int consume_stream() {
static RedisModuleStreamID id = {0, 0};
RedisModule_StreamIteratorStart(key, REDISMODULE_STREAM_EXCLUSIVE, &id, NULL);
while(RedisModule_IteratorNextID(key, &id, &numfields) == REDISMODULE_OK) {
...
}
RedisModule_BlockClientOnKeys(..., consume_stream, ...);
}In a way it's similar to for (i = 0; i < n; i++) { ... } -- you'd expect i == n afterwards and some code even relies on it.
Anyway, if we zero the args, I think it should at least be documented to avoid any confusion if anyone tries to do like in consume_stream() above.
Sadly we can't assume C11 is supported. Redis 6.0 went that way, and we had to revert that in 6.2. |
|
@redis/core-team please approve the creation of new module API for stream manipulation and iteration (handing anything other than consumer groups). |
Useful especially if automatic memory is used.
|
Thanks a lot, especially @oranagra, for your fast and useful responses! I makes me enjoy working with Redis. :-) See you in a different PR. Next, I'm considering picking something from the [Meta] Modules API issue. |
|
thank you Viktor for taking the initiative and doing all the work. |
APIs added for these stream operations: add, delete, iterate and trim (by ID or maxlength). The functions are prefixed by RM_Stream. * RM_StreamAdd * RM_StreamDelete * RM_StreamIteratorStart * RM_StreamIteratorStop * RM_StreamIteratorNextID * RM_StreamIteratorNextField * RM_StreamIteratorDelete * RM_StreamTrimByLength * RM_StreamTrimByID The type RedisModuleStreamID is added and functions for converting from and to RedisModuleString. * RM_CreateStringFromStreamID * RM_StringToStreamID Whenever the stream functions return REDISMODULE_ERR, errno is set to provide additional error information. Refactoring: The zset iterator fields in the RedisModuleKey struct are wrapped in a union, to allow the same space to be used for type- specific info for streams and allow future use for other key types.
APIs added for these stream operations: add, delete, iterate and
trim (by ID or maxlength). The functions are prefixed by RM_Stream.
The type RedisModuleStreamID is added and functions for converting
from and to RedisModuleString.
Whenever the stream functions return REDISMODULE_ERR, errno is set to
provide additional error information.
Refactoring: The zset iterator fields in the RedisModuleKey struct
are wrapped in a union, to allow the same space to be used for type-
specific info for streams and allow future use for other key types.
Fixes #5760.