-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Harden data races between threaded input workers and the main engine #12007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9c9338c
ce41a50
57d54ca
144901f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ | ||
|
|
||
| /* Fluent Bit | ||
| * ========== | ||
| * Copyright (C) 2015-2026 The Fluent Bit Authors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #ifndef FLB_ATOMIC_H | ||
| #define FLB_ATOMIC_H | ||
|
|
||
| /* | ||
| * Minimal relaxed-atomic helpers for scalar values that are read and written | ||
| * by more than one thread (e.g. counters and one-shot status fields shared | ||
| * between a threaded input worker and the main engine). | ||
| * | ||
| * Aligned word-sized loads/stores are already atomic on every platform Fluent | ||
| * Bit targets, but accessing them from multiple threads with plain operators is | ||
| * a C-level data race (undefined behavior, and flagged by ThreadSanitizer). | ||
| * These helpers make such accesses well defined. Relaxed ordering is used on | ||
| * purpose: callers only require atomicity of the individual value, not ordering | ||
| * relative to other memory (for ordered hand-offs use a mutex instead). | ||
| * | ||
| * The helpers are type-generic (int, size_t, uint64_t, ...). | ||
| */ | ||
|
|
||
| #if defined(__GNUC__) || defined(__clang__) | ||
|
|
||
| #define flb_atomic_load(ptr) __atomic_load_n((ptr), __ATOMIC_RELAXED) | ||
| #define flb_atomic_store(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_RELAXED) | ||
| #define flb_atomic_fetch_add(ptr, v) __atomic_fetch_add((ptr), (v), __ATOMIC_RELAXED) | ||
|
|
||
| #else | ||
|
|
||
| /* | ||
| * Fallback for compilers without the GCC/Clang atomic builtins. The accesses | ||
| * stay plain; on the hardware these compilers target an aligned word access is | ||
| * atomic, and the races guarded here are benign, so correctness is preserved. | ||
| */ | ||
| #define flb_atomic_load(ptr) (*(ptr)) | ||
| #define flb_atomic_store(ptr, val) (*(ptr) = (val)) | ||
| #define flb_atomic_fetch_add(ptr, v) (*(ptr) += (v)) | ||
|
|
||
| #endif | ||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| #include <fluent-bit/flb_utils.h> | ||
| #include <fluent-bit/flb_config.h> | ||
| #include <fluent-bit/flb_engine.h> | ||
| #include <fluent-bit/flb_atomic.h> | ||
| #include <fluent-bit/flb_event.h> | ||
| #include <fluent-bit/flb_engine_dispatch.h> | ||
| #include <fluent-bit/flb_network.h> | ||
|
|
@@ -1134,7 +1135,8 @@ int flb_engine_start(struct flb_config *config) | |
| return -2; | ||
| } | ||
|
|
||
| config->grace_input = config->grace / 2; | ||
| /* published to the supervisor/main thread, which reads it concurrently */ | ||
| flb_atomic_store(&config->grace_input, config->grace / 2); | ||
|
Comment on lines
+1138
to
+1139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🩺 Stability & Availability | 🟡 Minor | ⚡ Quick win 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -eu
# Inspect whether flb_engine_started() is the synchronization point that allows
# flb_start() / the main thread to continue.
rg -n -C5 '\bflb_engine_started\s*\(|\bflb_start\s*\(' src includeRepository: fluent/fluent-bit Length of output: 5072 🏁 Script executed: #!/bin/bash
set -eu
# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.cRepository: fluent/fluent-bit Length of output: 5961 🏁 Script executed: #!/bin/bash
set -eu
# Inspect the startup handshake and the supervisor-side read of grace_input.
sed -n '760,820p' src/flb_engine.c
printf '\n---\n'
sed -n '1,120p' src/flb_lib.c
printf '\n---\n'
sed -n '1458,1485p' src/fluent-bit.cRepository: fluent/fluent-bit Length of output: 5961 🏁 Script executed: #!/bin/bash
set -eu
# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.cRepository: fluent/fluent-bit Length of output: 13207 🏁 Script executed: #!/bin/bash
set -eu
# Inspect do_start() and the library-side wait for the engine-start notification.
sed -n '880,1045p' src/flb_lib.c
printf '\n---\n'
sed -n '1045,1125p' src/flb_lib.c
printf '\n---\n'
rg -n -C4 'ENGINE_STARTED|FLB_ENGINE_STARTED|ch_notif|flb_pipe_r|flb_pipe_w|started' src/flb_lib.c src/flb_engine.c src/fluent-bit.cRepository: fluent/fluent-bit Length of output: 13207 Move the 🤖 Prompt for AI Agents |
||
| flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); | ||
|
|
||
| while (1) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -60,6 +60,7 @@ | |||||||||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_reload.h> | ||||||||||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_config_format.h> | ||||||||||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_supervisor.h> | ||||||||||||||||||||||||||||||||||||||||
| #include <fluent-bit/flb_atomic.h> | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| #ifdef FLB_HAVE_MTRACE | ||||||||||||||||||||||||||||||||||||||||
| #include <mcheck.h> | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1473,8 +1474,9 @@ static int flb_main_run(int argc, char **argv) | |||||||||||||||||||||||||||||||||||||||
| ctx = flb_context_get(); | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if (ctx != NULL && ctx->config != NULL) { | ||||||||||||||||||||||||||||||||||||||||
| /* grace_input is published by the engine thread (flb_engine_start) */ | ||||||||||||||||||||||||||||||||||||||||
| flb_supervisor_child_update_grace(ctx->config->grace, | ||||||||||||||||||||||||||||||||||||||||
| ctx->config->grace_input); | ||||||||||||||||||||||||||||||||||||||||
| flb_atomic_load(&ctx->config->grace_input)); | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+1477
to
+1479
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win Use atomic loads for every supervisor grace publication. This path is fixed, but the hot-reload paths later in this function still read Suggested consistency fix@@
struct flb_cf_section *section;
struct flb_cf *cf_opts;
struct flb_cf_group *group = NULL;
int supervisor_reload_notified = FLB_FALSE;
+ int grace_input;
@@
if (ctx != NULL && ctx->config != NULL) {
/* grace_input is published by the engine thread (flb_engine_start) */
- flb_supervisor_child_update_grace(ctx->config->grace,
- flb_atomic_load(&ctx->config->grace_input));
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_update_grace(ctx->config->grace, grace_input);
@@
- flb_supervisor_child_signal_shutdown(ctx->config->grace,
- ctx->config->grace_input);
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_signal_shutdown(ctx->config->grace, grace_input);
@@
- flb_supervisor_child_update_grace(ctx->config->grace,
- ctx->config->grace_input);
+ grace_input = flb_atomic_load(&ctx->config->grace_input);
+ flb_supervisor_child_update_grace(ctx->config->grace, grace_input);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| #ifdef FLB_HAVE_CHUNK_TRACE | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 50374
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 16002
🏁 Script executed:
Repository: fluent/fluent-bit
Length of output: 15416
Replace the plain-access fallback with a real atomic backend or a build error.
flb_atomic_load/store/fetch_add()are used on shared fields across threads, so the#elsebranch reintroduces a data race and makesfetch_adda non-atomic RMW on non-GCC/Clang compilers such as MSVC.🤖 Prompt for AI Agents