diff --git a/plugins/in_process_exporter_metrics/pe_process.c b/plugins/in_process_exporter_metrics/pe_process.c index 904d3e3ab47..713e9852a79 100644 --- a/plugins/in_process_exporter_metrics/pe_process.c +++ b/plugins/in_process_exporter_metrics/pe_process.c @@ -23,6 +23,8 @@ #include "pe.h" #include "pe_utils.h" +#include + #include #include @@ -378,6 +380,7 @@ static int process_proc_thread_io(struct flb_pe *ctx, uint64_t ts, } static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts, + flb_sds_t process_name, flb_sds_t thread_name, flb_sds_t thread_id, struct flb_slist_entry *thread) { @@ -450,7 +453,7 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts, /* Collect the number of minor page faults per process */ if (pe_utils_str_to_uint64(status, &val) != -1) { cmt_counter_set(ctx->thread_context_switches, ts, val, - 4, (char *[]){ name, thread_name, thread_id, "voluntary_ctxt_switches" }); + 4, (char *[]){ process_name, thread_name, thread_id, "voluntary_ctxt_switches" }); } flb_sds_destroy(status); } @@ -465,7 +468,7 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts, /* Collect the number of minor page faults per process */ if (pe_utils_str_to_uint64(status, &val) != -1) { cmt_counter_set(ctx->thread_context_switches, ts, val, - 4, (char *[]){ name, thread_name, thread_id, "nonvoluntary_ctxt_switches" }); + 4, (char *[]){ process_name, thread_name, thread_id, "nonvoluntary_ctxt_switches" }); } flb_sds_destroy(status); } @@ -478,12 +481,15 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts, return 0; } -static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, flb_sds_t name) +static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, + flb_sds_t name, struct flb_hash_table *active_tids, + int *active_index_complete) { int ret; flb_sds_t tmp = NULL; flb_sds_t thread_name = NULL; flb_sds_t tid_str = NULL; + flb_sds_t active_key = NULL; uint64_t val; const char *pattern = "/[0-9]*"; struct mk_list *head; @@ -535,6 +541,24 @@ static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, continue; } + active_key = flb_sds_create(name); + if (!active_key || + flb_sds_cat_safe(&active_key, "|", 1) == -1 || + flb_sds_cat_safe(&active_key, thread_name, strlen(thread_name)) == -1 || + flb_sds_cat_safe(&active_key, "|", 1) == -1 || + flb_sds_cat_safe(&active_key, tid_str, strlen(tid_str)) == -1 ) { + *active_index_complete = FLB_FALSE; + flb_sds_destroy(active_key); + active_key = NULL; + flb_free(thread_name); + continue; + } + if (flb_hash_table_add(active_tids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) { + *active_index_complete = FLB_FALSE; + } + flb_sds_destroy(active_key); + active_key = NULL; + /* split with the close parenthesis. * The entry of processes stat will start after that. */ tmp = strstr(entry->str, ")"); @@ -590,7 +614,7 @@ static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, } ret = process_proc_thread_status(ctx, ts, - thread_name, tid_str, + name, thread_name, tid_str, thread); if (ret == -1) { goto cleanup; @@ -922,6 +946,77 @@ static int process_proc_boot_time(struct flb_pe *ctx, uint64_t *out_boot_time) return 0; } +static char *get_metric_label_value(struct cmt_metric *metric, int index) +{ + int i = 0; + struct cfl_list *head; + struct cmt_map_label *label; + + cfl_list_foreach(head, &metric->labels) { + label = cfl_list_entry(head, struct cmt_map_label, _head); + if (i == index) { + return label->name; + } + i++; + } + return NULL; +} + +/* + * Build a composite lookup key from labels[0..id_label_index] (inclusive), + * joined by '|'. This mirrors the key stored in active_ids so the check + * matches the exact emitted identity (name, pid[, ppid] or + * name, threadname, tid) rather than just the bare id. + */ +static void purge_stale_metrics(struct cmt_map *map, + int id_label_index, + struct flb_hash_table *active_ids) +{ + int i; + char *label_val; + void *out_buf; + size_t out_size; + flb_sds_t key; + struct cfl_list *tmp; + struct cfl_list *head; + struct cmt_metric *metric; + + cfl_list_foreach_safe(head, tmp, &map->metrics) { + metric = cfl_list_entry(head, struct cmt_metric, _head); + key = NULL; + for (i = 0; i <= id_label_index; i++) { + label_val = get_metric_label_value(metric, i); + if (!label_val) { + flb_sds_destroy(key); + key = NULL; + break; + } + if (!key) { + key = flb_sds_create(label_val); + if (!key) { + break; + } + } + else { + if (flb_sds_cat_safe(&key, "|", 1) == -1 || + flb_sds_cat_safe(&key, label_val, strlen(label_val)) == -1) { + flb_sds_destroy(key); + key = NULL; + break; + } + } + } + if (!key) { + continue; + } + if (flb_hash_table_get(active_ids, key, (int) flb_sds_len(key), + &out_buf, &out_size) < 0) { + cmt_map_metric_destroy(metric); + } + flb_sds_destroy(key); + } +} + static int process_update(struct flb_pe *ctx) { int ret; @@ -944,6 +1039,10 @@ static int process_update(struct flb_pe *ctx) struct proc_state pstate; uint64_t boot_time = 0; int include_flag = FLB_FALSE; + struct flb_hash_table *active_pids = NULL; + struct flb_hash_table *active_tids = NULL; + int active_index_complete = FLB_TRUE; + flb_sds_t active_key = NULL; mk_list_init(&procfs_list); @@ -965,6 +1064,18 @@ static int process_update(struct flb_pe *ctx) boot_time = 0; } + active_pids = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, -1); + if (!active_pids) { + flb_slist_destroy(&procfs_list); + return -1; + } + active_tids = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, -1); + if (!active_tids) { + flb_hash_table_destroy(active_pids); + flb_slist_destroy(&procfs_list); + return -1; + } + /* PID entries */ mk_list_foreach(head, &procfs_list) { process = mk_list_entry(head, struct flb_slist_entry, _head); @@ -1018,17 +1129,35 @@ static int process_update(struct flb_pe *ctx) continue; } + active_key = flb_sds_create(name); + if (!active_key || + flb_sds_cat_safe(&active_key, "|", 1) == -1 || + flb_sds_cat_safe(&active_key, pid_str, strlen(pid_str)) == -1) { + active_index_complete = FLB_FALSE; + flb_sds_destroy(active_key); + active_key = NULL; + } + else { + if (flb_hash_table_add(active_pids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) { + active_index_complete = FLB_FALSE; + } + flb_sds_destroy(active_key); + active_key = NULL; + } + mk_list_init(&split_list); /* split with the close parenthesis. * The entry of processes stat will start after that. */ tmp = strstr(entry->str, ")"); if (tmp == NULL) { + active_index_complete = FLB_FALSE; goto cleanup; } ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1); if (ret == -1) { + active_index_complete = FLB_FALSE; goto cleanup; } @@ -1041,6 +1170,24 @@ static int process_update(struct flb_pe *ctx) entry = flb_slist_entry_get(&split_list, 1); ppid_str = entry->str; + active_key = flb_sds_create(name); + if (!active_key || + flb_sds_cat_safe(&active_key, "|", 1) == -1 || + flb_sds_cat_safe(&active_key, pid_str, strlen(pid_str)) == -1 || + flb_sds_cat_safe(&active_key, "|", 1) == -1 || + flb_sds_cat_safe(&active_key, ppid_str, strlen(ppid_str)) == -1) { + active_index_complete = FLB_FALSE; + flb_sds_destroy(active_key); + active_key = NULL; + } + else { + if (flb_hash_table_add(active_pids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) { + active_index_complete = FLB_FALSE; + } + flb_sds_destroy(active_key); + active_key = NULL; + } + /* State */ if (ctx->enabled_flag & METRIC_STATE) { /* node_processes_state @@ -1154,8 +1301,9 @@ static int process_update(struct flb_pe *ctx) /* Collect the states of threads */ if (ctx->enabled_flag & METRIC_THREAD) { - ret = process_thread_update(ctx, ts, pid_str, name); + ret = process_thread_update(ctx, ts, pid_str, name, active_tids, &active_index_complete); if (ret == -1) { + active_index_complete = FLB_FALSE; flb_plg_debug(ctx->ins, "collect thread procfs is failed on the pid = %s", pid_str); } } @@ -1175,6 +1323,41 @@ static int process_update(struct flb_pe *ctx) flb_slist_destroy(&procfs_list); + if (active_index_complete == FLB_TRUE) { + /* Remove metrics for processes that are no longer running. + * Metrics with ppid at label[2]: key = name:pid:ppid (id_label_index=2). + * Metrics without ppid (context_switches, thread_wchan): key = name:pid + * (id_label_index=1); active_pids also holds name:pid entries for these. */ + purge_stale_metrics(ctx->cpu_seconds->map, 2, active_pids); + purge_stale_metrics(ctx->read_bytes->map, 2, active_pids); + purge_stale_metrics(ctx->write_bytes->map, 2, active_pids); + purge_stale_metrics(ctx->major_page_faults->map, 2, active_pids); + purge_stale_metrics(ctx->minor_page_faults->map, 2, active_pids); + purge_stale_metrics(ctx->memory_bytes->map, 2, active_pids); + purge_stale_metrics(ctx->open_fds->map, 2, active_pids); + purge_stale_metrics(ctx->fd_ratio->map, 2, active_pids); + purge_stale_metrics(ctx->start_time->map, 2, active_pids); + purge_stale_metrics(ctx->num_threads->map, 2, active_pids); + purge_stale_metrics(ctx->states->map, 2, active_pids); + purge_stale_metrics(ctx->context_switches->map, 1, active_pids); + purge_stale_metrics(ctx->thread_wchan->map, 1, active_pids); + + /* Remove metrics for threads that are no longer running. + * Thread metrics: labels = {name, threadname, tid, ...}, + * key = name:threadname:tid (id_label_index=2). */ + purge_stale_metrics(ctx->thread_cpu_seconds->map, 2, active_tids); + purge_stale_metrics(ctx->thread_io_bytes->map, 2, active_tids); + purge_stale_metrics(ctx->thread_major_page_faults->map, 2, active_tids); + purge_stale_metrics(ctx->thread_minor_page_faults->map, 2, active_tids); + purge_stale_metrics(ctx->thread_context_switches->map, 2, active_tids); + } else { + flb_plg_warn(ctx->ins, + "skipping stale metric purge because active ID tracking is incomplete"); + } + + flb_hash_table_destroy(active_pids); + flb_hash_table_destroy(active_tids); + return 0; }