Skip to content

Commit 7ec2d1d

Browse files
committed
Kafka performance improvments and bug fixes
1 parent 4374a65 commit 7ec2d1d

6 files changed

Lines changed: 109 additions & 45 deletions

File tree

src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Linq;
43
using System.Text.Json;
54
using System.Threading;
65
using System.Threading.Channels;
@@ -34,6 +33,7 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
3433
private readonly IMetricFamily<ISummary>? _consumerLagSummary;
3534
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
3635
private readonly IHostApplicationLifetime _applicationLifetime;
36+
private readonly AsyncPolicy<ProcessedMessageStatus> _retryPolicy;
3737
private IConsumer<string?, byte[]>? _consumer;
3838
private readonly CancellationTokenSource _internalCts = new();
3939

@@ -69,6 +69,13 @@ CloudEventFormatter cloudEventFormatter
6969
_options.MaxConcurrentMessages
7070
);
7171
_timer = new Timer(HandleCommitTimer);
72+
73+
_retryPolicy = Policy
74+
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
75+
.WaitAndRetryAsync(
76+
_options.RetriesOnTemporaryFailure,
77+
retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt)
78+
);
7279
}
7380

7481
public Func<
@@ -160,36 +167,41 @@ private void WriteLog(LogMessage logMessage)
160167
case SyslogLevel.Alert:
161168
case SyslogLevel.Critical:
162169
_logger.LogCritical(
163-
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
170+
"{Message} -(Facility: {Facility}, Name: {Name})",
171+
logMessage.Message,
164172
logMessage.Facility,
165173
logMessage.Name
166174
);
167175
break;
168176
case SyslogLevel.Error:
169177
_logger.LogError(
170-
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
178+
"{Message} -(Facility: {Facility}, Name: {Name})",
179+
logMessage.Message,
171180
logMessage.Facility,
172181
logMessage.Name
173182
);
174183
break;
175184
case SyslogLevel.Warning:
176185
_logger.LogWarning(
177-
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
186+
"{Message} -(Facility: {Facility}, Name: {Name})",
187+
logMessage.Message,
178188
logMessage.Facility,
179189
logMessage.Name
180190
);
181191
break;
182192
case SyslogLevel.Notice:
183193
case SyslogLevel.Info:
184194
_logger.LogInformation(
185-
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
195+
"{Message} -(Facility: {Facility}, Name: {Name})",
196+
logMessage.Message,
186197
logMessage.Facility,
187198
logMessage.Name
188199
);
189200
break;
190201
case SyslogLevel.Debug:
191202
_logger.LogDebug(
192-
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
203+
"{Message} -(Facility: {Facility}, Name: {Name})",
204+
logMessage.Message,
193205
logMessage.Facility,
194206
logMessage.Name
195207
);
@@ -201,26 +213,37 @@ private void WriteLog(LogMessage logMessage)
201213

202214
private void WriteStatistics(string json)
203215
{
204-
var partitionConsumerLags = JsonSerializer
205-
.Deserialize<KafkaStatistics>(json)
206-
?.Topics?.Select(t => t.Value)
207-
.SelectMany(t => t.Partitions ?? new Dictionary<string, KafkaStatisticsPartition>())
208-
.Select(t => (Parition: t.Key.ToString(), t.Value.ConsumerLag));
209-
if (partitionConsumerLags is null)
216+
using var document = JsonDocument.Parse(json);
217+
var root = document.RootElement;
218+
219+
if (!root.TryGetProperty("topics", out var topics))
210220
{
211221
return;
212222
}
213223

214-
foreach (var (partition, consumerLag) in partitionConsumerLags)
224+
foreach (var topic in topics.EnumerateObject())
215225
{
216-
var lag = consumerLag;
217-
if (lag == -1)
226+
if (!topic.Value.TryGetProperty("partitions", out var partitions))
218227
{
219-
lag = 0;
228+
continue;
220229
}
221230

222-
_consumerLagSummary?.WithLabels(_options.Topic, partition)?.Observe(lag);
223-
_consumerLagGauge?.WithLabels(_options.Topic, partition)?.Set(lag);
231+
foreach (var partition in partitions.EnumerateObject())
232+
{
233+
if (!partition.Value.TryGetProperty("consumer_lag", out var consumerLagElement))
234+
{
235+
continue;
236+
}
237+
238+
var lag = consumerLagElement.GetInt64();
239+
if (lag == -1)
240+
{
241+
lag = 0;
242+
}
243+
244+
_consumerLagSummary?.WithLabels(_options.Topic, partition.Name)?.Observe(lag);
245+
_consumerLagGauge?.WithLabels(_options.Topic, partition.Name)?.Set(lag);
246+
}
224247
}
225248
}
226249

@@ -241,13 +264,7 @@ CancellationToken token
241264
);
242265
var cloudEvent = KafkaMessageToCloudEvent(msg.Message);
243266

244-
var retryPolicy = Policy
245-
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
246-
.WaitAndRetryAsync(
247-
_options.RetriesOnTemporaryFailure,
248-
retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt)
249-
);
250-
var status = await retryPolicy.ExecuteAsync(
267+
var status = await _retryPolicy.ExecuteAsync(
251268
(cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken),
252269
token
253270
);
@@ -272,6 +289,7 @@ CancellationToken token
272289
private readonly Timer _timer;
273290
private readonly object _commitLock = new();
274291
private bool _pendingCommit;
292+
private int _messagesSinceLastCommit;
275293

276294
private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
277295
{
@@ -281,7 +299,7 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
281299
{
282300
try
283301
{
284-
var result = await PeekAndAwaitProcessedMessages(cancellationToken);
302+
var result = await ReadAndAwaitProcessedMessage(cancellationToken);
285303

286304
if (IsIrrecoverableFailure(result.ProcessedMessageStatus))
287305
{
@@ -290,16 +308,16 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
290308
break;
291309
}
292310

293-
// Remove message from channel, when Task is successfully completed
294-
await _processedMessages.Reader.ReadAsync(cancellationToken);
295-
296311
lock (_commitLock)
297312
{
298313
_consumer?.StoreOffset(result.ConsumeResult);
299314
_pendingCommit = true;
315+
_messagesSinceLastCommit++;
300316
}
301317

302-
if ((result.ConsumeResult.Offset.Value + 1) % _options.CommitPeriod == 0)
318+
// Use message count since last commit instead of offset-based check.
319+
// This works correctly across multiple partitions with non-contiguous offsets.
320+
if (_messagesSinceLastCommit >= _options.CommitPeriod)
303321
{
304322
Commit();
305323
RestartCommitTimer();
@@ -314,17 +332,11 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
314332
StopCommitTimer();
315333
}
316334

317-
private async Task<ConsumeResultAndProcessedMessageStatus> PeekAndAwaitProcessedMessages(
335+
private async Task<ConsumeResultAndProcessedMessageStatus> ReadAndAwaitProcessedMessage(
318336
CancellationToken cancellationToken
319337
)
320338
{
321-
await _processedMessages.Reader.WaitToReadAsync(cancellationToken);
322-
323-
if (!_processedMessages.Reader.TryPeek(out var consumeAndProcessTask))
324-
{
325-
throw new InvalidOperationException("Awaited channel data has been removed by another consumer");
326-
}
327-
339+
var consumeAndProcessTask = await _processedMessages.Reader.ReadAsync(cancellationToken);
328340
return await consumeAndProcessTask;
329341
}
330342

@@ -338,6 +350,7 @@ private void Commit()
338350
}
339351

340352
_pendingCommit = false;
353+
_messagesSinceLastCommit = 0;
341354
try
342355
{
343356
_consumer?.Commit();
@@ -386,7 +399,7 @@ private bool IsIrrecoverableFailure(ProcessedMessageStatus status)
386399
default:
387400
_logger.LogCritical(
388401
LogEvents.UnknownProcessedMessageStatus,
389-
"Unknown processed message status {status}",
402+
"Unknown processed message status {Status}",
390403
status
391404
);
392405
return true;

src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using CloudNative.CloudEvents.Extensions;
66
using CloudNative.CloudEvents.Kafka;
77
using Confluent.Kafka;
8+
using Microsoft.Extensions.Logging;
89
using Microsoft.Extensions.Options;
910
using Motor.Extensions.Hosting.Abstractions;
1011
using Motor.Extensions.Hosting.CloudEvents;
@@ -19,24 +20,54 @@ public class KafkaMessagePublisher<TOutput> : IRawMessagePublisher<TOutput>, IDi
1920
private readonly IProducer<string?, byte[]> _producer;
2021
private readonly KafkaPublisherOptions<TOutput> _options;
2122
private readonly PublisherOptions _publisherOptions;
23+
private readonly ILogger<KafkaMessagePublisher<TOutput>> _logger;
2224

2325
public KafkaMessagePublisher(
2426
IOptions<KafkaPublisherOptions<TOutput>> options,
2527
CloudEventFormatter cloudEventFormatter,
26-
IOptions<PublisherOptions> publisherOptions
28+
IOptions<PublisherOptions> publisherOptions,
29+
ILogger<KafkaMessagePublisher<TOutput>> logger
2730
)
2831
{
2932
_cloudEventFormatter = cloudEventFormatter;
3033
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
3134
_publisherOptions = publisherOptions.Value ?? throw new ArgumentNullException(nameof(publisherOptions));
35+
_logger = logger;
3236
_producer = new ProducerBuilder<string?, byte[]>(_options).Build();
3337
}
3438

35-
public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, CancellationToken token = default)
39+
public Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, CancellationToken token = default)
3640
{
3741
var topic = motorCloudEvent.GetKafkaTopic() ?? _options.Topic;
3842
var message = CloudEventToKafkaMessage(motorCloudEvent);
39-
await _producer.ProduceAsync(topic, message, token);
43+
44+
// Use Produce with a TaskCompletionSource for pipelining instead of
45+
// awaiting ProduceAsync per message. This allows librdkafka to batch
46+
// multiple messages into a single broker request, significantly
47+
// improving throughput.
48+
var tcs = new TaskCompletionSource<DeliveryResult<string?, byte[]>>(TaskCreationOptions.RunContinuationsAsynchronously);
49+
50+
try
51+
{
52+
_producer.Produce(topic, message, deliveryReport =>
53+
{
54+
if (deliveryReport.Error.IsError)
55+
{
56+
tcs.SetException(new ProduceException<string?, byte[]>(
57+
deliveryReport.Error, deliveryReport));
58+
}
59+
else
60+
{
61+
tcs.SetResult(deliveryReport);
62+
}
63+
});
64+
}
65+
catch (ProduceException<string?, byte[]> ex)
66+
{
67+
tcs.SetException(ex);
68+
}
69+
70+
return tcs.Task;
4071
}
4172

4273
public Message<string?, byte[]> CloudEventToKafkaMessage(MotorCloudEvent<byte[]> motorCloudEvent)
@@ -57,6 +88,14 @@ public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, C
5788

5889
public void Dispose()
5990
{
91+
try
92+
{
93+
_producer.Flush(TimeSpan.FromSeconds(10));
94+
}
95+
catch (Exception e)
96+
{
97+
_logger.LogWarning(e, "Error flushing producer during dispose");
98+
}
6099
_producer.Dispose();
61100
}
62101
}

src/Motor.Extensions.Hosting.Kafka/LogEvents.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public static class LogEvents
1414
public static readonly EventId UnknownProcessedMessageStatus = new(7, nameof(UnknownProcessedMessageStatus));
1515

1616
public static readonly EventId MessageHandlingUnexpectedException = new(
17-
7,
17+
8,
1818
nameof(MessageHandlingUnexpectedException)
1919
);
2020
}

src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,14 @@ namespace Motor.Extensions.Hosting.Kafka.Options;
44

55
public class KafkaPublisherOptions<T> : ProducerConfig
66
{
7+
public KafkaPublisherOptions()
8+
{
9+
// Allow librdkafka to batch messages for better throughput.
10+
// LingerMs controls how long to wait for additional messages before sending a batch.
11+
LingerMs ??= 5;
12+
// Enable Snappy compression for better network throughput with low CPU overhead.
13+
CompressionType ??= Confluent.Kafka.CompressionType.Snappy;
14+
}
15+
716
public string? Topic { get; set; }
817
}

test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using CloudNative.CloudEvents.SystemTextJson;
99
using Confluent.Kafka;
1010
using Microsoft.Extensions.Hosting;
11+
using Microsoft.Extensions.Logging;
1112
using Microsoft.Extensions.Options;
1213
using Moq;
1314
using Motor.Extensions.Hosting.Abstractions;
@@ -519,7 +520,8 @@ private KafkaMessagePublisher<T> GetPublisher<T>(string topic)
519520
{
520521
var options = Options.Create(GetPublisherConfig<T>(topic));
521522
var publisherOptions = Options.Create(new PublisherOptions());
522-
return new KafkaMessagePublisher<T>(options, new JsonEventFormatter(), publisherOptions);
523+
return new KafkaMessagePublisher<T>(options, new JsonEventFormatter(), publisherOptions,
524+
Mock.Of<ILogger<KafkaMessagePublisher<T>>>());
523525
}
524526

525527
private KafkaPublisherOptions<T> GetPublisherConfig<T>(string topic)

test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ private static KafkaMessagePublisher<TData> GetKafkaPublisher<TData>(
9696
return new KafkaMessagePublisher<TData>(
9797
Options.Create(options),
9898
new JsonEventFormatter(),
99-
Options.Create(new PublisherOptions { CloudEventFormat = format })
99+
Options.Create(new PublisherOptions { CloudEventFormat = format }),
100+
Mock.Of<ILogger<KafkaMessagePublisher<TData>>>()
100101
);
101102
}
102103

0 commit comments

Comments
 (0)