Skip to content

Commit 3e8e30e

Browse files
feat(kafka): expose more kafka_franz parameters
1 parent 641fabf commit 3e8e30e

3 files changed

Lines changed: 231 additions & 38 deletions

File tree

internal/impl/kafka/franz_writer.go

Lines changed: 132 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ const (
3737
// Producer fields
3838
kfwFieldPartitioner = "partitioner"
3939
kfwFieldIdempotentWrite = "idempotent_write"
40+
kfwFieldAcks = "acks"
4041
kfwFieldCompression = "compression"
4142
kfwFieldAllowAutoTopicCreation = "allow_auto_topic_creation"
42-
kfwFieldTimeout = "timeout"
43-
kfwFieldMaxMessageBytes = "max_message_bytes"
44-
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
43+
kfwFieldTimeout = "timeout"
44+
kfwFieldMaxMessageBytes = "max_message_bytes"
45+
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
46+
kfwFieldMaxBufferedRecords = "max_buffered_records"
47+
kfwFieldMaxBufferedBytes = "max_buffered_bytes"
48+
kfwFieldMaxInFlightRequestsPerBrkr = "max_in_flight_requests"
49+
kfwFieldRecordRetries = "record_retries"
50+
kfwFieldRecordDeliveryTimeout = "record_delivery_timeout"
4551
)
4652

4753
// FranzProducerLimitsFields returns a slice of fields specifically for
@@ -68,6 +74,42 @@ func FranzProducerLimitsFields() []*service.ConfigField {
6874
Default("100MiB").
6975
Example("128MB").
7076
Example("50mib"),
77+
service.NewIntField(kfwFieldMaxBufferedRecords).
78+
Description("The maximum number of records the client will buffer in memory before blocking. " +
79+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered and space frees up. " +
80+
"Increase this value for high-throughput pipelines to avoid back-pressure stalls.").
81+
Advanced().
82+
Default(10000),
83+
service.NewStringField(kfwFieldMaxBufferedBytes).
84+
Description("The maximum number of bytes the client will buffer in memory before blocking. " +
85+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered. " +
86+
"Set to `0` to disable the byte-level limit (only `max_buffered_records` applies). " +
87+
"This limit is checked after `max_buffered_records`.").
88+
Advanced().
89+
Default("0").
90+
Example("256MB").
91+
Example("50mib"),
92+
service.NewIntField(kfwFieldMaxInFlightRequestsPerBrkr).
93+
Description("The maximum number of produce requests in flight per broker connection. " +
94+
"When `idempotent_write` is enabled, this is capped at 5 by the Kafka protocol (and at 1 for Kafka < v1.0.0). " +
95+
"When `idempotent_write` is disabled, higher values improve throughput by pipelining requests but may cause out-of-order delivery.").
96+
Advanced().
97+
Default(1),
98+
service.NewIntField(kfwFieldRecordRetries).
99+
Description("The maximum number of times a record produce is retried on failure before the record is failed. " +
100+
"When a record fails, all records buffered in the same partition are also failed to preserve gapless ordering. " +
101+
"Set to `0` for unlimited retries (the default). " +
102+
"With `idempotent_write` enabled, retries are only enforced when safe to do so without creating invalid sequence numbers.").
103+
Advanced().
104+
Default(0),
105+
service.NewDurationField(kfwFieldRecordDeliveryTimeout).
106+
Description("The maximum time a record can sit in the producer buffer before it is failed, roughly equivalent to Kafka's `delivery.timeout.ms`. " +
107+
"This is evaluated before writing a request or after a produce response. " +
108+
"When a record times out, all records in the same partition are also failed. " +
109+
"Set to `0s` for no timeout (the default). " +
110+
"With `idempotent_write` enabled, timeouts are only enforced when safe to do so without creating invalid sequence numbers.").
111+
Advanced().
112+
Default("0s"),
71113
}
72114
}
73115

@@ -84,16 +126,25 @@ func FranzProducerFields() []*service.ConfigField {
84126
}).
85127
Description("Override the default murmur2 hashing partitioner.").
86128
Advanced().Optional(),
87-
service.NewBoolField(kfwFieldIdempotentWrite).
88-
Description("Enable the idempotent write producer option. " +
89-
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
90-
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
91-
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
92-
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
93-
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
94-
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
95-
Default(true).
96-
Advanced(),
129+
service.NewBoolField(kfwFieldIdempotentWrite).
130+
Description("Enable the idempotent write producer option. " +
131+
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
132+
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
133+
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
134+
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
135+
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
136+
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
137+
Default(true).
138+
Advanced(),
139+
service.NewStringAnnotatedEnumField(kfwFieldAcks, map[string]string{
140+
"all": "Wait for all in-sync replicas to acknowledge (acks=-1). Required when idempotent_write is enabled.",
141+
"none": "Do not wait for any acknowledgement (acks=0). Highest throughput but messages may be lost.",
142+
"leader": "Wait for the leader broker to acknowledge (acks=1). Messages are lost if the leader fails before replication.",
143+
}).
144+
Description("The number of acknowledgements the leader broker must receive from ISR brokers before responding to the produce request. " +
145+
"When `idempotent_write` is enabled this must be set to `all`.").
146+
Default("all").
147+
Advanced(),
97148
service.NewStringEnumField(kfwFieldCompression, "lz4", "snappy", "gzip", "none", "zstd").
98149
Description("Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.").
99150
Optional().
@@ -144,6 +195,52 @@ func FranzProducerLimitsOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, e
144195
}
145196
opts = append(opts, kgo.ProduceRequestTimeout(timeout))
146197

198+
maxBufferedRecords, err := conf.FieldInt(kfwFieldMaxBufferedRecords)
199+
if err != nil {
200+
return nil, err
201+
}
202+
if maxBufferedRecords < 1 {
203+
return nil, fmt.Errorf("invalid max_buffered_records %d, must be at least 1", maxBufferedRecords)
204+
}
205+
opts = append(opts, kgo.MaxBufferedRecords(maxBufferedRecords))
206+
207+
maxBufferedBytesStr, err := conf.FieldString(kfwFieldMaxBufferedBytes)
208+
if err != nil {
209+
return nil, err
210+
}
211+
maxBufferedBytes, err := humanize.ParseBytes(maxBufferedBytesStr)
212+
if err != nil {
213+
return nil, fmt.Errorf("failed to parse max_buffered_bytes: %w", err)
214+
}
215+
if maxBufferedBytes > 0 {
216+
opts = append(opts, kgo.MaxBufferedBytes(int(maxBufferedBytes)))
217+
}
218+
219+
maxInFlightRequests, err := conf.FieldInt(kfwFieldMaxInFlightRequestsPerBrkr)
220+
if err != nil {
221+
return nil, err
222+
}
223+
if maxInFlightRequests < 1 {
224+
return nil, fmt.Errorf("invalid max_in_flight_requests %d, must be at least 1", maxInFlightRequests)
225+
}
226+
opts = append(opts, kgo.MaxProduceRequestsInflightPerBroker(maxInFlightRequests))
227+
228+
recordRetries, err := conf.FieldInt(kfwFieldRecordRetries)
229+
if err != nil {
230+
return nil, err
231+
}
232+
if recordRetries > 0 {
233+
opts = append(opts, kgo.RecordRetries(recordRetries))
234+
}
235+
236+
recordDeliveryTimeout, err := conf.FieldDuration(kfwFieldRecordDeliveryTimeout)
237+
if err != nil {
238+
return nil, err
239+
}
240+
if recordDeliveryTimeout > 0 {
241+
opts = append(opts, kgo.RecordDeliveryTimeout(recordDeliveryTimeout))
242+
}
243+
147244
return opts, nil
148245
}
149246

@@ -211,10 +308,31 @@ func FranzProducerOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, error)
211308
if err != nil {
212309
return nil, err
213310
}
311+
312+
acksStr, err := conf.FieldString(kfwFieldAcks)
313+
if err != nil {
314+
return nil, err
315+
}
316+
317+
if idempotentWrite && acksStr != "all" {
318+
return nil, fmt.Errorf("idempotent_write requires acks to be \"all\", got %q", acksStr)
319+
}
320+
214321
if !idempotentWrite {
215322
opts = append(opts, kgo.DisableIdempotentWrite())
216323
}
217324

325+
switch acksStr {
326+
case "all":
327+
opts = append(opts, kgo.RequiredAcks(kgo.AllISRAcks()))
328+
case "leader":
329+
opts = append(opts, kgo.RequiredAcks(kgo.LeaderAck()))
330+
case "none":
331+
opts = append(opts, kgo.RequiredAcks(kgo.NoAck()))
332+
default:
333+
return nil, fmt.Errorf("unknown acks value: %q", acksStr)
334+
}
335+
218336
allowAutoTopicCreation, err := conf.FieldBool(kfwFieldAllowAutoTopicCreation)
219337
if err != nil {
220338
return nil, err
@@ -275,6 +393,7 @@ func FranzWriterConfigLints() string {
275393
this.partitioner == "manual" && this.partition.or("") == "" => "a partition must be specified when the partitioner is set to manual"
276394
this.partitioner != "manual" && this.partition.or("") != "" => "a partition cannot be specified unless the partitioner is set to manual"
277395
this.timestamp.or("") != "" && this.timestamp_ms.or("") != "" => "both timestamp and timestamp_ms cannot be specified simultaneously"
396+
this.idempotent_write == true && this.acks.or("all") != "all" => "idempotent_write requires acks to be set to all"
278397
}`
279398
}
280399

internal/impl/kafka/output_kafka_franz_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,72 @@ kafka_franz:
6767
`,
6868
errContains: "a partition cannot be specified unless the partitioner is set to manual",
6969
},
70+
{
71+
name: "idempotent write with acks all",
72+
conf: `
73+
kafka_franz:
74+
seed_brokers: [ foo:1234 ]
75+
topic: foo
76+
idempotent_write: true
77+
acks: all
78+
`,
79+
},
80+
{
81+
name: "idempotent write with acks leader",
82+
conf: `
83+
kafka_franz:
84+
seed_brokers: [ foo:1234 ]
85+
topic: foo
86+
idempotent_write: true
87+
acks: leader
88+
`,
89+
errContains: "idempotent_write requires acks to be set to all",
90+
},
91+
{
92+
name: "idempotent write with acks none",
93+
conf: `
94+
kafka_franz:
95+
seed_brokers: [ foo:1234 ]
96+
topic: foo
97+
idempotent_write: true
98+
acks: none
99+
`,
100+
errContains: "idempotent_write requires acks to be set to all",
101+
},
102+
{
103+
name: "non-idempotent with acks leader",
104+
conf: `
105+
kafka_franz:
106+
seed_brokers: [ foo:1234 ]
107+
topic: foo
108+
idempotent_write: false
109+
acks: leader
110+
`,
111+
},
112+
{
113+
name: "non-idempotent with acks none",
114+
conf: `
115+
kafka_franz:
116+
seed_brokers: [ foo:1234 ]
117+
topic: foo
118+
idempotent_write: false
119+
acks: none
120+
`,
121+
},
122+
{
123+
name: "custom producer limits",
124+
conf: `
125+
kafka_franz:
126+
seed_brokers: [ foo:1234 ]
127+
topic: foo
128+
idempotent_write: false
129+
max_buffered_records: 50000
130+
max_buffered_bytes: "128MB"
131+
max_in_flight_requests: 5
132+
record_retries: 10
133+
record_delivery_timeout: "30s"
134+
`,
135+
},
70136
}
71137

72138
for _, test := range testCases {

internal/impl/kafka/output_redpanda.go

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -108,39 +108,47 @@ func init() {
108108
return
109109
}
110110

111-
if connDetails.IsConfigured() {
112-
var client *kgo.Client
113-
var clientMut sync.Mutex
114-
115-
output, err = NewFranzWriterFromConfig(
116-
conf,
117-
NewFranzWriterHooks(
118-
func(ctx context.Context, fn FranzSharedClientUseFn) error {
111+
if connDetails.IsConfigured() {
112+
var client *kgo.Client
113+
var clientMut sync.RWMutex
114+
115+
output, err = NewFranzWriterFromConfig(
116+
conf,
117+
NewFranzWriterHooks(
118+
func(ctx context.Context, fn FranzSharedClientUseFn) error {
119+
clientMut.RLock()
120+
c := client
121+
clientMut.RUnlock()
122+
123+
if c == nil {
119124
clientMut.Lock()
120-
defer clientMut.Unlock()
121-
122125
if client == nil {
123126
var err error
124127
if client, err = NewFranzClient(ctx, append(connDetails.FranzOpts(), producerOpts...)...); err != nil {
128+
clientMut.Unlock()
125129
return err
126130
}
127131
}
128-
return fn(&FranzSharedClientInfo{
129-
Client: client,
130-
ConnDetails: connDetails,
131-
})
132-
}).WithYieldClientFn(
133-
func(context.Context) error {
134-
clientMut.Lock()
135-
defer clientMut.Unlock()
136-
137-
if client == nil {
138-
return nil
139-
}
140-
client.Close()
141-
client = nil
132+
c = client
133+
clientMut.Unlock()
134+
}
135+
136+
return fn(&FranzSharedClientInfo{
137+
Client: c,
138+
ConnDetails: connDetails,
139+
})
140+
}).WithYieldClientFn(
141+
func(context.Context) error {
142+
clientMut.Lock()
143+
defer clientMut.Unlock()
144+
145+
if client == nil {
142146
return nil
143-
}))
147+
}
148+
client.Close()
149+
client = nil
150+
return nil
151+
}))
144152
} else {
145153
mgr.Logger().Info("Connection fields omitted, falling back to common redpanda config.")
146154

0 commit comments

Comments
 (0)