Skip to content

Commit ef816f8

Browse files
authored
Rework maintenance thread (netdata#20694)
* Call maintenance functions from metadata event loop Remove service (MAINTENANCE) thread * run_maintenace every 10 seconds
1 parent d22056f commit ef816f8

File tree

8 files changed

+57
-125
lines changed

8 files changed

+57
-125
lines changed

src/daemon/daemon-service.c

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,10 @@ void service_signal_exit(SERVICE_TYPE service) {
9696
}
9797

9898
static void service_to_buffer(BUFFER *wb, SERVICE_TYPE service) {
99-
if(service & SERVICE_MAINTENANCE)
100-
buffer_strcat(wb, "MAINTENANCE ");
10199
if(service & SERVICE_COLLECTORS)
102100
buffer_strcat(wb, "COLLECTORS ");
103101
if(service & SERVICE_REPLICATION)
104102
buffer_strcat(wb, "REPLICATION ");
105-
if(service & ABILITY_DATA_QUERIES)
106-
buffer_strcat(wb, "DATA_QUERIES ");
107103
if(service & ABILITY_WEB_REQUESTS)
108104
buffer_strcat(wb, "WEB_REQUESTS ");
109105
if(service & SERVICE_WEB_SERVER)

src/daemon/daemon-service.h

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,20 @@
66
#include "libnetdata/libnetdata.h"
77

88
typedef enum {
9-
ABILITY_DATA_QUERIES = (1 << 0),
10-
ABILITY_WEB_REQUESTS = (1 << 1),
11-
ABILITY_STREAMING_CONNECTIONS = (1 << 2),
12-
SERVICE_MAINTENANCE = (1 << 3),
13-
SERVICE_COLLECTORS = (1 << 4),
14-
SERVICE_REPLICATION = (1 << 5),
15-
SERVICE_WEB_SERVER = (1 << 6),
16-
SERVICE_ACLK = (1 << 7),
17-
SERVICE_HEALTH = (1 << 8),
18-
SERVICE_STREAMING = (1 << 9),
19-
SERVICE_STREAMING_CONNECTOR = (1 << 10),
20-
SERVICE_CONTEXT = (1 << 11),
21-
SERVICE_ANALYTICS = (1 << 12),
22-
SERVICE_EXPORTERS = (1 << 13),
23-
SERVICE_HTTPD = (1 << 14),
24-
SERVICE_SYSTEMD = (1 << 15),
9+
ABILITY_WEB_REQUESTS = (1 << 0),
10+
ABILITY_STREAMING_CONNECTIONS = (1 << 1),
11+
SERVICE_COLLECTORS = (1 << 2),
12+
SERVICE_REPLICATION = (1 << 3),
13+
SERVICE_WEB_SERVER = (1 << 4),
14+
SERVICE_ACLK = (1 << 5),
15+
SERVICE_HEALTH = (1 << 6),
16+
SERVICE_STREAMING = (1 << 7),
17+
SERVICE_STREAMING_CONNECTOR = (1 << 8),
18+
SERVICE_CONTEXT = (1 << 9),
19+
SERVICE_ANALYTICS = (1 << 10),
20+
SERVICE_EXPORTERS = (1 << 11),
21+
SERVICE_HTTPD = (1 << 12),
22+
SERVICE_SYSTEMD = (1 << 13),
2523
} SERVICE_TYPE;
2624

2725
typedef void (*force_quit_t)(void *data);

src/daemon/daemon-shutdown.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,13 @@ static void netdata_cleanup_and_exit(EXIT_REASON reason, bool abnormal, bool exi
204204
webrtc_close_all_connections();
205205
watcher_step_complete(WATCHER_STEP_ID_CLOSE_WEBRTC_CONNECTIONS);
206206

207-
service_signal_exit(SERVICE_MAINTENANCE | ABILITY_DATA_QUERIES | ABILITY_WEB_REQUESTS | SERVICE_ACLK |
208-
ABILITY_STREAMING_CONNECTIONS | SERVICE_SYSTEMD);
207+
service_signal_exit(ABILITY_WEB_REQUESTS | SERVICE_ACLK | ABILITY_STREAMING_CONNECTIONS | SERVICE_SYSTEMD);
209208

210209
service_signal_exit(SERVICE_EXPORTERS | SERVICE_HEALTH | SERVICE_WEB_SERVER | SERVICE_HTTPD);
211210

212211
watcher_step_complete(WATCHER_STEP_ID_DISABLE_MAINTENANCE_NEW_QUERIES_NEW_WEB_REQUESTS_NEW_STREAMING_CONNECTIONS);
213212

214-
service_wait_exit(SERVICE_MAINTENANCE | SERVICE_SYSTEMD, 5 * USEC_PER_SEC);
213+
service_wait_exit(SERVICE_SYSTEMD, 5 * USEC_PER_SEC);
215214
watcher_step_complete(WATCHER_STEP_ID_STOP_MAINTENANCE_THREAD);
216215

217216
service_wait_exit(SERVICE_EXPORTERS | SERVICE_HEALTH | SERVICE_WEB_SERVER | SERVICE_HTTPD, 3 * USEC_PER_SEC);

src/daemon/libuv_workers.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,16 @@ static void register_libuv_worker_jobs_internal(void) {
8686
// netdatacli
8787
worker_register_job_name(UV_EVENT_SCHEDULE_CMD, "schedule command");
8888

89+
// maintenance
90+
worker_register_job_name(UV_EVENT_CLEANUP_OBSOLETE_CHARTS, "cleanup obsolete charts");
91+
worker_register_job_name(UV_EVENT_ARCHIVE_CHART_DIMENSIONS, "archive chart dimensions");
92+
worker_register_job_name(UV_EVENT_ARCHIVE_DIMENSION, "archive dimension");
93+
worker_register_job_name(UV_EVENT_CLEANUP_ORPHAN_HOSTS, "cleanup orphan hosts");
94+
worker_register_job_name(UV_EVENT_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS, "cleanup obsolete charts on all hosts");
95+
worker_register_job_name(UV_EVENT_FREE_HOST, "free host");
96+
worker_register_job_name(UV_EVENT_FREE_CHART, "free chart");
97+
worker_register_job_name(UV_EVENT_FREE_DIMENSION, "free dimension");
98+
8999
// make sure we have the right thread id
90100
gettid_uncached();
91101

src/daemon/libuv_workers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ enum event_loop_job {
8282
UV_EVENT_CREATE_NODE_INSTANCE,
8383
UV_EVENT_UNREGISTER_NODE,
8484

85+
// maintenance
86+
UV_EVENT_CLEANUP_OBSOLETE_CHARTS,
87+
UV_EVENT_ARCHIVE_CHART_DIMENSIONS,
88+
UV_EVENT_ARCHIVE_DIMENSION,
89+
UV_EVENT_CLEANUP_ORPHAN_HOSTS,
90+
UV_EVENT_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS,
91+
UV_EVENT_FREE_HOST,
92+
UV_EVENT_FREE_CHART,
93+
UV_EVENT_FREE_DIMENSION,
94+
8595
// netdatacli
8696
UV_EVENT_SCHEDULE_CMD,
8797
};

src/daemon/service.c

Lines changed: 11 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,6 @@
22

33
#include "common.h"
44

5-
/* Run service jobs every X seconds */
6-
#define SERVICE_HEARTBEAT 10
7-
8-
#define WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK 1
9-
#define WORKER_JOB_CLEANUP_OBSOLETE_CHARTS 2
10-
#define WORKER_JOB_ARCHIVE_CHART 3
11-
#define WORKER_JOB_ARCHIVE_CHART_DIMENSIONS 4
12-
#define WORKER_JOB_ARCHIVE_DIMENSION 5
13-
#define WORKER_JOB_CLEANUP_ORPHAN_HOSTS 6
14-
#define WORKER_JOB_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS 7
15-
#define WORKER_JOB_FREE_HOST 9
16-
#define WORKER_JOB_FREE_CHART 12
17-
#define WORKER_JOB_FREE_DIMENSION 15
18-
#define WORKER_JOB_PGC_MAIN_EVICT 16
19-
#define WORKER_JOB_PGC_MAIN_FLUSH 17
20-
#define WORKER_JOB_PGC_OPEN_EVICT 18
21-
#define WORKER_JOB_PGC_OPEN_FLUSH 19
22-
235
static bool svc_rrddim_obsolete_to_archive(RRDDIM *rd) {
246
RRDSET *st = rd->rrdset;
257

@@ -32,7 +14,7 @@ static bool svc_rrddim_obsolete_to_archive(RRDDIM *rd) {
3214
else
3315
return false;
3416

35-
worker_is_busy(WORKER_JOB_ARCHIVE_DIMENSION);
17+
worker_is_busy(UV_EVENT_ARCHIVE_DIMENSION);
3618

3719
if (rd->rrd_memory_mode == RRD_DB_MODE_DBENGINE) {
3820
if (!rrddim_finalize_collection_and_check_retention(rd)) {
@@ -41,7 +23,7 @@ static bool svc_rrddim_obsolete_to_archive(RRDDIM *rd) {
4123
}
4224
}
4325

44-
worker_is_busy(WORKER_JOB_FREE_DIMENSION);
26+
worker_is_busy(UV_EVENT_FREE_DIMENSION);
4527
rrddim_free(st, rd);
4628
return true;
4729
}
@@ -50,7 +32,7 @@ static inline bool svc_rrdset_archive_obsolete_dimensions(RRDSET *st, bool all_d
5032
if(!all_dimensions && !rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS))
5133
return true;
5234

53-
worker_is_busy(WORKER_JOB_ARCHIVE_CHART_DIMENSIONS);
35+
worker_is_busy(UV_EVENT_ARCHIVE_CHART_DIMENSIONS);
5436

5537
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE_DIMENSIONS);
5638

@@ -69,14 +51,9 @@ static inline bool svc_rrdset_archive_obsolete_dimensions(RRDSET *st, bool all_d
6951
if(rd->collector.last_collected_time.tv_sec + rrdset_free_obsolete_time_s < now) {
7052
size_t references = dictionary_acquired_item_references(rd_dfe.item);
7153
if(references == 1) {
72-
// netdata_log_info("Removing obsolete dimension 'host:%s/chart:%s/dim:%s'",
73-
// rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
7454
if(svc_rrddim_obsolete_to_archive(rd))
7555
dim_archives++;
7656
}
77-
// else
78-
// netdata_log_info("Cannot remove obsolete dimension 'host:%s/chart:%s/dim:%s'",
79-
// rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
8057
}
8158
}
8259
}
@@ -109,7 +86,7 @@ static inline void svc_rrdhost_cleanup_charts_marked_obsolete(RRDHOST *host) {
10986
if(!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS|RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS))
11087
return;
11188

112-
worker_is_busy(WORKER_JOB_CLEANUP_OBSOLETE_CHARTS);
89+
worker_is_busy(UV_EVENT_CLEANUP_OBSOLETE_CHARTS);
11390

11491
rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS|RRDHOST_FLAG_PENDING_OBSOLETE_DIMENSIONS);
11592

@@ -140,7 +117,7 @@ static inline void svc_rrdhost_cleanup_charts_marked_obsolete(RRDHOST *host) {
140117
if(svc_rrdset_archive_obsolete_dimensions(st, true)) {
141118
full_archives++;
142119

143-
worker_is_busy(WORKER_JOB_FREE_CHART);
120+
worker_is_busy(UV_EVENT_FREE_CHART);
144121
rrdset_free(st);
145122
}
146123
else
@@ -170,16 +147,12 @@ void svc_rrdhost_obsolete_all_charts(RRDHOST *host) {
170147
}
171148

172149
static void svc_rrd_cleanup_obsolete_charts_from_all_hosts() {
173-
worker_is_busy(WORKER_JOB_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS);
150+
worker_is_busy(UV_EVENT_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS);
174151

175152
rrd_rdlock();
176153

177154
RRDHOST *host;
178155
rrdhost_foreach_read(host) {
179-
180-
if (!service_running(SERVICE_MAINTENANCE))
181-
break;
182-
183156
if(rrdhost_receiver_replicating_charts(host) || rrdhost_sender_replicating_charts(host))
184157
continue;
185158

@@ -205,7 +178,7 @@ static void svc_rrd_cleanup_obsolete_charts_from_all_hosts() {
205178
}
206179

207180
static void svc_rrdhost_cleanup_orphan_hosts(RRDHOST *protected_host) {
208-
worker_is_busy(WORKER_JOB_CLEANUP_ORPHAN_HOSTS);
181+
worker_is_busy(UV_EVENT_CLEANUP_ORPHAN_HOSTS);
209182

210183
time_t now = now_realtime_sec();
211184

@@ -233,7 +206,7 @@ static void svc_rrdhost_cleanup_orphan_hosts(RRDHOST *protected_host) {
233206
continue;
234207
}
235208

236-
worker_is_busy(WORKER_JOB_FREE_HOST);
209+
worker_is_busy(UV_EVENT_FREE_HOST);
237210

238211
if (delete) {
239212
netdata_log_info("Host '%s' with machine guid '%s' is archived, ephemeral clean up.", rrdhost_hostname(host), host->machine_guid);
@@ -248,62 +221,7 @@ static void svc_rrdhost_cleanup_orphan_hosts(RRDHOST *protected_host) {
248221
rrd_wrunlock();
249222
}
250223

251-
static void service_main_cleanup(void *pptr)
252-
{
253-
struct netdata_static_thread *static_thread = CLEANUP_FUNCTION_GET_PTR(pptr);
254-
if(!static_thread) return;
255-
256-
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
257-
258-
worker_unregister();
259-
260-
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
261-
}
262-
263-
/*
264-
* The service thread.
265-
*/
266-
void *service_main(void *ptr)
267-
{
268-
worker_register("SERVICE");
269-
worker_register_job_name(WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK, "child chart obsoletion check");
270-
worker_register_job_name(WORKER_JOB_CLEANUP_OBSOLETE_CHARTS, "cleanup obsolete charts");
271-
worker_register_job_name(WORKER_JOB_ARCHIVE_CHART, "archive chart");
272-
worker_register_job_name(WORKER_JOB_ARCHIVE_CHART_DIMENSIONS, "archive chart dimensions");
273-
worker_register_job_name(WORKER_JOB_ARCHIVE_DIMENSION, "archive dimension");
274-
worker_register_job_name(WORKER_JOB_CLEANUP_ORPHAN_HOSTS, "cleanup orphan hosts");
275-
worker_register_job_name(WORKER_JOB_CLEANUP_OBSOLETE_CHARTS_ON_HOSTS, "cleanup obsolete charts on all hosts");
276-
worker_register_job_name(WORKER_JOB_FREE_HOST, "free host");
277-
worker_register_job_name(WORKER_JOB_FREE_CHART, "free chart");
278-
worker_register_job_name(WORKER_JOB_FREE_DIMENSION, "free dimension");
279-
worker_register_job_name(WORKER_JOB_PGC_MAIN_EVICT, "main cache evictions");
280-
worker_register_job_name(WORKER_JOB_PGC_MAIN_FLUSH, "main cache flushes");
281-
worker_register_job_name(WORKER_JOB_PGC_OPEN_EVICT, "open cache evictions");
282-
worker_register_job_name(WORKER_JOB_PGC_OPEN_FLUSH, "open cache flushes");
283-
284-
CLEANUP_FUNCTION_REGISTER(service_main_cleanup) cleanup_ptr = ptr;
285-
286-
heartbeat_t hb;
287-
heartbeat_init(&hb, USEC_PER_SEC);
288-
usec_t step = USEC_PER_SEC * SERVICE_HEARTBEAT;
289-
usec_t real_step = USEC_PER_SEC;
290-
291-
netdata_log_debug(D_SYSTEM, "Service thread starts");
292-
293-
while (service_running(SERVICE_MAINTENANCE)) {
294-
worker_is_idle();
295-
heartbeat_next(&hb);
296-
if (real_step < step) {
297-
real_step += USEC_PER_SEC;
298-
continue;
299-
}
300-
real_step = USEC_PER_SEC;
301-
302-
svc_rrd_cleanup_obsolete_charts_from_all_hosts();
303-
304-
if (service_running(SERVICE_MAINTENANCE))
305-
svc_rrdhost_cleanup_orphan_hosts(localhost);
306-
}
307-
308-
return NULL;
224+
void run_maintenace() {
225+
svc_rrd_cleanup_obsolete_charts_from_all_hosts();
226+
svc_rrdhost_cleanup_orphan_hosts(localhost);
309227
}

src/daemon/static_threads.c

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,6 @@ const struct netdata_static_thread static_threads_common[] = {
9797
.init_routine = NULL,
9898
.start_routine = pluginsd_main
9999
},
100-
{
101-
.name = "SERVICE",
102-
.config_section = NULL,
103-
.config_name = NULL,
104-
.enabled = 1,
105-
.thread = NULL,
106-
.init_routine = NULL,
107-
.start_routine = service_main
108-
},
109100
{
110101
.name = "STATSD_FLUSH",
111102
.config_section = NULL,

src/database/sqlite/sqlite_metadata.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2429,11 +2429,21 @@ static void store_hosts_metadata(struct meta_config_s *config, BUFFER *work_buff
24292429
}
24302430
}
24312431

2432+
#define SERVICE_HEARTBEAT 10
2433+
void run_maintenace();
2434+
24322435
// Worker thread to scan hosts for pending metadata to store
24332436
static void start_metadata_hosts(uv_work_t *req)
24342437
{
2438+
static time_t next_maintenance_check = 0;
24352439
register_libuv_worker_jobs();
24362440

2441+
time_t now = now_realtime_sec();
2442+
if (now> next_maintenance_check) {
2443+
run_maintenace();
2444+
next_maintenance_check = now + SERVICE_HEARTBEAT;
2445+
}
2446+
24372447
worker_data_t *worker = req->data;
24382448
struct meta_config_s *config = worker->config;
24392449

0 commit comments

Comments
 (0)