Skip to content

Commit 3e65147

Browse files
authored
Improve agent startup on windows (#21125)
* Exclude Windows from temporary spawn server initialization and cleanup * Additional checks to isolate windows deadlock issue * Switch to sleep_usec * Cast revents to unsigned int for log clarity in log forwarder
1 parent 7c19589 commit 3e65147

File tree

3 files changed

+97
-28
lines changed

3 files changed

+97
-28
lines changed

src/daemon/main.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,6 @@ int netdata_main(int argc, char **argv) {
859859

860860
// ----------------------------------------------------------------------------------------------------------------
861861
delta_startup_time("temp spawn server");
862-
863862
netdata_main_spawn_server_init("init", argc, (const char **)argv);
864863

865864
// ----------------------------------------------------------------------------------------------------------------
@@ -982,7 +981,6 @@ int netdata_main(int argc, char **argv) {
982981

983982
// ----------------------------------------------------------------------------------------------------------------
984983
delta_startup_time("stop temporary spawn server");
985-
986984
// stop the old server and later start a new one under the new permissions
987985
netdata_main_spawn_server_cleanup();
988986

src/libnetdata/spawn_server/log-forwarder.c

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef struct LOG_FORWARDER {
2121
SPINLOCK spinlock;
2222
int pipe_fds[2]; // Pipe for notifications
2323
bool running;
24+
volatile bool initialized; // Thread has fully initialized (atomic)
2425
} LOG_FORWARDER;
2526

2627
static void log_forwarder_thread_func(void *arg);
@@ -71,9 +72,27 @@ LOG_FORWARDER *log_forwarder_start(void) {
7172
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: Failed to set non-blocking mode");
7273

7374
lf->running = true;
75+
__atomic_store_n(&lf->initialized, false, __ATOMIC_RELEASE);
76+
7477
lf->thread = nd_thread_create("log-fw", NETDATA_THREAD_OPTION_DEFAULT, log_forwarder_thread_func, lf);
7578

76-
nd_log(NDLS_COLLECTORS, NDLP_INFO, "Log forwarder: created thread pointer: %p", lf->thread);
79+
if(!lf->thread) {
80+
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: nd_thread_create() failed!");
81+
close(lf->pipe_fds[PIPE_READ]);
82+
close(lf->pipe_fds[PIPE_WRITE]);
83+
freez(lf);
84+
return NULL;
85+
}
86+
87+
// Wait for the thread to signal it's initialized
88+
size_t retries = 0;
89+
while (!__atomic_load_n(&lf->initialized, __ATOMIC_ACQUIRE) && retries < 100) { // 100 * 10ms = 1 second max
90+
sleep_usec(10 * USEC_PER_MS); // 1ms
91+
retries++;
92+
}
93+
94+
if (!__atomic_load_n(&lf->initialized, __ATOMIC_ACQUIRE))
95+
nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Log forwarder: thread initialization timeout");
7796

7897
return lf;
7998
}
@@ -84,7 +103,8 @@ static inline void mark_all_entries_for_deletion_unsafe(LOG_FORWARDER *lf) {
84103
}
85104

86105
void log_forwarder_stop(LOG_FORWARDER *lf) {
87-
if(!lf || !lf->running) return;
106+
if(!lf || !lf->running)
107+
return;
88108

89109
// Signal the thread to stop
90110
spinlock_lock(&lf->spinlock);
@@ -96,21 +116,25 @@ void log_forwarder_stop(LOG_FORWARDER *lf) {
96116

97117
lf->running = false;
98118
mark_all_entries_for_deletion_unsafe(lf);
99-
100-
// Send a byte to the pipe to wake up the thread
101-
// char ch = 0;
102-
// if(write(lf->pipe_fds[PIPE_WRITE], &ch, 1) <= 0) { ; }
103-
close(lf->pipe_fds[PIPE_WRITE]); // force it to quit
104119
spinlock_unlock(&lf->spinlock);
105120

121+
// Wake up the thread by writing to the pipe (don't close it yet - let the thread clean up)
122+
char ch = 0;
123+
ssize_t written = write(lf->pipe_fds[PIPE_WRITE], &ch, 1);
124+
(void)written;
125+
106126
// Wait for the thread to finish
107-
nd_log(NDLS_COLLECTORS, NDLP_INFO, "Log forwarder: stopping thread pointer: %p", lf->thread);
108-
if(nd_thread_join(lf->thread) == 0) {
109-
lf->thread = NULL;
110-
freez(lf);
127+
// Note: nd_thread_join() handles the Windows/MSYS2 EINVAL case internally
128+
int join_result = nd_thread_join(lf->thread);
129+
if(join_result != 0) {
130+
nd_log(NDLS_COLLECTORS, NDLP_ERR,
131+
"Log forwarder: nd_thread_join() failed with error %d", join_result);
111132
}
112-
else
113-
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: not freeing lf due to nd_thread_join() error.");
133+
134+
// Always clean up - if join failed, the thread has still exited
135+
lf->thread = NULL;
136+
close(lf->pipe_fds[PIPE_WRITE]);
137+
freez(lf);
114138
}
115139

116140
// --------------------------------------------------------------------------------------------------------------------
@@ -235,6 +259,13 @@ static void log_forwarder_thread_func(void *arg) {
235259

236260
while (1) {
237261
spinlock_lock(&lf->spinlock);
262+
263+
// Signal initialization on first iteration after acquiring spinlock
264+
// This ensures the thread is truly ready and in its main loop
265+
if(!__atomic_load_n(&lf->initialized, __ATOMIC_ACQUIRE)) {
266+
__atomic_store_n(&lf->initialized, true, __ATOMIC_RELEASE);
267+
}
268+
238269
if (!lf->running) {
239270
spinlock_unlock(&lf->spinlock);
240271
break;
@@ -263,17 +294,35 @@ static void log_forwarder_thread_func(void *arg) {
263294

264295
if (ret > 0) {
265296
// Check the notification pipe
266-
if (pfds[0].revents & POLLIN) {
267-
// Read and discard the data
268-
char buf[256];
269-
ssize_t bytes_read = read(lf->pipe_fds[PIPE_READ], buf, sizeof(buf));
270-
// Ignore the data; proceed regardless of the result
271-
if (bytes_read == -1) {
272-
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
273-
// Handle read error if necessary
274-
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: Failed to read from notification pipe");
297+
if (pfds[0].revents & (POLLIN | POLLERR | POLLHUP | POLLNVAL)) {
298+
if (pfds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
299+
// Pipe error - check if we should exit
300+
spinlock_lock(&lf->spinlock);
301+
bool should_exit = !lf->running;
302+
spinlock_unlock(&lf->spinlock);
303+
304+
if (should_exit) {
305+
// Expected during shutdown
275306
break;
276307
}
308+
309+
nd_log(NDLS_COLLECTORS, NDLP_ERR,
310+
"Log forwarder: pipe error (revents=0x%x) but still running",
311+
(unsigned int) pfds[0].revents);
312+
}
313+
314+
if (pfds[0].revents & POLLIN) {
315+
// Read and discard the data
316+
char buf[256];
317+
ssize_t bytes_read = read(lf->pipe_fds[PIPE_READ], buf, sizeof(buf));
318+
// Ignore the data; proceed regardless of the result
319+
if (bytes_read == -1) {
320+
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
321+
// Handle read error if necessary
322+
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: Failed to read from notification pipe");
323+
break;
324+
}
325+
}
277326
}
278327
}
279328

@@ -326,8 +375,6 @@ static void log_forwarder_thread_func(void *arg) {
326375
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: poll() error");
327376
}
328377

329-
nd_log(NDLS_COLLECTORS, NDLP_ERR, "Log forwarder: exiting...");
330-
331378
spinlock_lock(&lf->spinlock);
332379
mark_all_entries_for_deletion_unsafe(lf);
333380
log_forwarder_remove_deleted_unsafe(lf);

src/libnetdata/threads/threads.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -462,15 +462,39 @@ int nd_thread_join(ND_THREAD *nti) {
462462
return 0;
463463

464464
int ret;
465+
465466
if((ret = uv_thread_join(&nti->thread))) {
466467
// we can't join the thread
467468

468469
nd_log(NDLS_DAEMON, NDLP_WARNING,
469470
"cannot join thread. uv_thread_join() failed with code %d. (tag=%s)",
470471
ret, nti->tag);
472+
473+
// On Windows/MSYS2, if the thread exited very quickly, uv_thread_join() can fail with EINVAL (-22)
474+
// because the thread handle becomes invalid before the join executes. However, the thread may
475+
// still be finishing its cleanup. Wait for it to reach FINISHED state before cleaning up.
476+
if(ret == -22) { // UV_EINVAL
477+
nd_log(NDLS_DAEMON, NDLP_INFO,
478+
"thread '%s' join returned EINVAL, waiting for thread to finish...", nti->tag);
479+
480+
// Spin-wait for the thread to mark itself as finished
481+
size_t retries = 0;
482+
while(!nd_thread_status_check(nti, NETDATA_THREAD_STATUS_FINISHED) && retries < 1000) {
483+
sleep_usec(1 * USEC_PER_MS); // 1ms
484+
retries++;
485+
}
486+
487+
if (nd_thread_status_check(nti, NETDATA_THREAD_STATUS_FINISHED)) {
488+
nd_log(NDLS_DAEMON, NDLP_INFO, "thread '%s' confirmed finished, cleaning up structure", nti->tag);
489+
ret = 0;
490+
} else {
491+
nd_log(NDLS_DAEMON, NDLP_ERR, "thread '%s' did not reach FINISHED state after 1 second", nti->tag);
492+
}
493+
}
471494
}
472-
else {
473-
// we successfully joined the thread
495+
496+
if(ret == 0) {
497+
// we successfully joined the thread (or cleaned up after Windows fast-exit)
474498
nd_thread_status_set(nti, NETDATA_THREAD_STATUS_JOINED);
475499

476500
spinlock_lock(&threads_globals.running.spinlock);

0 commit comments

Comments
 (0)