Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,22 @@ private static int GetTopicPartitionCount(string topic, ClientConfig config)

private static Partition GetPartition(ClientConfig config, string topic)
{
var numPartitions = TopicPartitions.GetOrAdd(topic, t => GetTopicPartitionCount(topic, config));
// Only cache the partition count once metadata reports a non-zero
// value, otherwise a transient propagation lag would pin every
// produce on this topic to partition 0 for the rest of the run.
if (!TopicPartitions.TryGetValue(topic, out var numPartitions) || numPartitions <= 0)
{
numPartitions = GetTopicPartitionCount(topic, config);
if (numPartitions > 0)
{
TopicPartitions[topic] = numPartitions;
}
else
{
return new Partition(0);
}
}

var partition = LastUsedPartition.GetOrAdd(topic, t => 0);
if (partition >= numPartitions)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static async Task<bool> TryCreateTopic(
ClientConfig config)
{
var attempts = 3;
bool? created = null;
do
{
using var adminClient = new AdminClientBuilder(config).Build();
Expand All @@ -37,20 +38,63 @@ await adminClient.CreateTopicsAsync(new List<TopicSpecification> {
});

Console.WriteLine($"Topic created");
return true;
created = true;
break;
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code == ErrorCode.TopicAlreadyExists)
{
Console.WriteLine("Topic already exists");
return false;
created = false;
break;
}

Console.WriteLine($"An error occured creating topic {topicName}: {e.Results[0].Error.Reason}");
}
} while (!TopicExists(topicName, config) && attempts-- > 0);
throw new Exception("Unable to create topic");

if (created is null)
{
throw new Exception("Unable to create topic");
}

// The create call returns as soon as the broker acks the request, but
// metadata propagation can lag behind. Wait until the topic is visible
// with the expected partition count, otherwise downstream callers that
// immediately query metadata (e.g. to pick a partition) can race and
// see a topic with zero partitions.
if (!await WaitForTopicMetadata(topicName, numPartitions, config, TimeSpan.FromSeconds(30)))
{
// Broker metadata propagation timed out — that's an infrastructure
// problem, not a tracer bug. Exit with the well-known skip code (13)
// so the test harness skips rather than fails this run.
Console.WriteLine($"Topic {topicName} metadata did not propagate with {numPartitions} partitions");
Console.WriteLine("Exiting with skip code (13)");
Environment.Exit(13);
}

return created.Value;
}

private static async Task<bool> WaitForTopicMetadata(string topicName, int expectedPartitions, ClientConfig config, TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
using var adminClient = new AdminClientBuilder(config).Build();
var metadata = adminClient.GetMetadata(topicName, TimeSpan.FromSeconds(5));
var topic = metadata.Topics.FirstOrDefault(t => string.Equals(t.Topic, topicName, StringComparison.Ordinal));
if (topic != null && topic.Error.Code == ErrorCode.NoError && topic.Partitions.Count == expectedPartitions)
{
return true;
}

Console.WriteLine($"Waiting for topic {topicName} metadata (partitions: {topic?.Partitions.Count ?? 0}/{expectedPartitions})...");
await Task.Delay(TimeSpan.FromMilliseconds(500));
}

return false;
}

/// <summary>
Expand Down
Loading