Skip to content

Commit c643895

Browse files
authored
Improve stale replication (#21357)
* Implement stuck replication detection and response handling * Fix checks and commit response for not found chart * Initialize replication stuck detection counter and refine replication completion conditions * Refine streaming response handling to prevent data loss during buffer overflow
1 parent bc24088 commit c643895

File tree

4 files changed

+171
-1
lines changed

4 files changed

+171
-1
lines changed

src/database/rrdset-index-id.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
103103

104104
rw_spinlock_init(&st->alerts.spinlock);
105105

106+
// Initialize replication stuck detection counter
107+
st->replication_empty_response_count = 0;
108+
106109
// initialize the db tiers
107110
{
108111
for(size_t tier = 0; tier < nd_profile.storage_tiers; tier++) {

src/database/rrdset.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ struct rrdset {
228228
} replay;
229229
#endif // NETDATA_LOG_REPLICATION_REQUESTS
230230

231+
// Replication stuck detection - outside debug flag for production safety
232+
uint8_t replication_empty_response_count; // track consecutive empty responses
233+
231234
SPINLOCK destroy_lock;
232235
};
233236

src/plugins.d/pluginsd_replication.c

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,9 @@ ALWAYS_INLINE PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, si
359359
ALWAYS_INLINE PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
360360
if (num_words < 7) { // accepts 7, but the 7th is optional
361361
nd_log(NDLS_DAEMON, NDLP_ERR, "REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
362+
RRDSET *st = pluginsd_get_scope_chart(parser);
363+
if(st)
364+
st->replication_empty_response_count = 0;
362365
return PARSER_RC_ERROR;
363366
}
364367

@@ -402,6 +405,10 @@ ALWAYS_INLINE PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS
402405

403406
parser->user.data_collections_count++;
404407

408+
// Reset empty response counter when we receive actual data
409+
if(parser->user.replay.rset_enabled && st)
410+
st->replication_empty_response_count = 0;
411+
405412
if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
406413
time_t now = now_realtime_sec();
407414
time_t started = st->rrdhost->receiver->replication.first_time_s;
@@ -433,6 +440,9 @@ ALWAYS_INLINE PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS
433440
st->replay.log_next_data_collection = true;
434441
#endif
435442

443+
if(start_streaming)
444+
st->replication_empty_response_count = 0;
445+
436446
if (start_streaming) {
437447
#ifdef REPLICATION_TRACKING
438448
st->stream.rcv.who = REPLAY_WHO_FINISHED;
@@ -465,6 +475,111 @@ ALWAYS_INLINE PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS
465475
return PARSER_RC_OK;
466476
}
467477

478+
// ========================================================================
479+
// SAFETY NET: Detect stuck replication loops
480+
// ========================================================================
481+
//
482+
// We received start_streaming=false, which means we need to send another
483+
// replication request. However, we need to detect if we're stuck in an
484+
// infinite retry loop where no progress is being made.
485+
//
486+
// This can happen when:
487+
// 1. Parent already has newer data than child
488+
// 2. Child keeps splitting responses due to buffer constraints
489+
// 3. Network issues causing repeated empty/failed responses
490+
491+
// Check parent's current retention to detect if we're already caught up
492+
time_t local_first_entry = 0, local_last_entry = 0;
493+
rrdset_get_retention_of_tier_for_collected_chart(
494+
st, &local_first_entry, &local_last_entry, now_realtime_sec(), 0);
495+
496+
// Detect suspicious pattern: parent requested data but is already caught up
497+
// This indicates we're in a loop where child keeps splitting responses
498+
// even though parent doesn't need more data.
499+
bool parent_already_caught_up = (local_last_entry >= last_entry_child);
500+
bool requested_non_empty_range = (first_entry_requested != 0 || last_entry_requested != 0);
501+
bool is_suspicious_response = (requested_non_empty_range && parent_already_caught_up);
502+
503+
bool should_check_for_stuck_replication = false;
504+
505+
// Track consecutive suspicious responses - applies to all builds
506+
if(is_suspicious_response) {
507+
st->replication_empty_response_count++;
508+
// After 3 consecutive suspicious responses, we need to investigate
509+
if(st->replication_empty_response_count >= 3) {
510+
should_check_for_stuck_replication = true;
511+
}
512+
} else {
513+
// Reset counter if this was a legitimate response (parent still catching up)
514+
st->replication_empty_response_count = 0;
515+
}
516+
517+
if (should_check_for_stuck_replication) {
518+
// We already have local_first_entry and local_last_entry from above
519+
520+
// Check multiple conditions to ensure we're truly stuck:
521+
//
522+
// Condition 1: Parent has data that covers or exceeds child's retention
523+
// (We already checked this in parent_already_caught_up, but verify again)
524+
bool parent_has_equal_or_newer_data = (local_last_entry >= last_entry_child);
525+
526+
// Calculate the gap for logging purposes
527+
time_t gap_to_child = (last_entry_child > local_last_entry) ?
528+
(last_entry_child - local_last_entry) : 0;
529+
530+
// Condition 2: Parent's data is reasonably recent
531+
time_t wall_clock = now_realtime_sec();
532+
bool parent_data_is_recent = (local_last_entry > 0 &&
533+
(wall_clock - local_last_entry) < 300);
534+
535+
// Only finish replication if parent has equal or newer data than child
536+
// Do NOT terminate if there's any gap, as that would cause data loss
537+
if (parent_has_equal_or_newer_data) {
538+
539+
// Log with appropriate level based on confidence
540+
ND_LOG_FIELD_PRIORITY level = (parent_has_equal_or_newer_data && parent_data_is_recent) ?
541+
NDLP_INFO : NDLP_WARNING;
542+
543+
nd_log(NDLS_DAEMON, level,
544+
"PLUGINSD REPLAY: 'host:%s/chart:%s' detected stuck replication loop. "
545+
"Parent last entry: %llu, Child last entry: %llu, Gap: %llu seconds, "
546+
"Empty responses: %u. Forcing replication to finish.",
547+
rrdhost_hostname(host), rrdset_id(st),
548+
(unsigned long long)local_last_entry,
549+
(unsigned long long)last_entry_child,
550+
(unsigned long long)gap_to_child,
551+
(unsigned int)st->replication_empty_response_count
552+
);
553+
554+
st->replication_empty_response_count = 0;
555+
556+
// IMPORTANT: Mark as finished and decrement counter NOW, before sending final request.
557+
// This prevents infinite loops even if child continues to respond with start_streaming=false.
558+
// The next REPLAY_END will see FINISHED flag and handle accordingly.
559+
RRDSET_FLAGS old = rrdset_flag_set_and_clear(
560+
st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED,
561+
RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SYNC_CLOCK);
562+
563+
if(!(old & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) {
564+
if(rrdhost_receiver_replicating_charts_minus_one(st->rrdhost) == 0)
565+
pulse_host_status(host, PULSE_HOST_STATUS_RCV_RUNNING, 0);
566+
}
567+
568+
pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
569+
host->stream.rcv.status.replication.percent = 100.0;
570+
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->stream.rcv.status.replication.percent);
571+
572+
// Send one final request to notify child. If child responds with start_streaming=true,
573+
// it will start streaming. If it responds with start_streaming=false, the next
574+
// REPLAY_END will see the FINISHED flag and log a warning but not loop forever.
575+
bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
576+
first_entry_child, last_entry_child, child_world_time,
577+
0, 0); // prev_wanted = 0,0 to trigger empty request path
578+
579+
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
580+
}
581+
}
582+
468583
#ifdef REPLICATION_TRACKING
469584
st->stream.rcv.who = REPLAY_WHO_ME;
470585
#endif

src/streaming/stream-replication-sender.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,16 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
442442

443443
if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
444444
q->query.before = last_end_time_in_buffer;
445+
446+
// CRITICAL: When splitting a response due to buffer overflow, we MUST set
447+
// enable_streaming=false to prevent data loss. If we send start_streaming=true
448+
// with partial data, the parent will think replication is complete when we've
449+
// actually truncated the requested interval, causing permanent data loss.
450+
//
451+
// The parent-side stuck detection will handle infinite loops differently by:
452+
// 1. Detecting when no progress is made after multiple rounds
453+
// 2. Explicitly marking replication as FINISHED before sending final request
454+
// 3. Handling the child's response appropriately whether it says true or false
445455
q->query.enable_streaming = false;
446456

447457
internal_error(
@@ -1198,8 +1208,47 @@ static bool replication_execute_request(struct replication_request *rq, bool wor
11981208
if(!rq->st) {
11991209
__atomic_add_fetch(&replication_globals.atomic.error_not_found, 1, __ATOMIC_RELAXED);
12001210
nd_log(NDLS_DAEMON, NDLP_ERR,
1201-
"STREAM SND REPLAY ERROR: 'host:%s/chart:%s' not found",
1211+
"STREAM SND REPLAY ERROR: 'host:%s/chart:%s' not found, sending empty response to unblock parent",
12021212
rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
1213+
1214+
// CRITICAL: Parent is waiting for a response! We MUST send REPLAY_END even if chart not found
1215+
// Otherwise parent will wait forever with chart stuck in replicating state.
1216+
// Send empty response with start_streaming=true to finish replication for this non-existent chart.
1217+
BUFFER *wb = sender_thread_buffer(rq->sender, REPLICATION_THREAD_BUFFER_INITIAL_SIZE);
1218+
1219+
bool with_slots = (rq->sender->capabilities & STREAM_CAP_SLOTS) ? true : false;
1220+
NUMBER_ENCODING integer_encoding = (rq->sender->capabilities & STREAM_CAP_IEEE754) ?
1221+
NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
1222+
1223+
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
1224+
if(with_slots) {
1225+
buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
1226+
buffer_print_uint64_encoded(wb, integer_encoding, 0); // slot 0 for unknown chart
1227+
}
1228+
buffer_fast_strcat(wb, " '", 2);
1229+
buffer_fast_strcat(wb, string2str(rq->chart_id), string_strlen(rq->chart_id));
1230+
buffer_fast_strcat(wb, "'\n", 2);
1231+
1232+
// Send REPLAY_END with empty data and start_streaming=true to unblock parent
1233+
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
1234+
buffer_print_int64_encoded(wb, integer_encoding, 0); // update_every
1235+
buffer_fast_strcat(wb, " ", 1);
1236+
buffer_print_uint64_encoded(wb, integer_encoding, 0); // db_first_entry
1237+
buffer_fast_strcat(wb, " ", 1);
1238+
buffer_print_uint64_encoded(wb, integer_encoding, 0); // db_last_entry
1239+
buffer_fast_strcat(wb, " true ", 7); // start_streaming=true (force finish)
1240+
buffer_print_uint64_encoded(wb, integer_encoding, 0); // after
1241+
buffer_fast_strcat(wb, " ", 1);
1242+
buffer_print_uint64_encoded(wb, integer_encoding, 0); // before
1243+
buffer_fast_strcat(wb, " ", 1);
1244+
buffer_print_uint64_encoded(wb, integer_encoding, now_realtime_sec()); // wall_clock_time
1245+
buffer_fast_strcat(wb, "\n", 1);
1246+
1247+
sender_commit(rq->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
1248+
__atomic_add_fetch(&rq->sender->host->stream.snd.status.replication.counter_out, 1, __ATOMIC_RELAXED);
1249+
replication_replied_add();
1250+
1251+
ret = true; // Consider this a successful response
12031252
goto cleanup;
12041253
}
12051254
}

0 commit comments

Comments
 (0)