Skip to content

Commit c915715

Browse files
kafka(franz): expose acks, max_buffered_records, max_buffered_bytes, max_in_flight_requests, record_retries, record_delivery_timeout
1 parent 641fabf commit c915715

2 files changed

Lines changed: 201 additions & 13 deletions

File tree

internal/impl/kafka/franz_writer.go

Lines changed: 135 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,17 @@ const (
3737
// Producer fields
3838
kfwFieldPartitioner = "partitioner"
3939
kfwFieldIdempotentWrite = "idempotent_write"
40+
kfwFieldAcks = "acks"
4041
kfwFieldCompression = "compression"
4142
kfwFieldAllowAutoTopicCreation = "allow_auto_topic_creation"
42-
kfwFieldTimeout = "timeout"
43-
kfwFieldMaxMessageBytes = "max_message_bytes"
44-
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
43+
kfwFieldTimeout = "timeout"
44+
kfwFieldMaxMessageBytes = "max_message_bytes"
45+
kfwFieldBrokerWriteMaxBytes = "broker_write_max_bytes"
46+
kfwFieldMaxBufferedRecords = "max_buffered_records"
47+
kfwFieldMaxBufferedBytes = "max_buffered_bytes"
48+
kfwFieldMaxInFlightRequestsPerBrkr = "max_in_flight_requests"
49+
kfwFieldRecordRetries = "record_retries"
50+
kfwFieldRecordDeliveryTimeout = "record_delivery_timeout"
4551
)
4652

4753
// FranzProducerLimitsFields returns a slice of fields specifically for
@@ -68,6 +74,42 @@ func FranzProducerLimitsFields() []*service.ConfigField {
6874
Default("100MiB").
6975
Example("128MB").
7076
Example("50mib"),
77+
service.NewIntField(kfwFieldMaxBufferedRecords).
78+
Description("The maximum number of records the client will buffer in memory before blocking. " +
79+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered and space frees up. " +
80+
"Increase this value for high-throughput pipelines to avoid back-pressure stalls.").
81+
Advanced().
82+
Default(10000),
83+
service.NewStringField(kfwFieldMaxBufferedBytes).
84+
Description("The maximum number of bytes the client will buffer in memory before blocking. " +
85+
"When this limit is reached, `Produce()` calls will block until buffered records are delivered. " +
86+
"Set to `0` to disable the byte-level limit (only `max_buffered_records` applies). " +
87+
"This limit is checked after `max_buffered_records`.").
88+
Advanced().
89+
Default("0").
90+
Example("256MB").
91+
Example("50mib"),
92+
service.NewIntField(kfwFieldMaxInFlightRequestsPerBrkr).
93+
Description("The maximum number of produce requests in flight per broker connection. " +
94+
"When `idempotent_write` is enabled, this is capped at 5 by the Kafka protocol (and at 1 for Kafka < v1.0.0). " +
95+
"When `idempotent_write` is disabled, higher values improve throughput by pipelining requests but may cause out-of-order delivery.").
96+
Advanced().
97+
Default(1),
98+
service.NewIntField(kfwFieldRecordRetries).
99+
Description("The maximum number of times a record produce is retried on failure before the record is failed. " +
100+
"When a record fails, all records buffered in the same partition are also failed to preserve gapless ordering. " +
101+
"Set to `0` for unlimited retries (the default). " +
102+
"With `idempotent_write` enabled, retries are only enforced when safe to do so without creating invalid sequence numbers.").
103+
Advanced().
104+
Default(0),
105+
service.NewDurationField(kfwFieldRecordDeliveryTimeout).
106+
Description("The maximum time a record can sit in the producer buffer before it is failed, roughly equivalent to Kafka's `delivery.timeout.ms`. " +
107+
"This is evaluated before writing a request or after a produce response. " +
108+
"When a record times out, all records in the same partition are also failed. " +
109+
"Set to `0s` for no timeout (the default). " +
110+
"With `idempotent_write` enabled, timeouts are only enforced when safe to do so without creating invalid sequence numbers.").
111+
Advanced().
112+
Default("0s"),
71113
}
72114
}
73115

@@ -84,16 +126,25 @@ func FranzProducerFields() []*service.ConfigField {
84126
}).
85127
Description("Override the default murmur2 hashing partitioner.").
86128
Advanced().Optional(),
87-
service.NewBoolField(kfwFieldIdempotentWrite).
88-
Description("Enable the idempotent write producer option. " +
89-
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
90-
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
91-
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
92-
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
93-
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
94-
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
95-
Default(true).
96-
Advanced(),
129+
service.NewBoolField(kfwFieldIdempotentWrite).
130+
Description("Enable the idempotent write producer option. " +
131+
"When enabled, the producer initializes a producer ID and uses it to guarantee exactly-once semantics per partition (no duplicates on retries). " +
132+
"This requires the `IDEMPOTENT_WRITE` permission on the `CLUSTER` resource. " +
133+
"If your cluster does not grant this permission or uses ACLs restrictively, disable this option. " +
134+
"Note: Idempotent writes are strictly a win for data integrity but may be unavailable in restricted environments " +
135+
"(e.g., some managed Kafka services, Redpanda with strict ACLs). " +
136+
"Disabling this option is safe and only affects retry behavior—duplicates may occur on producer retries, but the pipeline will continue to function normally.").
137+
Default(true).
138+
Advanced(),
139+
service.NewStringAnnotatedEnumField(kfwFieldAcks, map[string]string{
140+
"all": "Wait for all in-sync replicas to acknowledge (acks=-1). Required when idempotent_write is enabled.",
141+
"none": "Do not wait for any acknowledgement (acks=0). Highest throughput but messages may be lost.",
142+
"leader": "Wait for the leader broker to acknowledge (acks=1). Messages are lost if the leader fails before replication.",
143+
}).
144+
Description("The number of acknowledgements the leader broker must receive from ISR brokers before responding to the produce request. " +
145+
"When `idempotent_write` is enabled this must be set to `all`.").
146+
Default("all").
147+
Advanced(),
97148
service.NewStringEnumField(kfwFieldCompression, "lz4", "snappy", "gzip", "none", "zstd").
98149
Description("Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.").
99150
Optional().
@@ -144,6 +195,55 @@ func FranzProducerLimitsOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, e
144195
}
145196
opts = append(opts, kgo.ProduceRequestTimeout(timeout))
146197

198+
maxBufferedRecords, err := conf.FieldInt(kfwFieldMaxBufferedRecords)
199+
if err != nil {
200+
return nil, err
201+
}
202+
if maxBufferedRecords < 1 {
203+
return nil, fmt.Errorf("invalid max_buffered_records %d, must be at least 1", maxBufferedRecords)
204+
}
205+
opts = append(opts, kgo.MaxBufferedRecords(maxBufferedRecords))
206+
207+
maxBufferedBytesStr, err := conf.FieldString(kfwFieldMaxBufferedBytes)
208+
if err != nil {
209+
return nil, err
210+
}
211+
maxBufferedBytes, err := humanize.ParseBytes(maxBufferedBytesStr)
212+
if err != nil {
213+
return nil, fmt.Errorf("failed to parse max_buffered_bytes: %w", err)
214+
}
215+
if maxBufferedBytes > uint64(math.MaxInt) {
216+
return nil, fmt.Errorf("invalid max_buffered_bytes, must not exceed %v", math.MaxInt)
217+
}
218+
if maxBufferedBytes > 0 {
219+
opts = append(opts, kgo.MaxBufferedBytes(int(maxBufferedBytes)))
220+
}
221+
222+
maxInFlightRequests, err := conf.FieldInt(kfwFieldMaxInFlightRequestsPerBrkr)
223+
if err != nil {
224+
return nil, err
225+
}
226+
if maxInFlightRequests < 1 {
227+
return nil, fmt.Errorf("invalid max_in_flight_requests %d, must be at least 1", maxInFlightRequests)
228+
}
229+
opts = append(opts, kgo.MaxProduceRequestsInflightPerBroker(maxInFlightRequests))
230+
231+
recordRetries, err := conf.FieldInt(kfwFieldRecordRetries)
232+
if err != nil {
233+
return nil, err
234+
}
235+
if recordRetries > 0 {
236+
opts = append(opts, kgo.RecordRetries(recordRetries))
237+
}
238+
239+
recordDeliveryTimeout, err := conf.FieldDuration(kfwFieldRecordDeliveryTimeout)
240+
if err != nil {
241+
return nil, err
242+
}
243+
if recordDeliveryTimeout > 0 {
244+
opts = append(opts, kgo.RecordDeliveryTimeout(recordDeliveryTimeout))
245+
}
246+
147247
return opts, nil
148248
}
149249

@@ -211,10 +311,31 @@ func FranzProducerOptsFromConfig(conf *service.ParsedConfig) ([]kgo.Opt, error)
211311
if err != nil {
212312
return nil, err
213313
}
314+
315+
acksStr, err := conf.FieldString(kfwFieldAcks)
316+
if err != nil {
317+
return nil, err
318+
}
319+
320+
if idempotentWrite && acksStr != "all" {
321+
return nil, fmt.Errorf("idempotent_write requires acks to be \"all\", got %q", acksStr)
322+
}
323+
214324
if !idempotentWrite {
215325
opts = append(opts, kgo.DisableIdempotentWrite())
216326
}
217327

328+
switch acksStr {
329+
case "all":
330+
opts = append(opts, kgo.RequiredAcks(kgo.AllISRAcks()))
331+
case "leader":
332+
opts = append(opts, kgo.RequiredAcks(kgo.LeaderAck()))
333+
case "none":
334+
opts = append(opts, kgo.RequiredAcks(kgo.NoAck()))
335+
default:
336+
return nil, fmt.Errorf("unknown acks value: %q", acksStr)
337+
}
338+
218339
allowAutoTopicCreation, err := conf.FieldBool(kfwFieldAllowAutoTopicCreation)
219340
if err != nil {
220341
return nil, err
@@ -275,6 +396,7 @@ func FranzWriterConfigLints() string {
275396
this.partitioner == "manual" && this.partition.or("") == "" => "a partition must be specified when the partitioner is set to manual"
276397
this.partitioner != "manual" && this.partition.or("") != "" => "a partition cannot be specified unless the partitioner is set to manual"
277398
this.timestamp.or("") != "" && this.timestamp_ms.or("") != "" => "both timestamp and timestamp_ms cannot be specified simultaneously"
399+
this.idempotent_write == true && this.acks.or("all") != "all" => "idempotent_write requires acks to be set to all"
278400
}`
279401
}
280402

internal/impl/kafka/output_kafka_franz_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,72 @@ kafka_franz:
6767
`,
6868
errContains: "a partition cannot be specified unless the partitioner is set to manual",
6969
},
70+
{
71+
name: "idempotent write with acks all",
72+
conf: `
73+
kafka_franz:
74+
seed_brokers: [ foo:1234 ]
75+
topic: foo
76+
idempotent_write: true
77+
acks: all
78+
`,
79+
},
80+
{
81+
name: "idempotent write with acks leader",
82+
conf: `
83+
kafka_franz:
84+
seed_brokers: [ foo:1234 ]
85+
topic: foo
86+
idempotent_write: true
87+
acks: leader
88+
`,
89+
errContains: "idempotent_write requires acks to be set to all",
90+
},
91+
{
92+
name: "idempotent write with acks none",
93+
conf: `
94+
kafka_franz:
95+
seed_brokers: [ foo:1234 ]
96+
topic: foo
97+
idempotent_write: true
98+
acks: none
99+
`,
100+
errContains: "idempotent_write requires acks to be set to all",
101+
},
102+
{
103+
name: "non-idempotent with acks leader",
104+
conf: `
105+
kafka_franz:
106+
seed_brokers: [ foo:1234 ]
107+
topic: foo
108+
idempotent_write: false
109+
acks: leader
110+
`,
111+
},
112+
{
113+
name: "non-idempotent with acks none",
114+
conf: `
115+
kafka_franz:
116+
seed_brokers: [ foo:1234 ]
117+
topic: foo
118+
idempotent_write: false
119+
acks: none
120+
`,
121+
},
122+
{
123+
name: "custom producer limits",
124+
conf: `
125+
kafka_franz:
126+
seed_brokers: [ foo:1234 ]
127+
topic: foo
128+
idempotent_write: false
129+
max_buffered_records: 50000
130+
max_buffered_bytes: "128MB"
131+
max_in_flight_requests: 5
132+
record_retries: 10
133+
record_delivery_timeout: "30s"
134+
`,
135+
},
70136
}
71137

72138
for _, test := range testCases {

0 commit comments

Comments
 (0)