Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 188 additions & 5 deletions plugins/in_process_exporter_metrics/pe_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "pe.h"
#include "pe_utils.h"

#include <cmetrics/cmt_map.h>

#include <unistd.h>
#include <dirent.h>

Expand Down Expand Up @@ -378,6 +380,7 @@ static int process_proc_thread_io(struct flb_pe *ctx, uint64_t ts,
}

static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts,
flb_sds_t process_name,
flb_sds_t thread_name, flb_sds_t thread_id,
struct flb_slist_entry *thread)
{
Expand Down Expand Up @@ -450,7 +453,7 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts,
/* Collect the number of minor page faults per process */
if (pe_utils_str_to_uint64(status, &val) != -1) {
cmt_counter_set(ctx->thread_context_switches, ts, val,
4, (char *[]){ name, thread_name, thread_id, "voluntary_ctxt_switches" });
4, (char *[]){ process_name, thread_name, thread_id, "voluntary_ctxt_switches" });
}
flb_sds_destroy(status);
}
Expand All @@ -465,7 +468,7 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts,
/* Collect the number of minor page faults per process */
if (pe_utils_str_to_uint64(status, &val) != -1) {
cmt_counter_set(ctx->thread_context_switches, ts, val,
4, (char *[]){ name, thread_name, thread_id, "nonvoluntary_ctxt_switches" });
4, (char *[]){ process_name, thread_name, thread_id, "nonvoluntary_ctxt_switches" });
}
flb_sds_destroy(status);
}
Expand All @@ -478,12 +481,15 @@ static int process_proc_thread_status(struct flb_pe *ctx, uint64_t ts,
return 0;
}

static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid, flb_sds_t name)
static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid,
flb_sds_t name, struct flb_hash_table *active_tids,
int *active_index_complete)
{
int ret;
flb_sds_t tmp = NULL;
flb_sds_t thread_name = NULL;
flb_sds_t tid_str = NULL;
flb_sds_t active_key = NULL;
uint64_t val;
const char *pattern = "/[0-9]*";
struct mk_list *head;
Expand Down Expand Up @@ -535,6 +541,24 @@ static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid,
continue;
}

active_key = flb_sds_create(name);
if (!active_key ||
flb_sds_cat_safe(&active_key, "|", 1) == -1 ||
flb_sds_cat_safe(&active_key, thread_name, strlen(thread_name)) == -1 ||
flb_sds_cat_safe(&active_key, "|", 1) == -1 ||
flb_sds_cat_safe(&active_key, tid_str, strlen(tid_str)) == -1 ) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
*active_index_complete = FLB_FALSE;
flb_sds_destroy(active_key);
active_key = NULL;
flb_free(thread_name);
continue;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if (flb_hash_table_add(active_tids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
*active_index_complete = FLB_FALSE;
}
flb_sds_destroy(active_key);
active_key = NULL;

/* split with the close parenthesis.
* The entry of processes stat will start after that. */
tmp = strstr(entry->str, ")");
Expand Down Expand Up @@ -590,7 +614,7 @@ static int process_thread_update(struct flb_pe *ctx, uint64_t ts, flb_sds_t pid,
}

ret = process_proc_thread_status(ctx, ts,
thread_name, tid_str,
name, thread_name, tid_str,
thread);
if (ret == -1) {
goto cleanup;
Expand Down Expand Up @@ -922,6 +946,77 @@ static int process_proc_boot_time(struct flb_pe *ctx, uint64_t *out_boot_time)
return 0;
}

static char *get_metric_label_value(struct cmt_metric *metric, int index)
{
int i = 0;
struct cfl_list *head;
struct cmt_map_label *label;

cfl_list_foreach(head, &metric->labels) {
label = cfl_list_entry(head, struct cmt_map_label, _head);
if (i == index) {
return label->name;
}
i++;
}
return NULL;
}

/*
* Build a composite lookup key from labels[0..id_label_index] (inclusive),
* joined by '|'. This mirrors the key stored in active_ids so the check
* matches the exact emitted identity (name, pid[, ppid] or
* name, threadname, tid) rather than just the bare id.
*/
static void purge_stale_metrics(struct cmt_map *map,
int id_label_index,
struct flb_hash_table *active_ids)
{
int i;
char *label_val;
void *out_buf;
size_t out_size;
flb_sds_t key;
struct cfl_list *tmp;
struct cfl_list *head;
struct cmt_metric *metric;

cfl_list_foreach_safe(head, tmp, &map->metrics) {
metric = cfl_list_entry(head, struct cmt_metric, _head);
key = NULL;
for (i = 0; i <= id_label_index; i++) {
label_val = get_metric_label_value(metric, i);
if (!label_val) {
flb_sds_destroy(key);
key = NULL;
break;
}
if (!key) {
key = flb_sds_create(label_val);
if (!key) {
break;
}
}
else {
if (flb_sds_cat_safe(&key, "|", 1) == -1 ||
flb_sds_cat_safe(&key, label_val, strlen(label_val)) == -1) {
flb_sds_destroy(key);
key = NULL;
break;
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
if (!key) {
continue;
}
if (flb_hash_table_get(active_ids, key, (int) flb_sds_len(key),
&out_buf, &out_size) < 0) {
cmt_map_metric_destroy(metric);
}
flb_sds_destroy(key);
}
}

static int process_update(struct flb_pe *ctx)
{
int ret;
Expand All @@ -944,6 +1039,10 @@ static int process_update(struct flb_pe *ctx)
struct proc_state pstate;
uint64_t boot_time = 0;
int include_flag = FLB_FALSE;
struct flb_hash_table *active_pids = NULL;
struct flb_hash_table *active_tids = NULL;
int active_index_complete = FLB_TRUE;
flb_sds_t active_key = NULL;

mk_list_init(&procfs_list);

Expand All @@ -965,6 +1064,18 @@ static int process_update(struct flb_pe *ctx)
boot_time = 0;
}

active_pids = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, -1);
if (!active_pids) {
flb_slist_destroy(&procfs_list);
return -1;
}
active_tids = flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 1024, -1);
if (!active_tids) {
flb_hash_table_destroy(active_pids);
flb_slist_destroy(&procfs_list);
return -1;
}

/* PID entries */
mk_list_foreach(head, &procfs_list) {
process = mk_list_entry(head, struct flb_slist_entry, _head);
Expand Down Expand Up @@ -1018,17 +1129,35 @@ static int process_update(struct flb_pe *ctx)
continue;
}

active_key = flb_sds_create(name);
if (!active_key ||
flb_sds_cat_safe(&active_key, "|", 1) == -1 ||
flb_sds_cat_safe(&active_key, pid_str, strlen(pid_str)) == -1) {
active_index_complete = FLB_FALSE;
flb_sds_destroy(active_key);
active_key = NULL;
}
else {
if (flb_hash_table_add(active_pids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) {
active_index_complete = FLB_FALSE;
}
flb_sds_destroy(active_key);
active_key = NULL;
}

mk_list_init(&split_list);

/* split with the close parenthesis.
* The entry of processes stat will start after that. */
tmp = strstr(entry->str, ")");
if (tmp == NULL) {
active_index_complete = FLB_FALSE;
goto cleanup;
}

ret = flb_slist_split_string(&split_list, tmp+2, ' ', -1);
if (ret == -1) {
active_index_complete = FLB_FALSE;
goto cleanup;
}

Expand All @@ -1041,6 +1170,24 @@ static int process_update(struct flb_pe *ctx)
entry = flb_slist_entry_get(&split_list, 1);
ppid_str = entry->str;

active_key = flb_sds_create(name);
if (!active_key ||
flb_sds_cat_safe(&active_key, "|", 1) == -1 ||
flb_sds_cat_safe(&active_key, pid_str, strlen(pid_str)) == -1 ||
flb_sds_cat_safe(&active_key, "|", 1) == -1 ||
flb_sds_cat_safe(&active_key, ppid_str, strlen(ppid_str)) == -1) {
active_index_complete = FLB_FALSE;
flb_sds_destroy(active_key);
active_key = NULL;
}
else {
if (flb_hash_table_add(active_pids, active_key, (int) flb_sds_len(active_key), NULL, 0) < 0) {
active_index_complete = FLB_FALSE;
}
flb_sds_destroy(active_key);
active_key = NULL;
}

/* State */
if (ctx->enabled_flag & METRIC_STATE) {
/* node_processes_state
Expand Down Expand Up @@ -1154,8 +1301,9 @@ static int process_update(struct flb_pe *ctx)

/* Collect the states of threads */
if (ctx->enabled_flag & METRIC_THREAD) {
ret = process_thread_update(ctx, ts, pid_str, name);
ret = process_thread_update(ctx, ts, pid_str, name, active_tids, &active_index_complete);
if (ret == -1) {
active_index_complete = FLB_FALSE;
flb_plg_debug(ctx->ins, "collect thread procfs is failed on the pid = %s", pid_str);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
Expand All @@ -1175,6 +1323,41 @@ static int process_update(struct flb_pe *ctx)

flb_slist_destroy(&procfs_list);

if (active_index_complete == FLB_TRUE) {
/* Remove metrics for processes that are no longer running.
* Metrics with ppid at label[2]: key = name:pid:ppid (id_label_index=2).
* Metrics without ppid (context_switches, thread_wchan): key = name:pid
* (id_label_index=1); active_pids also holds name:pid entries for these. */
purge_stale_metrics(ctx->cpu_seconds->map, 2, active_pids);
purge_stale_metrics(ctx->read_bytes->map, 2, active_pids);
purge_stale_metrics(ctx->write_bytes->map, 2, active_pids);
purge_stale_metrics(ctx->major_page_faults->map, 2, active_pids);
purge_stale_metrics(ctx->minor_page_faults->map, 2, active_pids);
purge_stale_metrics(ctx->memory_bytes->map, 2, active_pids);
purge_stale_metrics(ctx->open_fds->map, 2, active_pids);
purge_stale_metrics(ctx->fd_ratio->map, 2, active_pids);
purge_stale_metrics(ctx->start_time->map, 2, active_pids);
purge_stale_metrics(ctx->num_threads->map, 2, active_pids);
purge_stale_metrics(ctx->states->map, 2, active_pids);
purge_stale_metrics(ctx->context_switches->map, 1, active_pids);
purge_stale_metrics(ctx->thread_wchan->map, 1, active_pids);

/* Remove metrics for threads that are no longer running.
* Thread metrics: labels = {name, threadname, tid, ...},
* key = name:threadname:tid (id_label_index=2). */
purge_stale_metrics(ctx->thread_cpu_seconds->map, 2, active_tids);
purge_stale_metrics(ctx->thread_io_bytes->map, 2, active_tids);
purge_stale_metrics(ctx->thread_major_page_faults->map, 2, active_tids);
purge_stale_metrics(ctx->thread_minor_page_faults->map, 2, active_tids);
purge_stale_metrics(ctx->thread_context_switches->map, 2, active_tids);
} else {
flb_plg_warn(ctx->ins,
"skipping stale metric purge because active ID tracking is incomplete");
}

flb_hash_table_destroy(active_pids);
flb_hash_table_destroy(active_tids);

return 0;
}

Expand Down
Loading