Skip to content

out_kafka: partition OTLP logs by resource#11786

Open
edsiper wants to merge 5 commits intomasterfrom
out_kafka-otlp-partition
Open

out_kafka: partition OTLP logs by resource#11786
edsiper wants to merge 5 commits intomasterfrom
out_kafka-otlp-partition

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 7, 2026

This PR introduces a new config option to Kafka output to split OTLP log records by resource:

  • Add otlp_logs_partition_by_resource to out_kafka for otlp_json and otlp_proto log output.
  • When enabled, OTLP log chunks are split into separate Kafka messages by resource.
  • Partitioned OTLP log messages use a hash of the resource attributes as the Kafka message key.
  • Default behavior remains unchanged: OTLP log resources in the same flush chunk are emitted in one Kafka message.
  • Add integration coverage for both OTLP JSON and OTLP protobuf Kafka output.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Opt-in partitioning of OTLP logs by resource when sending to Kafka (JSON and Protobuf), producing separate messages per resource and deriving message keys when available via the otlp_logs_partition_by_resource setting.
  • Tests

    • Integration tests validating partitioned OTLP log output, message splitting by resource, and message key/record correctness across formats.
  • Configuration

    • Added example configs demonstrating enabling partitioning and static message key usage.

edsiper added 2 commits May 7, 2026 16:23
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 7, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5e6c1175-70e8-4ab0-aff9-2bf74f2ed962

📥 Commits

Reviewing files that changed from the base of the PR and between 35e338a and 223f713.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/out_kafka/kafka.c

📝 Walkthrough

Walkthrough

Adds configurable OTLP log partitioning by resource for the Kafka output plugin: new config flag, partition data structures, msgpack/hash helpers, keyed raw producer, partitioned production flow wired into OTLP JSON/proto flush paths, config registration, and integration tests/configs.

Changes

OTLP Log Partitioning by Resource

Layer / File(s) Summary
Configuration Schema
plugins/out_kafka/kafka_config.h
struct flb_out_kafka extended with otlp_logs_partition_by_resource boolean.
Partition Data Structures
plugins/out_kafka/kafka.c
Adds partition struct and OTLP log options initialization to hold per-resource identity/hash, optional key, and msgpack sbuffer.
Msgpack Inspection & Hashing
plugins/out_kafka/kafka.c
Adds msgpack helpers for map/object lookup, string/int64 extraction, and stable 64-bit hashing for resource identity.
Resource-Identity Hashing
plugins/out_kafka/kafka.c
Adds helpers deriving a stable 64-bit identity from OTLP resource objects (schema_url and attributes).
Partitioned OTLP Log Producer
plugins/out_kafka/kafka.c
Buffers decoded OTLP log records per-resource, converts each partition buffer to OTLP JSON/proto, and produces each partition payload with derived key when available.
Keyed Raw Payload Producer
plugins/out_kafka/kafka.c
Refactors raw payload production to add key-aware producer and retry-control; produce_raw_payload(...) delegates when no key.
Flush Path Integration
plugins/out_kafka/kafka.c
Routes OTLP JSON and proto flush paths to produce_partitioned_otlp_logs(...) when otlp_logs_partition_by_resource is enabled.
Configuration Registration
plugins/out_kafka/kafka.c
Registers otlp_logs_partition_by_resource plugin option in config map.
Tests / Integration Configs
tests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yaml, tests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yaml, tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py
Adds two scenario YAMLs enabling partitioning and a parametrized test asserting two logs are split into two Kafka messages with non-empty derived keys and matching resource attributes.

Sequence Diagram(s)

sequenceDiagram
  participant FlushCaller
  participant Decoder
  participant PartitionMap
  participant Converter
  participant KafkaProducer
  FlushCaller->>Decoder: decode OTLP logs chunk
  Decoder->>PartitionMap: compute resource hash, assign record
  PartitionMap->>PartitionMap: buffer record per resource
  PartitionMap->>Converter: convert each partition buffer to OTLP JSON/proto
  Converter->>KafkaProducer: produce_raw_payload_with_key(payload, key)
  KafkaProducer-->>FlushCaller: produce result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • fluent/fluent-bit#11624: Builds on prior OTLP Kafka support and conversion paths extended here with partitioning and keyed production.
  • fluent/fluent-bit#11781: Implements stable hashing and resource grouping for OTLP logs; related to the resource-hash/grouping logic added here.

Suggested labels

opentelemetry

Suggested reviewers

  • cosmo0920
  • celalettin1286

Poem

🐇 I hop through msgpack, keen and spry,

I hash each resource, give keys a try,
Buffers tuck records, one per name,
To Kafka they bound — one message, one frame,
A rabbit cheers, partitions multiply!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and concisely summarizes the main change: adding resource-based partitioning for OTLP logs in the Kafka output plugin.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch out_kafka-otlp-partition

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b5ae018aa4

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugins/out_kafka/kafka.c Outdated
Comment on lines +1083 to +1087
ret = produce_raw_payload_with_key(payload,
flb_sds_len(payload),
key,
key_len,
ctx);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid retrying after partially producing partitions

When otlp_logs_partition_by_resource creates multiple resource partitions, this loop produces them one by one and immediately returns FLB_RETRY/FLB_ERROR if a later rd_kafka_produce fails (for example, the local Kafka queue becomes full). Any earlier partitions have already been enqueued, but the engine will retry the original chunk as a whole, so those resource payloads are sent again and consumers receive duplicates whenever only a suffix of the partition list fails.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 875-908: The local variable scope_id in get_otlp_group_resource is
assigned but never used, causing -Werror with -Wunused-variable; remove the
unused variable and call msgpack_map_get_int64 without capturing the value (pass
NULL if the API supports it) or, alternatively, keep the variable but explicitly
mark it used (e.g., (void)scope_id) after the call; update the call site where
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", &scope_id) to either
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", NULL) or keep
&scope_id and add (void)scope_id to suppress the unused-variable warning in
get_otlp_group_resource.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5137eee9-650d-43f2-8ff9-6ab599b64c29

📥 Commits

Reviewing files that changed from the base of the PR and between 0512101 and b5ae018.

📒 Files selected for processing (5)
  • plugins/out_kafka/kafka.c
  • plugins/out_kafka/kafka_config.h
  • tests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yaml
  • tests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yaml
  • tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py

Comment thread plugins/out_kafka/kafka.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/out_kafka/kafka.c (1)

557-579: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Bound the queue-full loop even when engine retry is disabled.

After the first partition is accepted, allow_engine_retry becomes false, so Lines 557-559 never fire and the QUEUE_FULL path loops forever. If Kafka stays backed up after a partial send, this flush can block indefinitely instead of failing fast without duplicating already-enqueued partitions.

Suggested fix
 retry:
-    if (allow_engine_retry == FLB_TRUE &&
-        ctx->queue_full_retries > 0 &&
-        queue_full_retries >= ctx->queue_full_retries) {
+    if (ctx->queue_full_retries > 0 &&
+        queue_full_retries >= ctx->queue_full_retries) {
         ctx->blocked = FLB_FALSE;
-        return FLB_RETRY;
+        return allow_engine_retry == FLB_TRUE ? FLB_RETRY : FLB_ERROR;
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_kafka/kafka.c` around lines 557 - 579, The queue-full retry loop
can spin forever when allow_engine_retry becomes false after a partial send;
modify the QUEUE_FULL handling in the rd_kafka_produce branch to always check
and increment queue_full_retries and compare against ctx->queue_full_retries (or
a safe max) before retrying, regardless of allow_engine_retry, and if the max is
reached set ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead
of looping; update the logic around rd_kafka_produce,
RD_KAFKA_RESP_ERR__QUEUE_FULL, queue_full_retries and ctx->blocked so the loop
is bounded even when allow_engine_retry == FLB_FALSE.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 541-548: The code currently treats key == NULL as equivalent to
ctx->message_key, which causes partitions that should be unkeyed to inherit the
static key; change the logic so that a NULL key is left NULL for partitions that
cannot derive a resource hash: keep the existing wrapper behavior for the
default-key path, but when invoking the partitioned-send path ensure you pass
use_default_key = FLB_FALSE whenever partition->has_key != FLB_TRUE (so key
remains NULL and message_key/message_key_len are not overwritten by
ctx->message_key); update the branch that sets message_key/message_key_len (and
the call site that selects between wrapper vs partitioned path) to honor
use_default_key and partition->has_key accordingly.

---

Outside diff comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 557-579: The queue-full retry loop can spin forever when
allow_engine_retry becomes false after a partial send; modify the QUEUE_FULL
handling in the rd_kafka_produce branch to always check and increment
queue_full_retries and compare against ctx->queue_full_retries (or a safe max)
before retrying, regardless of allow_engine_retry, and if the max is reached set
ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead of looping;
update the logic around rd_kafka_produce, RD_KAFKA_RESP_ERR__QUEUE_FULL,
queue_full_retries and ctx->blocked so the loop is bounded even when
allow_engine_retry == FLB_FALSE.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1b05d470-93bf-49fc-8696-b3a22a352fd8

📥 Commits

Reviewing files that changed from the base of the PR and between b5ae018 and bbf385b.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka.c

Comment thread plugins/out_kafka/kafka.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
plugins/out_kafka/kafka.c (3)

1427-1432: ⚡ Quick win

Config entry — consider tightening the description.

The description is clear, but it does not mention the behavioral side-effect that message_key / message_key_field are effectively superseded for OTLP log chunks when this flag is enabled (each partition is keyed by the resource-attributes hash, or unkeyed when no resource info is available). A single sentence here will save a lot of support questions.

📝 Suggested wording tweak
     "When using format otlp_json or otlp_proto, split OTLP log payloads by "
-    "resource and use a hash of the resource attributes as the Kafka message key."
+    "resource and use a hash of the resource attributes as the Kafka message key. "
+    "When enabled, this overrides 'message_key' for OTLP log chunks; partitions "
+    "without a derivable resource are produced without a key."
    },
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_kafka/kafka.c` around lines 1427 - 1432, Update the config
description for otlp_logs_partition_by_resource in struct flb_out_kafka to
explicitly state the behavioral side-effect: when enabled (for formats
otlp_json/otlp_proto) OTLP log chunks are partitioned by resource and the Kafka
message key is derived from a hash of the resource attributes — this effectively
supersedes message_key and message_key_field for those OTLP log chunks, and if
no resource information exists the messages will be unkeyed. Keep the sentence
concise and add it to the existing description string.

974-1088: ⚡ Quick win

Consider hoisting default_logs_body_keys and the option setup.

default_logs_body_keys and the flb_opentelemetry_otlp_logs_options initialization block are now duplicated across produce_partitioned_otlp_logs (974, 1084-1088), produce_otlp_json (1165, 1176-1180), and produce_otlp_proto (1227, 1236-1240). A small file-scope helper (or a single static const array + a init_otlp_logs_options(...) helper) would eliminate the drift risk if the defaults ever evolve.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_kafka/kafka.c` around lines 974 - 1088, Hoist the duplicated
default_logs_body_keys and options initialization into a single shared helper:
create a file-scope static const char *default_logs_body_keys[] and an init
helper (e.g., init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options
*opts)) that sets logs_require_otel_metadata, logs_body_keys,
logs_body_key_count and logs_body_key_attributes; then replace the duplicate
blocks in produce_partitioned_otlp_logs, produce_otlp_json, and
produce_otlp_proto to call init_otlp_logs_options(&options) and remove their
local duplicates so all three functions use the same centralized defaults.

558-560: 💤 Low value

Consider <= 0 for the unlimited-retries guard.

queue_full_retries is documented as "0 or false = no limit". If a user passes a negative value (allowed by FLB_CONFIG_MAP_INT), the partitioned path will not apply the FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling because the comparison is == 0, leaving the partitioned producer to spin without a fallback bound. Tightening to <= 0 mirrors the documented "no limit" semantics and makes the partial-retry safety net unconditional.

♻️ Suggested change
-    if (queue_full_retry_limit == 0 && allow_engine_retry == FLB_FALSE) {
+    if (queue_full_retry_limit <= 0 && allow_engine_retry == FLB_FALSE) {
         queue_full_retry_limit = FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES;
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/out_kafka/kafka.c` around lines 558 - 560, The guard that sets
queue_full_retry_limit only when queue_full_retry_limit == 0 should be tightened
to handle negative values too: change the conditional that checks
queue_full_retry_limit (together with allow_engine_retry) so it uses <= 0
instead of == 0, ensuring the FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is
applied when queue_full_retry_limit is unset or negative; update the condition
referencing queue_full_retry_limit, allow_engine_retry and
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1090-1148: Summarize and document the at-most-once durability
trade-off introduced by otlp_logs_partition_by_resource: explain that when
partitioning OTLP logs by resource the send loop (see
produce_raw_payload_with_key_retry_control usage) intentionally returns
FLB_ERROR (not FLB_RETRY) for failures after one or more partitions have already
been produced, to avoid duplicating already-enqueued partitions on engine
replay; update the user-facing docs/feature doc to clearly state that enabling
otlp_logs_partition_by_resource may result in partial delivery under broker
back-pressure (reduced durability compared to unpartitioned mode), describe the
observable behavior and suggested mitigations (e.g., disable partitioning or
configure topic/producer to reduce back-pressure), and reference the
partitioned-produce behavior so users can make an informed choice.

---

Nitpick comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1427-1432: Update the config description for
otlp_logs_partition_by_resource in struct flb_out_kafka to explicitly state the
behavioral side-effect: when enabled (for formats otlp_json/otlp_proto) OTLP log
chunks are partitioned by resource and the Kafka message key is derived from a
hash of the resource attributes — this effectively supersedes message_key and
message_key_field for those OTLP log chunks, and if no resource information
exists the messages will be unkeyed. Keep the sentence concise and add it to the
existing description string.
- Around line 974-1088: Hoist the duplicated default_logs_body_keys and options
initialization into a single shared helper: create a file-scope static const
char *default_logs_body_keys[] and an init helper (e.g.,
init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *opts)) that
sets logs_require_otel_metadata, logs_body_keys, logs_body_key_count and
logs_body_key_attributes; then replace the duplicate blocks in
produce_partitioned_otlp_logs, produce_otlp_json, and produce_otlp_proto to call
init_otlp_logs_options(&options) and remove their local duplicates so all three
functions use the same centralized defaults.
- Around line 558-560: The guard that sets queue_full_retry_limit only when
queue_full_retry_limit == 0 should be tightened to handle negative values too:
change the conditional that checks queue_full_retry_limit (together with
allow_engine_retry) so it uses <= 0 instead of == 0, ensuring the
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is applied when
queue_full_retry_limit is unset or negative; update the condition referencing
queue_full_retry_limit, allow_engine_retry and
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4a67a8d9-ee31-4c7b-b329-5bec3fb90380

📥 Commits

Reviewing files that changed from the base of the PR and between bbf385b and 35e338a.

📒 Files selected for processing (1)
  • plugins/out_kafka/kafka.c

Comment thread plugins/out_kafka/kafka.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant