Skip to content

Commit 455c5a8

Browse files
committed
Fix TSAN warnings on the repl backlog
1 parent 875b6a7 commit 455c5a8

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

src/networking.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2079,8 +2079,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
20792079
* that may trigger write error or recreate handler. */
20802080
if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;
20812081

2082-
//std::unique_lock<decltype(c->lock)> lock(c->lock);
2083-
20842082
/* Don't write to clients that are going to be closed anyway. */
20852083
if (c->flags & CLIENT_CLOSE_ASAP) continue;
20862084

@@ -2098,6 +2096,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
20982096

20992097
/* If after the synchronous writes above we still have data to
21002098
* output to the client, we need to install the writable handler. */
2099+
std::unique_lock<decltype(c->lock)> lock(c->lock);
21012100
if (clientHasPendingReplies(c)) {
21022101
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
21032102
freeClientAsync(c);

src/replication.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,8 @@ void resizeReplicationBacklog(long long newsize) {
264264
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
265265
if (g_pserver->repl_backlog_size == newsize) return;
266266

267-
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
268-
269267
if (g_pserver->repl_backlog != NULL) {
268+
std::unique_lock<fastlock> repl_backlog_lock(g_pserver->repl_backlog_lock);
270269
/* What we actually do is to flush the old buffer and realloc a new
271270
* empty one. It will refill with new data incrementally.
272271
* The reason is that copying a few gigabytes adds latency and even
@@ -357,7 +356,7 @@ void freeReplicationBacklog(void) {
357356
void feedReplicationBacklog(const void *ptr, size_t len) {
358357
serverAssert(GlobalLocksAcquired());
359358
const unsigned char *p = (const unsigned char*)ptr;
360-
359+
std::unique_lock<fastlock> repl_backlog_lock(g_pserver->repl_backlog_lock, std::defer_lock);
361360

362361
if (g_pserver->repl_batch_idxStart >= 0) {
363362
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
@@ -417,6 +416,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
417416
// We need to update a few variables or later asserts will notice we dropped data
418417
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
419418
g_pserver->repl_lowest_off = -1;
419+
if (!repl_backlog_lock.owns_lock())
420+
repl_backlog_lock.lock(); // we need to acquire the lock if we'll be overwriting data that writeToClient may be reading
420421
}
421422
}
422423
}

0 commit comments

Comments
 (0)