@@ -21,6 +21,7 @@ typedef struct LOG_FORWARDER {
2121 SPINLOCK spinlock ;
2222 int pipe_fds [2 ]; // Pipe for notifications
2323 bool running ;
24+ volatile bool initialized ; // Thread has fully initialized (atomic)
2425} LOG_FORWARDER ;
2526
2627static void log_forwarder_thread_func (void * arg );
@@ -71,9 +72,27 @@ LOG_FORWARDER *log_forwarder_start(void) {
7172 nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: Failed to set non-blocking mode" );
7273
7374 lf -> running = true;
75+ __atomic_store_n (& lf -> initialized , false, __ATOMIC_RELEASE );
76+
7477 lf -> thread = nd_thread_create ("log-fw" , NETDATA_THREAD_OPTION_DEFAULT , log_forwarder_thread_func , lf );
7578
76- nd_log (NDLS_COLLECTORS , NDLP_INFO , "Log forwarder: created thread pointer: %p" , lf -> thread );
79+ if (!lf -> thread ) {
80+ nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: nd_thread_create() failed!" );
81+ close (lf -> pipe_fds [PIPE_READ ]);
82+ close (lf -> pipe_fds [PIPE_WRITE ]);
83+ freez (lf );
84+ return NULL ;
85+ }
86+
87+ // Wait for the thread to signal it's initialized
88+ size_t retries = 0 ;
89+ while (!__atomic_load_n (& lf -> initialized , __ATOMIC_ACQUIRE ) && retries < 100 ) { // 100 * 10ms = 1 second max
90+ sleep_usec (10 * USEC_PER_MS ); // 1ms
91+ retries ++ ;
92+ }
93+
94+ if (!__atomic_load_n (& lf -> initialized , __ATOMIC_ACQUIRE ))
95+ nd_log (NDLS_COLLECTORS , NDLP_WARNING , "Log forwarder: thread initialization timeout" );
7796
7897 return lf ;
7998}
@@ -84,7 +103,8 @@ static inline void mark_all_entries_for_deletion_unsafe(LOG_FORWARDER *lf) {
84103}
85104
86105void log_forwarder_stop (LOG_FORWARDER * lf ) {
87- if (!lf || !lf -> running ) return ;
106+ if (!lf || !lf -> running )
107+ return ;
88108
89109 // Signal the thread to stop
90110 spinlock_lock (& lf -> spinlock );
@@ -96,21 +116,25 @@ void log_forwarder_stop(LOG_FORWARDER *lf) {
96116
97117 lf -> running = false;
98118 mark_all_entries_for_deletion_unsafe (lf );
99-
100- // Send a byte to the pipe to wake up the thread
101- // char ch = 0;
102- // if(write(lf->pipe_fds[PIPE_WRITE], &ch, 1) <= 0) { ; }
103- close (lf -> pipe_fds [PIPE_WRITE ]); // force it to quit
104119 spinlock_unlock (& lf -> spinlock );
105120
121+ // Wake up the thread by writing to the pipe (don't close it yet - let the thread clean up)
122+ char ch = 0 ;
123+ ssize_t written = write (lf -> pipe_fds [PIPE_WRITE ], & ch , 1 );
124+ (void )written ;
125+
106126 // Wait for the thread to finish
107- nd_log (NDLS_COLLECTORS , NDLP_INFO , "Log forwarder: stopping thread pointer: %p" , lf -> thread );
108- if (nd_thread_join (lf -> thread ) == 0 ) {
109- lf -> thread = NULL ;
110- freez (lf );
127+ // Note: nd_thread_join() handles the Windows/MSYS2 EINVAL case internally
128+ int join_result = nd_thread_join (lf -> thread );
129+ if (join_result != 0 ) {
130+ nd_log (NDLS_COLLECTORS , NDLP_ERR ,
131+ "Log forwarder: nd_thread_join() failed with error %d" , join_result );
111132 }
112- else
113- nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: not freeing lf due to nd_thread_join() error." );
133+
134+ // Always clean up - if join failed, the thread has still exited
135+ lf -> thread = NULL ;
136+ close (lf -> pipe_fds [PIPE_WRITE ]);
137+ freez (lf );
114138}
115139
116140// --------------------------------------------------------------------------------------------------------------------
@@ -235,6 +259,13 @@ static void log_forwarder_thread_func(void *arg) {
235259
236260 while (1 ) {
237261 spinlock_lock (& lf -> spinlock );
262+
263+ // Signal initialization on first iteration after acquiring spinlock
264+ // This ensures the thread is truly ready and in its main loop
265+ if (!__atomic_load_n (& lf -> initialized , __ATOMIC_ACQUIRE )) {
266+ __atomic_store_n (& lf -> initialized , true, __ATOMIC_RELEASE );
267+ }
268+
238269 if (!lf -> running ) {
239270 spinlock_unlock (& lf -> spinlock );
240271 break ;
@@ -263,17 +294,35 @@ static void log_forwarder_thread_func(void *arg) {
263294
264295 if (ret > 0 ) {
265296 // Check the notification pipe
266- if (pfds [0 ].revents & POLLIN ) {
267- // Read and discard the data
268- char buf [ 256 ];
269- ssize_t bytes_read = read ( lf -> pipe_fds [ PIPE_READ ], buf , sizeof ( buf ) );
270- // Ignore the data; proceed regardless of the result
271- if ( bytes_read == -1 ) {
272- if ( errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
273- // Handle read error if necessary
274- nd_log ( NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: Failed to read from notification pipe" );
297+ if (pfds [0 ].revents & ( POLLIN | POLLERR | POLLHUP | POLLNVAL ) ) {
298+ if ( pfds [ 0 ]. revents & ( POLLERR | POLLHUP | POLLNVAL )) {
299+ // Pipe error - check if we should exit
300+ spinlock_lock ( & lf -> spinlock );
301+ bool should_exit = ! lf -> running ;
302+ spinlock_unlock ( & lf -> spinlock );
303+
304+ if ( should_exit ) {
305+ // Expected during shutdown
275306 break ;
276307 }
308+
309+ nd_log (NDLS_COLLECTORS , NDLP_ERR ,
310+ "Log forwarder: pipe error (revents=0x%x) but still running" ,
311+ (unsigned int ) pfds [0 ].revents );
312+ }
313+
314+ if (pfds [0 ].revents & POLLIN ) {
315+ // Read and discard the data
316+ char buf [256 ];
317+ ssize_t bytes_read = read (lf -> pipe_fds [PIPE_READ ], buf , sizeof (buf ));
318+ // Ignore the data; proceed regardless of the result
319+ if (bytes_read == -1 ) {
320+ if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR ) {
321+ // Handle read error if necessary
322+ nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: Failed to read from notification pipe" );
323+ break ;
324+ }
325+ }
277326 }
278327 }
279328
@@ -326,8 +375,6 @@ static void log_forwarder_thread_func(void *arg) {
326375 nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: poll() error" );
327376 }
328377
329- nd_log (NDLS_COLLECTORS , NDLP_ERR , "Log forwarder: exiting..." );
330-
331378 spinlock_lock (& lf -> spinlock );
332379 mark_all_entries_for_deletion_unsafe (lf );
333380 log_forwarder_remove_deleted_unsafe (lf );
0 commit comments