Skip to content

input: fix threaded input conditional routing for logs#11783

Merged
edsiper merged 15 commits intomasterfrom
threaded-cond-routing
May 7, 2026
Merged

input: fix threaded input conditional routing for logs#11783
edsiper merged 15 commits intomasterfrom
threaded-cond-routing

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 7, 2026

This PR started by enabling conditional routing for threaded input plugins. Threaded inputs previously fell back to normal routing because conditional routing needs route-specific chunks and route-mask updates, while threaded log ingestion first copied data through the input ring buffer where chunk state is not available.

The PR now routes threaded conditional log buffers safely by enqueueing already-processed buffers with a routing flag. The ring-buffer collector performs the conditional split on the main ingestion side through a local append path, so route-specific chunks and route masks are updated where chunk state is owned safely.

Primary Changes

  • Adds local raw chunk append and routed-log ring-buffer enqueue APIs.
  • Routes flagged threaded log buffers through flb_input_log_append_processed().
  • Preserves input processor behavior before routing, including processor-mutated fields.
  • Adds Python integration coverage for threaded conditional match and default route fallback.
  • Replaces brittle trace-log assertions with observable stdout routing assertions.

Additional Fixes Found During Validation

  • Fixed OpenTelemetry grouped log reassembly after conditional splits so resource/scope identity does not collide across grouped OTLP inputs.
  • Preserved OTLP group metadata after conditional split, including chunks containing multiple groups.
  • Preserved resource and scope schema URLs across OTLP input/output paths.
  • Fixed native OTLP trace flag handling and tightened trace/span ID validation.
  • Fixed protobuf/json OTLP log ingestion edge cases: empty requests, null resource/scope entries, empty scopes, malformed scope entries, and scope-only schema URL
    handling.
  • Fixed ScopeLogs.schema_url leak found by strict valgrind in real grouped-routing tests.
  • Fixed scope-map encoding when only dropped_attributes_count is set.
  • Fixed output grouping hash helpers to return an explicit error sentinel on msgpack serialization failure instead of merging under hash 0.
  • Stabilized grouped OTLP integration assertions by matching decoded payloads by request path rather than list index.
  • Fixed integration harness port allocation races that caused unrelated OpenTelemetry flakes.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

edsiper added 4 commits May 6, 2026 22:25
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@edsiper edsiper requested a review from cosmo0920 as a code owner May 7, 2026 04:27
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 7, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds ring-buffer flags and local/threaded append/enqueue APIs, routes flagged log ring-buffer items to a processed-log append path, refactors input-log append splitting for local vs threaded appends, strengthens OTLP grouping/validation, and adds integration scenarios, tests, and test-harness port allocation.

Changes

Threaded Conditional Routing

Layer / File(s) Summary
Data Shape / Headers
include/fluent-bit/flb_input_chunk.h, include/fluent-bit/flb_input_log.h, src/flb_input_chunk.c
Adds flags to ring-buffer payload and declares flb_input_chunk_append_raw_local(...), flb_input_chunk_ring_buffer_enqueue_log_routing(...), and flb_input_log_append_processed(...).
Ring Buffer Enqueue / Collector
src/flb_input_chunk.c
Threaded append enqueues with records=0; new enqueue variant sets LOG_ROUTING flag; collector dispatches flagged log items to processed-log append.
Input Log Append & Routing
src/flb_input_log.c
Introduces input_log_append_processed_internal, direct-route metadata writers, input_chunk_apply_base_direct_routes; split_and_append_route_payloads gains local_append and threaded conditional path uses log-routing enqueue.
OTLP Grouping & Validation
plugins/out_opentelemetry/opentelemetry_logs.c, plugins/in_opentelemetry/opentelemetry_logs.c, src/opentelemetry/flb_opentelemetry_logs.c
Adds msgpack hashing/identity for resource/scope grouping, stricter trace/span id validation, schema_url/scope emission fixes, encoder null-check behavior, and safer cleanup.
Plugin Config Cleanup
plugins/out_opentelemetry/opentelemetry_conf.c
Removes explicit oauth2 config destroy and documents config-map ownership of strings.
Integration Tests & Test Harness
tests/integration/scenarios/..., tests/integration/.../tests/*, tests/integration/src/utils/test_service.py
Adds/updates YAML scenarios and pytest tests validating threaded/non-threaded conditional routing and grouped OTLP metadata; test harness prevents port collisions.

Sequence Diagram(s)

sequenceDiagram
  participant Input as Threaded Input
  participant Ring as RingBuffer
  participant Collector as RingBufferCollector
  participant Log as flb_input_log
  participant Output as OutputPlugin

  Input->>Ring: enqueue item (records=0, flags)
  Ring->>Collector: collector reads item
  alt flags include LOG_ROUTING and event==LOG
    Collector->>Log: flb_input_log_append_processed
    Log->>Output: per-route payload append (local or chunked)
  else
    Collector->>Collector: input_chunk_append_raw (standard flow)
    Collector->>Output: normal chunk write
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • cosmo0920
  • pwhelan

"🐇 I packed raw bits in little crates,
flags on their backs to steer their fates,
threaded hops now send each routed line,
chunks find homes in order and in time.
Hop, hop, the logs are routed fine!"

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 6.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and accurately summarizes the main change: enabling threaded input conditional routing for logs by fixing a previous limitation where it was not supported.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch threaded-cond-routing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/flb_input_chunk.c`:
- Around line 3132-3142: The code unconditionally drops cr even if appending
fails; modify the calls to flb_input_log_append_processed(...) and
input_chunk_append_raw(...) to capture their return values, and if the return
indicates failure (non-zero), log an error and re-enqueue cr back onto the
collector ring using the same ring enqueue API used where chunks are originally
queued (do not free or discard cr on failure); only free/acknowledge cr when the
append succeeds.

In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py`:
- Around line 238-262: The negative "not in log_text" assertions in
test_out_stdout_threaded_input_conditional_route_matches and
test_out_stdout_threaded_input_conditional_default_route are racing against
later log lines because they use the initial snapshot from
wait_for_log_contains; after starting and stopping the Service
(Service(...).start()/stop()) capture the full final log (e.g. call a method
like service.get_logs() or service.read_all_logs() to obtain the complete log
after stop) and perform all negative assertions against that full_log instead of
the early log_text returned by wait_for_log_contains; update references in the
two test functions where log_text is used for negative checks to use full_log
(while keeping the initial wait_for_log_contains for the positive presence
checks).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b221b922-22fe-4d1f-ae55-68fad59652d9

📥 Commits

Reviewing files that changed from the base of the PR and between 54a9ebc and 4915f8e.

📒 Files selected for processing (7)
  • include/fluent-bit/flb_input_chunk.h
  • include/fluent-bit/flb_input_log.h
  • src/flb_input_chunk.c
  • src/flb_input_log.c
  • tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_default_route.yaml
  • tests/integration/scenarios/out_stdout/config/out_stdout_threaded_conditional_routing.yaml
  • tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py

Comment thread src/flb_input_chunk.c
Comment on lines +3132 to +3142
if (cr->event_type == FLB_INPUT_LOGS &&
(cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) {
flb_input_log_append_processed(cr->ins, cr->records,
cr->tag, tag_len,
cr->buf_data, cr->buf_size);
}
else {
input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
cr->tag, tag_len,
cr->buf_data, cr->buf_size);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle append failures in ring-buffer collector to avoid silent data loss.

The collector discards cr regardless of append result. If append fails, dequeued records are lost without explicit failure handling.

🛠️ Suggested fix
 void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
 {
     int ret;
+    int append_ret;
     int tag_len = 0;
@@
-                if (cr->event_type == FLB_INPUT_LOGS &&
-                    (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) {
-                    flb_input_log_append_processed(cr->ins, cr->records,
-                                                   cr->tag, tag_len,
-                                                   cr->buf_data, cr->buf_size);
-                }
-                else {
-                    input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
-                                           cr->tag, tag_len,
-                                           cr->buf_data, cr->buf_size);
-                }
+                if (cr->event_type == FLB_INPUT_LOGS &&
+                    (cr->flags & FLB_INPUT_CHUNK_RAW_LOG_ROUTING) != 0) {
+                    append_ret = flb_input_log_append_processed(cr->ins, cr->records,
+                                                                cr->tag, tag_len,
+                                                                cr->buf_data, cr->buf_size);
+                }
+                else {
+                    append_ret = input_chunk_append_raw(cr->ins, cr->event_type, cr->records,
+                                                        cr->tag, tag_len,
+                                                        cr->buf_data, cr->buf_size);
+                }
+
+                if (append_ret != 0) {
+                    flb_plg_error(ins, "failed to append dequeued ring-buffer payload");
+                }
                 destroy_chunk_raw(cr);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_input_chunk.c` around lines 3132 - 3142, The code unconditionally
drops cr even if appending fails; modify the calls to
flb_input_log_append_processed(...) and input_chunk_append_raw(...) to capture
their return values, and if the return indicates failure (non-zero), log an
error and re-enqueue cr back onto the collector ring using the same ring enqueue
API used where chunks are originally queued (do not free or discard cr on
failure); only free/acknowledge cr when the append succeeds.

Comment thread tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py
@edsiper edsiper added this to the Fluent Bit v5.0.5 milestone May 7, 2026
edsiper added 2 commits May 6, 2026 22:39
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/flb_input_log.c`:
- Around line 1585-1592: Threaded inputs are using the enqueue-path guarded by
flb_input_is_threaded and input_has_conditional_routes but
input_has_conditional_routes also returns true for route->per_record_routing
even though the splitter only builds payloads when route->condition is present;
this lets per_record_routing records be marked handled without ever being
split/routed. Fix by excluding per-record routing from this threaded path:
change the condition around flb_input_chunk_ring_buffer_enqueue_log_routing
(where flb_input_is_threaded(ins) and input_has_conditional_routes(ins) are
checked) to additionally ensure per-record routing is not present (e.g., add a
check like "&& !input_has_per_record_routing(ins)" or modify
input_has_conditional_routes to ignore route->per_record_routing), or
alternatively implement the missing split logic used by the splitter so
route->per_record_routing paths produce payloads before calling
flb_input_chunk_ring_buffer_enqueue_log_routing; update references to
flb_input_is_threaded, input_has_conditional_routes,
flb_input_chunk_ring_buffer_enqueue_log_routing, and route->per_record_routing
accordingly.
- Around line 1073-1110: The code clears the original routes mask then returns
when has_routes == FLB_FALSE, leaving the just-appended base chunk with no
active routes; change the early-return to remove or skip that base chunk so it
isn't persisted with empty routing: when has_routes is FLB_FALSE call the chunk
cleanup routine (e.g. flb_input_chunk_destroy or the module's chunk release
function) for the chunk created earlier instead of returning 0, and ensure you
do not call flb_input_chunk_get_real_size,
flb_input_chunk_update_output_instances or
input_chunk_write_direct_route_metadata for that chunk; locate this logic around
the memset of chunk->routes_mask, the has_routes flag, flb_routes_mask_set_bit,
and the subsequent chunk_size/metadata calls to apply the fix.

In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py`:
- Around line 138-141: The helper _assert_stdout_tag_contains currently only
checks the tag exists and that needles appear somewhere in the whole log; change
it to locate the specific stdout block for the given tag (find the tag header
matching f"] {tag}:"), then extract the substring from that header to the next
stdout header (or end of log) and assert each needle occurs within that
extracted block so you validate the payload is emitted under the correct tag
rather than anywhere in the full log.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c567d946-0a71-41dd-a8a3-0dc761e84c97

📥 Commits

Reviewing files that changed from the base of the PR and between 4915f8e and 1326f37.

📒 Files selected for processing (4)
  • src/flb_input_log.c
  • tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml
  • tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_threaded.yaml
  • tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py
✅ Files skipped from review due to trivial changes (1)
  • tests/integration/scenarios/out_stdout/config/out_stdout_conditional_corner_non_threaded.yaml

Comment thread src/flb_input_log.c
Comment on lines +1073 to +1110
mask_size = flb_routes_mask_get_size(ins->config->router);
memset(chunk->routes_mask, 0, sizeof(flb_route_mask_element) * mask_size);

has_routes = FLB_FALSE;
cfl_list_foreach(head, &ins->routes_direct) {
route_path = cfl_list_entry(head, struct flb_router_path, _head);

if (!route_path->ins) {
continue;
}

if (route_path->route &&
(route_path->route->condition || route_path->route->per_record_routing)) {
continue;
}

flb_routes_mask_set_bit(chunk->routes_mask,
route_path->ins->id,
ins->config->router);
has_routes = FLB_TRUE;
}

if (has_routes == FLB_FALSE) {
return 0;
}

if (chunk_size == -1) {
chunk_size = flb_input_chunk_get_real_size(chunk);
if (chunk_size > 0) {
chunk_size_sz = (size_t) chunk_size;
}
}

if (chunk_size_sz > 0) {
flb_input_chunk_update_output_instances(chunk, chunk_size_sz);
}

return input_chunk_write_direct_route_metadata(ins, chunk);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't leave an empty-route base chunk behind.

After memset() clears the original mask, this returns success when has_routes == FLB_FALSE without removing the just-appended base chunk. For inputs whose direct routes are all conditional, that leaves a persisted chunk with no active routes, so it cannot be scheduled and its stored route metadata/accounting can drift from the live state. Please destroy or skip the base chunk when no non-conditional routes remain. As per coding guidelines, "Final chunk release happens only when all active routes are resolved" and "Backlog-loaded chunks must preserve route state and accounting parity with live-ingested chunks."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_input_log.c` around lines 1073 - 1110, The code clears the original
routes mask then returns when has_routes == FLB_FALSE, leaving the just-appended
base chunk with no active routes; change the early-return to remove or skip that
base chunk so it isn't persisted with empty routing: when has_routes is
FLB_FALSE call the chunk cleanup routine (e.g. flb_input_chunk_destroy or the
module's chunk release function) for the chunk created earlier instead of
returning 0, and ensure you do not call flb_input_chunk_get_real_size,
flb_input_chunk_update_output_instances or
input_chunk_write_direct_route_metadata for that chunk; locate this logic around
the memset of chunk->routes_mask, the has_routes flag, flb_routes_mask_set_bit,
and the subsequent chunk_size/metadata calls to apply the fix.

Comment thread src/flb_input_log.c
Comment on lines +1585 to +1592
if (flb_input_is_threaded(ins) == FLB_TRUE &&
input_has_conditional_routes(ins) == FLB_TRUE) {
ret = flb_input_chunk_ring_buffer_enqueue_log_routing(ins,
FLB_INPUT_LOGS,
records,
tag, tag_len,
out_buf, out_size);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

per_record_routing is being sent through a path that never builds payloads.

input_has_conditional_routes() also returns true for route->per_record_routing, so threaded inputs with that flag now take this enqueue path. But the splitter only creates route payloads when route->condition is present, which means per_record_routing records can be marked as handled here and then have their base route bits stripped later without ever producing a routed chunk. Please exclude per_record_routing from this new threaded path until it is explicitly supported here, or add the missing split logic.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_input_log.c` around lines 1585 - 1592, Threaded inputs are using the
enqueue-path guarded by flb_input_is_threaded and input_has_conditional_routes
but input_has_conditional_routes also returns true for route->per_record_routing
even though the splitter only builds payloads when route->condition is present;
this lets per_record_routing records be marked handled without ever being
split/routed. Fix by excluding per-record routing from this threaded path:
change the condition around flb_input_chunk_ring_buffer_enqueue_log_routing
(where flb_input_is_threaded(ins) and input_has_conditional_routes(ins) are
checked) to additionally ensure per-record routing is not present (e.g., add a
check like "&& !input_has_per_record_routing(ins)" or modify
input_has_conditional_routes to ignore route->per_record_routing), or
alternatively implement the missing split logic used by the splitter so
route->per_record_routing paths produce payloads before calling
flb_input_chunk_ring_buffer_enqueue_log_routing; update references to
flb_input_is_threaded, input_has_conditional_routes,
flb_input_chunk_ring_buffer_enqueue_log_routing, and route->per_record_routing
accordingly.

Comment on lines +138 to +141
def _assert_stdout_tag_contains(log_text, tag, *needles):
assert f"] {tag}:" in log_text, f"Could not find stdout tag {tag!r}"
for needle in needles:
assert needle in log_text
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Scope the helper to the matching stdout emission.

_assert_stdout_tag_contains() currently proves that the tag exists somewhere and that each needle exists somewhere in the full log. In the new corner-case test, that can pass even when the payload only appears under the base tag instead of the routed tag being asserted. Match a per-tag stdout block/window and assert the needles inside that block.

Possible tightening
 def _assert_stdout_tag_contains(log_text, tag, *needles):
-    assert f"] {tag}:" in log_text, f"Could not find stdout tag {tag!r}"
-    for needle in needles:
-        assert needle in log_text
+    lines = log_text.splitlines()
+
+    for index, line in enumerate(lines):
+        if f"] {tag}:" not in line:
+            continue
+
+        window = "\n".join(lines[index : index + 3])
+        if all(needle in window for needle in needles):
+            return
+
+    raise AssertionError(f"Could not find stdout payload for tag {tag!r}")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/out_stdout/tests/test_out_stdout_001.py` around
lines 138 - 141, The helper _assert_stdout_tag_contains currently only checks
the tag exists and that needles appear somewhere in the whole log; change it to
locate the specific stdout block for the given tag (find the tag header matching
f"] {tag}:"), then extract the substring from that header to the next stdout
header (or end of log) and assert each needle occurs within that extracted block
so you validate the payload is emitted under the correct tag rather than
anywhere in the full log.

edsiper added 2 commits May 6, 2026 23:00
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
edsiper added 4 commits May 7, 2026 09:28
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/in_opentelemetry/opentelemetry_logs.c`:
- Around line 482-483: The scope map gate currently only checks scope->name,
scope->version, scope->n_attributes and scope_has_schema_url, which lets a scope
with only scope->dropped_attributes_count set fall through to the empty-map
branch; update the conditional that guards the scope-map branch (the if that
references scope and scope->name/version/n_attributes/scope_has_schema_url) to
also test scope->dropped_attributes_count (e.g., treat any non-zero
dropped_attributes_count as a populated field) so scopes with only
dropped_attributes_count are handled by the scope-map logic.

In `@plugins/out_opentelemetry/opentelemetry_logs.c`:
- Around line 175-197: The helper msgpack_object_hash (and the other two similar
helpers in the other blocks) must not return 0 on msgpack_pack_* failure because
callers treat 0 as a valid hash; change these helpers to return a dedicated
error sentinel (e.g., UINT64_MAX) when
msgpack_pack_object()/msgpack_pack_array() fails, and update otel_process_logs()
to check for that sentinel and treat it as a serialization/allocation error
(skip grouping/propagate error or trigger retry) instead of merging under hash
0; make sure to include the appropriate header for UINT64_MAX and adjust all
call sites to handle the sentinel.

In
`@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py`:
- Around line 389-395: The helper _log_payloads_by_request_path is incorrectly
pairing logs_seen and requests_seen by list index which is flaky; instead parse
each entry in logs_seen (using json_format.MessageToJson on each log) to extract
the request path or identifier and build a lookup dict keyed by that path, then
iterate requests_seen and pull the matching decoded log from that lookup
(asserting the path exists) so pairing is done by request.path not by index;
update references in _log_payloads_by_request_path to perform this match-by-path
logic rather than enumerate pairing.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fc4cfd04-f54d-4b80-8105-b0abd6a759b2

📥 Commits

Reviewing files that changed from the base of the PR and between 22086e7 and 04510b6.

📒 Files selected for processing (6)
  • plugins/in_opentelemetry/opentelemetry_logs.c
  • plugins/out_opentelemetry/opentelemetry_logs.c
  • src/opentelemetry/flb_opentelemetry_logs.c
  • tests/integration/scenarios/in_opentelemetry/tests/test_in_opentelemetry_001.py
  • tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py
  • tests/integration/src/utils/test_service.py

Comment thread plugins/in_opentelemetry/opentelemetry_logs.c Outdated
Comment thread plugins/out_opentelemetry/opentelemetry_logs.c
Comment thread tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py Outdated
edsiper added 3 commits May 7, 2026 09:50
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py (1)

433-435: 💤 Low value

zip() without strict=True — though guarded by the preceding length assertion.

Line 433 already asserts len(scope_logs) == len(scopes), so silent truncation cannot happen in practice. Still, adding strict=True makes the intent explicit and satisfies the Ruff B905 warning.

♻️ Proposed fix
-    for scope_log, expected in zip(scope_logs, scopes):
+    for scope_log, expected in zip(scope_logs, scopes, strict=True):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py`
around lines 433 - 435, The loop using zip over scope_logs and scopes should be
made explicit to avoid silent truncation: update the for loop that iterates "for
scope_log, expected in zip(scope_logs, scopes)" to pass the strict=True argument
to zip so the lengths must match (this addresses Ruff B905); keep the preceding
length assertion or remove it if redundant, and ensure the change is applied
where scope_logs and scopes are zipped in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In
`@tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py`:
- Around line 433-435: The loop using zip over scope_logs and scopes should be
made explicit to avoid silent truncation: update the for loop that iterates "for
scope_log, expected in zip(scope_logs, scopes)" to pass the strict=True argument
to zip so the lengths must match (this addresses Ruff B905); keep the preceding
length assertion or remove it if redundant, and ensure the change is applied
where scope_logs and scopes are zipped in the test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: bee3a56b-fcdf-4065-bf03-f948fbfb3e33

📥 Commits

Reviewing files that changed from the base of the PR and between 04510b6 and 83b1710.

📒 Files selected for processing (3)
  • plugins/in_opentelemetry/opentelemetry_logs.c
  • plugins/out_opentelemetry/opentelemetry_logs.c
  • tests/integration/scenarios/out_opentelemetry/tests/test_out_opentelemetry_001.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • plugins/in_opentelemetry/opentelemetry_logs.c
  • plugins/out_opentelemetry/opentelemetry_logs.c

@edsiper edsiper merged commit 0512101 into master May 7, 2026
55 of 56 checks passed
@edsiper edsiper deleted the threaded-cond-routing branch May 7, 2026 16:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant