diff --git a/include/fluent-bit/flb_atomic.h b/include/fluent-bit/flb_atomic.h new file mode 100644 index 00000000000..337e54cfba0 --- /dev/null +++ b/include/fluent-bit/flb_atomic.h @@ -0,0 +1,57 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2026 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_ATOMIC_H +#define FLB_ATOMIC_H + +/* + * Minimal relaxed-atomic helpers for scalar values that are read and written + * by more than one thread (e.g. counters and one-shot status fields shared + * between a threaded input worker and the main engine). + * + * Aligned word-sized loads/stores are already atomic on every platform Fluent + * Bit targets, but accessing them from multiple threads with plain operators is + * a C-level data race (undefined behavior, and flagged by ThreadSanitizer). + * These helpers make such accesses well defined. Relaxed ordering is used on + * purpose: callers only require atomicity of the individual value, not ordering + * relative to other memory (for ordered hand-offs use a mutex instead). + * + * The helpers are type-generic (int, size_t, uint64_t, ...). + */ + +#if defined(__GNUC__) || defined(__clang__) + +#define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED) +#define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED) +#define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED) + +#else + +/* + * Fallback for compilers without the GCC/Clang atomic builtins. The accesses + * stay plain; on the hardware these compilers target an aligned word access is + * atomic, and the races guarded here are benign, so correctness is preserved. + */ +#define flb_atomic_load(ptr) (*(ptr)) +#define flb_atomic_store(ptr, val) (*(ptr) = (val)) +#define flb_atomic_fetch_add(ptr, v) (*(ptr) += (v)) + +#endif + +#endif diff --git a/src/flb_engine.c b/src/flb_engine.c index 63178da843c..2aed4f6ab7f 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -1134,7 +1135,8 @@ int flb_engine_start(struct flb_config *config) return -2; } - config->grace_input = config->grace / 2; + /* published to the supervisor/main thread, which reads it concurrently */ + flb_atomic_store(&config->grace_input, config->grace / 2); flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); while (1) { diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..2496c526869 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -2484,6 +2484,17 @@ int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config) mk_list_foreach(head, &config->inputs) { ins = mk_list_entry(head, struct flb_input_instance, _head); + + /* + * Collectors of a threaded input are registered and dispatched in the + * input's own thread/event loop (see flb_input_thread.c), never through + * this main-thread handler. Skipping them avoids a benign data race with + * the worker thread that concurrently initializes those collector fds. + */ + if (flb_input_is_threaded(ins)) { + continue; + } + mk_list_foreach(head_coll, &ins->collectors) { collector = mk_list_entry(head_coll, struct flb_input_collector, _head); if (collector->fd_event == fd) { diff --git a/src/flb_metrics.c b/src/flb_metrics.c index 4b9a17473a7..7e01ff1047e 100644 --- a/src/flb_metrics.c +++ b/src/flb_metrics.c @@ -27,6 +27,7 @@ #include #include #include +#include #include static int id_exists(int id, struct flb_metrics *metrics) @@ -181,7 +182,12 @@ int flb_metrics_sum(int id, size_t val, struct flb_metrics *metrics) return -1; } - m->val += val; + /* + * The counter is summed from the owning input/output worker thread while the + * metrics exporter reads it from the main engine thread; use a relaxed + * atomic so the access is well defined (see flb_metrics_dump_values()). + */ + flb_atomic_fetch_add(&m->val, val); return 0; } @@ -214,7 +220,7 @@ int flb_metrics_print(struct flb_metrics *metrics) mk_list_foreach(head, &metrics->list) { m = mk_list_entry(head, struct flb_metric, _head); - printf(", '%s' => %lu", m->title, m->val); + printf(", '%s' => %lu", m->title, (unsigned long) flb_atomic_load(&m->val)); } printf("\n"); @@ -240,7 +246,7 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size, m = mk_list_entry(head, struct flb_metric, _head); msgpack_pack_str(&mp_pck, flb_sds_len(m->title)); msgpack_pack_str_body(&mp_pck, m->title, flb_sds_len(m->title)); - msgpack_pack_uint64(&mp_pck, m->val); + msgpack_pack_uint64(&mp_pck, flb_atomic_load(&m->val)); } *out_buf = mp_sbuf.data; diff --git a/src/fluent-bit.c b/src/fluent-bit.c index 14e5f5682a4..f5bfcf44ee4 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -60,6 +60,7 @@ #include #include #include +#include #ifdef FLB_HAVE_MTRACE #include @@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv) ctx = flb_context_get(); if (ctx != NULL && ctx->config != NULL) { + /* grace_input is published by the engine thread (flb_engine_start) */ flb_supervisor_child_update_grace(ctx->config->grace, - ctx->config->grace_input); + flb_atomic_load(&ctx->config->grace_input)); } #ifdef FLB_HAVE_CHUNK_TRACE