input: fix threaded input conditional routing for logs#11783
input: fix threaded input conditional routing for logs#11783
Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds ring-buffer flags and local/threaded append/enqueue APIs, routes flagged log ring-buffer items to a processed-log append path, refactors input-log append splitting for local vs threaded appends, strengthens OTLP grouping/validation, and adds integration scenarios, tests, and test-harness port allocation. ChangesThreaded Conditional Routing
Sequence Diagram(s)sequenceDiagram
participant Input as Threaded Input
participant Ring as RingBuffer
participant Collector as RingBufferCollector
participant Log as flb_input_log
participant Output as OutputPlugin
Input->>Ring: enqueue item (records=0, flags)
Ring->>Collector: collector reads item
alt flags include LOG_ROUTING and event==LOG
Collector->>Log: flb_input_log_append_processed
Log->>Output: per-route payload append (local or chunked)
else
Collector->>Collector: input_chunk_append_raw (standard flow)
Collector->>Output: normal chunk write
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 3132-3142: The code unconditionally drops cr even if appending
fails; modify the calls to flb_input_log_append_processed(...) and
input_chunk_append_raw(...) to capture their return values, and if the return
indicates failure (non-zero), log an error and re-enqueue cr back onto the
collector ring using the same ring enqueue API used where chunks are originally
queued (do not free or discard cr on failure); only free/acknowledge cr when the
append succeeds.
In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py`:
- Around line 238-262: The negative "not in log_text" assertions in
test_out_stdout_threaded_input_conditional_route_matches and
test_out_stdout_threaded_input_conditional_default_route are racing against
later log lines because they use the initial snapshot from
wait_for_log_contains; after starting and stopping the Service
(Service(...).start()/stop()) capture the full final log (e.g. call a method
like service.get_logs() or service.read_all_logs() to obtain the complete log
after stop) and perform all negative assertions against that full_log instead of
the early log_text returned by wait_for_log_contains; update references in the
two test functions where log_text is used for negative checks to use full_log
(while keeping the initial wait_for_log_contains for the positive presence
checks).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b221b922-22fe-4d1f-ae55-68fad59652d9
📒 Files selected for processing (7)
include/fluent-bit/flb_input_chunk.hinclude/fluent-bit/flb_input_log.hsrc/flb_input_chunk.csrc/flb_input_log.ctests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_default_route.yamltests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_routing.yamltests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py
| if (cr->event_type == FLB_INPUT_LOGS && | ||
| (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) { | ||
| flb_input_log_append_processed(cr->ins, cr->records, | ||
| cr->tag, tag_len, | ||
| cr->buf_data, cr->buf_size); | ||
| } | ||
| else { | ||
| input_chunk_append_raw(cr->ins, cr->event_type, cr->records, | ||
| cr->tag, tag_len, | ||
| cr->buf_data, cr->buf_size); | ||
| } |
There was a problem hiding this comment.
Handle append failures in ring-buffer collector to avoid silent data loss.
The collector discards cr regardless of append result. If append fails, dequeued records are lost without explicit failure handling.
🛠️ Suggested fix
void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
{
int ret;
+ int append_ret;
int tag_len = 0;
@@
- if (cr->event_type == FLB_INPUT_LOGS &&
- (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) {
- flb_input_log_append_processed(cr->ins, cr->records,
- cr->tag, tag_len,
- cr->buf_data, cr->buf_size);
- }
- else {
- input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
- cr->tag, tag_len,
- cr->buf_data, cr->buf_size);
- }
+ if (cr->event_type == FLB_INPUT_LOGS &&
+ (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) {
+ append_ret = flb_input_log_append_processed(cr->ins, cr->records,
+ cr->tag, tag_len,
+ cr->buf_data, cr->buf_size);
+ }
+ else {
+ append_ret = input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
+ cr->tag, tag_len,
+ cr->buf_data, cr->buf_size);
+ }
+
+ if (append_ret != 0) {
+ flb_plg_error(ins, "failed to append dequeued ring-buffer payload");
+ }
destroy_chunk_raw(cr);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/flb_input_chunk.c` around lines 3132 - 3142, The code unconditionally
drops cr even if appending fails; modify the calls to
flb_input_log_append_processed(...) and input_chunk_append_raw(...) to capture
their return values, and if the return indicates failure (non-zero), log an
error and re-enqueue cr back onto the collector ring using the same ring enqueue
API used where chunks are originally queued (do not free or discard cr on
failure); only free/acknowledge cr when the append succeeds.
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/flb_input_log.c`:
- Around line 1585-1592: Threaded inputs are using the enqueue-path guarded by
flb_input_is_threaded and input_has_conditional_routes but
input_has_conditional_routes also returns true for route->per_record_routing
even though the splitter only builds payloads when route->condition is present;
this lets per_record_routing records be marked handled without ever being
split/routed. Fix by excluding per-record routing from this threaded path:
change the condition around flb_input_chunk_ring_buffer_enqueue_log_routing
(where flb_input_is_threaded(ins) and input_has_conditional_routes(ins) are
checked) to additionally ensure per-record routing is not present (e.g., add a
check like "&& !input_has_per_record_routing(ins)" or modify
input_has_conditional_routes to ignore route->per_record_routing), or
alternatively implement the missing split logic used by the splitter so
route->per_record_routing paths produce payloads before calling
flb_input_chunk_ring_buffer_enqueue_log_routing; update references to
flb_input_is_threaded, input_has_conditional_routes,
flb_input_chunk_ring_buffer_enqueue_log_routing, and route->per_record_routing
accordingly.
- Around line 1073-1110: The code clears the original routes mask then returns
when has_routes == FLB_FALSE, leaving the just-appended base chunk with no
active routes; change the early-return to remove or skip that base chunk so it
isn't persisted with empty routing: when has_routes is FLB_FALSE call the chunk
cleanup routine (e.g. flb_input_chunk_destroy or the module's chunk release
function) for the chunk created earlier instead of returning 0, and ensure you
do not call flb_input_chunk_get_real_size,
flb_input_chunk_update_output_instances or
input_chunk_write_direct_route_metadata for that chunk; locate this logic around
the memset of chunk->routes_mask, the has_routes flag, flb_routes_mask_set_bit,
and the subsequent chunk_size/metadata calls to apply the fix.
In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py`:
- Around line 138-141: The helper _assert_stdout_tag_contains currently only
checks the tag exists and that needles appear somewhere in the whole log; change
it to locate the specific stdout block for the given tag (find the tag header
matching f"] {tag}:"), then extract the substring from that header to the next
stdout header (or end of log) and assert each needle occurs within that
extracted block so you validate the payload is emitted under the correct tag
rather than anywhere in the full log.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c567d946-0a71-41dd-a8a3-0dc761e84c97
📒 Files selected for processing (4)
src/flb_input_log.ctests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yamltests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_threaded.yamltests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py
✅ Files skipped from review due to trivial changes (1)
- tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml
| mask_size = flb_routes_mask_get_size(ins->config->router); | ||
| memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * mask_size); | ||
|
|
||
| has_routes = FLB_FALSE; | ||
| cfl_list_foreach(head, &ins->routes_direct) { | ||
| route_path = cfl_list_entry(head, struct flb_router_path, _head); | ||
|
|
||
| if (!route_path->ins) { | ||
| continue; | ||
| } | ||
|
|
||
| if (route_path->route && | ||
| (route_path->route->condition || route_path->route->per_record_routing)) { | ||
| continue; | ||
| } | ||
|
|
||
| flb_routes_mask_set_bit(chunk->routes_mask, | ||
| route_path->ins->id, | ||
| ins->config->router); | ||
| has_routes = FLB_TRUE; | ||
| } | ||
|
|
||
| if (has_routes == FLB_FALSE) { | ||
| return 0; | ||
| } | ||
|
|
||
| if (chunk_size == -1) { | ||
| chunk_size = flb_input_chunk_get_real_size(chunk); | ||
| if (chunk_size > 0) { | ||
| chunk_size_sz = (size_t) chunk_size; | ||
| } | ||
| } | ||
|
|
||
| if (chunk_size_sz > 0) { | ||
| flb_input_chunk_update_output_instances(chunk, chunk_size_sz); | ||
| } | ||
|
|
||
| return input_chunk_write_direct_route_metadata(ins, chunk); |
There was a problem hiding this comment.
Don't leave an empty-route base chunk behind.
After memset() clears the original mask, this returns success when has_routes == FLB_FALSE without removing the just-appended base chunk. For inputs whose direct routes are all conditional, that leaves a persisted chunk with no active routes, so it cannot be scheduled and its stored route metadata/accounting can drift from the live state. Please destroy or skip the base chunk when no non-conditional routes remain. As per coding guidelines, "Final chunk release happens only when all active routes are resolved" and "Backlog-loaded chunks must preserve route state and accounting parity with live-ingested chunks."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/flb_input_log.c` around lines 1073 - 1110, The code clears the original
routes mask then returns when has_routes == FLB_FALSE, leaving the just-appended
base chunk with no active routes; change the early-return to remove or skip that
base chunk so it isn't persisted with empty routing: when has_routes is
FLB_FALSE call the chunk cleanup routine (e.g. flb_input_chunk_destroy or the
module's chunk release function) for the chunk created earlier instead of
returning 0, and ensure you do not call flb_input_chunk_get_real_size,
flb_input_chunk_update_output_instances or
input_chunk_write_direct_route_metadata for that chunk; locate this logic around
the memset of chunk->routes_mask, the has_routes flag, flb_routes_mask_set_bit,
and the subsequent chunk_size/metadata calls to apply the fix.
| if (flb_input_is_threaded(ins) == FLB_TRUE && | ||
| input_has_conditional_routes(ins) == FLB_TRUE) { | ||
| ret = flb_input_chunk_ring_buffer_enqueue_log_routing(ins, | ||
| FLB_INPUT_LOGS, | ||
| records, | ||
| tag, tag_len, | ||
| out_buf, out_size); | ||
| } |
There was a problem hiding this comment.
per_record_routing is being sent through a path that never builds payloads.
input_has_conditional_routes() also returns true for route->per_record_routing, so threaded inputs with that flag now take this enqueue path. But the splitter only creates route payloads when route->condition is present, which means per_record_routing records can be marked as handled here and then have their base route bits stripped later without ever producing a routed chunk. Please exclude per_record_routing from this new threaded path until it is explicitly supported here, or add the missing split logic.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/flb_input_log.c` around lines 1585 - 1592, Threaded inputs are using the
enqueue-path guarded by flb_input_is_threaded and input_has_conditional_routes
but input_has_conditional_routes also returns true for route->per_record_routing
even though the splitter only builds payloads when route->condition is present;
this lets per_record_routing records be marked handled without ever being
split/routed. Fix by excluding per-record routing from this threaded path:
change the condition around flb_input_chunk_ring_buffer_enqueue_log_routing
(where flb_input_is_threaded(ins) and input_has_conditional_routes(ins) are
checked) to additionally ensure per-record routing is not present (e.g., add a
check like "&& !input_has_per_record_routing(ins)" or modify
input_has_conditional_routes to ignore route->per_record_routing), or
alternatively implement the missing split logic used by the splitter so
route->per_record_routing paths produce payloads before calling
flb_input_chunk_ring_buffer_enqueue_log_routing; update references to
flb_input_is_threaded, input_has_conditional_routes,
flb_input_chunk_ring_buffer_enqueue_log_routing, and route->per_record_routing
accordingly.
| def _assert_stdout_tag_contains(log_text, tag, *needles): | ||
| assert f"] {tag}:" in log_text, f"Could not find stdout tag {tag!r}" | ||
| for needle in needles: | ||
| assert needle in log_text |
There was a problem hiding this comment.
Scope the helper to the matching stdout emission.
_assert_stdout_tag_contains() currently proves that the tag exists somewhere and that each needle exists somewhere in the full log. In the new corner-case test, that can pass even when the payload only appears under the base tag instead of the routed tag being asserted. Match a per-tag stdout block/window and assert the needles inside that block.
Possible tightening
def _assert_stdout_tag_contains(log_text, tag, *needles):
- assert f"] {tag}:" in log_text, f"Could not find stdout tag {tag!r}"
- for needle in needles:
- assert needle in log_text
+ lines = log_text.splitlines()
+
+ for index, line in enumerate(lines):
+ if f"] {tag}:" not in line:
+ continue
+
+ window = "\n".join(lines[index : index + 3])
+ if all(needle in window for needle in needles):
+ return
+
+ raise AssertionError(f"Could not find stdout payload for tag {tag!r}")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py` around
lines 138 - 141, The helper _assert_stdout_tag_contains currently only checks
the tag exists and that needles appear somewhere in the whole log; change it to
locate the specific stdout block for the given tag (find the tag header matching
f"] {tag}:"), then extract the substring from that header to the next stdout
header (or end of log) and assert each needle occurs within that extracted block
so you validate the payload is emitted under the correct tag rather than
anywhere in the full log.
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/in_opentelemetry/opentelemetry_logs.c`:
- Around line 482-483: The scope map gate currently only checks scope->name,
scope->version, scope->n_attributes and scope_has_schema_url, which lets a scope
with only scope->dropped_attributes_count set fall through to the empty-map
branch; update the conditional that guards the scope-map branch (the if that
references scope and scope->name/version/n_attributes/scope_has_schema_url) to
also test scope->dropped_attributes_count (e.g., treat any non-zero
dropped_attributes_count as a populated field) so scopes with only
dropped_attributes_count are handled by the scope-map logic.
In `@plugins/out_opentelemetry/opentelemetry_logs.c`:
- Around line 175-197: The helper msgpack_object_hash (and the other two similar
helpers in the other blocks) must not return 0 on msgpack_pack_* failure because
callers treat 0 as a valid hash; change these helpers to return a dedicated
error sentinel (e.g., UINT64_MAX) when
msgpack_pack_object()/msgpack_pack_array() fails, and update otel_process_logs()
to check for that sentinel and treat it as a serialization/allocation error
(skip grouping/propagate error or trigger retry) instead of merging under hash
0; make sure to include the appropriate header for UINT64_MAX and adjust all
call sites to handle the sentinel.
In
`@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py`:
- Around line 389-395: The helper _log_payloads_by_request_path is incorrectly
pairing logs_seen and requests_seen by list index which is flaky; instead parse
each entry in logs_seen (using json_format.MessageToJson on each log) to extract
the request path or identifier and build a lookup dict keyed by that path, then
iterate requests_seen and pull the matching decoded log from that lookup
(asserting the path exists) so pairing is done by request.path not by index;
update references in _log_payloads_by_request_path to perform this match-by-path
logic rather than enumerate pairing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fc4cfd04-f54d-4b80-8105-b0abd6a759b2
📒 Files selected for processing (6)
plugins/in_opentelemetry/opentelemetry_logs.cplugins/out_opentelemetry/opentelemetry_logs.csrc/opentelemetry/flb_opentelemetry_logs.ctests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.pytests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.pytests/integration/src/utils/test_service.py
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py (1)
433-435: 💤 Low value
zip()withoutstrict=True— though guarded by the preceding length assertion.Line 433 already asserts
len(scope_logs) == len(scopes), so silent truncation cannot happen in practice. Still, addingstrict=Truemakes the intent explicit and satisfies the Ruff B905 warning.♻️ Proposed fix
- for scope_log, expected in zip(scope_logs, scopes): + for scope_log, expected in zip(scope_logs, scopes, strict=True):🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py` around lines 433 - 435, The loop using zip over scope_logs and scopes should be made explicit to avoid silent truncation: update the for loop that iterates "for scope_log, expected in zip(scope_logs, scopes)" to pass the strict=True argument to zip so the lengths must match (this addresses Ruff B905); keep the preceding length assertion or remove it if redundant, and ensure the change is applied where scope_logs and scopes are zipped in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py`:
- Around line 433-435: The loop using zip over scope_logs and scopes should be
made explicit to avoid silent truncation: update the for loop that iterates "for
scope_log, expected in zip(scope_logs, scopes)" to pass the strict=True argument
to zip so the lengths must match (this addresses Ruff B905); keep the preceding
length assertion or remove it if redundant, and ensure the change is applied
where scope_logs and scopes are zipped in the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bee3a56b-fcdf-4065-bf03-f948fbfb3e33
📒 Files selected for processing (3)
plugins/in_opentelemetry/opentelemetry_logs.cplugins/out_opentelemetry/opentelemetry_logs.ctests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py
🚧 Files skipped from review as they are similar to previous changes (2)
- plugins/in_opentelemetry/opentelemetry_logs.c
- plugins/out_opentelemetry/opentelemetry_logs.c
This PR started by enabling conditional routing for threaded input plugins. Threaded inputs previously fell back to normal routing because conditional routing needs route-specific chunks and route-mask updates, while threaded log ingestion first copied data through the input ring buffer where chunk state is not available.
The PR now routes threaded conditional log buffers safely by enqueueing already-processed buffers with a routing flag. The ring-buffer collector performs the conditional split on the main ingestion side through a local append path, so route-specific chunks and route masks are updated where chunk state is owned safely.
Primary Changes
Additional Fixes Found During Validation
handling.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.