Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds configurable OTLP log partitioning by resource for the Kafka output plugin: new config flag, partition data structures, msgpack/hash helpers, keyed raw producer, partitioned production flow wired into OTLP JSON/proto flush paths, config registration, and integration tests/configs. ChangesOTLP Log Partitioning by Resource
Sequence Diagram(s)sequenceDiagram
participant FlushCaller
participant Decoder
participant PartitionMap
participant Converter
participant KafkaProducer
FlushCaller->>Decoder: decode OTLP logs chunk
Decoder->>PartitionMap: compute resource hash, assign record
PartitionMap->>PartitionMap: buffer record per resource
PartitionMap->>Converter: convert each partition buffer to OTLP JSON/proto
Converter->>KafkaProducer: produce_raw_payload_with_key(payload, key)
KafkaProducer-->>FlushCaller: produce result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b5ae018aa4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| ret = produce_raw_payload_with_key(payload, | ||
| flb_sds_len(payload), | ||
| key, | ||
| key_len, | ||
| ctx); |
There was a problem hiding this comment.
Avoid retrying after partially producing partitions
When otlp_logs_partition_by_resource creates multiple resource partitions, this loop produces them one by one and immediately returns FLB_RETRY/FLB_ERROR if a later rd_kafka_produce fails (for example, the local Kafka queue becomes full). Any earlier partitions have already been enqueued, but the engine will retry the original chunk as a whole, so those resource payloads are sent again and consumers receive duplicates whenever only a suffix of the partition list fails.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 875-908: The local variable scope_id in get_otlp_group_resource is
assigned but never used, causing -Werror with -Wunused-variable; remove the
unused variable and call msgpack_map_get_int64 without capturing the value (pass
NULL if the API supports it) or, alternatively, keep the variable but explicitly
mark it used (e.g., (void)scope_id) after the call; update the call site where
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", &scope_id) to either
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", NULL) or keep
&scope_id and add (void)scope_id to suppress the unused-variable warning in
get_otlp_group_resource.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5137eee9-650d-43f2-8ff9-6ab599b64c29
📒 Files selected for processing (5)
plugins/out_kafka/kafka.cplugins/out_kafka/kafka_config.htests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yamltests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yamltests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/out_kafka/kafka.c (1)
557-579:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winBound the queue-full loop even when engine retry is disabled.
After the first partition is accepted,
allow_engine_retrybecomes false, so Lines 557-559 never fire and theQUEUE_FULLpath loops forever. If Kafka stays backed up after a partial send, this flush can block indefinitely instead of failing fast without duplicating already-enqueued partitions.Suggested fix
retry: - if (allow_engine_retry == FLB_TRUE && - ctx->queue_full_retries > 0 && - queue_full_retries >= ctx->queue_full_retries) { + if (ctx->queue_full_retries > 0 && + queue_full_retries >= ctx->queue_full_retries) { ctx->blocked = FLB_FALSE; - return FLB_RETRY; + return allow_engine_retry == FLB_TRUE ? FLB_RETRY : FLB_ERROR; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 557 - 579, The queue-full retry loop can spin forever when allow_engine_retry becomes false after a partial send; modify the QUEUE_FULL handling in the rd_kafka_produce branch to always check and increment queue_full_retries and compare against ctx->queue_full_retries (or a safe max) before retrying, regardless of allow_engine_retry, and if the max is reached set ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead of looping; update the logic around rd_kafka_produce, RD_KAFKA_RESP_ERR__QUEUE_FULL, queue_full_retries and ctx->blocked so the loop is bounded even when allow_engine_retry == FLB_FALSE.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 541-548: The code currently treats key == NULL as equivalent to
ctx->message_key, which causes partitions that should be unkeyed to inherit the
static key; change the logic so that a NULL key is left NULL for partitions that
cannot derive a resource hash: keep the existing wrapper behavior for the
default-key path, but when invoking the partitioned-send path ensure you pass
use_default_key = FLB_FALSE whenever partition->has_key != FLB_TRUE (so key
remains NULL and message_key/message_key_len are not overwritten by
ctx->message_key); update the branch that sets message_key/message_key_len (and
the call site that selects between wrapper vs partitioned path) to honor
use_default_key and partition->has_key accordingly.
---
Outside diff comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 557-579: The queue-full retry loop can spin forever when
allow_engine_retry becomes false after a partial send; modify the QUEUE_FULL
handling in the rd_kafka_produce branch to always check and increment
queue_full_retries and compare against ctx->queue_full_retries (or a safe max)
before retrying, regardless of allow_engine_retry, and if the max is reached set
ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead of looping;
update the logic around rd_kafka_produce, RD_KAFKA_RESP_ERR__QUEUE_FULL,
queue_full_retries and ctx->blocked so the loop is bounded even when
allow_engine_retry == FLB_FALSE.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
plugins/out_kafka/kafka.c (3)
1427-1432: ⚡ Quick winConfig entry — consider tightening the description.
The description is clear, but it does not mention the behavioral side-effect that
message_key/message_key_fieldare effectively superseded for OTLP log chunks when this flag is enabled (each partition is keyed by the resource-attributes hash, or unkeyed when no resource info is available). A single sentence here will save a lot of support questions.📝 Suggested wording tweak
"When using format otlp_json or otlp_proto, split OTLP log payloads by " - "resource and use a hash of the resource attributes as the Kafka message key." + "resource and use a hash of the resource attributes as the Kafka message key. " + "When enabled, this overrides 'message_key' for OTLP log chunks; partitions " + "without a derivable resource are produced without a key." },🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 1427 - 1432, Update the config description for otlp_logs_partition_by_resource in struct flb_out_kafka to explicitly state the behavioral side-effect: when enabled (for formats otlp_json/otlp_proto) OTLP log chunks are partitioned by resource and the Kafka message key is derived from a hash of the resource attributes — this effectively supersedes message_key and message_key_field for those OTLP log chunks, and if no resource information exists the messages will be unkeyed. Keep the sentence concise and add it to the existing description string.
974-1088: ⚡ Quick winConsider hoisting
default_logs_body_keysand the option setup.
default_logs_body_keysand theflb_opentelemetry_otlp_logs_optionsinitialization block are now duplicated acrossproduce_partitioned_otlp_logs(974, 1084-1088),produce_otlp_json(1165, 1176-1180), andproduce_otlp_proto(1227, 1236-1240). A small file-scope helper (or a singlestatic constarray + ainit_otlp_logs_options(...)helper) would eliminate the drift risk if the defaults ever evolve.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 974 - 1088, Hoist the duplicated default_logs_body_keys and options initialization into a single shared helper: create a file-scope static const char *default_logs_body_keys[] and an init helper (e.g., init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *opts)) that sets logs_require_otel_metadata, logs_body_keys, logs_body_key_count and logs_body_key_attributes; then replace the duplicate blocks in produce_partitioned_otlp_logs, produce_otlp_json, and produce_otlp_proto to call init_otlp_logs_options(&options) and remove their local duplicates so all three functions use the same centralized defaults.
558-560: 💤 Low valueConsider
<= 0for the unlimited-retries guard.
queue_full_retriesis documented as "0 or false = no limit". If a user passes a negative value (allowed byFLB_CONFIG_MAP_INT), the partitioned path will not apply theFLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIESceiling because the comparison is== 0, leaving the partitioned producer to spin without a fallback bound. Tightening to<= 0mirrors the documented "no limit" semantics and makes the partial-retry safety net unconditional.♻️ Suggested change
- if (queue_full_retry_limit == 0 && allow_engine_retry == FLB_FALSE) { + if (queue_full_retry_limit <= 0 && allow_engine_retry == FLB_FALSE) { queue_full_retry_limit = FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 558 - 560, The guard that sets queue_full_retry_limit only when queue_full_retry_limit == 0 should be tightened to handle negative values too: change the conditional that checks queue_full_retry_limit (together with allow_engine_retry) so it uses <= 0 instead of == 0, ensuring the FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is applied when queue_full_retry_limit is unset or negative; update the condition referencing queue_full_retry_limit, allow_engine_retry and FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1090-1148: Summarize and document the at-most-once durability
trade-off introduced by otlp_logs_partition_by_resource: explain that when
partitioning OTLP logs by resource the send loop (see
produce_raw_payload_with_key_retry_control usage) intentionally returns
FLB_ERROR (not FLB_RETRY) for failures after one or more partitions have already
been produced, to avoid duplicating already-enqueued partitions on engine
replay; update the user-facing docs/feature doc to clearly state that enabling
otlp_logs_partition_by_resource may result in partial delivery under broker
back-pressure (reduced durability compared to unpartitioned mode), describe the
observable behavior and suggested mitigations (e.g., disable partitioning or
configure topic/producer to reduce back-pressure), and reference the
partitioned-produce behavior so users can make an informed choice.
---
Nitpick comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1427-1432: Update the config description for
otlp_logs_partition_by_resource in struct flb_out_kafka to explicitly state the
behavioral side-effect: when enabled (for formats otlp_json/otlp_proto) OTLP log
chunks are partitioned by resource and the Kafka message key is derived from a
hash of the resource attributes — this effectively supersedes message_key and
message_key_field for those OTLP log chunks, and if no resource information
exists the messages will be unkeyed. Keep the sentence concise and add it to the
existing description string.
- Around line 974-1088: Hoist the duplicated default_logs_body_keys and options
initialization into a single shared helper: create a file-scope static const
char *default_logs_body_keys[] and an init helper (e.g.,
init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *opts)) that
sets logs_require_otel_metadata, logs_body_keys, logs_body_key_count and
logs_body_key_attributes; then replace the duplicate blocks in
produce_partitioned_otlp_logs, produce_otlp_json, and produce_otlp_proto to call
init_otlp_logs_options(&options) and remove their local duplicates so all three
functions use the same centralized defaults.
- Around line 558-560: The guard that sets queue_full_retry_limit only when
queue_full_retry_limit == 0 should be tightened to handle negative values too:
change the conditional that checks queue_full_retry_limit (together with
allow_engine_retry) so it uses <= 0 instead of == 0, ensuring the
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is applied when
queue_full_retry_limit is unset or negative; update the condition referencing
queue_full_retry_limit, allow_engine_retry and
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
This PR introduces a new config option to Kafka output to split OTLP log records by resource:
otlp_logs_partition_by_resourcetoout_kafkaforotlp_jsonandotlp_protolog output.Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests
Configuration