diff --git a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs index 367eb0e2a2..aa79ccb6d6 100644 --- a/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs +++ b/src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs @@ -131,7 +131,9 @@ private static ServiceBusMessage ConvertToServiceBusMessage(Message message) message.Header.HandledCount); azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.ReplyToHeaderBagKey, message.Header.ReplyTo); - foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) + foreach (var header in message.Header.Bag.Where(h => + !ASBConstants.ReservedHeaders.Contains(h.Key) + && !MessageHeader.IsLocalHeader(h.Key))) { azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value); } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs index 7bde2d7a12..217fb8bfb8 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs @@ -109,7 +109,7 @@ private Dictionary BuildMessageAttributes(Message message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" }; return messageAttributes; diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs index 5fa2940b39..d20aec54f9 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs @@ -127,7 +127,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message) message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; request.MessageAttributes = messageAttributes; } diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs index cc049be643..13a5818b6b 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs @@ -109,7 +109,7 @@ private Dictionary BuildMessageAttributes(Message message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new MessageAttributeValue { StringValue = Convert.ToString(bagJson), DataType = "String" }; return messageAttributes; diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs index b0470210a8..87d14d42fd 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs @@ -129,7 +129,7 @@ private void SetMessageAttributes(SendMessageRequest request, Message message) message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture); - var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); + var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = "String" }; request.MessageAttributes = messageAttributes; } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs index 7014952014..54699eb392 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs @@ -62,7 +62,9 @@ private static void AddBrighterHeaders(Message message, ServiceBusMessage azureS if (message.Header.Bag.TryGetValue(ASBConstants.SessionIdKey, out object? value)) azureServiceBusMessage.SessionId = value.ToString(); - foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key))) + foreach (var header in message.Header.Bag.Where(h => + !ASBConstants.ReservedHeaders.Contains(h.Key) + && !MessageHeader.IsLocalHeader(h.Key))) { azureServiceBusMessage.ApplicationProperties[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs index 0b245d1f1f..7b1a8e162d 100644 --- a/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs +++ b/src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs @@ -349,7 +349,7 @@ private static void AddHeaders(MapField headers, Message message message.Header.Bag.Each(header => { - if (!headers.ContainsKey(header.Key)) + if (!headers.ContainsKey(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers.Add(header.Key, header.Value.ToString()!); } diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs index 422d3f2506..a396237615 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs @@ -113,7 +113,8 @@ private void AddCLoudEventsOptionalHeaders(Headers headers, Message message) private void AddUserDefinedBagHeaders(Headers headers, Message message) { message.Header.Bag - .Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key)) + .Where(x => !BrighterDefinedHeaders.HeadersToReset.Contains(x.Key) + && !MessageHeader.IsLocalHeader(x.Key)) .Each(header => AddUserDefinedBagHeader(headers, header.Key, header.Value)); } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs index 4333ee1565..a89829e950 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs @@ -228,7 +228,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs index 120cbb5c0f..d45bd500fe 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs @@ -194,7 +194,7 @@ private static void AddUserDefinedHeaders(Message message, Dictionary { - if (!_headersToReset.Contains(header.Key)) + if (!_headersToReset.Contains(header.Key) && !MessageHeader.IsLocalHeader(header.Key)) { headers[header.Key] = header.Value; } diff --git a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs index 46083294bf..8104be6113 100644 --- a/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs @@ -131,7 +131,7 @@ private static void WriteHandledCount(MessageHeader messageHeader, Dictionary headers) { - var flatBag = JsonSerializer.Serialize(messageHeader.Bag, JsonSerialisationOptions.Options); + var flatBag = JsonSerializer.Serialize(messageHeader.BagWithoutLocalHeaders(), JsonSerialisationOptions.Options); headers.Add(HeaderNames.BAG, flatBag); } diff --git a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs index 372fe9b041..5585e6120f 100644 --- a/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs @@ -146,7 +146,9 @@ private async Task SendWithDelayAsync(Message message, TimeSpan? delay, bool use } foreach (var (key, val) in message.Header.Bag - .Where(x => x.Key != HeaderNames.Keys && x.Key != HeaderNames.Tag)) + .Where(x => x.Key != HeaderNames.Keys + && x.Key != HeaderNames.Tag + && !MessageHeader.IsLocalHeader(x.Key))) { builder.AddProperty(key, val.ToString()); } diff --git a/src/Paramore.Brighter/Message.cs b/src/Paramore.Brighter/Message.cs index e1544e0a23..b8e591cdd5 100644 --- a/src/Paramore.Brighter/Message.cs +++ b/src/Paramore.Brighter/Message.cs @@ -50,6 +50,14 @@ public class Message : IEquatable /// public const string RejectionReasonHeaderName = "RejectionReason"; + /// + /// Bag key (not a typed header property — lives in ) + /// carrying the publication topic when the message mapper overrides + /// (e.g. for Reply messages whose topic is the reply address). + /// Read by the outbox dispatcher so it can still locate the registered producer. + /// + public const string ProducerTopicHeaderName = "paramore.brighter.ProducerTopic"; + /// /// Gets the header. /// diff --git a/src/Paramore.Brighter/MessageHeader.cs b/src/Paramore.Brighter/MessageHeader.cs index b9ba0aca4c..6395b743a2 100644 --- a/src/Paramore.Brighter/MessageHeader.cs +++ b/src/Paramore.Brighter/MessageHeader.cs @@ -26,6 +26,7 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; using System.Net.Mime; +using System.Threading; using System.Text.Json.Serialization; using Paramore.Brighter.JsonConverters; using Paramore.Brighter.NJsonConverters; @@ -87,7 +88,70 @@ public class MessageHeader : IEquatable /// The default Brighter source /// public const string DefaultSource = "http://goparamore.io"; - + + /// + /// Bag keys that are internal to Brighter — set by the framework so a downstream + /// component (e.g. the outbox dispatcher) can read them, but not intended to travel + /// over the wire. Transports that copy into their wire format must + /// skip keys for which returns true: either filter inline + /// (see ASB / RMQ / Kafka publishers) or call + /// when serialising the bag in one go (see SNS / SQS / Redis publishers). + /// + /// + /// Pre-populated with . Use + /// from extension code to add additional keys. + /// The backing is treated as a copy-on-write snapshot — + /// never mutated in place after publication — so reads are lock-free on the + /// message hot path; registrations are expected at startup and use a CAS loop + /// that allocates a new snapshot. + /// + private static HashSet s_localHeaderNames = new(StringComparer.Ordinal) + { + Message.ProducerTopicHeaderName + }; + + /// + /// Returns true when is a local-only bag key that must + /// not travel over the wire. + /// + public static bool IsLocalHeader(string name) + => Volatile.Read(ref s_localHeaderNames).Contains(name); + + /// + /// Adds to the set of local bag keys. Idempotent. + /// Call once at startup from extension code that introduces a local-only bag key. + /// + public static void RegisterLocalHeader(string name) + { + while (true) + { + var snapshot = Volatile.Read(ref s_localHeaderNames); + if (snapshot.Contains(name)) + return; + var updated = new HashSet(snapshot, StringComparer.Ordinal) { name }; + if (ReferenceEquals(Interlocked.CompareExchange(ref s_localHeaderNames, updated, snapshot), snapshot)) + return; + } + } + + /// + /// Returns a new dictionary containing every entry whose key + /// is not a local header (see ). For transports that + /// serialise the whole bag in one go (e.g. SNS/SQS/Redis emit it as a single JSON + /// property), pass this to the serialiser instead of directly. + /// + public Dictionary BagWithoutLocalHeaders() + { + var locals = Volatile.Read(ref s_localHeaderNames); + var copy = new Dictionary(Bag.Count); + foreach (var kv in Bag) + { + if (!locals.Contains(kv.Key)) + copy[kv.Key] = kv.Value; + } + return copy; + } + /// /// A property bag that can be used for extended header attributes. /// Use camelCase for the key names if you intend to read it yourself, as when converted to and from Json serializers will tend convert the property diff --git a/src/Paramore.Brighter/OutboxProducerMediator.cs b/src/Paramore.Brighter/OutboxProducerMediator.cs index 782a9ba515..a7bb1a58e7 100644 --- a/src/Paramore.Brighter/OutboxProducerMediator.cs +++ b/src/Paramore.Brighter/OutboxProducerMediator.cs @@ -774,6 +774,30 @@ private bool ConfigurePublisherCallbackMaybe(IAmAMessageProducerSync producer, R return false; } + private static RoutingKey GetProducerLookupTopic(Message message) + { + // Reply messages set the ProducerTopic bag entry so the dispatcher can locate + // the registered producer even though Header.Topic has been rewritten to the + // dynamic reply address. Normal publications don't carry the bag entry, so we + // fall back to Header.Topic — and a null/empty Header.Topic remains a lookup + // failure, matching pre-fix behaviour. + // + // The `is string` cast is safe across persistent outboxes (SQL family, + // Mongo, DynamoDB) because Brighter's bag round-trip uses + // JsonSerialisationOptions.Options, which registers DictionaryStringObjectJsonConverter + // + ObjectToInferredTypesConverter — together they preserve the string runtime + // type through serialise/deserialise rather than handing back JsonElement. + // See When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options for + // a regression pin on that contract. + if (message.Header.Bag.TryGetValue(Message.ProducerTopicHeaderName, out var producerTopic) + && producerTopic is string topic) + { + return new RoutingKey(topic); + } + + return message.Header.Topic; + } + private void Dispatch(IEnumerable posts, RequestContext requestContext, Dictionary? args = null) { var parentSpan = requestContext.Span; @@ -783,9 +807,12 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, if (_outBox is null) throw new ArgumentException(NoSyncOutboxError); foreach (var message in posts) { + // Log the wire topic (Header.Topic) — where the message is going. Producer + // lookup uses GetProducerLookupTopic, which may differ from Header.Topic when + // a mapper overrode it (e.g. Reply messages routed to a dynamic reply address). Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); - var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext); + var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); var span = _tracer?.CreateProducerSpan(producer.Publication, message, requestContext.Span, _instrumentationOptions); producer.Span = span; @@ -807,10 +834,12 @@ private void Dispatch(IEnumerable posts, RequestContext requestContext, requestContext ); if (sent) + { ExecuteWithResiliencePipeline( () => _outBox.MarkDispatched(message.Id, requestContext, _timeProvider.GetUtcNow(), args), requestContext ); + } } } else @@ -839,25 +868,30 @@ private async Task BulkDispatchAsync( try { if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); - var messagesByTopic = posts.GroupBy(m => m.Header.Topic); + // Group by (wire topic, producer-lookup topic) so a batch is guaranteed to + // resolve to a single producer — messages with the same wire topic but + // different ProducerTopic bag values land in separate batches. + var messagesByTopic = posts.GroupBy(m => (WireTopic: m.Header.Topic, LookupTopic: GetProducerLookupTopic(m))); foreach (var topicBatch in messagesByTopic) { - var producer = _producerRegistry.LookupBy(topicBatch.Key); + var producer = _producerRegistry.LookupBy(topicBatch.Key.LookupTopic); var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span, _instrumentationOptions); if (span is not null) { producer.Span = span; - producerSpans.TryAdd(topicBatch.Key, span); + // Key is only used for uniqueness until EndSpans runs; a Uuid avoids + // any risk of collision from composing topic strings. + producerSpans.TryAdd(Uuid.NewAsString(), span); } if (producer is IAmABulkMessageProducerAsync bulkMessageProducer and not ISupportPublishConfirmation) { var messages = topicBatch.ToArray(); - Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key); + Log.BulkDispatchingMessages(s_logger, messages.Length, topicBatch.Key.WireTopic); foreach (var batch in await bulkMessageProducer.CreateBatchesAsync(messages, cancellationToken)) { @@ -885,7 +919,10 @@ await _asyncOutbox.MarkDispatchedAsync( } } - if (!sent) TripTopic(batch.RoutingKey); + if (!sent) + { + TripTopic(batch.RoutingKey); + } } } else @@ -915,9 +952,12 @@ private async Task DispatchAsync( if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError); foreach (var message in posts) { + // Log the wire topic (Header.Topic) — where the message is going. Producer + // lookup uses GetProducerLookupTopic, which may differ from Header.Topic when + // a mapper overrode it (e.g. Reply messages routed to a dynamic reply address). Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id); - var producer = _producerRegistry.LookupBy(message.Header.Topic, message.Header.Type, requestContext); + var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext); var span = _tracer?.CreateProducerSpan(producer.Publication, message, parentSpan, _instrumentationOptions); producer.Span = span; @@ -947,7 +987,6 @@ await ExecuteWithResiliencePipelineAsync( } if(!sent) TripTopic(message.Header.Topic); - } else throw new InvalidOperationException("No async message producer defined."); diff --git a/src/Paramore.Brighter/WrapPipeline.cs b/src/Paramore.Brighter/WrapPipeline.cs index 037f05c0a9..91cdc768da 100644 --- a/src/Paramore.Brighter/WrapPipeline.cs +++ b/src/Paramore.Brighter/WrapPipeline.cs @@ -93,6 +93,10 @@ public Message Wrap(TRequest request, RequestContext requestContext, Publication if (message.Header.Topic != publication.Topic) { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); + if (publication.Topic is not null) + { + message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; + } } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, false, _instrumentationOptions, true); diff --git a/src/Paramore.Brighter/WrapPipelineAsync.cs b/src/Paramore.Brighter/WrapPipelineAsync.cs index d1d39f6990..d5b43a84eb 100644 --- a/src/Paramore.Brighter/WrapPipelineAsync.cs +++ b/src/Paramore.Brighter/WrapPipelineAsync.cs @@ -100,6 +100,10 @@ public async Task WrapAsync(TRequest request, RequestContext requestCon if (message.Header.Topic != publication.Topic) { Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic); + if (publication.Topic is not null) + { + message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value; + } } BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, true, _instrumentationOptions, true); diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs new file mode 100644 index 0000000000..1702223bed --- /dev/null +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_Converting_A_Message_With_Local_Headers.cs @@ -0,0 +1,31 @@ +using System; +using Paramore.Brighter.MessagingGateway.AzureServiceBus; +using Xunit; + +namespace Paramore.Brighter.AzureServiceBus.Tests.MessagingGateway; + +[Trait("Category", "ASB")] +public class AzureServiceBusMessagePublisherLocalHeaderTests +{ + [Fact] + public void When_Converting_A_Message_The_ProducerTopic_Local_Header_Is_Stripped() + { + var header = new MessageHeader( + messageId: Guid.NewGuid().ToString(), + topic: new RoutingKey("reply.address"), + messageType: MessageType.MT_COMMAND); + header.Bag[Message.ProducerTopicHeaderName] = "the.real.producer.topic"; + header.Bag["customer.header"] = "should.survive"; + + var message = new Message(header, new MessageBody("body")); + + var asbMessage = AzureServiceBusMessagePublisher.ConvertToServiceBusMessage(message); + + // local header is stripped from the wire form... + Assert.False(asbMessage.ApplicationProperties.ContainsKey(Message.ProducerTopicHeaderName)); + // ...but the original message keeps it (so InMemoryOutbox-by-reference retries still work) + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + // unrelated bag entries still travel on the wire + Assert.True(asbMessage.ApplicationProperties.ContainsKey("customer.header")); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs new file mode 100644 index 0000000000..eb34b42ca8 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + // Pins the bulk-outbox-clear path: reply messages drained via BulkDispatchAsync + // are grouped by (WireTopic, LookupTopic) and dispatched to the producer resolved + // from LookupTopic. Prevents regression to a Header.Topic-only lookup that would + // fail to locate the producer for reply messages during outbox sweeps. + public class CommandProcessorBulkDispatchReplyAsyncTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly IAmAnOutboxProducerMediator _mediator; + private readonly MyResponse _replyOne; + private readonly MyResponse _replyTwo; + private readonly InternalBus _internalBus = new(); + private readonly RoutingKey _replyTopic; + + public CommandProcessorBulkDispatchReplyAsyncTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + _replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(_replyTopic, Uuid.NewAsString()); + _replyOne = new MyResponse(replyAddress) { ReplyValue = "Hello" }; + _replyTwo = new MyResponse(replyAddress) { ReplyValue = "World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + messageMapperRegistry.RegisterAsync(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + var outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + _mediator = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + outbox, + maxOutStandingMessages: -1 + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + _mediator, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public async Task When_Bulk_Dispatching_Reply_Messages_Async() + { + //arrange - deposit two replies whose mapper sets Header.Topic to the reply + //address, not the registered producer topic + var context = new RequestContext(); + await _commandProcessor.DepositPostAsync([_replyOne, _replyTwo], context); + + //act - drain via the bulk path (exercised by background outbox sweeps). + //ClearOutstandingFromOutboxAsync awaits BackgroundDispatchUsingAsync which + //awaits BulkDispatchAsync, so dispatch is complete when this returns. + await _mediator.ClearOutstandingFromOutboxAsync( + amountToClear: 10, + minimumAge: TimeSpan.Zero, + useBulk: true, + requestContext: context); + + //assert - messages landed on the reply topic. If producer lookup had used + //Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy + //would have thrown and nothing would arrive on the bus. + var messages = _internalBus.Stream(_replyTopic).ToArray(); + Assert.True(messages.Length == 2, + $"expected 2 reply messages on the bus after bulk dispatch, got {messages.Length} — bulk dispatch or producer lookup failed"); + + //assert - the ProducerTopic bag entry survives bulk dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). + Assert.All(messages, m => + Assert.True(m.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName))); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs new file mode 100644 index 0000000000..6cc7ee7c70 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor.cs @@ -0,0 +1,99 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + public class CommandProcessorPostReplyTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly MyResponse _myResponse; + private readonly InMemoryOutbox _outbox; + private readonly InternalBus _internalBus = new(); + + public CommandProcessorPostReplyTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + messageMapperRegistry.Register(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + _outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + _outbox + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + bus, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public void When_Posting_A_Reply_Message_To_The_Command_Processor() + { + //act - post a Reply whose mapper sets a dynamic topic (reply address) + //different from the producer's registered topic + _commandProcessor.Post(_myResponse); + + //assert - message was dispatched to the reply topic, not the producer topic + var messages = _internalBus.Stream(_myResponse.SendersAddress.Topic).ToArray(); + Assert.Single(messages); + + //assert - message was stored in the outbox + var outboxMessage = _outbox.Get(_myResponse.Id, new RequestContext()); + Assert.NotNull(outboxMessage); + + //assert - message topic is the reply address + Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + + //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). + Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs new file mode 100644 index 0000000000..14ca40c60d --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Posting_A_Reply_Message_To_The_Command_Processor_Async.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Transactions; +using Microsoft.Extensions.Time.Testing; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.Observability; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post +{ + public class CommandProcessorPostReplyAsyncTests + { + private const string ProducerTopic = "Reply"; + private readonly CommandProcessor _commandProcessor; + private readonly MyResponse _myResponse; + private readonly InMemoryOutbox _outbox; + private readonly InternalBus _internalBus = new(); + + public CommandProcessorPostReplyAsyncTests() + { + var timeProvider = new FakeTimeProvider(); + var producerRoutingKey = new RoutingKey(ProducerTopic); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + InMemoryMessageProducer messageProducer = new(_internalBus, + new Publication + { + Topic = producerRoutingKey, + RequestType = typeof(MyResponse) + }); + + var messageMapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + messageMapperRegistry.RegisterAsync(); + + var resiliencePipelineRegistry = new ResiliencePipelineRegistry() + .AddBrighterDefault(); + + var producerRegistry = new ProducerRegistry( + new Dictionary + { + { producerRoutingKey, messageProducer } + }); + + var tracer = new BrighterTracer(timeProvider); + _outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer }; + + IAmAnOutboxProducerMediator bus = new OutboxProducerMediator( + producerRegistry, + resiliencePipelineRegistry, + messageMapperRegistry, + new EmptyMessageTransformerFactory(), + new EmptyMessageTransformerFactoryAsync(), + tracer, + new FindPublicationByPublicationTopicOrRequestType(), + _outbox + ); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new DefaultPolicy(), + resiliencePipelineRegistry, + bus, + new InMemorySchedulerFactory() + ); + } + + [Fact] + public async Task When_Posting_A_Reply_Message_To_The_Command_Processor_Async() + { + //act - post a Reply whose mapper sets a dynamic topic (reply address) + //different from the producer's registered topic + await _commandProcessor.PostAsync(_myResponse); + + //assert - message was dispatched to the reply topic, not the producer topic + var messages = _internalBus.Stream(_myResponse.SendersAddress.Topic).ToArray(); + Assert.Single(messages); + + //assert - message was stored in the outbox + var outboxMessage = await _outbox.GetAsync(_myResponse.Id, new RequestContext()); + Assert.NotNull(outboxMessage); + + //assert - message topic is the reply address + Assert.Equal(_myResponse.SendersAddress.Topic, outboxMessage.Header.Topic); + + //assert - the ProducerTopic bag entry survives dispatch so an InMemoryOutbox + //(which holds the message by reference) keeps the producer hint for retries. + //Wire-format stripping is the transport's responsibility (see MessageHeader.IsLocalHeader). + Assert.True(messages[0].Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs new file mode 100644 index 0000000000..8ba58ce7c1 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/MyResponseMessageMapperAsync.cs @@ -0,0 +1,36 @@ +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Paramore.Brighter.Extensions; +using Paramore.Brighter.JsonConverters; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles +{ + public class MyResponseMessageMapperAsync : IAmAMessageMapperAsync + { + public IRequestContext Context { get; set; } + + public Task MapToMessageAsync(MyResponse request, Publication publication, CancellationToken cancellationToken = default) + { + var header = new MessageHeader( + messageId: request.Id, + topic: request.SendersAddress.Topic, + messageType: request.RequestToMessageType(), + correlationId: request.SendersAddress.CorrelationId); + + var body = new MessageBody(JsonSerializer.Serialize(new MyResponseObject(request.Id.ToString(), request.ReplyValue), JsonSerialisationOptions.Options)); + var message = new Message(header, body); + return Task.FromResult(message); + } + + public Task MapToRequestAsync(Message message, CancellationToken cancellationToken = default) + { + var replyAddress = new ReplyAddress(topic: message.Header.ReplyTo, correlationId: message.Header.CorrelationId); + var command = new MyResponse(replyAddress); + var messageBody = JsonSerializer.Deserialize(message.Body.Value, JsonSerialisationOptions.Options); + command.Id = messageBody.Id; + command.ReplyValue = messageBody.ReplyValue; + return Task.FromResult(command); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs new file mode 100644 index 0000000000..f764888933 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Bag_String_Values_Round_Trip_Through_Brighter_Json_Options.cs @@ -0,0 +1,34 @@ +using System.Collections.Generic; +using System.Text.Json; +using Paramore.Brighter.JsonConverters; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Regression pin for the contract that OutboxProducerMediator.GetProducerLookupTopic +// relies on: when Header.Bag is round-tripped through Brighter's JsonSerialisationOptions +// the values come back as their runtime types (string here), NOT as JsonElement. +// If a future change drops DictionaryStringObjectJsonConverter or +// ObjectToInferredTypesConverter from the options, the `producerTopic is string` cast in +// GetProducerLookupTopic would silently fail for persistent outboxes (SQL, Mongo, +// DynamoDB) and reply-message dispatch would regress to the bug this PR fixes. +public class BagStringValueRoundTripTests +{ + [Fact] + public void When_Round_Tripping_A_Bag_String_Value_It_Survives_As_String_Not_JsonElement() + { + var bag = new Dictionary + { + [Message.ProducerTopicHeaderName] = "the.publication.topic", + ["customer.header"] = "customer.value" + }; + + var json = JsonSerializer.Serialize(bag, JsonSerialisationOptions.Options); + var roundTripped = JsonSerializer.Deserialize>(json, JsonSerialisationOptions.Options); + + Assert.NotNull(roundTripped); + Assert.IsType(roundTripped![Message.ProducerTopicHeaderName]); + Assert.Equal("the.publication.topic", roundTripped[Message.ProducerTopicHeaderName]); + Assert.IsType(roundTripped["customer.header"]); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs new file mode 100644 index 0000000000..a6bc94339e --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Stripping_Local_Headers_From_A_Message_Header.cs @@ -0,0 +1,68 @@ +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class MessageHeaderLocalHeadersTests +{ + [Fact] + public void When_BagWithoutLocalHeaders_Removes_Local_Entries_And_Other_Entries_Survive() + { + var header = new MessageHeader( + messageId: "id-1", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_COMMAND); + header.Bag[Message.ProducerTopicHeaderName] = "lookup.topic"; + header.Bag["user.key"] = "user.value"; + + var wireBag = header.BagWithoutLocalHeaders(); + + Assert.False(wireBag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.True(wireBag.ContainsKey("user.key")); + // original is untouched — InMemoryOutbox-by-reference keeps the local entry for retries + Assert.True(header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } + + [Fact] + public void When_BagWithoutLocalHeaders_With_No_Local_Entries_Returns_Equivalent_Copy() + { + var header = new MessageHeader( + messageId: "id-2", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_EVENT); + header.Bag["user.key"] = "user.value"; + + var wireBag = header.BagWithoutLocalHeaders(); + + Assert.Single(wireBag); + Assert.Equal("user.value", wireBag["user.key"]); + } + + [Fact] + public void When_The_Producer_Topic_Header_Name_Is_A_Local_Header() + { + Assert.True(MessageHeader.IsLocalHeader(Message.ProducerTopicHeaderName)); + } + + [Fact] + public void When_RegisterLocalHeader_Adds_A_Custom_Key_It_Is_Recognised_And_Is_Idempotent() + { + const string customKey = "custom.local.header." + nameof(MessageHeaderLocalHeadersTests); + + MessageHeader.RegisterLocalHeader(customKey); + MessageHeader.RegisterLocalHeader(customKey); // idempotent + + Assert.True(MessageHeader.IsLocalHeader(customKey)); + + var header = new MessageHeader( + messageId: "id-3", + topic: new RoutingKey("a.topic"), + messageType: MessageType.MT_COMMAND); + header.Bag[customKey] = "value"; + header.Bag["user.key"] = "user.value"; + + var wireBag = header.BagWithoutLocalHeaders(); + + Assert.False(wireBag.ContainsKey(customKey)); + Assert.True(wireBag.ContainsKey("user.key")); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs new file mode 100644 index 0000000000..6ce0c9c5ce --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Message_Whose_Topic_Matches_The_Publication.cs @@ -0,0 +1,45 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Fallback path for OutboxProducerMediator.GetProducerLookupTopic: when a mapper does +// NOT override Header.Topic (the normal-publication case), no ProducerTopic bag entry +// is written. Producer lookup then falls back to Header.Topic. Without this pin, a +// future change that always wrote the bag entry would leave persistent outbox rows +// across a rolling upgrade with stale producer hints — exactly the failure mode the +// pre-fix bug had, just inverted. +public class WrapMatchingPublicationTopicTests +{ + [Fact] + public void When_The_Mapper_Topic_Matches_The_Publication_No_Producer_Topic_Bag_Entry_Is_Written() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyCommandMessageMapper()), + null); + mapperRegistry.Register(); + + var publicationTopic = new RoutingKey("normal.publication.topic"); + var publication = new Publication + { + Topic = publicationTopic, + RequestType = typeof(MyCommand) + }; + + var pipelineBuilder = new TransformPipelineBuilder( + mapperRegistry, + new SimpleMessageTransformerFactory(_ => null)); + + var message = pipelineBuilder + .BuildWrapPipeline() + .Wrap(new MyCommand(), new RequestContext(), publication); + + // mapper produced a topic identical to publication.Topic — fallback path + Assert.Equal(publicationTopic, message.Header.Topic); + + // no bag entry → producer lookup falls back to Header.Topic + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs new file mode 100644 index 0000000000..f207b25a91 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_Mapper.cs @@ -0,0 +1,52 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class ReplyMessageWrapRequestTests +{ + private readonly TransformPipelineBuilder _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public ReplyMessageWrapRequestTests() + { + //arrange + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + mapperRegistry.Register(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => null); + + _publication = new Publication + { + Topic = new RoutingKey("Reply"), + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); + } + + [Fact] + public void When_Wrapping_A_Reply_Message_Mapper() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = transformPipeline.Wrap(_myResponse, new RequestContext(), _publication); + + //assert - message topic is the reply address, not the publication topic + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + Assert.NotEqual(_publication.Topic, message.Header.Topic); + + //assert - publication topic stored in bag for producer lookup + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.Equal(_publication.Topic!.Value, message.Header.Bag[Message.ProducerTopicHeaderName]); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs new file mode 100644 index 0000000000..8ea68fae62 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_A_Reply_Message_MapperAsync.cs @@ -0,0 +1,54 @@ +using System.Threading.Tasks; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +public class AsyncReplyMessageWrapRequestTests +{ + private readonly TransformPipelineBuilderAsync _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public AsyncReplyMessageWrapRequestTests() + { + //arrange + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + mapperRegistry.RegisterAsync(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactoryAsync(_ => null); + + _publication = new Publication + { + Topic = new RoutingKey("Reply"), + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilderAsync(mapperRegistry, messageTransformerFactory, InstrumentationOptions.All); + } + + [Fact] + public async Task When_Wrapping_A_Reply_Message_Mapper_Async() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = await transformPipeline.WrapAsync(_myResponse, new RequestContext(), _publication); + + //assert - message topic is the reply address, not the publication topic + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + Assert.NotEqual(_publication.Topic, message.Header.Topic); + + //assert - publication topic stored in bag for producer lookup + Assert.True(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + Assert.Equal(_publication.Topic!.Value, message.Header.Bag[Message.ProducerTopicHeaderName]); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs new file mode 100644 index 0000000000..4f13cd354c --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic.cs @@ -0,0 +1,55 @@ +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Gap: WrapPipeline.Wrap now writes publication.Topic into Header.Bag when the +// mapper-set topic differs from the publication topic. The fix guards against a +// null publication.Topic before writing. Without this test, a future refactor that +// drops the null-check would NRE at runtime for publications that legitimately +// have no topic (dynamic routing scenarios). +public class WrapNullPublicationTopicTests +{ + private readonly TransformPipelineBuilder _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public WrapNullPublicationTopicTests() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()), + null); + mapperRegistry.Register(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => null); + + //publication with no topic — mapper still sets its own topic from the reply address + _publication = new Publication + { + Topic = null, + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory); + } + + [Fact] + public void When_Wrapping_With_Null_Publication_Topic() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = transformPipeline.Wrap(_myResponse, new RequestContext(), _publication); + + //assert - topic came from the mapper + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + + //assert - no ProducerTopic entry was written to the bag (the guard held) + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs new file mode 100644 index 0000000000..4d15fff484 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/MessageSerialisation/When_Wrapping_With_Null_Publication_Topic_Async.cs @@ -0,0 +1,53 @@ +using System.Threading.Tasks; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Paramore.Brighter.Observability; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.MessageSerialisation; + +// Async twin of WrapNullPublicationTopicTests — pins the null-publication-topic +// guard in WrapPipelineAsync so refactors cannot silently NRE. +public class AsyncWrapNullPublicationTopicTests +{ + private readonly TransformPipelineBuilderAsync _pipelineBuilder; + private readonly MyResponse _myResponse; + private readonly Publication _publication; + + public AsyncWrapNullPublicationTopicTests() + { + TransformPipelineBuilder.ClearPipelineCache(); + + var mapperRegistry = new MessageMapperRegistry( + null, + new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync())); + mapperRegistry.RegisterAsync(); + + var replyTopic = new RoutingKey(Uuid.NewAsString()); + var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString()); + _myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" }; + + var messageTransformerFactory = new SimpleMessageTransformerFactoryAsync(_ => null); + + _publication = new Publication + { + Topic = null, + RequestType = typeof(MyResponse) + }; + + _pipelineBuilder = new TransformPipelineBuilderAsync(mapperRegistry, messageTransformerFactory, InstrumentationOptions.All); + } + + [Fact] + public async Task When_Wrapping_With_Null_Publication_Topic_Async() + { + //act + var transformPipeline = _pipelineBuilder.BuildWrapPipeline(); + var message = await transformPipeline.WrapAsync(_myResponse, new RequestContext(), _publication); + + //assert - topic came from the mapper + Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic); + + //assert - no ProducerTopic entry was written to the bag (the guard held) + Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName)); + } +}