fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633
fix(kafka): support SASL_PLAINTEXT in watermill publisher#4633chitender wants to merge 3 commits into
Conversation
The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected. Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughKafka config validation now requires SASL fields for SASL protocols, and broker config generation now supports SASL_PLAINTEXT as well as SASL_SSL. Tests were added for both validation rules and Sarama config wiring. ChangesKafka SASL config handling
Estimated code review effort: 2 (Simple) | ~15 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Greptile SummaryThis PR updates Kafka SASL handling for the Watermill publisher. The main changes are:
Confidence Score: 4/5This is close, but one validation gap should be fixed before merging.
Shared Kafka config validation used by the sink path. Important Files Changed
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
app/config/ingest.go:97
**Share SASL validation**
This check only runs for the ingest `KafkaConfiguration`, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with `SASL_SSL` or `SASL_PLAINTEXT` and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.
Reviews (3): Last reviewed commit: "fix(kafka): validate SASL credentials an..." | Re-trigger Greptile |
| switch o.KafkaConfig.SecurityProtocol { | ||
| case "SASL_SSL", "SASL_PLAINTEXT": |
There was a problem hiding this comment.
SASL Config Allows Empty Mechanism
When securityProtocol is SASL_PLAINTEXT but saslMechanisms or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.
Prompt To Fix With AI
This is a comment left during a code review.
Path: openmeter/watermill/driver/kafka/broker.go
Line: 87-88
Comment:
**SASL Config Allows Empty Mechanism**
When `securityProtocol` is `SASL_PLAINTEXT` but `saslMechanisms` or credentials are left at their default empty values, this new branch enables Sarama SASL with empty auth fields. That config passes the existing validation and only fails later during broker connection, so a bad SASL_PLAINTEXT config reports as a runtime Kafka auth/connection failure instead of a clear startup validation error.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
🧹 Nitpick comments (1)
openmeter/watermill/driver/kafka/broker_test.go (1)
55-67: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd rows for the untested SCRAM paths.
Nice table setup. One small gap: this only exercises SCRAM-SHA-512 over
SASL_PLAINTEXT; a broken SCRAM-SHA-256 branch, or a regression in existingSASL_SSL+ SCRAM behavior, would still pass.Suggested extra cases
{ name: "SASL_PLAINTEXT with SCRAM wires up a SCRAM client generator", kafkaConfig: config.KafkaConfiguration{ Broker: "localhost:9092", SecurityProtocol: "SASL_PLAINTEXT", SaslMechanisms: sarama.SASLTypeSCRAMSHA512, SaslUsername: "user", SaslPassword: "pass", }, expectSASLEnabled: true, expectTLSEnabled: false, expectSCRAMGenerator: true, }, + { + name: "SASL_PLAINTEXT with SCRAM-SHA-256 wires up a SCRAM client generator", + kafkaConfig: config.KafkaConfiguration{ + Broker: "localhost:9092", + SecurityProtocol: "SASL_PLAINTEXT", + SaslMechanisms: sarama.SASLTypeSCRAMSHA256, + SaslUsername: "user", + SaslPassword: "pass", + }, + expectSASLEnabled: true, + expectTLSEnabled: false, + expectSCRAMGenerator: true, + }, + { + name: "SASL_SSL with SCRAM wires up a SCRAM client generator", + kafkaConfig: config.KafkaConfiguration{ + Broker: "localhost:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: sarama.SASLTypeSCRAMSHA512, + SaslUsername: "user", + SaslPassword: "pass", + }, + expectSASLEnabled: true, + expectTLSEnabled: true, + expectSCRAMGenerator: true, + },As per path instructions, tests should be comprehensive and cover the changes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@openmeter/watermill/driver/kafka/broker_test.go` around lines 55 - 67, Add the missing table-driven test cases in broker_test.go so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and expectSCRAMGenerator checks.Source: Path instructions
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@openmeter/watermill/driver/kafka/broker_test.go`:
- Around line 55-67: Add the missing table-driven test cases in broker_test.go
so the SCRAM coverage includes the other untested paths, not just SASL_PLAINTEXT
with sarama.SASLTypeSCRAMSHA512. Extend the existing KafkaConfiguration
scenarios to verify SASL_SSL with SCRAM still wires up the SCRAM client
generator, and add a separate case for SASL_PLAINTEXT/SASL_SSL using
sarama.SASLTypeSCRAMSHA256 so both SCRAM branches are exercised. Keep the
assertions aligned with the current expectSASLEnabled, expectTLSEnabled, and
expectSCRAMGenerator checks.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 24ea5a65-90e3-4cff-b628-bd2c6819d57e
📒 Files selected for processing (2)
openmeter/watermill/driver/kafka/broker.goopenmeter/watermill/driver/kafka/broker_test.go
Address PR review feedback: - Reject SASL_SSL/SASL_PLAINTEXT configs missing mechanism or credentials at startup validation instead of failing later with an opaque broker connection error (Greptile P2). - Add table rows for SCRAM-SHA-256 over SASL_PLAINTEXT and SCRAM over SASL_SSL so both SCRAM branches and the SASL_SSL+SCRAM path are exercised (CodeRabbit). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the reviews! Addressed both points in the latest commit:
Also added a |
| // When a SASL security protocol is selected, the mechanism and credentials must be provided. | ||
| // Otherwise SASL is enabled with empty auth fields and the misconfiguration only surfaces later | ||
| // as an opaque broker connection failure instead of a clear startup validation error. | ||
| if c.SecurityProtocol == "SASL_SSL" || c.SecurityProtocol == "SASL_PLAINTEXT" { |
There was a problem hiding this comment.
This check only runs for the ingest KafkaConfiguration, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with SASL_SSL or SASL_PLAINTEXT and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.
Prompt To Fix With AI
This is a comment left during a code review.
Path: app/config/ingest.go
Line: 97
Comment:
**Share SASL validation**
This check only runs for the ingest `KafkaConfiguration`, but the sink Kafka config uses the same SASL fields through the shared Kafka config type and still does not reject empty mechanism, username, or password. When the sink is configured with `SASL_SSL` or `SASL_PLAINTEXT` and the SASL fields are left empty, startup validation can still pass and Kafka authentication fails later with the same opaque broker connection error. Please enforce this SASL-field invariant in the shared Kafka validation path used by the sink as well.
How can I resolve this? If you propose a fix, please make it concise.
The Sarama-based event publisher only enabled SASL when securityProtocol was SASL_SSL, coupling SASL auth with TLS. This made SASL_PLAINTEXT brokers fail with "client has run out of available brokers to talk to: EOF" since the client opened an unauthenticated connection that brokers rejected.
Enable SASL for both SASL_SSL and SASL_PLAINTEXT, gating TLS setup behind SASL_SSL only. This matches the confluent-kafka-go path used by the sink.
Notes for reviewer
▎ Added a table-driven test for createKafkaConfig covering PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, and SASL_PLAINTEXT+SCRAM. Verified in production: OpenMeter pods connect successfully to SASL_PLAINTEXT + PLAIN brokers with this change.
Summary by CodeRabbit
New Features
Bug Fixes
Tests