From ee305221877381bdf7dee099a9949c537e40dc14 Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 14:45:41 +0100 Subject: [PATCH 01/16] fix: resolve producer lookup failure when posting Reply messages When posting a Reply, the message mapper sets the header topic to the dynamic reply address (a GUID queue name). The outbox dispatcher then fails to find the producer because it's registered under the static publication topic (e.g. "Reply"), not the GUID. Store the publication topic in the message header Bag during the wrap pipeline when a topic mismatch is detected, then use it as the lookup key in all three dispatch methods (sync, async, bulk). Co-Authored-By: Claude (claude-opus-4-6) --- src/Paramore.Brighter/Message.cs | 6 ++ .../OutboxProducerMediator.cs | 18 +++- src/Paramore.Brighter/WrapPipeline.cs | 2 + src/Paramore.Brighter/WrapPipelineAsync.cs | 2 + ..._Reply_Message_To_The_Command_Processor.cs | 94 ++++++++++++++++++ ..._Message_To_The_Command_Processor_Async.cs | 95 +++++++++++++++++++ .../MyResponseMessageMapperAsync.cs | 36 +++++++ .../When_Wrapping_A_Reply_Message_Mapper.cs | 52 ++++++++++ ...en_Wrapping_A_Reply_Message_MapperAsync.cs | 54 +++++++++++ 9 files changed, 356 insertions(+), 3 deletions(-) create mode 100644 tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs diff --git a/src/Paramore.Brighter/Message.cs b/src/Paramore.Brighter/Message.cs index e1544e0a23..185005f515 100644 --- a/src/Paramore.Brighter/Message.cs +++ b/src/Paramore.Brighter/Message.cs @@ -50,6 +50,12 @@ public class Message : IEquatable /// public const string RejectionReasonHeaderName = "RejectionReason"; + /// + /// Tag name for the producer topic header, used when the message mapper overrides the topic (e.g. for Reply messages) + /// so that the outbox dispatcher can still find the correct producer + /// + public const string ProducerTopicHeaderName = "ProducerTopic"; + /// /// Gets the header. /// diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 782a9ba515..d514339fc1 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -774,6 +774,17 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducerSync producer, R return false; } + private static RoutingKey GetProducerLookupTopic(Message message) + { + if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic) + && producerTopic is string topic) + { + return new RoutingKey(topic); + } + + return message.Header.Topic; + } + private void Dispatch(IEnumerable posts, RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; @@ -785,7 +796,7 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, { Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); - var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext); + var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, _instrumentationOptions); producer.Span = span; @@ -843,7 +854,8 @@ private async Task BulkDispatchAsync( foreach (var topicBatch in messagesByTopic) { - var producer = _producerRegistry.LookupBy(topicBatch.Key); + var firstMessage = topicBatch.First(); + var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage)); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); @@ -917,7 +929,7 @@ private async Task DispatchAsync( { Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); - var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext); + var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, _instrumentationOptions); producer.Span = span; diff --git a/src/Paramore.Brighter/WrapPipeline.cs b/src/Paramore.Brighter/WrapPipeline.cs index 037f05c0a9..f1067ae68f 100644 --- a/src/Paramore.Brighter/WrapPipeline.cs +++ b/src/Paramore.Brighter/WrapPipeline.cs @@ -93,6 +93,8 @@ public Message Wrap(TRequest request, RequestContext requestContext, Publication if (message.Header.Topic != publication.Topic) { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); + if (publication.Topic is not null) + message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, false, _instrumentationOptions, true); diff --git a/src/Paramore.Brighter/WrapPipelineAsync.cs b/src/Paramore.Brighter/WrapPipelineAsync.cs index d1d39f6990..da7025bb6b 100644 --- a/src/Paramore.Brighter/WrapPipelineAsync.cs +++ b/src/Paramore.Brighter/WrapPipelineAsync.cs @@ -100,6 +100,8 @@ public async Task WrapAsync(TRequest request, RequestContext requestCon if (message.Header.Topic != publication.Topic) { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); + if (publication.Topic is not null) + message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, true, _instrumentationOptions, true); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs new file mode 100644 index 0000000000..b8eb9abc73 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + public class CommandProcessorPostReplyTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly MyResponse _myResponse; + private readonly InMemoryOutbox _outbox; + private readonly InternalBus _internalBus = new(); + + public CommandProcessorPostReplyTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + messageMapperRegistry.Register(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + _outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + _outbox + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + bus, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public void When_Posting_A_Reply_Message_To_The_Command_Processor() + { + //act - post a Reply whose mapper sets a dynamic topic (reply address) + //different from the producer's registered topic + _commandProcessor.Post(_myResponse); + + //assert - message was dispatched to the reply topic, not the producer topic + var messages = _internalBus.Stream(_myResponse.SendersAddress.Topic).ToArray(); + Assert.Single(messages); + + //assert - message was stored in the outbox + var outboxMessage = _outbox.Get(_myResponse.Id, new RequestContext()); + Assert.NotNull(outboxMessage); + + //assert - message topic is the reply address + Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs new file mode 100644 index 0000000000..f81995e40c --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs @@ -0,0 +1,95 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + public class CommandProcessorPostReplyAsyncTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly MyResponse _myResponse; + private readonly InMemoryOutbox _outbox; + private readonly InternalBus _internalBus = new(); + + public CommandProcessorPostReplyAsyncTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + messageMapperRegistry.RegisterAsync(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + _outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + _outbox + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + bus, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public async Task When_Posting_A_Reply_Message_To_The_Command_Processor_Async() + { + //act - post a Reply whose mapper sets a dynamic topic (reply address) + //different from the producer's registered topic + await _commandProcessor.PostAsync(_myResponse); + + //assert - message was dispatched to the reply topic, not the producer topic + var messages = _internalBus.Stream(_myResponse.SendersAddress.Topic).ToArray(); + Assert.Single(messages); + + //assert - message was stored in the outbox + var outboxMessage = await _outbox.GetAsync(_myResponse.Id, new RequestContext()); + Assert.NotNull(outboxMessage); + + //assert - message topic is the reply address + Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs new file mode 100644 index 0000000000..8ba58ce7c1 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs @@ -0,0 +1,36 @@ +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.JsonConverters; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles +{ + public class MyResponseMessageMapperAsync : IAmAMessageMapperAsync + { + public IRequestContext Context { get; set; } + + public Task MapToMessageAsync(MyResponse request, Publication publication, CancellationToken cancellationToken = default) + { + var header = new MessageHeader( + messageId: request.Id, + topic: request.SendersAddress.Topic, + messageType: request.RequestToMessageType(), + correlationId: request.SendersAddress.CorrelationId); + + var body = new MessageBody(JsonSerializer.Serialize(new MyResponseObject(request.Id.ToString(), request.ReplyValue), JsonSerialisationOptions.Options)); + var message = new Message(header, body); + return Task.FromResult(message); + } + + public Task MapToRequestAsync(Message message, CancellationToken cancellationToken = default) + { + var replyAddress = new ReplyAddress(topic: message.Header.ReplyTo, correlationId: message.Header.CorrelationId); + var command = new MyResponse(replyAddress); + var messageBody = JsonSerializer.Deserialize(message.Body.Value, JsonSerialisationOptions.Options); + command.Id = messageBody.Id; + command.ReplyValue = messageBody.ReplyValue; + return Task.FromResult(command); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs new file mode 100644 index 0000000000..f207b25a91 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs @@ -0,0 +1,52 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class ReplyMessageWrapRequestTests +{ + private readonly TransformPipelineBuilder _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public ReplyMessageWrapRequestTests() + { + //arrange + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + mapperRegistry.Register(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => null); + + _publication = new Publication + { + Topic = new RoutingKey("Reply"), + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); + } + + [Fact] + public void When_Wrapping_A_Reply_Message_Mapper() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = transformPipeline.Wrap(_myResponse, new RequestContext(), _publication); + + //assert - message topic is the reply address, not the publication topic + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + Assert.NotEqual(_publication.Topic, message.Header.Topic); + + //assert - publication topic stored in bag for producer lookup + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.Equal(_publication.Topic!.Value, message.Header.Bag[Message.ProducerTopicHeaderName]); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs new file mode 100644 index 0000000000..a05e7243a6 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs @@ -0,0 +1,54 @@ +using System.Threading.Tasks; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class AsyncReplyMessageWrapRequestTests +{ + private readonly TransformPipelineBuilderAsync _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public AsyncReplyMessageWrapRequestTests() + { + //arrange + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + mapperRegistry.RegisterAsync(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactoryAsync(_ => null); + + _publication = new Publication + { + Topic = new RoutingKey("Reply"), + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilderAsync(mapperRegistry, messageTransformerFactory, InstrumentationOptions.All); + } + + [Fact] + public async Task When_Wrapping_A_Reply_Message_Mapper() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = await transformPipeline.WrapAsync(_myResponse, new RequestContext(), _publication); + + //assert - message topic is the reply address, not the publication topic + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + Assert.NotEqual(_publication.Topic, message.Header.Topic); + + //assert - publication topic stored in bag for producer lookup + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.Equal(_publication.Topic!.Value, message.Header.Bag[Message.ProducerTopicHeaderName]); + } +} From 7fe500feda77ef10480295fa38095ea9534ffbfb Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 15:03:34 +0100 Subject: [PATCH 02/16] test: cover bulk-dispatch + null-publication-topic for reply producer lookup - bulk dispatch reply messages via outbox sweep (firstMessage bag lookup) - wrap with null publication.Topic skips bag write (guard pin, sync + async) Co-Authored-By: Claude (claude-opus-4-7) --- ...n_Bulk_Dispatching_Reply_Messages_Async.cs | 110 ++++++++++++++++++ ...en_Wrapping_With_Null_Publication_Topic.cs | 55 +++++++++ ...pping_With_Null_Publication_Topic_Async.cs | 53 +++++++++ 3 files changed, 218 insertions(+) create mode 100644 tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs new file mode 100644 index 0000000000..d753bf3429 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -0,0 +1,110 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + // Gap: BulkDispatchAsync groups by Header.Topic (the mapper-set reply topic) but + // looks up the producer using the first message's bag. The single-message Dispatch + // paths are covered; this pins the bulk-outbox-clear path so a future refactor of + // GetProducerLookupTopic or the firstMessage assumption cannot silently regress and + // fail to locate the producer when draining the outbox via the bulk sweep. + public class CommandProcessorBulkDispatchReplyAsyncTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly IAmAnOutboxProducerMediator _mediator; + private readonly MyResponse _replyOne; + private readonly MyResponse _replyTwo; + private readonly InternalBus _internalBus = new(); + private readonly RoutingKey _replyTopic; + + public CommandProcessorBulkDispatchReplyAsyncTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + _replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(_replyTopic, Uuid.NewAsString()); + _replyOne = new MyResponse(replyAddress) { ReplyValue = "Hello" }; + _replyTwo = new MyResponse(replyAddress) { ReplyValue = "World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + messageMapperRegistry.RegisterAsync(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + var outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + _mediator = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + outbox, + maxOutStandingMessages: -1 + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + _mediator, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public async Task When_Bulk_Dispatching_Reply_Messages_Async() + { + //arrange - deposit two replies whose mapper sets Header.Topic to the reply + //address, not the registered producer topic + var context = new RequestContext(); + await _commandProcessor.DepositPostAsync([_replyOne, _replyTwo], context); + + //act - drain via the bulk path (exercised by background outbox sweeps) + await _mediator.ClearOutstandingFromOutboxAsync( + amountToClear: 10, + minimumAge: TimeSpan.Zero, + useBulk: true, + requestContext: context); + + //allow background clear to run + await Task.Delay(500); + + //assert - messages landed on the reply topic. If producer lookup had used + //Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy + //would have thrown and nothing would arrive on the bus. + var messages = _internalBus.Stream(_replyTopic).ToArray(); + Assert.Equal(2, messages.Length); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs new file mode 100644 index 0000000000..4f13cd354c --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs @@ -0,0 +1,55 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Gap: WrapPipeline.Wrap now writes publication.Topic into Header.Bag when the +// mapper-set topic differs from the publication topic. The fix guards against a +// null publication.Topic before writing. Without this test, a future refactor that +// drops the null-check would NRE at runtime for publications that legitimately +// have no topic (dynamic routing scenarios). +public class WrapNullPublicationTopicTests +{ + private readonly TransformPipelineBuilder _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public WrapNullPublicationTopicTests() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + mapperRegistry.Register(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => null); + + //publication with no topic — mapper still sets its own topic from the reply address + _publication = new Publication + { + Topic = null, + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); + } + + [Fact] + public void When_Wrapping_With_Null_Publication_Topic() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = transformPipeline.Wrap(_myResponse, new RequestContext(), _publication); + + //assert - topic came from the mapper + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + + //assert - no ProducerTopic entry was written to the bag (the guard held) + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs new file mode 100644 index 0000000000..4d15fff484 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs @@ -0,0 +1,53 @@ +using System.Threading.Tasks; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Async twin of WrapNullPublicationTopicTests — pins the null-publication-topic +// guard in WrapPipelineAsync so refactors cannot silently NRE. +public class AsyncWrapNullPublicationTopicTests +{ + private readonly TransformPipelineBuilderAsync _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public AsyncWrapNullPublicationTopicTests() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + mapperRegistry.RegisterAsync(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactoryAsync(_ => null); + + _publication = new Publication + { + Topic = null, + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilderAsync(mapperRegistry, messageTransformerFactory, InstrumentationOptions.All); + } + + [Fact] + public async Task When_Wrapping_With_Null_Publication_Topic_Async() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = await transformPipeline.WrapAsync(_myResponse, new RequestContext(), _publication); + + //assert - topic came from the mapper + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + + //assert - no ProducerTopic entry was written to the bag (the guard held) + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +} From 1f6457243b762514db7bd4ef81091b590416ea47 Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 15:32:56 +0100 Subject: [PATCH 03/16] refactor: address PR #4070 review feedback - deterministic poll-until in bulk dispatch test (drop Task.Delay) - document firstMessage invariant in BulkDispatchAsync - clarify ProducerTopicHeaderName xml doc (bag key, not header) - braces around single-line if in WrapPipeline + WrapPipelineAsync - inline note on Log.DecoupledInvocationOfMessage divergence from lookup Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/Message.cs | 6 ++++-- src/Paramore.Brighter/OutboxProducerMediator.cs | 9 +++++++++ src/Paramore.Brighter/WrapPipeline.cs | 2 ++ src/Paramore.Brighter/WrapPipelineAsync.cs | 2 ++ .../Post/When_Bulk_Dispatching_Reply_Messages_Async.cs | 8 ++++++-- 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/Paramore.Brighter/Message.cs b/src/Paramore.Brighter/Message.cs index 185005f515..6d18c360e8 100644 --- a/src/Paramore.Brighter/Message.cs +++ b/src/Paramore.Brighter/Message.cs @@ -51,8 +51,10 @@ public class Message : IEquatable public const string RejectionReasonHeaderName = "RejectionReason"; /// - /// Tag name for the producer topic header, used when the message mapper overrides the topic (e.g. for Reply messages) - /// so that the outbox dispatcher can still find the correct producer + /// Bag key (not a typed header property — lives in ) + /// carrying the publication topic when the message mapper overrides + /// (e.g. for Reply messages whose topic is the reply address). + /// Read by the outbox dispatcher so it can still locate the registered producer. /// public const string ProducerTopicHeaderName = "ProducerTopic"; diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index d514339fc1..5764a31a68 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -794,6 +794,9 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, if (_outBox is null) throw new ArgumentException(NoSyncOutboxError); foreach (var message in posts) { + // Log the wire topic (Header.Topic) — where the message is going. Producer + // lookup uses GetProducerLookupTopic, which may differ from Header.Topic when + // a mapper overrode it (e.g. Reply messages routed to a dynamic reply address). Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); @@ -854,6 +857,9 @@ private async Task BulkDispatchAsync( foreach (var topicBatch in messagesByTopic) { + // Messages in this batch share Header.Topic (the group key). They therefore + // share the same ProducerTopic bag entry (set by WrapPipelineAsync when the + // mapper overrode the topic), so firstMessage is representative of the batch. var firstMessage = topicBatch.First(); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage)); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, @@ -927,6 +933,9 @@ private async Task DispatchAsync( if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); foreach (var message in posts) { + // Log the wire topic (Header.Topic) — where the message is going. Producer + // lookup uses GetProducerLookupTopic, which may differ from Header.Topic when + // a mapper overrode it (e.g. Reply messages routed to a dynamic reply address). Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); diff --git a/src/Paramore.Brighter/WrapPipeline.cs b/src/Paramore.Brighter/WrapPipeline.cs index f1067ae68f..91cdc768da 100644 --- a/src/Paramore.Brighter/WrapPipeline.cs +++ b/src/Paramore.Brighter/WrapPipeline.cs @@ -94,7 +94,9 @@ public Message Wrap(TRequest request, RequestContext requestContext, Publication { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); if (publication.Topic is not null) + { message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; + } } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, false, _instrumentationOptions, true); diff --git a/src/Paramore.Brighter/WrapPipelineAsync.cs b/src/Paramore.Brighter/WrapPipelineAsync.cs index da7025bb6b..d5b43a84eb 100644 --- a/src/Paramore.Brighter/WrapPipelineAsync.cs +++ b/src/Paramore.Brighter/WrapPipelineAsync.cs @@ -101,7 +101,9 @@ public async Task WrapAsync(TRequest request, RequestContext requestCon { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); if (publication.Topic is not null) + { message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; + } } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, true, _instrumentationOptions, true); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index d753bf3429..75517569b3 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -97,8 +97,12 @@ await _mediator.ClearOutstandingFromOutboxAsync( useBulk: true, requestContext: context); - //allow background clear to run - await Task.Delay(500); + //poll until the background clear drains the outbox (bounded to avoid flake in CI) + var deadline = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(5); + while (_internalBus.Stream(_replyTopic).Count() < 2 && DateTimeOffset.UtcNow < deadline) + { + await Task.Delay(25); + } //assert - messages landed on the reply topic. If producer lookup had used //Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy From 839c4606c8d21f3fa51fe4b0b2fd49913ea6bd1f Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 15:54:51 +0100 Subject: [PATCH 04/16] refactor: strip producer-topic bag entry + address second review pass - strip ProducerTopicHeaderName from Header.Bag after lookup in Dispatch, DispatchAsync, and BulkDispatchAsync so transports that serialise the bag (AMQP headers, SNS/SQS attributes) don't leak internal topology - namespace bag key as "paramore.brighter.ProducerTopic" to eliminate collision risk with user-defined bag entries - rename async wrap-reply test to end with _Async per suite convention - extend reply post + bulk dispatch tests to pin the strip behaviour Co-Authored-By: Claude (claude-opus-4-7) --- Brighter.slnx | 5 +++++ src/Paramore.Brighter/Message.cs | 2 +- src/Paramore.Brighter/OutboxProducerMediator.cs | 17 +++++++++++++++++ ...hen_Bulk_Dispatching_Reply_Messages_Async.cs | 5 +++++ ..._A_Reply_Message_To_The_Command_Processor.cs | 4 ++++ ...ly_Message_To_The_Command_Processor_Async.cs | 4 ++++ ...When_Wrapping_A_Reply_Message_MapperAsync.cs | 2 +- 7 files changed, 37 insertions(+), 2 deletions(-) diff --git a/Brighter.slnx b/Brighter.slnx index d9e0dd4139..4b4a27cbb2 100644 --- a/Brighter.slnx +++ b/Brighter.slnx @@ -34,6 +34,11 @@ + + + + + diff --git a/src/Paramore.Brighter/Message.cs b/src/Paramore.Brighter/Message.cs index 6d18c360e8..b8e591cdd5 100644 --- a/src/Paramore.Brighter/Message.cs +++ b/src/Paramore.Brighter/Message.cs @@ -56,7 +56,7 @@ public class Message : IEquatable /// (e.g. for Reply messages whose topic is the reply address). /// Read by the outbox dispatcher so it can still locate the registered producer. /// - public const string ProducerTopicHeaderName = "ProducerTopic"; + public const string ProducerTopicHeaderName = "paramore.brighter.ProducerTopic"; /// /// Gets the header. diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 5764a31a68..c544e88c82 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -785,6 +785,17 @@ private static RoutingKey GetProducerLookupTopic(Message message) return message.Header.Topic; } + /// + /// Removes the internal bag entry so it is + /// not serialised onto the wire by transport adapters that include Header.Bag in + /// the message envelope (AMQP headers, SNS/SQS attributes, etc.). Call after producer + /// lookup and immediately before dispatch. + /// + private static void StripProducerLookupTopic(Message message) + { + message.Header.Bag.Remove(Message.ProducerTopicHeaderName); + } + private void Dispatch(IEnumerable posts, RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; @@ -800,6 +811,7 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); + StripProducerLookupTopic(message); var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, _instrumentationOptions); producer.Span = span; @@ -862,6 +874,10 @@ private async Task BulkDispatchAsync( // mapper overrode the topic), so firstMessage is representative of the batch. var firstMessage = topicBatch.First(); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage)); + foreach (var m in topicBatch) + { + StripProducerLookupTopic(m); + } var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); @@ -939,6 +955,7 @@ private async Task DispatchAsync( Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); + StripProducerLookupTopic(message); var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, _instrumentationOptions); producer.Span = span; diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index 75517569b3..57448e00cc 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -109,6 +109,11 @@ await _mediator.ClearOutstandingFromOutboxAsync( //would have thrown and nothing would arrive on the bus. var messages = _internalBus.Stream(_replyTopic).ToArray(); Assert.Equal(2, messages.Length); + + //assert - internal ProducerTopic bag entry was stripped on every message + //in the batch so transports that serialise Header.Bag don't leak it + Assert.All(messages, m => + Assert.False(m.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName))); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs index b8eb9abc73..8e89df510f 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs @@ -89,6 +89,10 @@ public void When_Posting_A_Reply_Message_To_The_Command_Processor() //assert - message topic is the reply address Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + + //assert - internal ProducerTopic bag entry was stripped before dispatch so + //transports that serialise Header.Bag don't leak it onto the wire + Assert.False(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs index f81995e40c..824ec9470c 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs @@ -90,6 +90,10 @@ public async Task When_Posting_A_Reply_Message_To_The_Command_Processor_Async() //assert - message topic is the reply address Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + + //assert - internal ProducerTopic bag entry was stripped before dispatch so + //transports that serialise Header.Bag don't leak it onto the wire + Assert.False(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs index a05e7243a6..8ea68fae62 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs @@ -37,7 +37,7 @@ public AsyncReplyMessageWrapRequestTests() } [Fact] - public async Task When_Wrapping_A_Reply_Message_Mapper() + public async Task When_Wrapping_A_Reply_Message_Mapper_Async() { //act var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); From 7c47c08c977118abac648727a8fd1b762bbb1a66 Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 15:59:53 +0100 Subject: [PATCH 05/16] refactor: extract strip helper overload to keep BulkDispatchAsync complexity flat Moves the per-message strip loop into an IEnumerable overload of StripProducerLookupTopic so the added foreach no longer counts against BulkDispatchAsync's cyclomatic complexity (CodeScene delta warning). Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/OutboxProducerMediator.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index c544e88c82..a1b87dadb6 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -796,6 +796,14 @@ private static void StripProducerLookupTopic(Message message) message.Header.Bag.Remove(Message.ProducerTopicHeaderName); } + private static void StripProducerLookupTopic(IEnumerable messages) + { + foreach (var m in messages) + { + StripProducerLookupTopic(m); + } + } + private void Dispatch(IEnumerable posts, RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; @@ -874,10 +882,7 @@ private async Task BulkDispatchAsync( // mapper overrode the topic), so firstMessage is representative of the batch. var firstMessage = topicBatch.First(); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage)); - foreach (var m in topicBatch) - { - StripProducerLookupTopic(m); - } + StripProducerLookupTopic(topicBatch); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); From ae70780bf1947908086e0d71fe48653bdb138765 Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 19:33:22 +0100 Subject: [PATCH 06/16] =?UTF-8?q?refactor:=20tighten=20review=20feedback?= =?UTF-8?q?=20=E2=80=94=20batch=20invariant=20assert,=20doc=20trims,=20tes?= =?UTF-8?q?t=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Debug.Assert in BulkDispatchAsync that every message in a topicBatch resolves to the same producer-lookup topic (makes the firstMessage assumption explicit and cheaply catchable) - drop the poll loop in the bulk dispatch test; ClearOutstandingFromOutboxAsync awaits the dispatch so it completes before return - shorten StripProducerLookupTopic xml doc to a one-line comment - add a brief note in GetProducerLookupTopic explaining the null-topic fallback Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/OutboxProducerMediator.cs | 16 +++++++++------- ...When_Bulk_Dispatching_Reply_Messages_Async.cs | 14 +++++--------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index a1b87dadb6..c7f9cf6c04 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -776,6 +776,8 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducerSync producer, R private static RoutingKey GetProducerLookupTopic(Message message) { + // Falls back to Header.Topic when the bag entry is absent — reproducing the + // pre-fix behaviour, so publications with a null Topic remain a lookup failure. if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic) && producerTopic is string topic) { @@ -785,12 +787,8 @@ private static RoutingKey GetProducerLookupTopic(Message message) return message.Header.Topic; } - /// - /// Removes the internal bag entry so it is - /// not serialised onto the wire by transport adapters that include Header.Bag in - /// the message envelope (AMQP headers, SNS/SQS attributes, etc.). Call after producer - /// lookup and immediately before dispatch. - /// + // Strip the internal ProducerTopic bag entry so transports that serialise Header.Bag + // (AMQP, SNS/SQS) don't leak it on the wire. Call after lookup, before dispatch. private static void StripProducerLookupTopic(Message message) { message.Header.Bag.Remove(Message.ProducerTopicHeaderName); @@ -881,7 +879,11 @@ private async Task BulkDispatchAsync( // share the same ProducerTopic bag entry (set by WrapPipelineAsync when the // mapper overrode the topic), so firstMessage is representative of the batch. var firstMessage = topicBatch.First(); - var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage)); + var producerLookupTopic = GetProducerLookupTopic(firstMessage); + Debug.Assert( + topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic), + "all messages in a topicBatch must share the same producer-lookup topic"); + var producer = _producerRegistry.LookupBy(producerLookupTopic); StripProducerLookupTopic(topicBatch); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index 57448e00cc..9589c1e0ea 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -90,25 +90,21 @@ public async Task When_Bulk_Dispatching_Reply_Messages_Async() var context = new RequestContext(); await _commandProcessor.DepositPostAsync([_replyOne, _replyTwo], context); - //act - drain via the bulk path (exercised by background outbox sweeps) + //act - drain via the bulk path (exercised by background outbox sweeps). + //ClearOutstandingFromOutboxAsync awaits BackgroundDispatchUsingAsync which + //awaits BulkDispatchAsync, so dispatch is complete when this returns. await _mediator.ClearOutstandingFromOutboxAsync( amountToClear: 10, minimumAge: TimeSpan.Zero, useBulk: true, requestContext: context); - //poll until the background clear drains the outbox (bounded to avoid flake in CI) - var deadline = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(5); - while (_internalBus.Stream(_replyTopic).Count() < 2 && DateTimeOffset.UtcNow < deadline) - { - await Task.Delay(25); - } - //assert - messages landed on the reply topic. If producer lookup had used //Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy //would have thrown and nothing would arrive on the bus. var messages = _internalBus.Stream(_replyTopic).ToArray(); - Assert.Equal(2, messages.Length); + Assert.True(messages.Length == 2, + $"expected 2 reply messages on the bus after bulk dispatch, got {messages.Length} — bulk dispatch or producer lookup failed"); //assert - internal ProducerTopic bag entry was stripped on every message //in the batch so transports that serialise Header.Bag don't leak it From 8176c8839a4ed378c9756d89535980312343fefa Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Tue, 21 Apr 2026 19:44:45 +0100 Subject: [PATCH 07/16] fix: remove uncommitted sample projects from Brighter.slnx The slnx was accidentally swept into an earlier commit with references to samples/TaskQueue/ASBRequestReply/ projects that were never committed to the branch, breaking CI build. Co-Authored-By: Claude (claude-opus-4-7) --- Brighter.slnx | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Brighter.slnx b/Brighter.slnx index 4b4a27cbb2..d9e0dd4139 100644 --- a/Brighter.slnx +++ b/Brighter.slnx @@ -34,11 +34,6 @@ - - - - - From 06a7a6ba265b36a1fd1a7d052b40aeb8c48e67b0 Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Wed, 22 Apr 2026 08:13:44 +0100 Subject: [PATCH 08/16] refactor: group bulk dispatch by (wire, producer) topic pair MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Debug.Assert is stripped in Release, so it could not enforce the single- producer invariant at runtime. Group by the composite key instead — this guarantees every batch resolves to one producer without any assertion. Addresses Copilot review feedback on PR #4070. Co-Authored-By: Claude (claude-opus-4-7) --- .../OutboxProducerMediator.cs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index c7f9cf6c04..37902a9710 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -871,19 +871,14 @@ private async Task BulkDispatchAsync( try { if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); - var messagesByTopic = posts.GroupBy(m => m.Header.Topic); + // Group by (wire topic, producer-lookup topic) so a batch is guaranteed to + // resolve to a single producer — messages with the same wire topic but + // different ProducerTopic bag values land in separate batches. + var messagesByTopic = posts.GroupBy(m => (WireTopic: m.Header.Topic, LookupTopic: GetProducerLookupTopic(m))); foreach (var topicBatch in messagesByTopic) { - // Messages in this batch share Header.Topic (the group key). They therefore - // share the same ProducerTopic bag entry (set by WrapPipelineAsync when the - // mapper overrode the topic), so firstMessage is representative of the batch. - var firstMessage = topicBatch.First(); - var producerLookupTopic = GetProducerLookupTopic(firstMessage); - Debug.Assert( - topicBatch.All(m => GetProducerLookupTopic(m) == producerLookupTopic), - "all messages in a topicBatch must share the same producer-lookup topic"); - var producer = _producerRegistry.LookupBy(producerLookupTopic); + var producer = _producerRegistry.LookupBy(topicBatch.Key.LookupTopic); StripProducerLookupTopic(topicBatch); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); @@ -891,14 +886,14 @@ private async Task BulkDispatchAsync( if (span is not null) { producer.Span = span; - producerSpans.TryAdd(topicBatch.Key, span); + producerSpans.TryAdd(topicBatch.Key.WireTopic, span); } if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) { var messages = topicBatch.ToArray(); - Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key); + Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) { From 20c1c3610626b4110aa5f69d7853cafcd45a6a5f Mon Sep 17 00:00:00 2001 From: Jonny Olliff-Lee Date: Wed, 22 Apr 2026 08:48:29 +0100 Subject: [PATCH 09/16] =?UTF-8?q?refactor:=20address=20claude-bot=20review?= =?UTF-8?q?=20=E2=80=94=20stale=20comment,=20span=20key=20collision,=20out?= =?UTF-8?q?box=20note?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - rewrite the stale class-level comment in When_Bulk_Dispatching_Reply_Messages_Async to match the current composite-key grouping - disambiguate producerSpans key with the composite (WireTopic, LookupTopic) so two batches sharing a WireTopic but differing in LookupTopic can both end their spans cleanly - document on StripProducerLookupTopic that the mutation is harmless for DB-backed outboxes but affects InMemoryOutbox retries (primarily dev/test scope) Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/OutboxProducerMediator.cs | 9 ++++++++- .../Post/When_Bulk_Dispatching_Reply_Messages_Async.cs | 9 ++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 37902a9710..fe0c69cf56 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -789,6 +789,10 @@ private static RoutingKey GetProducerLookupTopic(Message message) // Strip the internal ProducerTopic bag entry so transports that serialise Header.Bag // (AMQP, SNS/SQS) don't leak it on the wire. Call after lookup, before dispatch. + // Persistent outboxes (SQL, Mongo, Dynamo) re-hydrate a fresh object per drain so + // this mutation is harmless; InMemoryOutbox stores the reference, so a dispatch that + // fails after strip and then retries via the outbox will fall back to Header.Topic + // and miss the producer — acceptable since InMemoryOutbox is primarily dev/test. private static void StripProducerLookupTopic(Message message) { message.Header.Bag.Remove(Message.ProducerTopicHeaderName); @@ -886,7 +890,10 @@ private async Task BulkDispatchAsync( if (span is not null) { producer.Span = span; - producerSpans.TryAdd(topicBatch.Key.WireTopic, span); + // Compose the dictionary key from both grouping keys so two + // batches that share a WireTopic but differ by LookupTopic + // don't collide (their spans must both be ended later). + producerSpans.TryAdd($"{topicBatch.Key.WireTopic}|{topicBatch.Key.LookupTopic}", span); } if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index 9589c1e0ea..22decd9862 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -12,11 +12,10 @@ namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post { - // Gap: BulkDispatchAsync groups by Header.Topic (the mapper-set reply topic) but - // looks up the producer using the first message's bag. The single-message Dispatch - // paths are covered; this pins the bulk-outbox-clear path so a future refactor of - // GetProducerLookupTopic or the firstMessage assumption cannot silently regress and - // fail to locate the producer when draining the outbox via the bulk sweep. + // Pins the bulk-outbox-clear path: reply messages drained via BulkDispatchAsync + // are grouped by (WireTopic, LookupTopic) and dispatched to the producer resolved + // from LookupTopic. Prevents regression to a Header.Topic-only lookup that would + // fail to locate the producer for reply messages during outbox sweeps. public class CommandProcessorBulkDispatchReplyAsyncTests { private const string ProducerTopic = "Reply"; From da71167466f16f5ed175f5957e872f6beb63f1dc Mon Sep 17 00:00:00 2001 From: DevJonny Date: Fri, 24 Apr 2026 21:17:54 +0100 Subject: [PATCH 10/16] fix: restore producer topic on dispatch failure and use collision-free span key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses copilot review feedback on #4070: - StripProducerLookupTopic now returns the prior bag value so each dispatch path restores it in a finally block when dispatch did not succeed. InMemoryOutbox stores the Message by reference, so without this a post-strip send failure would leave the outbox entry missing ProducerTopic and retry would fall back to Header.Topic. - BulkDispatchAsync's producerSpans key is now Guid.NewGuid().ToString() rather than "{WireTopic}|{LookupTopic}" — routing keys may legally contain '|', which could collide and drop spans. Co-Authored-By: Claude (claude-opus-4-7) --- .../OutboxProducerMediator.cs | 253 +++++++++++------- 1 file changed, 152 insertions(+), 101 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index fe0c69cf56..e19882d0d1 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -788,21 +788,37 @@ private static RoutingKey GetProducerLookupTopic(Message message) } // Strip the internal ProducerTopic bag entry so transports that serialise Header.Bag - // (AMQP, SNS/SQS) don't leak it on the wire. Call after lookup, before dispatch. - // Persistent outboxes (SQL, Mongo, Dynamo) re-hydrate a fresh object per drain so - // this mutation is harmless; InMemoryOutbox stores the reference, so a dispatch that - // fails after strip and then retries via the outbox will fall back to Header.Topic - // and miss the producer — acceptable since InMemoryOutbox is primarily dev/test. - private static void StripProducerLookupTopic(Message message) + // (AMQP, SNS/SQS) don't leak it on the wire. Returns the prior bag value (or null) + // so callers can restore it if dispatch fails — important for InMemoryOutbox, which + // stores the message by reference and would otherwise lose the producer hint on retry. + private static object? StripProducerLookupTopic(Message message) { + message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var saved); message.Header.Bag.Remove(Message.ProducerTopicHeaderName); + return saved; } - private static void StripProducerLookupTopic(IEnumerable messages) + private static void RestoreProducerLookupTopic(Message message, object? saved) { + if (saved is not null) + message.Header.Bag[Message.ProducerTopicHeaderName] = saved; + } + + private static List<(Message Message, object? Saved)> StripProducerLookupTopic(IEnumerable messages) + { + var saved = new List<(Message, object?)>(); foreach (var m in messages) { - StripProducerLookupTopic(m); + saved.Add((m, StripProducerLookupTopic(m))); + } + return saved; + } + + private static void RestoreProducerLookupTopic(IEnumerable<(Message Message, object? Saved)> saved) + { + foreach (var (message, value) in saved) + { + RestoreProducerLookupTopic(message, value); } } @@ -821,39 +837,51 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); - StripProducerLookupTopic(message); - var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, - _instrumentationOptions); - producer.Span = span; - if (span != null) producerSpans.TryAdd(message.Id, span); - - if (producer is IAmAMessageProducerSync producerSync) + var savedProducerTopic = StripProducerLookupTopic(message); + var dispatched = false; + try { - if (producer is ISupportPublishConfirmation) - { - //mark dispatch handled by a callback - set in constructor - ExecuteWithResiliencePipeline( - () => { producerSync.Send(message); }, - requestContext); - } - else + var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, + _instrumentationOptions); + producer.Span = span; + if (span != null) producerSpans.TryAdd(message.Id, span); + + if (producer is IAmAMessageProducerSync producerSync) { - var sent = ExecuteWithResiliencePipeline( - () => { producerSync.Send(message); }, - requestContext - ); - if (sent) + if (producer is ISupportPublishConfirmation) + { + //mark dispatch handled by a callback - set in constructor ExecuteWithResiliencePipeline( - () => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args), + () => { producerSync.Send(message); }, + requestContext); + dispatched = true; + } + else + { + var sent = ExecuteWithResiliencePipeline( + () => { producerSync.Send(message); }, requestContext ); + if (sent) + { + ExecuteWithResiliencePipeline( + () => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args), + requestContext + ); + dispatched = true; + } + } } - } - else - throw new InvalidOperationException("No sync message producer defined."); + else + throw new InvalidOperationException("No sync message producer defined."); - Activity.Current = parentSpan; - producer.Span = null; + Activity.Current = parentSpan; + producer.Span = null; + } + finally + { + if (!dispatched) RestoreProducerLookupTopic(message, savedProducerTopic); + } } } finally @@ -883,57 +911,70 @@ private async Task BulkDispatchAsync( foreach (var topicBatch in messagesByTopic) { var producer = _producerRegistry.LookupBy(topicBatch.Key.LookupTopic); - StripProducerLookupTopic(topicBatch); - var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, - _instrumentationOptions); - - if (span is not null) - { - producer.Span = span; - // Compose the dictionary key from both grouping keys so two - // batches that share a WireTopic but differ by LookupTopic - // don't collide (their spans must both be ended later). - producerSpans.TryAdd($"{topicBatch.Key.WireTopic}|{topicBatch.Key.LookupTopic}", span); - } - - if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) + var savedBatchTopics = StripProducerLookupTopic(topicBatch); + var batchDispatched = false; + try { - var messages = topicBatch.ToArray(); + var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, + _instrumentationOptions); - Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); + if (span is not null) + { + producer.Span = span; + // Key is only used for uniqueness until EndSpans runs; a Uuid avoids + // any risk of collision from composing topic strings. + producerSpans.TryAdd(Uuid.NewAsString(), span); + } - foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) + if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) { - var sent = await ExecuteWithResiliencePipelineAsync( - async _ => await bulkMessageProducer.SendAsync(batch, cancellationToken) - .ConfigureAwait(continueOnCapturedContext), - requestContext, - continueOnCapturedContext, - cancellationToken - ) - .ConfigureAwait(continueOnCapturedContext); + var messages = topicBatch.ToArray(); - if (producer is not ISupportPublishConfirmation && sent) + Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); + + var allSent = true; + foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) { - foreach (var successfulMessage in batch.Ids()) - { - await ExecuteWithResiliencePipelineAsync(async _ => - await _asyncOutbox.MarkDispatchedAsync( - successfulMessage, requestContext, _timeProvider.GetUtcNow(), - cancellationToken: cancellationToken - ), + var sent = await ExecuteWithResiliencePipelineAsync( + async _ => await bulkMessageProducer.SendAsync(batch, cancellationToken) + .ConfigureAwait(continueOnCapturedContext), requestContext, - cancellationToken: cancellationToken - ); + continueOnCapturedContext, + cancellationToken + ) + .ConfigureAwait(continueOnCapturedContext); + + if (producer is not ISupportPublishConfirmation && sent) + { + foreach (var successfulMessage in batch.Ids()) + { + await ExecuteWithResiliencePipelineAsync(async _ => + await _asyncOutbox.MarkDispatchedAsync( + successfulMessage, requestContext, _timeProvider.GetUtcNow(), + cancellationToken: cancellationToken + ), + requestContext, + cancellationToken: cancellationToken + ); + } } - } - if (!sent) TripTopic(batch.RoutingKey); + if (!sent) + { + allSent = false; + TripTopic(batch.RoutingKey); + } + } + batchDispatched = allSent; + } + else + { + throw new InvalidOperationException("No async bulk message producer defined."); } } - else + finally { - throw new InvalidOperationException("No async bulk message producer defined."); + if (!batchDispatched) RestoreProducerLookupTopic(savedBatchTopics); } } } @@ -964,40 +1005,50 @@ private async Task DispatchAsync( Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); - StripProducerLookupTopic(message); - var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, - _instrumentationOptions); - producer.Span = span; - if (span != null) producerSpans.TryAdd(message.Id, span); - - if (producer is IAmAMessageProducerAsync producerAsync) + var savedProducerTopic = StripProducerLookupTopic(message); + var dispatched = false; + try { - var sent = await ExecuteWithResiliencePipelineAsync( - async _ => await producerAsync.SendAsync(message, cancellationToken) - .ConfigureAwait(continueOnCapturedContext), - requestContext, - continueOnCapturedContext, - cancellationToken - ) - .ConfigureAwait(continueOnCapturedContext); + var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, + _instrumentationOptions); + producer.Span = span; + if (span != null) producerSpans.TryAdd(message.Id, span); - if (producer is not ISupportPublishConfirmation && sent) + if (producer is IAmAMessageProducerAsync producerAsync) { - await ExecuteWithResiliencePipelineAsync( - async _ => await _asyncOutbox.MarkDispatchedAsync( - message.Id, requestContext, _timeProvider.GetUtcNow(), + var sent = await ExecuteWithResiliencePipelineAsync( + async _ => await producerAsync.SendAsync(message, cancellationToken) + .ConfigureAwait(continueOnCapturedContext), + requestContext, + continueOnCapturedContext, + cancellationToken + ) + .ConfigureAwait(continueOnCapturedContext); + + if (producer is not ISupportPublishConfirmation && sent) + { + await ExecuteWithResiliencePipelineAsync( + async _ => await _asyncOutbox.MarkDispatchedAsync( + message.Id, requestContext, _timeProvider.GetUtcNow(), + cancellationToken: cancellationToken + ), + requestContext, cancellationToken: cancellationToken - ), - requestContext, - cancellationToken: cancellationToken - ); - } + ); + } + + dispatched = sent || producer is ISupportPublishConfirmation; + + if(!sent) TripTopic(message.Header.Topic); - if(!sent) TripTopic(message.Header.Topic); - + } + else + throw new InvalidOperationException("No async message producer defined."); + } + finally + { + if (!dispatched) RestoreProducerLookupTopic(message, savedProducerTopic); } - else - throw new InvalidOperationException("No async message producer defined."); } } finally From 9bfd39871f13cfa8f5d5158b0df883335afb700a Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 19:38:31 +0100 Subject: [PATCH 11/16] docs: clarify GetProducerLookupTopic comment for reply vs publication paths Per review feedback: the original comment focused on the fallback being "pre-fix behaviour," which obscured *why* the bag entry exists. Rewrite to call out the Reply path (mapper rewrites Header.Topic to a dynamic reply address) and the normal-publication path (no bag entry, falls back to Header.Topic). Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/OutboxProducerMediator.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index e19882d0d1..b5fc760bc3 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -776,8 +776,11 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducerSync producer, R private static RoutingKey GetProducerLookupTopic(Message message) { - // Falls back to Header.Topic when the bag entry is absent — reproducing the - // pre-fix behaviour, so publications with a null Topic remain a lookup failure. + // Reply messages set the ProducerTopic bag entry so the dispatcher can locate + // the registered producer even though Header.Topic has been rewritten to the + // dynamic reply address. Normal publications don't carry the bag entry, so we + // fall back to Header.Topic — and a null/empty Header.Topic remains a lookup + // failure, matching pre-fix behaviour. if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic) && producerTopic is string topic) { From 66fc585e06ce800eb454e4c8bb7c344b7837d11c Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 19:40:09 +0100 Subject: [PATCH 12/16] refactor: move producer-topic stripping from mediator to transports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback (iancooper): the OutboxProducerMediator was stripping the ProducerTopic bag entry around dispatch and restoring it on failure. That mutated the message reference held by InMemoryOutbox, so on retry the producer hint was gone. Move stripping responsibility to the transport's wire-conversion step: - MessageHeader.LocalHeaderNames: static set of bag keys that are internal to Brighter and must not be serialised onto the wire. Pre-populated with Message.ProducerTopicHeaderName; extensible from downstream code. - MessageHeader.StripLocalHeaders(): instance method removing those keys from the bag (provided as a transport convenience). - AzureServiceBusMessagePublisher: skip LocalHeaderNames when copying Header.Bag into ApplicationProperties. - OutboxProducerMediator: drop StripProducerLookupTopic / RestoreProducerLookupTopic and the dispatched/batchDispatched book- keeping that existed only to drive the restore. The bag entry now survives a successful dispatch — InMemoryOutbox-by-reference keeps the producer hint for retries. Tests updated: reply-message Post tests now assert the bag entry survives dispatch (it's the transport's job to omit it on the wire). New unit tests cover MessageHeader.StripLocalHeaders and the ASB publisher's local-header skip. Note: only the ASB transport has been migrated. Other transports that serialise Header.Bag (RMQ, SNS/SQS, Kafka, …) will now leak the paramore.brighter.ProducerTopic entry on the wire — benign for receivers but a behaviour change. Follow-up work to apply the same pattern to those transports is documented on LocalHeaderNames. Co-Authored-By: Claude (claude-opus-4-7) --- .../AzureServiceBusMessagePublisher.cs | 4 +- src/Paramore.Brighter/MessageHeader.cs | 28 +- .../OutboxProducerMediator.cs | 250 +++++++----------- ...Converting_A_Message_With_Local_Headers.cs | 32 +++ ...n_Bulk_Dispatching_Reply_Messages_Async.cs | 7 +- ..._Reply_Message_To_The_Command_Processor.cs | 7 +- ..._Message_To_The_Command_Processor_Async.cs | 7 +- ...ing_Local_Headers_From_A_Message_Header.cs | 43 +++ 8 files changed, 207 insertions(+), 171 deletions(-) create mode 100644 tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs index 7014952014..9705a59465 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs @@ -62,7 +62,9 @@ private static void AddBrighterHeaders(Message message, ServiceBusMessage azureS if (message.Header.Bag.TryGetValue(ASBConstants.SessionIdKey, out object? value)) azureServiceBusMessage.SessionId = value.ToString(); - foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) + foreach (var header in message.Header.Bag.Where(h => + !ASBConstants.ReservedHeaders.Contains(h.Key) + && !MessageHeader.LocalHeaderNames.Contains(h.Key))) { azureServiceBusMessage.ApplicationProperties[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter/MessageHeader.cs b/src/Paramore.Brighter/MessageHeader.cs index b9ba0aca4c..d3c31fe3f8 100644 --- a/src/Paramore.Brighter/MessageHeader.cs +++ b/src/Paramore.Brighter/MessageHeader.cs @@ -87,7 +87,33 @@ public class MessageHeader : IEquatable /// The default Brighter source /// public const string DefaultSource = "http://goparamore.io"; - + + /// + /// Bag keys that are internal to Brighter — set by the framework so a downstream + /// component (e.g. the outbox dispatcher) can read them, but not intended to travel + /// over the wire. Transports that copy into their wire format must + /// skip these keys (or call on a copy of the header). + /// + /// + /// Pre-populated with . Add to this set + /// from extension code if you introduce additional local-only bag keys. + /// + public static readonly HashSet LocalHeaderNames = new(StringComparer.Ordinal) + { + Message.ProducerTopicHeaderName + }; + + /// + /// Removes every entry from . + /// Call on a wire-bound copy of the header to ensure local-only bag entries + /// are not serialised onto the transport. + /// + public void StripLocalHeaders() + { + foreach (var name in LocalHeaderNames) + Bag.Remove(name); + } + /// /// A property bag that can be used for extended header attributes. /// Use camelCase for the key names if you intend to read it yourself, as when converted to and from Json serializers will tend convert the property diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index b5fc760bc3..9bb9aea95d 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -790,41 +790,6 @@ private static RoutingKey GetProducerLookupTopic(Message message) return message.Header.Topic; } - // Strip the internal ProducerTopic bag entry so transports that serialise Header.Bag - // (AMQP, SNS/SQS) don't leak it on the wire. Returns the prior bag value (or null) - // so callers can restore it if dispatch fails — important for InMemoryOutbox, which - // stores the message by reference and would otherwise lose the producer hint on retry. - private static object? StripProducerLookupTopic(Message message) - { - message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var saved); - message.Header.Bag.Remove(Message.ProducerTopicHeaderName); - return saved; - } - - private static void RestoreProducerLookupTopic(Message message, object? saved) - { - if (saved is not null) - message.Header.Bag[Message.ProducerTopicHeaderName] = saved; - } - - private static List<(Message Message, object? Saved)> StripProducerLookupTopic(IEnumerable messages) - { - var saved = new List<(Message, object?)>(); - foreach (var m in messages) - { - saved.Add((m, StripProducerLookupTopic(m))); - } - return saved; - } - - private static void RestoreProducerLookupTopic(IEnumerable<(Message Message, object? Saved)> saved) - { - foreach (var (message, value) in saved) - { - RestoreProducerLookupTopic(message, value); - } - } - private void Dispatch(IEnumerable posts, RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; @@ -840,51 +805,40 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); - var savedProducerTopic = StripProducerLookupTopic(message); - var dispatched = false; - try - { - var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, - _instrumentationOptions); - producer.Span = span; - if (span != null) producerSpans.TryAdd(message.Id, span); + var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, + _instrumentationOptions); + producer.Span = span; + if (span != null) producerSpans.TryAdd(message.Id, span); - if (producer is IAmAMessageProducerSync producerSync) + if (producer is IAmAMessageProducerSync producerSync) + { + if (producer is ISupportPublishConfirmation) + { + //mark dispatch handled by a callback - set in constructor + ExecuteWithResiliencePipeline( + () => { producerSync.Send(message); }, + requestContext); + } + else { - if (producer is ISupportPublishConfirmation) + var sent = ExecuteWithResiliencePipeline( + () => { producerSync.Send(message); }, + requestContext + ); + if (sent) { - //mark dispatch handled by a callback - set in constructor ExecuteWithResiliencePipeline( - () => { producerSync.Send(message); }, - requestContext); - dispatched = true; - } - else - { - var sent = ExecuteWithResiliencePipeline( - () => { producerSync.Send(message); }, + () => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args), requestContext ); - if (sent) - { - ExecuteWithResiliencePipeline( - () => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args), - requestContext - ); - dispatched = true; - } } } - else - throw new InvalidOperationException("No sync message producer defined."); - - Activity.Current = parentSpan; - producer.Span = null; - } - finally - { - if (!dispatched) RestoreProducerLookupTopic(message, savedProducerTopic); } + else + throw new InvalidOperationException("No sync message producer defined."); + + Activity.Current = parentSpan; + producer.Span = null; } } finally @@ -914,70 +868,58 @@ private async Task BulkDispatchAsync( foreach (var topicBatch in messagesByTopic) { var producer = _producerRegistry.LookupBy(topicBatch.Key.LookupTopic); - var savedBatchTopics = StripProducerLookupTopic(topicBatch); - var batchDispatched = false; - try + var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, + _instrumentationOptions); + + if (span is not null) { - var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, - _instrumentationOptions); + producer.Span = span; + // Key is only used for uniqueness until EndSpans runs; a Uuid avoids + // any risk of collision from composing topic strings. + producerSpans.TryAdd(Uuid.NewAsString(), span); + } - if (span is not null) - { - producer.Span = span; - // Key is only used for uniqueness until EndSpans runs; a Uuid avoids - // any risk of collision from composing topic strings. - producerSpans.TryAdd(Uuid.NewAsString(), span); - } + if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) + { + var messages = topicBatch.ToArray(); - if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) - { - var messages = topicBatch.ToArray(); + Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); - Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); + foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) + { + var sent = await ExecuteWithResiliencePipelineAsync( + async _ => await bulkMessageProducer.SendAsync(batch, cancellationToken) + .ConfigureAwait(continueOnCapturedContext), + requestContext, + continueOnCapturedContext, + cancellationToken + ) + .ConfigureAwait(continueOnCapturedContext); - var allSent = true; - foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) + if (producer is not ISupportPublishConfirmation && sent) { - var sent = await ExecuteWithResiliencePipelineAsync( - async _ => await bulkMessageProducer.SendAsync(batch, cancellationToken) - .ConfigureAwait(continueOnCapturedContext), - requestContext, - continueOnCapturedContext, - cancellationToken - ) - .ConfigureAwait(continueOnCapturedContext); - - if (producer is not ISupportPublishConfirmation && sent) + foreach (var successfulMessage in batch.Ids()) { - foreach (var successfulMessage in batch.Ids()) - { - await ExecuteWithResiliencePipelineAsync(async _ => - await _asyncOutbox.MarkDispatchedAsync( - successfulMessage, requestContext, _timeProvider.GetUtcNow(), - cancellationToken: cancellationToken - ), - requestContext, - cancellationToken: cancellationToken - ); - } + await ExecuteWithResiliencePipelineAsync(async _ => + await _asyncOutbox.MarkDispatchedAsync( + successfulMessage, requestContext, _timeProvider.GetUtcNow(), + cancellationToken: cancellationToken + ), + requestContext, + cancellationToken: cancellationToken + ); } + } - if (!sent) - { - allSent = false; - TripTopic(batch.RoutingKey); - } + if (!sent) + { + TripTopic(batch.RoutingKey); } - batchDispatched = allSent; - } - else - { - throw new InvalidOperationException("No async bulk message producer defined."); } } - finally + else { - if (!batchDispatched) RestoreProducerLookupTopic(savedBatchTopics); + throw new InvalidOperationException("No async bulk message producer defined."); } } } @@ -1008,50 +950,38 @@ private async Task DispatchAsync( Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); - var savedProducerTopic = StripProducerLookupTopic(message); - var dispatched = false; - try + var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, + _instrumentationOptions); + producer.Span = span; + if (span != null) producerSpans.TryAdd(message.Id, span); + + if (producer is IAmAMessageProducerAsync producerAsync) { - var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, - _instrumentationOptions); - producer.Span = span; - if (span != null) producerSpans.TryAdd(message.Id, span); + var sent = await ExecuteWithResiliencePipelineAsync( + async _ => await producerAsync.SendAsync(message, cancellationToken) + .ConfigureAwait(continueOnCapturedContext), + requestContext, + continueOnCapturedContext, + cancellationToken + ) + .ConfigureAwait(continueOnCapturedContext); - if (producer is IAmAMessageProducerAsync producerAsync) + if (producer is not ISupportPublishConfirmation && sent) { - var sent = await ExecuteWithResiliencePipelineAsync( - async _ => await producerAsync.SendAsync(message, cancellationToken) - .ConfigureAwait(continueOnCapturedContext), - requestContext, - continueOnCapturedContext, - cancellationToken - ) - .ConfigureAwait(continueOnCapturedContext); - - if (producer is not ISupportPublishConfirmation && sent) - { - await ExecuteWithResiliencePipelineAsync( - async _ => await _asyncOutbox.MarkDispatchedAsync( - message.Id, requestContext, _timeProvider.GetUtcNow(), - cancellationToken: cancellationToken - ), - requestContext, + await ExecuteWithResiliencePipelineAsync( + async _ => await _asyncOutbox.MarkDispatchedAsync( + message.Id, requestContext, _timeProvider.GetUtcNow(), cancellationToken: cancellationToken - ); - } - - dispatched = sent || producer is ISupportPublishConfirmation; - - if(!sent) TripTopic(message.Header.Topic); - + ), + requestContext, + cancellationToken: cancellationToken + ); } - else - throw new InvalidOperationException("No async message producer defined."); - } - finally - { - if (!dispatched) RestoreProducerLookupTopic(message, savedProducerTopic); + + if(!sent) TripTopic(message.Header.Topic); } + else + throw new InvalidOperationException("No async message producer defined."); } } finally diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs new file mode 100644 index 0000000000..1f48af50de --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs @@ -0,0 +1,32 @@ +using System; +using Paramore.Brighter.MessagingGateway.AzureServiceBus; +using Xunit; + +namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway; + +[Trait("Category", "ASB")] +[Trait("Fragile", "CI")] +public class AzureServiceBusMessagePublisherLocalHeaderTests +{ + [Fact] + public void When_Converting_A_Message_The_ProducerTopic_Local_Header_Is_Stripped() + { + var header = new MessageHeader( + messageId: Guid.NewGuid().ToString(), + topic: new RoutingKey("reply.address"), + messageType: MessageType.MT_COMMAND); + header.Bag[Message.ProducerTopicHeaderName] = "the.real.producer.topic"; + header.Bag["customer.header"] = "should.survive"; + + var message = new Message(header, new MessageBody("body")); + + var asbMessage = AzureServiceBusMessagePublisher.ConvertToServiceBusMessage(message); + + // local header is stripped from the wire form... + Assert.False(asbMessage.ApplicationProperties.ContainsKey(Message.ProducerTopicHeaderName)); + // ...but the original message keeps it (so InMemoryOutbox-by-reference retries still work) + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + // unrelated bag entries still travel on the wire + Assert.True(asbMessage.ApplicationProperties.ContainsKey("customer.header")); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index 22decd9862..4e69c72c57 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -105,10 +105,11 @@ await _mediator.ClearOutstandingFromOutboxAsync( Assert.True(messages.Length == 2, $"expected 2 reply messages on the bus after bulk dispatch, got {messages.Length} — bulk dispatch or producer lookup failed"); - //assert - internal ProducerTopic bag entry was stripped on every message - //in the batch so transports that serialise Header.Bag don't leak it + //assert - the ProducerTopic bag entry survives bulk dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). Assert.All(messages, m => - Assert.False(m.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName))); + Assert.True(m.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName))); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs index 8e89df510f..f8ca0378ac 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs @@ -90,9 +90,10 @@ public void When_Posting_A_Reply_Message_To_The_Command_Processor() //assert - message topic is the reply address Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); - //assert - internal ProducerTopic bag entry was stripped before dispatch so - //transports that serialise Header.Bag don't leak it onto the wire - Assert.False(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). + Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs index 824ec9470c..2aa0383274 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs @@ -91,9 +91,10 @@ public async Task When_Posting_A_Reply_Message_To_The_Command_Processor_Async() //assert - message topic is the reply address Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); - //assert - internal ProducerTopic bag entry was stripped before dispatch so - //transports that serialise Header.Bag don't leak it onto the wire - Assert.False(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). + Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs new file mode 100644 index 0000000000..6bd19d81cf --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs @@ -0,0 +1,43 @@ +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class MessageHeaderStripLocalHeadersTests +{ + [Fact] + public void When_Stripping_Local_Headers_The_ProducerTopic_Bag_Entry_Is_Removed_And_Other_Entries_Survive() + { + var header = new MessageHeader( + messageId: "id-1", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_COMMAND); + header.Bag[Message.ProducerTopicHeaderName] = "lookup.topic"; + header.Bag["user.key"] = "user.value"; + + header.StripLocalHeaders(); + + Assert.False(header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.True(header.Bag.ContainsKey("user.key")); + } + + [Fact] + public void When_Stripping_Local_Headers_With_No_Local_Bag_Entries_Then_The_Bag_Is_Unchanged() + { + var header = new MessageHeader( + messageId: "id-2", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_EVENT); + header.Bag["user.key"] = "user.value"; + + header.StripLocalHeaders(); + + Assert.Single(header.Bag); + Assert.Equal("user.value", header.Bag["user.key"]); + } + + [Fact] + public void When_The_Producer_Topic_Header_Name_Is_In_The_Local_Header_Names_Set() + { + Assert.Contains(Message.ProducerTopicHeaderName, MessageHeader.LocalHeaderNames); + } +} From ed04c52050b594a7fbfa95e515a2b6fa0a12d09a Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 19:52:04 +0100 Subject: [PATCH 13/16] refactor: skip MessageHeader.LocalHeaderNames in remaining transports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ASB stripped the ProducerTopic local header on the wire already (prior commit). Apply the same pattern to every other transport that copies Header.Bag onto its wire format so paramore.brighter.ProducerTopic doesn't leak. Inline-filter transports (existing skip-set extended with LocalHeaderNames): - RMQ.Async / RMQ.Sync RmqMessagePublisher - Kafka KafkaDefaultMessageHeaderBuilder - GcpPubSub Parser - RocketMQ RocketMqMessageProducer - MessageScheduler.Azure AzureServiceBusScheduler Whole-bag-as-JSON transports (use new MessageHeader.BagWithoutLocalHeaders): - AWSSQS V3 + V4 SnsMessagePublisher - AWSSQS V3 + V4 SqsMessageSender - Redis RedisMessagePublisher MessageHeader gains BagWithoutLocalHeaders() — returns a new dictionary copy minus LocalHeaderNames — so transports that serialise the bag in one shot (SNS / SQS / Redis emit it as a single JSON property) can hand the filtered view to the serialiser without mutating the original header (preserves InMemoryOutbox-by-reference retries). Co-Authored-By: Claude (claude-opus-4-7) --- .../AzureServiceBusScheduler.cs | 4 +++- .../SnsMessagePublisher.cs | 2 +- .../SqsMessageSender.cs | 2 +- .../SnsMessagePublisher.cs | 2 +- .../SqsMessageSender.cs | 2 +- .../Parser.cs | 2 +- .../KafkaDefaultMessageHeaderBuilder.cs | 3 ++- .../RmqMessagePublisher.cs | 2 +- .../RmqMessagePublisher.cs | 2 +- .../RedisMessagePublisher.cs | 2 +- .../RocketMqMessageProducer.cs | 4 +++- src/Paramore.Brighter/MessageHeader.cs | 21 ++++++++++++++++++- 12 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs index 367eb0e2a2..ad66a82278 100644 --- a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs +++ b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs @@ -131,7 +131,9 @@ private static ServiceBusMessage ConvertToServiceBusMessage(Message message) message.Header.HandledCount); azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.ReplyToHeaderBagKey, message.Header.ReplyTo); - foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) + foreach (var header in message.Header.Bag.Where(h => + !ASBConstants.ReservedHeaders.Contains(h.Key) + && !MessageHeader.LocalHeaderNames.Contains(h.Key))) { azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value); } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs index 7bde2d7a12..217fb8bfb8 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs @@ -109,7 +109,7 @@ private Dictionary BuildMessageAttributes(Message message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" }; return messageAttributes; diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs index 5fa2940b39..d20aec54f9 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs @@ -127,7 +127,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message) message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; request.MessageAttributes = messageAttributes; } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs index cc049be643..13a5818b6b 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs @@ -109,7 +109,7 @@ private Dictionary BuildMessageAttributes(Message message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" }; return messageAttributes; diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs index b0470210a8..87d14d42fd 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs @@ -129,7 +129,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message) message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; request.MessageAttributes = messageAttributes; } diff --git a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs index 0b245d1f1f..9846a303a3 100644 --- a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs +++ b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs @@ -349,7 +349,7 @@ private static void AddHeaders(MapField headers, Message message message.Header.Bag.Each(header => { - if (!headers.ContainsKey(header.Key)) + if (!headers.ContainsKey(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) { headers.Add(header.Key, header.Value.ToString()!); } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index 422d3f2506..43fe804f40 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -113,7 +113,8 @@ private void AddCLoudEventsOptionalHeaders(Headers headers, Message message) private void AddUserDefinedBagHeaders(Headers headers, Message message) { message.Header.Bag - .Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key)) + .Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key) + && !MessageHeader.LocalHeaderNames.Contains(x.Key)) .Each(header => AddUserDefinedBagHeader(headers, header.Key, header.Value)); } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs index 4333ee1565..e4f67988df 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs @@ -228,7 +228,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs index 120cbb5c0f..2667ff0fe7 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs @@ -194,7 +194,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs index 46083294bf..8104be6113 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs @@ -131,7 +131,7 @@ private static void WriteHandledCount(MessageHeader messageHeader, Dictionary headers) { - var flatBag = JsonSerializer.Serialize(messageHeader.Bag, JsonSerialisationOptions.Options); + var flatBag = JsonSerializer.Serialize(messageHeader.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); headers.Add(HeaderNames.BAG, flatBag); } diff --git a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs index 372fe9b041..aa51cf0f9b 100644 --- a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs @@ -146,7 +146,9 @@ private async Task SendWithDelayAsync(Message message, TimeSpan? delay, bool use } foreach (var (key, val) in message.Header.Bag - .Where(x => x.Key != HeaderNames.Keys && x.Key != HeaderNames.Tag)) + .Where(x => x.Key != HeaderNames.Keys + && x.Key != HeaderNames.Tag + && !MessageHeader.LocalHeaderNames.Contains(x.Key))) { builder.AddProperty(key, val.ToString()); } diff --git a/src/Paramore.Brighter/MessageHeader.cs b/src/Paramore.Brighter/MessageHeader.cs index d3c31fe3f8..9112014c94 100644 --- a/src/Paramore.Brighter/MessageHeader.cs +++ b/src/Paramore.Brighter/MessageHeader.cs @@ -92,7 +92,9 @@ public class MessageHeader : IEquatable /// Bag keys that are internal to Brighter — set by the framework so a downstream /// component (e.g. the outbox dispatcher) can read them, but not intended to travel /// over the wire. Transports that copy into their wire format must - /// skip these keys (or call on a copy of the header). + /// skip these keys: either filter inline (see ASB / RMQ / Kafka publishers) or + /// call when serialising the bag in one go + /// (see SNS / SQS / Redis publishers). /// /// /// Pre-populated with . Add to this set @@ -114,6 +116,23 @@ public void StripLocalHeaders() Bag.Remove(name); } + /// + /// Returns a new dictionary containing every entry whose key + /// is not in . For transports that serialise the + /// whole bag in one go (e.g. SNS/SQS/Redis emit it as a single JSON property), + /// pass this to the serialiser instead of directly. + /// + public Dictionary BagWithoutLocalHeaders() + { + var copy = new Dictionary(Bag.Count); + foreach (var kv in Bag) + { + if (!LocalHeaderNames.Contains(kv.Key)) + copy[kv.Key] = kv.Value; + } + return copy; + } + /// /// A property bag that can be used for extended header attributes. /// Use camelCase for the key names if you intend to read it yourself, as when converted to and from Json serializers will tend convert the property From 736612c3e7b38ed5c8a7040a5d33386a30beb180 Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 20:26:52 +0100 Subject: [PATCH 14/16] refactor: protect LocalHeaderNames behind IsLocalHeader / RegisterLocalHeader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first cut exposed LocalHeaderNames as a public mutable HashSet so transports could call .Contains directly. That let any caller remove the framework-essential ProducerTopicHeaderName entry, and read/write races were possible if a registration ran while a publisher was iterating. Hide the storage and expose two operations: - IsLocalHeader(name): lock-free read against a copy-on-write snapshot (Volatile.Read of the field). This is on the per-bag-entry hot path. - RegisterLocalHeader(name): CAS loop that swaps in a new HashSet containing the additional key. Idempotent. Expected to be called once at startup from extension code. The field itself becomes private; the snapshot is never mutated in place after publication, so readers can iterate it freely without locking. StripLocalHeaders and BagWithoutLocalHeaders snapshot once via Volatile.Read at the start. ImmutableHashSet was the natural fit but isn't in the netstandard2.0 BCL and Brighter doesn't reference the package — emulating copy-on-write with HashSet keeps the same semantics without adding a dependency. Call sites in the 9 transports + the local-header tests switch to MessageHeader.IsLocalHeader. New test covers RegisterLocalHeader idempotency and that custom keys are honoured by BagWithoutLocalHeaders. Co-Authored-By: Claude (claude-opus-4-7) --- .../AzureServiceBusScheduler.cs | 2 +- .../AzureServiceBusMessagePublisher.cs | 2 +- .../Parser.cs | 2 +- .../KafkaDefaultMessageHeaderBuilder.cs | 2 +- .../RmqMessagePublisher.cs | 2 +- .../RmqMessagePublisher.cs | 2 +- .../RocketMqMessageProducer.cs | 2 +- src/Paramore.Brighter/MessageHeader.cs | 59 ++++++++++++++----- ...n_Bulk_Dispatching_Reply_Messages_Async.cs | 2 +- ..._Reply_Message_To_The_Command_Processor.cs | 2 +- ..._Message_To_The_Command_Processor_Async.cs | 2 +- ...ing_Local_Headers_From_A_Message_Header.cs | 27 ++++++++- 12 files changed, 80 insertions(+), 26 deletions(-) diff --git a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs index ad66a82278..aa79ccb6d6 100644 --- a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs +++ b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs @@ -133,7 +133,7 @@ private static ServiceBusMessage ConvertToServiceBusMessage(Message message) foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key) - && !MessageHeader.LocalHeaderNames.Contains(h.Key))) + && !MessageHeader.IsLocalHeader(h.Key))) { azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value); } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs index 9705a59465..54699eb392 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs @@ -64,7 +64,7 @@ private static void AddBrighterHeaders(Message message, ServiceBusMessage azureS foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key) - && !MessageHeader.LocalHeaderNames.Contains(h.Key))) + && !MessageHeader.IsLocalHeader(h.Key))) { azureServiceBusMessage.ApplicationProperties[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs index 9846a303a3..7b1a8e162d 100644 --- a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs +++ b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs @@ -349,7 +349,7 @@ private static void AddHeaders(MapField headers, Message message message.Header.Bag.Each(header => { - if (!headers.ContainsKey(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) + if (!headers.ContainsKey(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers.Add(header.Key, header.Value.ToString()!); } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index 43fe804f40..a396237615 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -114,7 +114,7 @@ private void AddUserDefinedBagHeaders(Headers headers, Message message) { message.Header.Bag .Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key) - && !MessageHeader.LocalHeaderNames.Contains(x.Key)) + && !MessageHeader.IsLocalHeader(x.Key)) .Each(header => AddUserDefinedBagHeader(headers, header.Key, header.Value)); } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs index e4f67988df..a89829e950 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs @@ -228,7 +228,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs index 2667ff0fe7..d45bd500fe 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs @@ -194,7 +194,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key) && !MessageHeader.LocalHeaderNames.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs index aa51cf0f9b..5585e6120f 100644 --- a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs @@ -148,7 +148,7 @@ private async Task SendWithDelayAsync(Message message, TimeSpan? delay, bool use foreach (var (key, val) in message.Header.Bag .Where(x => x.Key != HeaderNames.Keys && x.Key != HeaderNames.Tag - && !MessageHeader.LocalHeaderNames.Contains(x.Key))) + && !MessageHeader.IsLocalHeader(x.Key))) { builder.AddProperty(key, val.ToString()); } diff --git a/src/Paramore.Brighter/MessageHeader.cs b/src/Paramore.Brighter/MessageHeader.cs index 9112014c94..1fd8b3003c 100644 --- a/src/Paramore.Brighter/MessageHeader.cs +++ b/src/Paramore.Brighter/MessageHeader.cs @@ -26,6 +26,7 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; using System.Net.Mime; +using System.Threading; using System.Text.Json.Serialization; using Paramore.Brighter.JsonConverters; using Paramore.Brighter.NJsonConverters; @@ -92,42 +93,72 @@ public class MessageHeader : IEquatable /// Bag keys that are internal to Brighter — set by the framework so a downstream /// component (e.g. the outbox dispatcher) can read them, but not intended to travel /// over the wire. Transports that copy into their wire format must - /// skip these keys: either filter inline (see ASB / RMQ / Kafka publishers) or - /// call when serialising the bag in one go - /// (see SNS / SQS / Redis publishers). + /// skip keys for which returns true: either filter inline + /// (see ASB / RMQ / Kafka publishers) or call + /// when serialising the bag in one go (see SNS / SQS / Redis publishers). /// /// - /// Pre-populated with . Add to this set - /// from extension code if you introduce additional local-only bag keys. + /// Pre-populated with . Use + /// from extension code to add additional keys. + /// The backing is treated as a copy-on-write snapshot — + /// never mutated in place after publication — so reads are lock-free on the + /// message hot path; registrations are expected at startup and use a CAS loop + /// that allocates a new snapshot. /// - public static readonly HashSet LocalHeaderNames = new(StringComparer.Ordinal) + private static HashSet s_localHeaderNames = new(StringComparer.Ordinal) { Message.ProducerTopicHeaderName }; /// - /// Removes every entry from . - /// Call on a wire-bound copy of the header to ensure local-only bag entries - /// are not serialised onto the transport. + /// Returns true when is a local-only bag key that must + /// not travel over the wire. + /// + public static bool IsLocalHeader(string name) + => Volatile.Read(ref s_localHeaderNames).Contains(name); + + /// + /// Adds to the set of local bag keys. Idempotent. + /// Call once at startup from extension code that introduces a local-only bag key. + /// + public static void RegisterLocalHeader(string name) + { + while (true) + { + var snapshot = Volatile.Read(ref s_localHeaderNames); + if (snapshot.Contains(name)) + return; + var updated = new HashSet(snapshot, StringComparer.Ordinal) { name }; + if (ReferenceEquals(Interlocked.CompareExchange(ref s_localHeaderNames, updated, snapshot), snapshot)) + return; + } + } + + /// + /// Removes every local-header entry (see ) from + /// . Call on a wire-bound copy of the header to ensure + /// local-only bag entries are not serialised onto the transport. /// public void StripLocalHeaders() { - foreach (var name in LocalHeaderNames) + var locals = Volatile.Read(ref s_localHeaderNames); + foreach (var name in locals) Bag.Remove(name); } /// /// Returns a new dictionary containing every entry whose key - /// is not in . For transports that serialise the - /// whole bag in one go (e.g. SNS/SQS/Redis emit it as a single JSON property), - /// pass this to the serialiser instead of directly. + /// is not a local header (see ). For transports that + /// serialise the whole bag in one go (e.g. SNS/SQS/Redis emit it as a single JSON + /// property), pass this to the serialiser instead of directly. /// public Dictionary BagWithoutLocalHeaders() { + var locals = Volatile.Read(ref s_localHeaderNames); var copy = new Dictionary(Bag.Count); foreach (var kv in Bag) { - if (!LocalHeaderNames.Contains(kv.Key)) + if (!locals.Contains(kv.Key)) copy[kv.Key] = kv.Value; } return copy; diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs index 4e69c72c57..eb34b42ca8 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -107,7 +107,7 @@ await _mediator.ClearOutstandingFromOutboxAsync( //assert - the ProducerTopic bag entry survives bulk dispatch so an InMemoryOutbox //(which holds the message by reference) keeps the producer hint for retries. - //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). Assert.All(messages, m => Assert.True(m.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName))); } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs index f8ca0378ac..6cc7ee7c70 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs @@ -92,7 +92,7 @@ public void When_Posting_A_Reply_Message_To_The_Command_Processor() //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox //(which holds the message by reference) keeps the producer hint for retries. - //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs index 2aa0383274..14ca40c60d 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs @@ -93,7 +93,7 @@ public async Task When_Posting_A_Reply_Message_To_The_Command_Processor_Async() //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox //(which holds the message by reference) keeps the producer hint for retries. - //Wire-format stripping is the transport's responsibility (see MessageHeader.LocalHeaderNames). + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs index 6bd19d81cf..1b6c59d72a 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs @@ -36,8 +36,31 @@ public void When_Stripping_Local_Headers_With_No_Local_Bag_Entries_Then_The_Bag_ } [Fact] - public void When_The_Producer_Topic_Header_Name_Is_In_The_Local_Header_Names_Set() + public void When_The_Producer_Topic_Header_Name_Is_A_Local_Header() { - Assert.Contains(Message.ProducerTopicHeaderName, MessageHeader.LocalHeaderNames); + Assert.True(MessageHeader.IsLocalHeader(Message.ProducerTopicHeaderName)); + } + + [Fact] + public void When_RegisterLocalHeader_Adds_A_Custom_Key_It_Is_Recognised_And_Is_Idempotent() + { + const string customKey = "custom.local.header." + nameof(MessageHeaderStripLocalHeadersTests); + + MessageHeader.RegisterLocalHeader(customKey); + MessageHeader.RegisterLocalHeader(customKey); // idempotent + + Assert.True(MessageHeader.IsLocalHeader(customKey)); + + var header = new MessageHeader( + messageId: "id-3", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_COMMAND); + header.Bag[customKey] = "value"; + header.Bag["user.key"] = "user.value"; + + var wireBag = header.BagWithoutLocalHeaders(); + + Assert.False(wireBag.ContainsKey(customKey)); + Assert.True(wireBag.ContainsKey("user.key")); } } From f16577f8e8e8a120f4b959d657ee8cd56dbaf30e Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 20:30:54 +0100 Subject: [PATCH 15/16] refactor: drop unused MessageHeader.StripLocalHeaders instance method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No transport calls it — they all use IsLocalHeader for inline filtering or BagWithoutLocalHeaders for whole-bag JSON serialisation. Keeping the method around invites callers to mutate the message's bag in place, which would re-introduce the InMemoryOutbox-by-reference regression that moving stripping out of the mediator just fixed. Test class renamed MessageHeaderStripLocalHeadersTests → MessageHeaderLocalHeadersTests and the two strip-specific cases rewritten against BagWithoutLocalHeaders (also asserts the original header is untouched, pinning the InMemoryOutbox-by-reference invariant). Co-Authored-By: Claude (claude-opus-4-7) --- src/Paramore.Brighter/MessageHeader.cs | 12 ---------- ...ing_Local_Headers_From_A_Message_Header.cs | 22 ++++++++++--------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/Paramore.Brighter/MessageHeader.cs b/src/Paramore.Brighter/MessageHeader.cs index 1fd8b3003c..6395b743a2 100644 --- a/src/Paramore.Brighter/MessageHeader.cs +++ b/src/Paramore.Brighter/MessageHeader.cs @@ -134,18 +134,6 @@ public static void RegisterLocalHeader(string name) } } - /// - /// Removes every local-header entry (see ) from - /// . Call on a wire-bound copy of the header to ensure - /// local-only bag entries are not serialised onto the transport. - /// - public void StripLocalHeaders() - { - var locals = Volatile.Read(ref s_localHeaderNames); - foreach (var name in locals) - Bag.Remove(name); - } - /// /// Returns a new dictionary containing every entry whose key /// is not a local header (see ). For transports that diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs index 1b6c59d72a..a6bc94339e 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs @@ -2,10 +2,10 @@ namespace Paramore.Brighter.Core.Tests.MessageSerialisation; -public class MessageHeaderStripLocalHeadersTests +public class MessageHeaderLocalHeadersTests { [Fact] - public void When_Stripping_Local_Headers_The_ProducerTopic_Bag_Entry_Is_Removed_And_Other_Entries_Survive() + public void When_BagWithoutLocalHeaders_Removes_Local_Entries_And_Other_Entries_Survive() { var header = new MessageHeader( messageId: "id-1", @@ -14,14 +14,16 @@ public void When_Stripping_Local_Headers_The_ProducerTopic_Bag_Entry_Is_Removed_ header.Bag[Message.ProducerTopicHeaderName] = "lookup.topic"; header.Bag["user.key"] = "user.value"; - header.StripLocalHeaders(); + var wireBag = header.BagWithoutLocalHeaders(); - Assert.False(header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); - Assert.True(header.Bag.ContainsKey("user.key")); + Assert.False(wireBag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.True(wireBag.ContainsKey("user.key")); + // original is untouched — InMemoryOutbox-by-reference keeps the local entry for retries + Assert.True(header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); } [Fact] - public void When_Stripping_Local_Headers_With_No_Local_Bag_Entries_Then_The_Bag_Is_Unchanged() + public void When_BagWithoutLocalHeaders_With_No_Local_Entries_Returns_Equivalent_Copy() { var header = new MessageHeader( messageId: "id-2", @@ -29,10 +31,10 @@ public void When_Stripping_Local_Headers_With_No_Local_Bag_Entries_Then_The_Bag_ messageType: MessageType.MT_EVENT); header.Bag["user.key"] = "user.value"; - header.StripLocalHeaders(); + var wireBag = header.BagWithoutLocalHeaders(); - Assert.Single(header.Bag); - Assert.Equal("user.value", header.Bag["user.key"]); + Assert.Single(wireBag); + Assert.Equal("user.value", wireBag["user.key"]); } [Fact] @@ -44,7 +46,7 @@ public void When_The_Producer_Topic_Header_Name_Is_A_Local_Header() [Fact] public void When_RegisterLocalHeader_Adds_A_Custom_Key_It_Is_Recognised_And_Is_Idempotent() { - const string customKey = "custom.local.header." + nameof(MessageHeaderStripLocalHeadersTests); + const string customKey = "custom.local.header." + nameof(MessageHeaderLocalHeadersTests); MessageHeader.RegisterLocalHeader(customKey); MessageHeader.RegisterLocalHeader(customKey); // idempotent From b4406b9f7f6629f48845660e9811f1432756454a Mon Sep 17 00:00:00 2001 From: DevJonny Date: Sat, 25 Apr 2026 20:46:42 +0100 Subject: [PATCH 16/16] =?UTF-8?q?test:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20drop=20Fragile=20trait,=20document=20converter=20co?= =?UTF-8?q?ntract,=20pin=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues from claude[bot]'s latest review: 1. The new ASB unit test carried [Trait("Fragile", "CI")], copied by mistake from the live-ASB integration tests. The CI pipeline uses "Fragile!=CI" as its filter, so the trait skipped the only transport-level pin for local-header filtering. Drop it. 2. GetProducerLookupTopic uses `producerTopic is string topic`. Concern was that persistent outboxes round-trip Header.Bag through JSON, so values would come back as JsonElement and the cast would silently fail — reproducing the bug the PR fixes. Investigation shows Brighter's bag round-trip uses JsonSerialisationOptions.Options, which composes DictionaryStringObjectJsonConverter with ObjectToInferredTypesConverter to preserve string runtime types. Verified RelationDatabaseOutbox (covers MsSql, PostgreSQL, MySql, Sqlite, Spanner), MongoDb, and DynamoDB all use those options on deserialise. Document the contract on GetProducerLookupTopic and add BagStringValueRoundTripTests to pin the converter behaviour. If a future change drops one of the converters, the round-trip test fails loudly instead of GetProducerLookupTopic regressing silently. 3. Add WrapMatchingPublicationTopicTests to pin the fallback path: when a mapper does NOT override Header.Topic, no ProducerTopic bag entry is written and producer lookup falls back to Header.Topic. The existing test trio (reply-mapper-overrides, null-publication-topic, matching-topic) now covers all three branches of WrapPipeline.Wrap. Co-Authored-By: Claude (claude-opus-4-7) --- .../OutboxProducerMediator.cs | 8 ++++ ...Converting_A_Message_With_Local_Headers.cs | 1 - ...ound_Trip_Through_Brighter_Json_Options.cs | 34 ++++++++++++++ ...age_Whose_Topic_Matches_The_Publication.cs | 45 +++++++++++++++++++ 4 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs create mode 100644 tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 9bb9aea95d..a7bb1a58e7 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -781,6 +781,14 @@ private static RoutingKey GetProducerLookupTopic(Message message) // dynamic reply address. Normal publications don't carry the bag entry, so we // fall back to Header.Topic — and a null/empty Header.Topic remains a lookup // failure, matching pre-fix behaviour. + // + // The `is string` cast is safe across persistent outboxes (SQL family, + // Mongo, DynamoDB) because Brighter's bag round-trip uses + // JsonSerialisationOptions.Options, which registers DictionaryStringObjectJsonConverter + // + ObjectToInferredTypesConverter — together they preserve the string runtime + // type through serialise/deserialise rather than handing back JsonElement. + // See When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options for + // a regression pin on that contract. if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic) && producerTopic is string topic) { diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs index 1f48af50de..1702223bed 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs @@ -5,7 +5,6 @@ namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway; [Trait("Category", "ASB")] -[Trait("Fragile", "CI")] public class AzureServiceBusMessagePublisherLocalHeaderTests { [Fact] diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs new file mode 100644 index 0000000000..f764888933 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs @@ -0,0 +1,34 @@ +using System.Collections.Generic; +using System.Text.Json; +using Paramore.Brighter.JsonConverters; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Regression pin for the contract that OutboxProducerMediator.GetProducerLookupTopic +// relies on: when Header.Bag is round-tripped through Brighter's JsonSerialisationOptions +// the values come back as their runtime types (string here), NOT as JsonElement. +// If a future change drops DictionaryStringObjectJsonConverter or +// ObjectToInferredTypesConverter from the options, the `producerTopic is string` cast in +// GetProducerLookupTopic would silently fail for persistent outboxes (SQL, Mongo, +// DynamoDB) and reply-message dispatch would regress to the bug this PR fixes. +public class BagStringValueRoundTripTests +{ + [Fact] + public void When_Round_Tripping_A_Bag_String_Value_It_Survives_As_String_Not_JsonElement() + { + var bag = new Dictionary + { + [Message.ProducerTopicHeaderName] = "the.publication.topic", + ["customer.header"] = "customer.value" + }; + + var json = JsonSerializer.Serialize(bag, JsonSerialisationOptions.Options); + var roundTripped = JsonSerializer.Deserialize>(json, JsonSerialisationOptions.Options); + + Assert.NotNull(roundTripped); + Assert.IsType(roundTripped![Message.ProducerTopicHeaderName]); + Assert.Equal("the.publication.topic", roundTripped[Message.ProducerTopicHeaderName]); + Assert.IsType(roundTripped["customer.header"]); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs new file mode 100644 index 0000000000..6ce0c9c5ce --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs @@ -0,0 +1,45 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Fallback path for OutboxProducerMediator.GetProducerLookupTopic: when a mapper does +// NOT override Header.Topic (the normal-publication case), no ProducerTopic bag entry +// is written. Producer lookup then falls back to Header.Topic. Without this pin, a +// future change that always wrote the bag entry would leave persistent outbox rows +// across a rolling upgrade with stale producer hints — exactly the failure mode the +// pre-fix bug had, just inverted. +public class WrapMatchingPublicationTopicTests +{ + [Fact] + public void When_The_Mapper_Topic_Matches_The_Publication_No_Producer_Topic_Bag_Entry_Is_Written() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyCommandMessageMapper()), + null); + mapperRegistry.Register(); + + var publicationTopic = new RoutingKey("normal.publication.topic"); + var publication = new Publication + { + Topic = publicationTopic, + RequestType = typeof(MyCommand) + }; + + var pipelineBuilder = new TransformPipelineBuilder( + mapperRegistry, + new SimpleMessageTransformerFactory(_ => null)); + + var message = pipelineBuilder + .BuildWrapPipeline() + .Wrap(new MyCommand(), new RequestContext(), publication); + + // mapper produced a topic identical to publication.Topic — fallback path + Assert.Equal(publicationTopic, message.Header.Topic); + + // no bag entry → producer lookup falls back to Header.Topic + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +}