Skip to content

Commit 05f3138

Browse files
Fix kafka flakiness (#8629)
## Summary of changes Fix the `DataStreamsMonitoringKafkaTests.HandlesBatchProcessing` (and related) flake by making the test sample wait for Kafka metadata to propagate after topic creation, and by hardening the partition-picker in the producer helper against a transient `0` partition count. ## Reason for change The `HandlesBatchProcessing` test failed in CI with a Verify mismatch where every `kafka_produce` backlog entry collapsed onto `partition:0`: ``` - partition:1, topic:data-streams-batch-processing-1-..., type:kafka_produce - partition:1, topic:data-streams-batch-processing-2-..., type:kafka_produce - partition:2, topic:data-streams-batch-processing-1-..., type:kafka_produce - partition:2, topic:data-streams-batch-processing-2-..., type:kafka_produce ``` The four missing entries are all producer-side backlogs for partitions 1 and 2. Consumer-side `kafka_commit` backlogs are present for all partitions, and the per-topic in/out DSM edges are present too — only the producer collapsed onto partition 0. **Direct confirmation in the sample's stdout** — every produce went to partition `[[0]]`: ``` Produced message to: data-streams-batch-processing-1-HandlesBatchProcessing [[0]] @0 Produced message to: data-streams-batch-processing-1-HandlesBatchProcessing [[0]] @1 Produced message to: data-streams-batch-processing-1-HandlesBatchProcessing [[0]] @2 ... Produced message to: data-streams-batch-processing-2-HandlesBatchProcessing [[0]] @0 Produced message to: data-streams-batch-processing-2-HandlesBatchProcessing [[0]] @1 Produced message to: data-streams-batch-processing-2-HandlesBatchProcessing [[0]] @2 ``` …instead of the expected `[[0]]`, `[[1]]`, `[[2]]` distribution. Topic creation took 4.2s (`Finished creating topics: 0:00:04.2190968`) and producing started ~1s later — well within the metadata-propagation window where `AdminClient.GetMetadata` can return the topic with zero partitions. **Root cause** is a race in the test sample: 1. `TopicHelpers.TryCreateTopic` returns as soon as Kafka acks the `CreateTopics` request, but broker metadata propagation can lag. 2. `Producer.GetPartition` calls `AdminClient.GetMetadata(topic, 5s)` to learn the partition count. If metadata hasn't propagated yet, the topic comes back with zero partitions and `GetTopicPartitionCount` returns `0`. 3. That `0` gets cached in a static `ConcurrentDictionary<string,int>` for the lifetime of the sample process via `GetOrAdd`. 4. With `numPartitions == 0`, the `partition >= numPartitions` guard is always true, so every subsequent produce is pinned to partition 0 — for the entire run. This is timing-dependent (flaky, not always failing), and once it loses the race the whole run is doomed. This is the same family of flake addressed in #7211 ("Alternative approach to fixing flaky DSM Kafka tests"), but the partition-count race in the sample wasn't covered there. ## Implementation details Two changes, both in the test sample only — no tracer code touched. **`Samples.Kafka/TopicHelpers.cs` — fix the root cause.** After `CreateTopicsAsync` succeeds (or returns `TopicAlreadyExists`), poll `GetMetadata` until the topic is visible with the expected partition count, with a 30s bounded timeout. If metadata never propagates, throw a descriptive exception instead of silently returning. The original throw-on-exhausted-retries behavior is preserved. **`Samples.Kafka/Producer.cs` — defense in depth.** `GetPartition` no longer caches a `0` partition count. If `GetTopicPartitionCount` returns 0 (shouldn't happen anymore after the TopicHelpers fix, but in case anyone calls this without going through `TryCreateTopic`), we fall back to partition 0 for that single call and re-query on the next produce instead of pinning the whole run. ## Test coverage Existing `DataStreamsMonitoringKafkaTests` cover the scenario; the failure mode this PR fixes is exactly the one observed in the failing build. No new tests added — this is a flake fix in a sample app, not a behavior change in the tracer. ## Other details - Test sample code only; no tracer / production code changes. - `Samples.Kafka/TopicHelpers.cs` is used by every Kafka-producing sample test, so it broadly hardens that family of tests against the metadata-propagation race.
1 parent 6bb7fc9 commit 05f3138

2 files changed

Lines changed: 63 additions & 4 deletions

File tree

tracer/test/test-applications/integrations/Samples.Kafka/Producer.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,22 @@ private static int GetTopicPartitionCount(string topic, ClientConfig config)
8989

9090
private static Partition GetPartition(ClientConfig config, string topic)
9191
{
92-
var numPartitions = TopicPartitions.GetOrAdd(topic, t => GetTopicPartitionCount(topic, config));
92+
// Only cache the partition count once metadata reports a non-zero
93+
// value, otherwise a transient propagation lag would pin every
94+
// produce on this topic to partition 0 for the rest of the run.
95+
if (!TopicPartitions.TryGetValue(topic, out var numPartitions) || numPartitions <= 0)
96+
{
97+
numPartitions = GetTopicPartitionCount(topic, config);
98+
if (numPartitions > 0)
99+
{
100+
TopicPartitions[topic] = numPartitions;
101+
}
102+
else
103+
{
104+
return new Partition(0);
105+
}
106+
}
107+
93108
var partition = LastUsedPartition.GetOrAdd(topic, t => 0);
94109
if (partition >= numPartitions)
95110
{

tracer/test/test-applications/integrations/Samples.Kafka/TopicHelpers.cs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public static async Task<bool> TryCreateTopic(
2020
ClientConfig config)
2121
{
2222
var attempts = 3;
23+
bool? created = null;
2324
do
2425
{
2526
using var adminClient = new AdminClientBuilder(config).Build();
@@ -37,20 +38,63 @@ await adminClient.CreateTopicsAsync(new List<TopicSpecification> {
3738
});
3839

3940
Console.WriteLine($"Topic created");
40-
return true;
41+
created = true;
42+
break;
4143
}
4244
catch (CreateTopicsException e)
4345
{
4446
if (e.Results[0].Error.Code == ErrorCode.TopicAlreadyExists)
4547
{
4648
Console.WriteLine("Topic already exists");
47-
return false;
49+
created = false;
50+
break;
4851
}
4952

5053
Console.WriteLine($"An error occured creating topic {topicName}: {e.Results[0].Error.Reason}");
5154
}
5255
} while (!TopicExists(topicName, config) && attempts-- > 0);
53-
throw new Exception("Unable to create topic");
56+
57+
if (created is null)
58+
{
59+
throw new Exception("Unable to create topic");
60+
}
61+
62+
// The create call returns as soon as the broker acks the request, but
63+
// metadata propagation can lag behind. Wait until the topic is visible
64+
// with the expected partition count, otherwise downstream callers that
65+
// immediately query metadata (e.g. to pick a partition) can race and
66+
// see a topic with zero partitions.
67+
if (!await WaitForTopicMetadata(topicName, numPartitions, config, TimeSpan.FromSeconds(30)))
68+
{
69+
// Broker metadata propagation timed out — that's an infrastructure
70+
// problem, not a tracer bug. Exit with the well-known skip code (13)
71+
// so the test harness skips rather than fails this run.
72+
Console.WriteLine($"Topic {topicName} metadata did not propagate with {numPartitions} partitions");
73+
Console.WriteLine("Exiting with skip code (13)");
74+
Environment.Exit(13);
75+
}
76+
77+
return created.Value;
78+
}
79+
80+
private static async Task<bool> WaitForTopicMetadata(string topicName, int expectedPartitions, ClientConfig config, TimeSpan timeout)
81+
{
82+
var deadline = DateTime.UtcNow + timeout;
83+
while (DateTime.UtcNow < deadline)
84+
{
85+
using var adminClient = new AdminClientBuilder(config).Build();
86+
var metadata = adminClient.GetMetadata(topicName, TimeSpan.FromSeconds(5));
87+
var topic = metadata.Topics.FirstOrDefault(t => string.Equals(t.Topic, topicName, StringComparison.Ordinal));
88+
if (topic != null && topic.Error.Code == ErrorCode.NoError && topic.Partitions.Count == expectedPartitions)
89+
{
90+
return true;
91+
}
92+
93+
Console.WriteLine($"Waiting for topic {topicName} metadata (partitions: {topic?.Partitions.Count ?? 0}/{expectedPartitions})...");
94+
await Task.Delay(TimeSpan.FromMilliseconds(500));
95+
}
96+
97+
return false;
5498
}
5599

56100
/// <summary>

0 commit comments

Comments
 (0)