Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
bf3607f
[add] enabled background and reply time tracking on blocked clients w…
filipecosta90 Jul 3, 2020
ef5f268
[add] updated towards peer review on latency histogram workflow and b…
filipecosta90 Jul 8, 2020
00492ae
[fix] fixes per PR review
filipecosta90 Jul 17, 2020
e50dd24
[wip] unified common code into updateStatsOnUnblock
filipecosta90 Jan 17, 2021
12a7788
Merge branch 'unstable' into slowlog.blocking.commands
filipecosta90 Jan 17, 2021
cd726c8
[add] removed spiruous unrelated changes
filipecosta90 Jan 17, 2021
a82f627
[add] removed spiruous unrelated changes
filipecosta90 Jan 17, 2021
dfc0d4f
[fix] Fixed incompatible pointer passing on updateStatsOnUnblock
filipecosta90 Jan 17, 2021
bc11a38
[fix] Moved module's updateStatsOnUnblock() outside of reply callback…
filipecosta90 Jan 17, 2021
1d8498f
[fix] Moved module's updateStatsOnUnblock() outside of reply callback…
filipecosta90 Jan 17, 2021
1e0fc66
[fix] Fixes per PR review: replacing ustime() usage for monotime meth…
filipecosta90 Jan 18, 2021
a702ce0
[add] swapping ustime for monotonic clock on call()
filipecosta90 Jan 18, 2021
a1f4c6d
[fix] Fixes per PR review: using monotonic clock to track blocked cli…
filipecosta90 Jan 19, 2021
83c4dd4
[fix] Fixes per PR review: change time_t to long within updateStatsOn…
filipecosta90 Jan 19, 2021
ee77cfd
[fix] Fixes per PR review: Introduced RedisModule_MeasureTimeStart() …
filipecosta90 Jan 20, 2021
2289e12
[fix] Fixes per PR review: enable tracking time on timeout. Added tes…
filipecosta90 Jan 21, 2021
6a70eea
[fix] Fixed latency monitor tracking. Included latency monitor test
filipecosta90 Jan 21, 2021
b452dee
[add] Made reference to RM_MeasureTimeStart() function description th…
filipecosta90 Jan 21, 2021
97f55ec
[fix] Fixes per PR review: using the latency monitor only for main th…
filipecosta90 Jan 24, 2021
6a7b334
[fix] Fixes per PR review: renamed methods to include blocked client …
filipecosta90 Jan 24, 2021
d2bd4b2
[fix] Fixed flaky valgrind tests on module blocked client
filipecosta90 Jan 24, 2021
fd93a06
[fix] Fixes per PR review: moved BlockedClientMeasureTime* to inside …
filipecosta90 Jan 24, 2021
73da9d8
[fix] Fixes per PR review: client duration should be long.
filipecosta90 Jan 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtest-moduleapi
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/hooks \
--single unit/moduleapi/misc \
--single unit/moduleapi/blockonkeys \
--single unit/moduleapi/blockonbackground \
--single unit/moduleapi/scan \
--single unit/moduleapi/datatype \
--single unit/moduleapi/auth \
Expand Down
29 changes: 29 additions & 0 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
*/

#include "server.h"
#include "slowlog.h"
#include "latency.h"
#include "monotonic.h"

int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
Expand Down Expand Up @@ -97,6 +100,20 @@ void blockClient(client *c, int btype) {
}
}

/* This function is called after a client has finished a blocking operation
* in order to update the total command duration, log the command into
* the Slow log if needed, and log the reply duration event if needed. */
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
c->lastcmd->microseconds += total_cmd_duration;
/* Log the command into the Slow log if needed. */
if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG)) {
slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration);
/* Log the reply duration event. */
latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
}
}

/* This function is called in the beforeSleep() function of the event loop
* in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */
Expand Down Expand Up @@ -264,6 +281,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);

monotime replyTimer;
elapsedStart(&replyTimer);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
wherefrom, whereto) == C_ERR)
Expand All @@ -272,6 +291,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* to also undo the POP operation. */
listTypePush(o,value,wherefrom);
}
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
Expand Down Expand Up @@ -316,7 +336,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
zcard--;

/* Replicate the command. */
Expand Down Expand Up @@ -406,6 +429,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}

monotime replyTimer;
elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
* the name of the stream and the data we
* extracted from it. Wrapped in a single-item
Expand All @@ -425,6 +450,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

/* Note that after we unblock the client, 'gt'
* and other receiver->bpop stuff are no longer
Expand Down Expand Up @@ -471,7 +497,10 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

moduleUnblockClient(receiver);
}
Expand Down
51 changes: 51 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

#include "server.h"
#include "cluster.h"
#include "slowlog.h"
#include "rdb.h"
#include "monotonic.h"
#include <dlfcn.h>
#include <sys/stat.h>
#include <sys/wait.h>
Expand Down Expand Up @@ -252,6 +254,9 @@ typedef struct RedisModuleBlockedClient {
int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
int unblocked; /* Already on the moduleUnblocked list. */
monotime background_timer; /* Timer tracking the start of background work */
uint64_t background_duration; /* Current command background time duration.
Used for measuring latency of blocking cmds */
} RedisModuleBlockedClient;

static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -900,6 +905,30 @@ long long RM_Milliseconds(void) {
return mstime();
}

/* Mark a point in time that will be used as the start time to calculate
* the elapsed execution time when RM_BlockedClientMeasureTimeEnd() is called.
* Within the same command, you can call multiple times
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
* to accummulate indepedent time intervals to the background duration.
* This method always return REDISMODULE_OK. */
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
elapsedStart(&(bc->background_timer));
return REDISMODULE_OK;
}

/* Mark a point in time that will be used as the end time
* to calculate the elapsed execution time.
* On success REDISMODULE_OK is returned.
* This method only returns REDISMODULE_ERR if no start time was
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
if (!bc->background_timer)
return REDISMODULE_ERR;
bc->background_duration += elapsedUs(bc->background_timer);
return REDISMODULE_OK;
}

/* Set flags defining capabilities or behavior bit flags.
*
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
Expand Down Expand Up @@ -4573,6 +4602,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->dbid = c->db->id;
bc->blocked_on_keys = keys != NULL;
bc->unblocked = 0;
bc->background_duration = 0;
c->bpop.timeout = timeout;

if (islua || ismulti) {
Expand Down Expand Up @@ -4646,6 +4676,11 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
*
* In these cases, a call to RedisModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply.
*
* Measuring background time: By default the time spent in the blocked command
* is not account for the total command duration. To include such time you should
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
* or multiple times within the blocking command background work.
*/
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
Expand Down Expand Up @@ -4840,6 +4875,7 @@ void moduleHandleBlockedClients(void) {
* was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
uint64_t reply_us = 0;
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
Expand All @@ -4848,9 +4884,19 @@ void moduleHandleBlockedClients(void) {
ctx.module = bc->module;
ctx.client = bc->client;
ctx.blocked_client = bc;
monotime replyTimer;
elapsedStart(&replyTimer);
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
reply_us = elapsedUs(replyTimer);
moduleFreeContext(&ctx);
}
/* Update stats now that we've finished the blocking operation.
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us);
}

/* Free privdata if any. */
if (bc->privdata && bc->free_privdata) {
Expand Down Expand Up @@ -4914,6 +4960,9 @@ void moduleBlockedClientTimedOut(client *c) {
ctx.blocked_privdata = bc->privdata;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
if (!bc->blocked_on_keys) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madolson added the background time tracking on timeout as well for modules (tested on test named "check blocked command that uses RedisModule_MeasureTimeStart() is tracking background time even in timeout" ).
With regards replyToBlockedClientTimedOut on lists, sorted sets or streams, given that timeout occurs on keys there is no reason to track that time correct?

updateStatsOnUnblock(c, bc->background_duration, 0);
}
/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
* this case, and the user can still hook using the timeout callback. */
Expand Down Expand Up @@ -8552,6 +8601,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetBlockedClientPrivateData);
REGISTER_API(AbortBlock);
REGISTER_API(Milliseconds);
REGISTER_API(BlockedClientMeasureTimeStart);
REGISTER_API(BlockedClientMeasureTimeEnd);
REGISTER_API(GetThreadSafeContext);
REGISTER_API(GetDetachedThreadSafeContext);
REGISTER_API(FreeThreadSafeContext);
Expand Down
4 changes: 4 additions & 0 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ REDISMODULE_API int (*RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx)
REDISMODULE_API void * (*RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AbortBlock)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeStart)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeEnd)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetDetachedThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1006,6 +1008,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetBlockedClientPrivateData);
REDISMODULE_GET_API(GetBlockedClientHandle);
REDISMODULE_GET_API(AbortBlock);
REDISMODULE_GET_API(BlockedClientMeasureTimeStart);
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
Expand Down
12 changes: 8 additions & 4 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3557,7 +3557,7 @@ void preventCommandReplication(client *c) {
*/
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
monotime call_timer;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
static long long prev_err_count;
Expand All @@ -3583,9 +3583,10 @@ void call(client *c, int flags) {
dirty = server.dirty;
prev_err_count = server.stat_total_error_replies;
updateCachedTime(0);
start = server.ustime;
elapsedStart(&call_timer);
c->cmd->proc(c);
duration = ustime()-start;
const long duration = elapsedUs(call_timer);
c->duration = duration;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

Expand Down Expand Up @@ -3629,7 +3630,10 @@ void call(client *c, int flags) {
* arguments. */
robj **argv = c->original_argv ? c->original_argv : c->argv;
int argc = c->original_argv ? c->original_argc : c->argc;
slowlogPushEntryIfNeeded(c,argv,argc,duration);
/* If the client is blocked we will handle slowlog when it is unblocked . */
if (!(c->flags & CLIENT_BLOCKED)) {
slowlogPushEntryIfNeeded(c,argv,argc,duration);
}
}
freeClientOriginalArgv(c);

Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ typedef struct client {
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
Expand Down Expand Up @@ -2358,6 +2359,7 @@ void disconnectAllBlockedClients(void);
void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);

/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);
Expand Down
1 change: 1 addition & 0 deletions tests/modules/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ TEST_MODULES = \
misc.so \
hooks.so \
blockonkeys.so \
blockonbackground.so \
scan.so \
datatype.so \
auth.so \
Expand Down
Loading