Skip to content

Commit eae1990

Browse files
refactor: address PR #4070 review feedback
- deterministic poll-until in bulk dispatch test (drop Task.Delay) - document firstMessage invariant in BulkDispatchAsync - clarify ProducerTopicHeaderName xml doc (bag key, not header) - braces around single-line if in WrapPipeline + WrapPipelineAsync - inline note on Log.DecoupledInvocationOfMessage divergence from lookup Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
1 parent 94e6260 commit eae1990

5 files changed

Lines changed: 23 additions & 4 deletions

File tree

src/Paramore.Brighter/Message.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ public class Message : IEquatable<Message>
5151
public const string RejectionReasonHeaderName = "RejectionReason";
5252

5353
/// <summary>
54-
/// Tag name for the producer topic header, used when the message mapper overrides the topic (e.g. for Reply messages)
55-
/// so that the outbox dispatcher can still find the correct producer
54+
/// Bag key (not a typed header property — lives in <see cref="MessageHeader.Bag"/>)
55+
/// carrying the publication topic when the message mapper overrides <see cref="MessageHeader.Topic"/>
56+
/// (e.g. for Reply messages whose topic is the reply address).
57+
/// Read by the outbox dispatcher so it can still locate the registered producer.
5658
/// </summary>
5759
public const string ProducerTopicHeaderName = "ProducerTopic";
5860

src/Paramore.Brighter/OutboxProducerMediator.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,9 @@ private void Dispatch(IEnumerable<Message> posts, RequestContext requestContext,
794794
if (_outBox is null) throw new ArgumentException(NoSyncOutboxError);
795795
foreach (var message in posts)
796796
{
797+
// Log the wire topic (Header.Topic) — where the message is going. Producer
798+
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
799+
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).
797800
Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);
798801

799802
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext);
@@ -854,6 +857,9 @@ private async Task BulkDispatchAsync(
854857

855858
foreach (var topicBatch in messagesByTopic)
856859
{
860+
// Messages in this batch share Header.Topic (the group key). They therefore
861+
// share the same ProducerTopic bag entry (set by WrapPipelineAsync when the
862+
// mapper overrode the topic), so firstMessage is representative of the batch.
857863
var firstMessage = topicBatch.First();
858864
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(firstMessage));
859865
var span = _tracer?.CreateProducerSpan(producer.Publication, null, requestContext.Span,
@@ -927,6 +933,9 @@ private async Task DispatchAsync(
927933
if (_asyncOutbox is null) throw new ArgumentException(NoAsyncOutboxError);
928934
foreach (var message in posts)
929935
{
936+
// Log the wire topic (Header.Topic) — where the message is going. Producer
937+
// lookup uses GetProducerLookupTopic, which may differ from Header.Topic when
938+
// a mapper overrode it (e.g. Reply messages routed to a dynamic reply address).
930939
Log.DecoupledInvocationOfMessage(s_logger, message.Header.Topic, message.Id);
931940

932941
var producer = _producerRegistry.LookupBy(GetProducerLookupTopic(message), message.Header.Type, requestContext);

src/Paramore.Brighter/WrapPipeline.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ public Message Wrap(TRequest request, RequestContext requestContext, Publication
9494
{
9595
Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic);
9696
if (publication.Topic is not null)
97+
{
9798
message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;
99+
}
98100
}
99101

100102
BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, false, _instrumentationOptions, true);

src/Paramore.Brighter/WrapPipelineAsync.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ public async Task<Message> WrapAsync(TRequest request, RequestContext requestCon
101101
{
102102
Log.DifferentPublicationAndMessageTopic(s_logger, publication.Topic?.Value ?? string.Empty, message.Header.Topic);
103103
if (publication.Topic is not null)
104+
{
104105
message.Header.Bag[Message.ProducerTopicHeaderName] = publication.Topic.Value;
106+
}
105107
}
106108

107109
BrighterTracer.WriteMapperEvent(message, publication, requestContext.Span, MessageMapper.GetType().Name, true, _instrumentationOptions, true);

tests/Paramore.Brighter.Core.Tests/CommandProcessors/Post/When_Bulk_Dispatching_Reply_Messages_Async.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,12 @@ await _mediator.ClearOutstandingFromOutboxAsync(
9797
useBulk: true,
9898
requestContext: context);
9999

100-
//allow background clear to run
101-
await Task.Delay(500);
100+
//poll until the background clear drains the outbox (bounded to avoid flake in CI)
101+
var deadline = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(5);
102+
while (_internalBus.Stream(_replyTopic).Count() < 2 && DateTimeOffset.UtcNow < deadline)
103+
{
104+
await Task.Delay(25);
105+
}
102106

103107
//assert - messages landed on the reply topic. If producer lookup had used
104108
//Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy

0 commit comments

Comments
 (0)