Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions include/fluent-bit/flb_atomic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2026 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_ATOMIC_H
#define FLB_ATOMIC_H

/*
* Minimal relaxed-atomic helpers for scalar values that are read and written
* by more than one thread (e.g. counters and one-shot status fields shared
* between a threaded input worker and the main engine).
*
* Aligned word-sized loads/stores are already atomic on every platform Fluent
* Bit targets, but accessing them from multiple threads with plain operators is
* a C-level data race (undefined behavior, and flagged by ThreadSanitizer).
* These helpers make such accesses well defined. Relaxed ordering is used on
* purpose: callers only require atomicity of the individual value, not ordering
* relative to other memory (for ordered hand-offs use a mutex instead).
*
* The helpers are type-generic (int, size_t, uint64_t, ...).
*/

#if defined(__GNUC__) || defined(__clang__)

#define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED)
#define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED)
#define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED)

#else

/*
* Fallback for compilers without the GCC/Clang atomic builtins. The accesses
* stay plain; on the hardware these compilers target an aligned word access is
* atomic, and the races guarded here are benign, so correctness is preserved.
*/
#define flb_atomic_load(ptr) (*(ptr))
#define flb_atomic_store(ptr, val) (*(ptr) = (val))
#define flb_atomic_fetch_add(ptr, v) (*(ptr) += (v))
Comment on lines +44 to +53

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -eu

# Verify whether this repository supports non-GCC/non-Clang C compilers and
# whether a platform-specific atomic backend is needed instead of a plain fallback.
{
  fd 'CMakeLists\.txt$|.*\.cmake$' .
  fd -e h -e c -e cpp .
} | xargs -r rg -n -C2 \
  'MSVC|_MSC_VER|CMAKE_C_COMPILER_ID|stdatomic|Interlocked|__atomic|ThreadSanitizer|fsanitize=thread'

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the atomic helper header, its call sites, and any platform/compiler
# gating around atomic support.
files=$(git ls-files \
  'include/fluent-bit/flb_atomic.h' \
  'include/**/*.h' \
  'src/**/*.c' \
  'CMakeLists.txt' \
  'cmake/**/*.cmake' \
  'lib/**/*.cmake' \
  2>/dev/null || true)

printf '%s\n' "$files" | rg -n 'flb_atomic|_MSC_VER|__atomic|stdatomic|Interlocked|atomic' -n -C 2 --no-heading || true

echo '--- HEADER ---'
sed -n '1,140p' include/fluent-bit/flb_atomic.h

echo '--- CALL SITES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib || true

echo '--- BUILD/GATING ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src lib || true

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- FILE ---'
sed -n '1,160p' include/fluent-bit/flb_atomic.h

echo '--- USAGES ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- WINDOWS/COMPILER SUPPORT ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src lib

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- header ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- direct usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler support / fallbacks ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|stdatomic|__atomic|Interlocked|CMAKE_C_COMPILER_ID|atomic' CMakeLists.txt cmake include src lib

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo '--- flb_atomic usages ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src lib

echo '--- compiler/build gating near fluent-bit atomics ---'
rg -n -C 2 '_MSC_VER|__clang__|__GNUC__|__atomic|stdatomic|Interlocked|atomic' CMakeLists.txt cmake include src lib | rg -n 'flb_atomic|atomic' -C 1

Repository: fluent/fluent-bit

Length of output: 50374


🏁 Script executed:

#!/bin/bash
set -eu

echo 'HEADER:'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo
echo 'USAGES:'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src | head -n 200

echo
echo 'BUILD FILES:'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID' CMakeLists.txt cmake include src | head -n 200

Repository: fluent/fluent-bit

Length of output: 16002


🏁 Script executed:

#!/bin/bash
set -eu

# Focus only on fluent-bit atomics to determine whether the plain fallback
# is actually used on shared, multi-threaded state and whether any non-GCC/
# non-Clang backend exists for this header.
echo '--- include/fluent-bit/flb_atomic.h ---'
sed -n '1,120p' include/fluent-bit/flb_atomic.h

echo
echo '--- flb_atomic call sites ---'
rg -n -C 2 'flb_atomic_(load|store|fetch_add)\s*\(' include src

echo
echo '--- fluent-bit build/compiler checks ---'
rg -n -C 2 'flb_atomic|__atomic|stdatomic|_MSC_VER|CMAKE_C_COMPILER_ID|MSVC|Interlocked' CMakeLists.txt cmake include src | head -n 200

Repository: fluent/fluent-bit

Length of output: 15416


Replace the plain-access fallback with a real atomic backend or a build error. flb_atomic_load/store/fetch_add() are used on shared fields across threads, so the #else branch reintroduces a data race and makes fetch_add a non-atomic RMW on non-GCC/Clang compilers such as MSVC.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/fluent-bit/flb_atomic.h` around lines 44 - 53, The fallback in
flb_atomic.h currently maps flb_atomic_load, flb_atomic_store, and
flb_atomic_fetch_add to plain accesses, which is unsafe for shared cross-thread
state. Update the `#else` branch to use a real atomic implementation for
non-GCC/Clang compilers, or fail the build with an explicit error if no atomic
backend is available. Keep the fix scoped to the flb_atomic_* macros so all
existing call sites get proper atomic semantics.


#endif

#endif
4 changes: 3 additions & 1 deletion src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_atomic.h>
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_engine_dispatch.h>
#include <fluent-bit/flb_network.h>
Expand Down Expand Up @@ -1134,7 +1135,8 @@ int flb_engine_start(struct flb_config *config)
return -2;
}

config->grace_input = config->grace / 2;
/* published to the supervisor/main thread, which reads it concurrently */
flb_atomic_store(&config->grace_input, config->grace / 2);
Comment on lines +1138 to +1139

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -eu

# Inspect whether flb_engine_started() is the synchronization point that allows
# flb_start() / the main thread to continue.
rg -n -C5 '\bflb_engine_started\s*\(|\bflb_start\s*\(' src include

Repository: fluent/fluent-bit

Length of output: 5072


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 5961


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 5961


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 13207


🏁 Script executed:

#!/bin/bash
set -eu

# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.c

Repository: fluent/fluent-bit

Length of output: 13207


Move the grace_input store before the startup signal. flb_start() returns only after the main thread reads FLB_ENGINE_STARTED, so src/fluent-bit.c can load a stale grace_input and publish the wrong supervisor grace window. Store it before flb_engine_started(config).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_engine.c` around lines 1138 - 1139, The grace window update in
flb_engine_started setup is happening too late, so the main thread can observe a
stale grace_input value after FLB_ENGINE_STARTED is published. Move the
flb_atomic_store for config->grace_input to occur before calling
flb_engine_started(config), keeping the update in the startup path around the
existing grace_input logic so src/fluent-bit.c reads the correct supervisor
grace window.

flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input);

while (1) {
Expand Down
11 changes: 11 additions & 0 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -2484,6 +2484,17 @@ int flb_input_collector_fd(flb_pipefd_t fd, struct flb_config *config)

mk_list_foreach(head, &config->inputs) {
ins = mk_list_entry(head, struct flb_input_instance, _head);

/*
* Collectors of a threaded input are registered and dispatched in the
* input's own thread/event loop (see flb_input_thread.c), never through
* this main-thread handler. Skipping them avoids a benign data race with
* the worker thread that concurrently initializes those collector fds.
*/
if (flb_input_is_threaded(ins)) {
continue;
}

mk_list_foreach(head_coll, &ins->collectors) {
collector = mk_list_entry(head_coll, struct flb_input_collector, _head);
if (collector->fd_event == fd) {
Expand Down
12 changes: 9 additions & 3 deletions src/flb_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_version.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_atomic.h>
#include <msgpack.h>

static int id_exists(int id, struct flb_metrics *metrics)
Expand Down Expand Up @@ -181,7 +182,12 @@ int flb_metrics_sum(int id, size_t val, struct flb_metrics *metrics)
return -1;
}

m->val += val;
/*
* The counter is summed from the owning input/output worker thread while the
* metrics exporter reads it from the main engine thread; use a relaxed
* atomic so the access is well defined (see flb_metrics_dump_values()).
*/
flb_atomic_fetch_add(&m->val, val);
return 0;
}

Expand Down Expand Up @@ -214,7 +220,7 @@ int flb_metrics_print(struct flb_metrics *metrics)

mk_list_foreach(head, &metrics->list) {
m = mk_list_entry(head, struct flb_metric, _head);
printf(", '%s' => %lu", m->title, m->val);
printf(", '%s' => %lu", m->title, (unsigned long) flb_atomic_load(&m->val));
}
printf("\n");

Expand All @@ -240,7 +246,7 @@ int flb_metrics_dump_values(char **out_buf, size_t *out_size,
m = mk_list_entry(head, struct flb_metric, _head);
msgpack_pack_str(&mp_pck, flb_sds_len(m->title));
msgpack_pack_str_body(&mp_pck, m->title, flb_sds_len(m->title));
msgpack_pack_uint64(&mp_pck, m->val);
msgpack_pack_uint64(&mp_pck, flb_atomic_load(&m->val));
}

*out_buf = mp_sbuf.data;
Expand Down
4 changes: 3 additions & 1 deletion src/fluent-bit.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include <fluent-bit/flb_reload.h>
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/flb_supervisor.h>
#include <fluent-bit/flb_atomic.h>

#ifdef FLB_HAVE_MTRACE
#include <mcheck.h>
Expand Down Expand Up @@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv)
ctx = flb_context_get();

if (ctx != NULL && ctx->config != NULL) {
/* grace_input is published by the engine thread (flb_engine_start) */
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
Comment on lines +1477 to +1479

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Use atomic loads for every supervisor grace publication.

This path is fixed, but the hot-reload paths later in this function still read ctx->config->grace_input directly at Lines 1518-1531. Those paths can reintroduce the same race when the engine thread publishes the value.

Suggested consistency fix
@@
     struct flb_cf_section *section;
     struct flb_cf *cf_opts;
     struct flb_cf_group *group = NULL;
     int supervisor_reload_notified = FLB_FALSE;
+    int grace_input;
@@
     if (ctx != NULL && ctx->config != NULL) {
         /* grace_input is published by the engine thread (flb_engine_start) */
-        flb_supervisor_child_update_grace(ctx->config->grace,
-                                          flb_atomic_load(&ctx->config->grace_input));
+        grace_input = flb_atomic_load(&ctx->config->grace_input);
+        flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
@@
-                flb_supervisor_child_signal_shutdown(ctx->config->grace,
-                                                     ctx->config->grace_input);
+                grace_input = flb_atomic_load(&ctx->config->grace_input);
+                flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
@@
-                    flb_supervisor_child_update_grace(ctx->config->grace,
-                                                      ctx->config->grace_input);
+                    grace_input = flb_atomic_load(&ctx->config->grace_input);
+                    flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* grace_input is published by the engine thread (flb_engine_start) */
flb_supervisor_child_update_grace(ctx->config->grace,
ctx->config->grace_input);
flb_atomic_load(&ctx->config->grace_input));
struct flb_cf_section *section;
struct flb_cf *cf_opts;
struct flb_cf_group *group = NULL;
int supervisor_reload_notified = FLB_FALSE;
int grace_input;
...
/* grace_input is published by the engine thread (flb_engine_start) */
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
...
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
...
grace_input = flb_atomic_load(&ctx->config->grace_input);
flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/fluent-bit.c` around lines 1477 - 1479, The supervisor grace publication
is only using an atomic load in the fixed path, but the later hot-reload paths
in the same function still read ctx->config->grace_input directly and can
reintroduce the race. Update those additional grace publication sites in the
function that handles hot reloads to use flb_atomic_load on
ctx->config->grace_input before calling flb_supervisor_child_update_grace,
keeping the access pattern consistent everywhere in this flow.

}

#ifdef FLB_HAVE_CHUNK_TRACE
Expand Down
Loading