Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
36b38dc
reprocess command when client is unblocked on keys
ranshid May 9, 2022
402e4e0
pr fixes:
ranshid Oct 2, 2022
c116025
pr fixes #2:
ranshid Oct 3, 2022
b49b482
pr fixes #3:
ranshid Oct 12, 2022
7fbdc6a
fix lst ist test failures
ranshid Oct 13, 2022
c5564e4
pr fix
ranshid Oct 13, 2022
e93ad2f
fix spelling errors
ranshid Oct 13, 2022
624e946
Merge remote-tracking branch 'redis/unstable' into oss-client-blocking
ranshid Oct 19, 2022
f263ce2
introduce several changes:
ranshid Oct 24, 2022
78c0d4d
refactor test according to: https://github.com/redis/redis/pull/11012…
ranshid Oct 25, 2022
3625ca0
pr fixes
ranshid Oct 26, 2022
c5104f2
fix small indentation issue
ranshid Nov 20, 2022
44bc6a8
Merge remote-tracking branch 'redis/unstable' into oss-client-blocking
ranshid Nov 21, 2022
e4e91d7
Update src/blocked.c
ranshid Nov 21, 2022
2cf4de0
Update tests/unit/type/list.tcl
ranshid Nov 21, 2022
984154b
pr-fix: bring back readyList struct definition to server.h
ranshid Nov 21, 2022
7c2a1e6
fix test race condition in "Blocking XREADGROUP for stream key that has
ranshid Nov 21, 2022
ae0da3c
fix spellckeck
ranshid Nov 21, 2022
47279ee
pr fixes:
ranshid Nov 23, 2022
9250f3e
Merge remote-tracking branch 'redis/unstable' into oss-client-blocking
ranshid Nov 29, 2022
1e2530b
skip test "Unblock fairness is kept during nested unblock" in cluste…
ranshid Nov 29, 2022
145b9b8
fix names of new blocking stats
ranshid Nov 29, 2022
7750d0f
Update src/blocked.c
ranshid Nov 29, 2022
5deb48c
Update src/server.c
ranshid Nov 29, 2022
6f1aaa5
Update tests/unit/type/list.tcl
ranshid Nov 29, 2022
5d06d3d
fix test after stats names change
ranshid Nov 29, 2022
49f1296
fix failing test
ranshid Nov 29, 2022
58b21bc
check if client was evicted after reprocessing
ranshid Nov 29, 2022
bdce45f
introduce LOOKUP_NOEFFECTS
ranshid Nov 29, 2022
e61f16e
fix compilation issue
ranshid Nov 29, 2022
6ae5c00
better align list test for cluster mode
ranshid Nov 29, 2022
3debb34
pr fixes:
ranshid Nov 29, 2022
4b3a91c
do not lookup the command again when reprocessing + command duration
ranshid Nov 29, 2022
68e9de9
zero duration on client creation and client reset
ranshid Nov 30, 2022
a97bc0c
Merge remote-tracking branch 'redis/unstable' into oss-client-blocking
ranshid Dec 5, 2022
62055a7
Some more changes:
ranshid Dec 8, 2022
29002f0
fix test failure in modulesapi:
ranshid Dec 11, 2022
85c6564
Update src/blocked.c
ranshid Dec 11, 2022
69b1358
pr changes
ranshid Dec 12, 2022
f386420
pr changes:
ranshid Dec 12, 2022
ab1252f
pr fixes:
ranshid Dec 13, 2022
3adc264
retry
ranshid Dec 13, 2022
e1269ab
deprecate SLOWLOG and STATS command flags
ranshid Dec 27, 2022
9c2b60a
make explicit check during call if current_client is AOF
ranshid Dec 28, 2022
d2a79f8
Merge remote-tracking branch 'redis/unstable' into oss-client-blocking
ranshid Dec 29, 2022
14e6a1e
pr fixes:
ranshid Dec 29, 2022
924246b
fix unittest race condition
ranshid Dec 29, 2022
47e8f1a
Update tests/unit/type/list.tcl
ranshid Dec 29, 2022
07685b3
LAST pr fixes
ranshid Dec 30, 2022
390ee7b
report blocked commandsto monitor only once and only when it was first
ranshid Jan 1, 2023
9f23b38
Update src/server.c
ranshid Jan 1, 2023
a235707
Update src/server.c
ranshid Jan 1, 2023
606818a
Update src/server.c
ranshid Jan 1, 2023
55c6216
add test for monitor log blocked command
ranshid Jan 1, 2023
0d54e97
fix test issue
ranshid Jan 1, 2023
5237d3f
remove debug print
ranshid Jan 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
784 changes: 299 additions & 485 deletions src/blocked.c

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -7215,10 +7215,10 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
if (c->flags & CLIENT_BLOCKED &&
(c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM ||
c->btype == BLOCKED_MODULE))
(c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_MODULE))
{
dictEntry *de;
dictIterator *di;
Expand All @@ -7234,11 +7234,11 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {

/* If the client is blocked on module, but not on a specific key,
* don't unblock it (except for the CLUSTER_FAIL case above). */
if (c->btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
return 0;

/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bpop.keys);
di = dictGetIterator(c->bstate.keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ void shutdownCommand(client *c) {
return;
}

blockClient(c, BLOCKED_SHUTDOWN);
blockClientShutdown(c);
if (prepareForShutdown(flags) == C_OK) exit(0);
/* If we're here, then shutdown is ongoing (the client is still blocked) or
* failed (the client has received an error). */
Expand Down
9 changes: 9 additions & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref);
#define dictSetDoubleVal(entry, _val_) \
do { (entry)->v.d = _val_; } while(0)

#define dictIncrSignedIntegerVal(entry, _val_) \
((entry)->v.s64 += _val_)

#define dictIncrUnsignedIntegerVal(entry, _val_) \
((entry)->v.u64 += _val_)

#define dictIncrDoubleVal(entry, _val_) \
((entry)->v.d += _val_)

#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) \
(d)->type->keyDestructor((d), (entry)->key)
Expand Down
31 changes: 15 additions & 16 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -6158,7 +6158,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
server.replication_allowed = replicate && server.replication_allowed;

/* Run the command */
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_FROM_MODULE;
int call_flags = CMD_CALL_FROM_MODULE;
if (replicate) {
if (!(flags & REDISMODULE_ARGV_NO_AOF))
call_flags |= CMD_CALL_PROPAGATE_AOF;
Expand Down Expand Up @@ -7282,7 +7282,7 @@ void RM_LatencyAddSample(const char *event, mstime_t latency) {
* The structure RedisModuleBlockedClient will be always deallocated when
* running the list of clients blocked by a module that need to be unblocked. */
void unblockClientFromModule(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;

/* Call the disconnection callback if any. Note that
* bc->disconnect_callback is set to NULL if the client gets disconnected
Expand Down Expand Up @@ -7346,8 +7346,8 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
int islua = scriptIsRunning();
int ismulti = server.in_exec;

c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
ctx->module->blocked_clients++;

/* We need to handle the invalid operation of calling modules blocking
Expand All @@ -7371,16 +7371,16 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->unblocked = 0;
bc->background_timer = 0;
bc->background_duration = 0;
c->bpop.timeout = timeout;
c->bstate.timeout = timeout;

if (islua || ismulti) {
c->bpop.module_blocked_handle = NULL;
c->bstate.module_blocked_handle = NULL;
addReplyError(c, islua ?
"Blocking module command called from Lua script" :
"Blocking module command called from transaction");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
} else {
blockClient(c,BLOCKED_MODULE);
}
Expand All @@ -7397,7 +7397,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
* This function returns 1 if client was served (and should be unblocked) */
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;

/* Protect against re-processing: don't serve clients that are already
* in the unblocking list for any reason (including RM_UnblockClient()
Expand Down Expand Up @@ -7566,14 +7566,14 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
/* This API is used by the Redis core to unblock a client that was blocked
* by a module. */
void moduleUnblockClient(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
moduleUnblockClientByHandle(bc,NULL);
}

/* Return true if the client 'c' was blocked by a module using
* RM_BlockClientOnKeys(). */
int moduleClientIsBlockedOnKeys(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
return bc->blocked_on_keys;
}

Expand Down Expand Up @@ -7740,10 +7740,10 @@ void moduleHandleBlockedClients(void) {
* moduleBlockedClientTimedOut().
*/
int moduleBlockedClientMayTimeout(client *c) {
if (c->btype != BLOCKED_MODULE)
if (c->bstate.btype != BLOCKED_MODULE)
return 1;

RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
return (bc && bc->timeout_callback != NULL);
}

Expand All @@ -7752,7 +7752,7 @@ int moduleBlockedClientMayTimeout(client *c) {
* does not need to do any cleanup. Eventually the module will call the
* API to unblock the client and the memory will be released. */
void moduleBlockedClientTimedOut(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;

/* Protect against re-processing: don't serve clients that are already
* in the unblocking list for any reason (including RM_UnblockClient()
Expand All @@ -7767,9 +7767,8 @@ void moduleBlockedClientTimedOut(client *c) {
long long prev_error_replies = server.stat_total_error_replies;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
if (!bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
}
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);

/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
* this case, and the user can still hook using the timeout callback. */
Expand Down
22 changes: 8 additions & 14 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ client *createClient(connection *conn) {
c->flags = 0;
c->slot = -1;
c->ctime = c->lastinteraction = server.unixtime;
c->duration = 0;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
c->repl_start_cmd_stream_on_ack = 0;
Expand All @@ -184,15 +185,7 @@ client *createClient(connection *conn) {
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
initClientBlockingState(c);
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
Expand Down Expand Up @@ -1588,7 +1581,7 @@ void freeClient(client *c) {

/* Deallocate structures used to block on blocking ops. */
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
dictRelease(c->bpop.keys);
dictRelease(c->bstate.keys);

/* UNWATCH all the keys */
unwatchAllKeys(c);
Expand Down Expand Up @@ -2033,6 +2026,8 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->duration = 0;
c->flags &= ~CLIENT_EXECUTING_COMMAND;

if (c->deferred_reply_errors)
listRelease(c->deferred_reply_errors);
Expand Down Expand Up @@ -3142,12 +3137,11 @@ NULL
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (unblock_error)
addReplyError(target,
unblockClientOnError(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
else
replyToBlockedClientTimedOut(target);
unblockClient(target);
updateStatsOnUnblock(target, 0, 0, 1);
unblockClientOnTimeout(target);

addReply(c,shared.cone);
} else {
addReply(c,shared.czero);
Expand Down
16 changes: 6 additions & 10 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3480,11 +3480,7 @@ void waitCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
c->bpop.timeout = timeout;
c->bpop.reploffset = offset;
c->bpop.numreplicas = numreplicas;
listAddNodeHead(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
blockForReplication(c,timeout,offset,numreplicas);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3518,16 +3514,16 @@ void processClientsWaitingReplicas(void) {
* offset and number of replicas, we remember it so the next client
* may be unblocked without calling replicationCountAcksByOffset()
* if the requested offset / replicas were equal or less. */
if (last_offset && last_offset >= c->bpop.reploffset &&
last_numreplicas >= c->bpop.numreplicas)
if (last_offset && last_offset >= c->bstate.reploffset &&
last_numreplicas >= c->bstate.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
int numreplicas = replicationCountAcksByOffset(c->bstate.reploffset);

if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
if (numreplicas >= c->bstate.numreplicas) {
last_offset = c->bstate.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
Expand Down
2 changes: 1 addition & 1 deletion src/script.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
goto error;
}

int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
int call_flags = CMD_CALL_NONE;
if (run_ctx->repl_flags & PROPAGATE_AOF) {
call_flags |= CMD_CALL_PROPAGATE_AOF;
}
Expand Down
2 changes: 1 addition & 1 deletion src/script_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ static int luaRedisGenericCommand(lua_State *lua, int raise_error) {
c->argc = c->argv_len = 0;
c->user = NULL;
c->argv = NULL;
freeClientArgv(c);
resetClient(c);
inuse--;

if (raise_error) {
Expand Down
Loading