Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
ee30522
fix: resolve producer lookup failure when posting Reply messages
Jonny-Freemarket Apr 21, 2026
7fe500f
test: cover bulk-dispatch + null-publication-topic for reply producer…
Jonny-Freemarket Apr 21, 2026
1f64572
refactor: address PR #4070 review feedback
Jonny-Freemarket Apr 21, 2026
839c460
refactor: strip producer-topic bag entry + address second review pass
Jonny-Freemarket Apr 21, 2026
7c47c08
refactor: extract strip helper overload to keep BulkDispatchAsync com…
Jonny-Freemarket Apr 21, 2026
ae70780
refactor: tighten review feedback — batch invariant assert, doc trims…
Jonny-Freemarket Apr 21, 2026
8176c88
fix: remove uncommitted sample projects from Brighter.slnx
Jonny-Freemarket Apr 21, 2026
06a7a6b
refactor: group bulk dispatch by (wire, producer) topic pair
Jonny-Freemarket Apr 22, 2026
20c1c36
refactor: address claude-bot review — stale comment, span key collisi…
Jonny-Freemarket Apr 22, 2026
da71167
fix: restore producer topic on dispatch failure and use collision-fre…
DevJonny Apr 24, 2026
9bfd398
docs: clarify GetProducerLookupTopic comment for reply vs publication…
DevJonny Apr 25, 2026
66fc585
refactor: move producer-topic stripping from mediator to transports
DevJonny Apr 25, 2026
ed04c52
refactor: skip MessageHeader.LocalHeaderNames in remaining transports
DevJonny Apr 25, 2026
736612c
refactor: protect LocalHeaderNames behind IsLocalHeader / RegisterLoc…
DevJonny Apr 25, 2026
f16577f
refactor: drop unused MessageHeader.StripLocalHeaders instance method
DevJonny Apr 25, 2026
b4406b9
test: address review feedback — drop Fragile trait, document converte…
DevJonny Apr 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ private static ServiceBusMessage ConvertToServiceBusMessage(Message message)
message.Header.HandledCount);
azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.ReplyToHeaderBagKey, message.Header.ReplyTo);

foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key)))
foreach (var header in message.Header.Bag.Where(h =>
!ASBConstants.ReservedHeaders.Contains(h.Key)
&& !MessageHeader.IsLocalHeader(h.Key)))
{
azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Dictionary<string, MessageAttributeValue> BuildMessageAttributes(Message

message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);

var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options);
messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" };

return messageAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message)

message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);

var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options);
messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" };
request.MessageAttributes = messageAttributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Dictionary<string, MessageAttributeValue> BuildMessageAttributes(Message

message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);

var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options);
messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" };

return messageAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message)

message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);

var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options);
messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" };
request.MessageAttributes = messageAttributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private static void AddBrighterHeaders(Message message, ServiceBusMessage azureS
if (message.Header.Bag.TryGetValue(ASBConstants.SessionIdKey, out object? value))
azureServiceBusMessage.SessionId = value.ToString();

foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key)))
foreach (var header in message.Header.Bag.Where(h =>
!ASBConstants.ReservedHeaders.Contains(h.Key)
&& !MessageHeader.IsLocalHeader(h.Key)))
{
azureServiceBusMessage.ApplicationProperties[header.Key] = header.Value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private static void AddHeaders(MapField<string, string> headers, Message message

message.Header.Bag.Each(header =>
{
if (!headers.ContainsKey(header.Key))
if (!headers.ContainsKey(header.Key) && !MessageHeader.IsLocalHeader(header.Key))
{
headers.Add(header.Key, header.Value.ToString()!);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ private void AddCLoudEventsOptionalHeaders(Headers headers, Message message)
private void AddUserDefinedBagHeaders(Headers headers, Message message)
{
message.Header.Bag
.Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key))
.Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key)
&& !MessageHeader.IsLocalHeader(x.Key))
.Each(header => AddUserDefinedBagHeader(headers, header.Key, header.Value));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary<string, ob
{
message.Header.Bag.Each(header =>
{
if (!_headersToReset.Contains(header.Key))
if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key))
{
headers[header.Key] = header.Value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary<string, ob
{
message.Header.Bag.Each(header =>
{
if (!_headersToReset.Contains(header.Key))
if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key))
{
headers[header.Key] = header.Value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private static void WriteHandledCount(MessageHeader messageHeader, Dictionary<st

private static void WriteMessageBag(MessageHeader messageHeader, Dictionary<string, string> headers)
{
var flatBag = JsonSerializer.Serialize(messageHeader.Bag, JsonSerialisationOptions.Options);
var flatBag = JsonSerializer.Serialize(messageHeader.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options);
headers.Add(HeaderNames.BAG, flatBag);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ private async Task SendWithDelayAsync(Message message, TimeSpan? delay, bool use
}

foreach (var (key, val) in message.Header.Bag
.Where(x => x.Key != HeaderNames.Keys && x.Key != HeaderNames.Tag))
.Where(x => x.Key != HeaderNames.Keys
&& x.Key != HeaderNames.Tag
&& !MessageHeader.IsLocalHeader(x.Key)))
Comment on lines +149 to +151
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Complex Method
SendWithDelayAsync increases in cyclomatic complexity from 28 to 29, threshold = 9

Suppress

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. The +1 comes from adding && !MessageHeader.IsLocalHeader(x.Key) to the existing bag-filter Where in SendWithDelayAsync — the smallest change that keeps the local header off the wire on the RocketMQ producer.

SendWithDelayAsync was already at cyclomatic complexity 28 (threshold 9) before this PR; refactoring it is out of scope for a Reply-message bug fix. Leaving this thread open so it stays visible for a future cleanup.

Comment on lines +149 to +151
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Complex Conditional
SendWithDelayAsync increases from 2 complex conditionals with 4 branches to 3 complex conditionals with 6 branches, threshold = 2

Suppress

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same root cause as the cyclomatic-complexity thread: the new !MessageHeader.IsLocalHeader(x.Key) clause adds one branch to an existing condition. SendWithDelayAsync was already over the threshold pre-PR. Refactoring is out of scope here; leaving open as a follow-up signal.

{
builder.AddProperty(key, val.ToString());
}
Expand Down
8 changes: 8 additions & 0 deletions src/Paramore.Brighter/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public class Message : IEquatable<Message>
/// </summary>
public const string RejectionReasonHeaderName = "RejectionReason";

/// <summary>
/// Bag key (not a typed header property — lives in <see cref="MessageHeader.Bag"/>)
/// carrying the publication topic when the message mapper overrides <see cref="MessageHeader.Topic"/>
/// (e.g. for Reply messages whose topic is the reply address).
/// Read by the outbox dispatcher so it can still locate the registered producer.
/// </summary>
public const string ProducerTopicHeaderName = "paramore.brighter.ProducerTopic";

/// <summary>
/// Gets the header.
/// </summary>
Expand Down
66 changes: 65 additions & 1 deletion src/Paramore.Brighter/MessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ THE SOFTWARE. */
using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Threading;
using System.Text.Json.Serialization;
using Paramore.Brighter.JsonConverters;
using Paramore.Brighter.NJsonConverters;
Expand Down Expand Up @@ -87,7 +88,70 @@ public class MessageHeader : IEquatable<MessageHeader>
/// The default Brighter source
/// </summary>
public const string DefaultSource = "http://goparamore.io";


/// <summary>
/// Bag keys that are internal to Brighter — set by the framework so a downstream
/// component (e.g. the outbox dispatcher) can read them, but not intended to travel
/// over the wire. Transports that copy <see cref="Bag"/> into their wire format must
/// skip keys for which <see cref="IsLocalHeader"/> returns true: either filter inline
/// (see ASB / RMQ / Kafka publishers) or call <see cref="BagWithoutLocalHeaders"/>
/// when serialising the bag in one go (see SNS / SQS / Redis publishers).
/// </summary>
/// <remarks>
/// Pre-populated with <see cref="Message.ProducerTopicHeaderName"/>. Use
/// <see cref="RegisterLocalHeader"/> from extension code to add additional keys.
/// The backing <see cref="HashSet{T}"/> is treated as a copy-on-write snapshot —
/// never mutated in place after publication — so reads are lock-free on the
/// message hot path; registrations are expected at startup and use a CAS loop
/// that allocates a new snapshot.
/// </remarks>
private static HashSet<string> s_localHeaderNames = new(StringComparer.Ordinal)
{
Message.ProducerTopicHeaderName
};

/// <summary>
/// Returns true when <paramref name="name"/> is a local-only bag key that must
/// not travel over the wire.
/// </summary>
public static bool IsLocalHeader(string name)
=> Volatile.Read(ref s_localHeaderNames).Contains(name);

/// <summary>
/// Adds <paramref name="name"/> to the set of local bag keys. Idempotent.
/// Call once at startup from extension code that introduces a local-only bag key.
/// </summary>
public static void RegisterLocalHeader(string name)
{
while (true)
{
var snapshot = Volatile.Read(ref s_localHeaderNames);
if (snapshot.Contains(name))
return;
var updated = new HashSet<string>(snapshot, StringComparer.Ordinal) { name };
if (ReferenceEquals(Interlocked.CompareExchange(ref s_localHeaderNames, updated, snapshot), snapshot))
return;
}
}

/// <summary>
/// Returns a new dictionary containing every <see cref="Bag"/> entry whose key
/// is not a local header (see <see cref="IsLocalHeader"/>). For transports that
/// serialise the whole bag in one go (e.g. SNS/SQS/Redis emit it as a single JSON
/// property), pass this to the serialiser instead of <see cref="Bag"/> directly.
/// </summary>
public Dictionary<string, object> BagWithoutLocalHeaders()
{
var locals = Volatile.Read(ref s_localHeaderNames);
var copy = new Dictionary<string, object>(Bag.Count);
foreach (var kv in Bag)
{
if (!locals.Contains(kv.Key))
copy[kv.Key] = kv.Value;
}
return copy;
}

/// <summary>
/// A property bag that can be used for extended header attributes.
/// Use camelCase for the key names if you intend to read it yourself, as when converted to and from Json serializers will tend convert the property
Expand Down
55 changes: 47 additions & 8 deletions src/Paramore.Brighter/OutboxProducerMediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,30 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducerSync producer, R
return false;
}

private static RoutingKey GetProducerLookupTopic(Message message)
{
// Reply messages set the ProducerTopic bag entry so the dispatcher can locate
// the registered producer even though Header.Topic has been rewritten to the
// dynamic reply address. Normal publications don't carry the bag entry, so we
// fall back to Header.Topic — and a null/empty Header.Topic remains a lookup
// failure, matching pre-fix behaviour.
//
// The `is string` cast is safe across persistent outboxes (SQL family,
// Mongo, DynamoDB) because Brighter's bag round-trip uses
// JsonSerialisationOptions.Options, which registers DictionaryStringObjectJsonConverter
// + ObjectToInferredTypesConverter — together they preserve the string runtime
// type through serialise/deserialise rather than handing back JsonElement.
// See When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options for
// a regression pin on that contract.
if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic)
&& producerTopic is string topic)
{
return new RoutingKey(topic);
}

return message.Header.Topic;
}

private void Dispatch(IEnumerable<Message> posts, RequestContext requestContext, Dictionary<string, object>? args = null)
{
var parentSpan = requestContext.Span;
Expand All @@ -783,9 +807,12 @@ private void Dispatch(IEnumerable<Message> posts, RequestContext requestContext,
if (_outBox is null) throw new ArgumentException(NoSyncOutboxError);
foreach (var message in posts)
{
// Log the wire topic (Header.Topic) — where the message is going. Producer
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).
Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);

var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext);
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext);
var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span,
_instrumentationOptions);
producer.Span = span;
Expand All @@ -807,10 +834,12 @@ private void Dispatch(IEnumerable<Message> posts, RequestContext requestContext,
requestContext
);
if (sent)
{
ExecuteWithResiliencePipeline(
() => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args),
requestContext
);
}
}
}
else
Expand Down Expand Up @@ -839,25 +868,30 @@ private async Task BulkDispatchAsync(
try
{
if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError);
var messagesByTopic = posts.GroupBy(m => m.Header.Topic);
// Group by (wire topic, producer-lookup topic) so a batch is guaranteed to
// resolve to a single producer — messages with the same wire topic but
// different ProducerTopic bag values land in separate batches.
var messagesByTopic = posts.GroupBy(m => (WireTopic: m.Header.Topic, LookupTopic: GetProducerLookupTopic(m)));

foreach (var topicBatch in messagesByTopic)
{
var producer = _producerRegistry.LookupBy(topicBatch.Key);
var producer = _producerRegistry.LookupBy(topicBatch.Key.LookupTopic);
var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span,
_instrumentationOptions);

if (span is not null)
{
producer.Span = span;
producerSpans.TryAdd(topicBatch.Key, span);
// Key is only used for uniqueness until EndSpans runs; a Uuid avoids
// any risk of collision from composing topic strings.
producerSpans.TryAdd(Uuid.NewAsString(), span);
}

if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation)
{
var messages = topicBatch.ToArray();

Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key);
Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic);

foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken))
{
Expand Down Expand Up @@ -885,7 +919,10 @@ await _asyncOutbox.MarkDispatchedAsync(
}
}

if (!sent) TripTopic(batch.RoutingKey);
if (!sent)
{
TripTopic(batch.RoutingKey);
}
}
}
else
Expand Down Expand Up @@ -915,9 +952,12 @@ private async Task DispatchAsync(
if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError);
foreach (var message in posts)
{
// Log the wire topic (Header.Topic) — where the message is going. Producer
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).
Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);

var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext);
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext);
var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan,
_instrumentationOptions);
producer.Span = span;
Expand Down Expand Up @@ -947,7 +987,6 @@ await ExecuteWithResiliencePipelineAsync(
}

if(!sent) TripTopic(message.Header.Topic);

}
else
throw new InvalidOperationException("No async message producer defined.");
Expand Down
4 changes: 4 additions & 0 deletions src/Paramore.Brighter/WrapPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public Message Wrap(TRequest request, RequestContext requestContext, Publication
if (message.Header.Topic != publication.Topic)
{
Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic);
if (publication.Topic is not null)
{
message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;
}
}

BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, false, _instrumentationOptions, true);
Expand Down
4 changes: 4 additions & 0 deletions src/Paramore.Brighter/WrapPipelineAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public async Task<Message> WrapAsync(TRequest request, RequestContext requestCon
if (message.Header.Topic != publication.Topic)
{
Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic);
if (publication.Topic is not null)
{
message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;
}
}

BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, true, _instrumentationOptions, true);
Expand Down
Loading
Loading