Skip to content

fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633

Open
chitender wants to merge 3 commits into
openmeterio:mainfrom
chitender:fix/kafka-sasl-plaintext
Open

fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633
chitender wants to merge 3 commits into
openmeterio:mainfrom
chitender:fix/kafka-sasl-plaintext

Conversation

@chitender

@chitender chitender commented Jul 1, 2026

Copy link
Copy Markdown

The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected.

Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink.

Notes for reviewer
▎ Added a table-driven test for createKafkaConfig covering PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, and SASL_PLAINTEXT+SCRAM. Verified in production: OpenMeter pods connect successfully to SASL_PLAINTEXT + PLAIN brokers with this change.

Summary by CodeRabbit

  • New Features

    • Kafka connections now support SASL authentication for both secure and plaintext security protocols.
    • TLS is enabled only for the secure SASL-over-TLS option, based on the configured connection mode.
  • Bug Fixes

    • Kafka authentication settings are applied consistently across SASL mechanisms and credentials.
    • Configuration validation now enforces required SASL settings (mechanism, username, password) for SASL security protocols.
  • Tests

    • Added and expanded unit tests covering SASL_PLAINTEXT and SASL_SSL scenarios, including SCRAM mechanisms.

The Sarama-based event publisher only enabled SASL when securityProtocol
was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers
fail with "client has run out of available brokers to talk to: EOF" since
the client opened an unauthenticated connection that brokers rejected.

Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind
SASL_SSL only. This matches the confluent-kafka-go path used by the sink.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@chitender chitender requested a review from a team as a code owner July 1, 2026 15:29
@coderabbitai

coderabbitai Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: fd580add-3c44-4f41-8466-b153006dbd93

📥 Commits

Reviewing files that changed from the base of the PR and between 0399967 and e4d04a2.

📒 Files selected for processing (3)
  • app/config/ingest.go
  • app/config/ingest_test.go
  • openmeter/watermill/driver/kafka/broker_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • openmeter/watermill/driver/kafka/broker_test.go

📝 Walkthrough

Walkthrough

Kafka config validation now requires SASL fields for SASL protocols, and broker config generation now supports SASL_PLAINTEXT as well as SASL_SSL. Tests were added for both validation rules and Sarama config wiring.

Changes

Kafka SASL config handling

Layer / File(s) Summary
SASL validation rules
app/config/ingest.go, app/config/ingest_test.go
KafkaConfiguration.Validate() now rejects SASL configs that omit mechanism, username, or password, and the new table-driven test covers valid and invalid protocol/credential combinations.
Broker SASL/TLS wiring
openmeter/watermill/driver/kafka/broker.go, openmeter/watermill/driver/kafka/broker_test.go
createKafkaConfig now enables SASL for SASL_SSL and SASL_PLAINTEXT, limits TLS to SASL_SSL, and the new test verifies credential propagation, mechanism mapping, and SCRAM generator selection.

Estimated code review effort: 2 (Simple) | ~15 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly matches the main change: adding SASL_PLAINTEXT support to the Watermill Kafka publisher.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@greptile-apps

greptile-apps Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR updates Kafka SASL handling for the Watermill publisher. The main changes are:

  • SASL now turns on for both SASL_SSL and SASL_PLAINTEXT.
  • TLS setup stays limited to SASL_SSL.
  • Ingest Kafka config now validates required SASL fields.
  • Tests cover plaintext, SASL plaintext, SASL SSL, and SCRAM cases.

Confidence Score: 4/5

This is close, but one validation gap should be fixed before merging.

  • The publisher path now validates SASL fields before creating the Sarama config.
  • The same empty SASL configuration can still pass through the sink Kafka config path.
  • That path can still fail later during broker authentication instead of failing at startup.

Shared Kafka config validation used by the sink path.

Important Files Changed

Filename Overview
app/config/ingest.go Adds SASL field validation for ingest Kafka config, but the same check is not applied to the shared sink Kafka config path.
app/config/ingest_test.go Adds validation tests for plaintext and SASL Kafka configurations.
openmeter/watermill/driver/kafka/broker.go Enables Sarama SASL for both SASL protocols and keeps TLS limited to SASL_SSL.
openmeter/watermill/driver/kafka/broker_test.go Adds tests for SASL and TLS behavior across supported Kafka protocol combinations.

Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
app/config/ingest.go:97
**Share SASL validation**

This check only runs for the ingest `KafkaConfiguration`, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with `SASL_SSL` or `SASL_PLAINTEXT` and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.

Reviews (3): Last reviewed commit: "fix(kafka): validate SASL credentials an..." | Re-trigger Greptile

Comment on lines +87 to +88
switch o.KafkaConfig.SecurityProtocol {
case "SASL_SSL", "SASL_PLAINTEXT":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 SASL Config Allows Empty Mechanism

When securityProtocol is SASL_PLAINTEXT but saslMechanisms or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.

Prompt To Fix With AI
This is a comment left during a code review.
Path: openmeter/watermill/driver/kafka/broker.go
Line: 87-88

Comment:
**SASL Config Allows Empty Mechanism**

When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code Fix in Codex

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
openmeter/watermill/driver/kafka/broker_test.go (1)

55-67: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Add rows for the untested SCRAM paths.

Nice table setup. One small gap: this only exercises SCRAM-SHA-512 over SASL_PLAINTEXT; a broken SCRAM-SHA-256 branch, or a regression in existing SASL_SSL + SCRAM behavior, would still pass.

Suggested extra cases
 		{
 			name: "SASL_PLAINTEXT with SCRAM wires up a SCRAM client generator",
 			kafkaConfig: config.KafkaConfiguration{
 				Broker:           "localhost:9092",
 				SecurityProtocol: "SASL_PLAINTEXT",
 				SaslMechanisms:   sarama.SASLTypeSCRAMSHA512,
 				SaslUsername:     "user",
 				SaslPassword:     "pass",
 			},
 			expectSASLEnabled:    true,
 			expectTLSEnabled:     false,
 			expectSCRAMGenerator: true,
 		},
+		{
+			name: "SASL_PLAINTEXT with SCRAM-SHA-256 wires up a SCRAM client generator",
+			kafkaConfig: config.KafkaConfiguration{
+				Broker:           "localhost:9092",
+				SecurityProtocol: "SASL_PLAINTEXT",
+				SaslMechanisms:   sarama.SASLTypeSCRAMSHA256,
+				SaslUsername:     "user",
+				SaslPassword:     "pass",
+			},
+			expectSASLEnabled:    true,
+			expectTLSEnabled:     false,
+			expectSCRAMGenerator: true,
+		},
+		{
+			name: "SASL_SSL with SCRAM wires up a SCRAM client generator",
+			kafkaConfig: config.KafkaConfiguration{
+				Broker:           "localhost:9092",
+				SecurityProtocol: "SASL_SSL",
+				SaslMechanisms:   sarama.SASLTypeSCRAMSHA512,
+				SaslUsername:     "user",
+				SaslPassword:     "pass",
+			},
+			expectSASLEnabled:    true,
+			expectTLSEnabled:     true,
+			expectSCRAMGenerator: true,
+		},

As per path instructions, tests should be comprehensive and cover the changes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@openmeter/watermill/driver/kafka/broker_test.go` around lines 55 - 67, Add
the missing table-driven test cases in broker_test.go so the SCRAM coverage
includes the other untested paths, not just SASL_PLAINTEXT with
sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration scenarios to
verify SASL_SSL with SCRAM still wires up the SCRAM client generator, and add a
separate case for SASL_PLAINTEXT/SASL_SSL using sarama.SASLTypeSCRAMSHA256 so
both SCRAM branches are exercised. Keep the assertions aligned with the current
expectSASLEnabled, expectTLSEnabled, and expectSCRAMGenerator checks.

Source: Path instructions

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@openmeter/watermill/driver/kafka/broker_test.go`:
- Around line 55-67: Add the missing table-driven test cases in broker_test.go
so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT
with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration
scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client
generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using
sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the
assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and
expectSCRAMGenerator checks.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 24ea5a65-90e3-4cff-b628-bd2c6819d57e

📥 Commits

Reviewing files that changed from the base of the PR and between ede95f6 and 0399967.

📒 Files selected for processing (2)
  • openmeter/watermill/driver/kafka/broker.go
  • openmeter/watermill/driver/kafka/broker_test.go

chitender and others added 2 commits July 2, 2026 12:39
Address PR review feedback:
- Reject SASL_SSL/SASL_PLAINTEXT configs missing mechanism or credentials at
  startup validation instead of failing later with an opaque broker connection
  error (Greptile P2).
- Add table rows for SCRAM-SHA-256 over SASL_PLAINTEXT and SCRAM over SASL_SSL
  so both SCRAM branches and the SASL_SSL+SCRAM path are exercised (CodeRabbit).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@chitender

Copy link
Copy Markdown
Author

Thanks for the reviews! Addressed both points in the latest commit:

  • Greptile (P2 — SASL config allows empty mechanism/credentials): Added startup validation in KafkaConfiguration.Validate(). When securityProtocol is SASL_SSL or SASL_PLAINTEXT, the SASL mechanism, username, and password are now required, so a misconfiguration fails fast with a clear error instead of surfacing later as an opaque broker connection failure.
  • CodeRabbit (nitpick — SCRAM test coverage): Extended the table-driven test with SCRAM-SHA-256 over SASL_PLAINTEXT and SCRAM over SASL_SSL (which also asserts TLS stays enabled), so both SCRAM branches and the SASL_SSL+SCRAM path are exercised.

Also added a TestKafkaConfigurationValidate test covering the new validation (valid plaintext, valid SASL, and missing-credential cases).

Comment thread app/config/ingest.go
// When a SASL security protocol is selected, the mechanism and credentials must be provided.
// Otherwise SASL is enabled with empty auth fields and the misconfiguration only surfaces later
// as an opaque broker connection failure instead of a clear startup validation error.
if c.SecurityProtocol == "SASL_SSL" || c.SecurityProtocol == "SASL_PLAINTEXT" {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Share SASL validation

This check only runs for the ingest KafkaConfiguration, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with SASL_SSL or SASL_PLAINTEXT and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.

Prompt To Fix With AI
This is a comment left during a code review.
Path: app/config/ingest.go
Line: 97

Comment:
**Share SASL validation**

This check only runs for the ingest `KafkaConfiguration`, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with `SASL_SSL` or `SASL_PLAINTEXT` and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code Fix in Codex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant