Skip to content

Commit 94e6260

Browse files
test: cover bulk-dispatch + null-publication-topic for reply producer lookup
- bulk dispatch reply messages via outbox sweep (firstMessage bag lookup) - wrap with null publication.Topic skips bag write (guard pin, sync + async) Co-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>
1 parent 166bf8f commit 94e6260

3 files changed

Lines changed: 218 additions & 0 deletions

File tree

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using System.Transactions;
6+
using Microsoft.Extensions.Time.Testing;
7+
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
8+
using Paramore.Brighter.Extensions;
9+
using Paramore.Brighter.Observability;
10+
using Polly.Registry;
11+
using Xunit;
12+
13+
namespace Paramore.Brighter.Core.Tests.CommandProcessors.Post
14+
{
15+
// Gap: BulkDispatchAsync groups by Header.Topic (the mapper-set reply topic) but
16+
// looks up the producer using the first message's bag. The single-message Dispatch
17+
// paths are covered; this pins the bulk-outbox-clear path so a future refactor of
18+
// GetProducerLookupTopic or the firstMessage assumption cannot silently regress and
19+
// fail to locate the producer when draining the outbox via the bulk sweep.
20+
public class CommandProcessorBulkDispatchReplyAsyncTests
21+
{
22+
private const string ProducerTopic = "Reply";
23+
private readonly CommandProcessor _commandProcessor;
24+
private readonly IAmAnOutboxProducerMediator _mediator;
25+
private readonly MyResponse _replyOne;
26+
private readonly MyResponse _replyTwo;
27+
private readonly InternalBus _internalBus = new();
28+
private readonly RoutingKey _replyTopic;
29+
30+
public CommandProcessorBulkDispatchReplyAsyncTests()
31+
{
32+
var timeProvider = new FakeTimeProvider();
33+
var producerRoutingKey = new RoutingKey(ProducerTopic);
34+
35+
_replyTopic = new RoutingKey(Uuid.NewAsString());
36+
var replyAddress = new ReplyAddress(_replyTopic, Uuid.NewAsString());
37+
_replyOne = new MyResponse(replyAddress) { ReplyValue = "Hello" };
38+
_replyTwo = new MyResponse(replyAddress) { ReplyValue = "World" };
39+
40+
InMemoryMessageProducer messageProducer = new(_internalBus,
41+
new Publication
42+
{
43+
Topic = producerRoutingKey,
44+
RequestType = typeof(MyResponse)
45+
});
46+
47+
var messageMapperRegistry = new MessageMapperRegistry(
48+
null,
49+
new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync()));
50+
messageMapperRegistry.RegisterAsync<MyResponse, MyResponseMessageMapperAsync>();
51+
52+
var resiliencePipelineRegistry = new ResiliencePipelineRegistry<string>()
53+
.AddBrighterDefault();
54+
55+
var producerRegistry = new ProducerRegistry(
56+
new Dictionary<RoutingKey, IAmAMessageProducer>
57+
{
58+
{ producerRoutingKey, messageProducer }
59+
});
60+
61+
var tracer = new BrighterTracer(timeProvider);
62+
var outbox = new InMemoryOutbox(timeProvider) { Tracer = tracer };
63+
64+
_mediator = new OutboxProducerMediator<Message, CommittableTransaction>(
65+
producerRegistry,
66+
resiliencePipelineRegistry,
67+
messageMapperRegistry,
68+
new EmptyMessageTransformerFactory(),
69+
new EmptyMessageTransformerFactoryAsync(),
70+
tracer,
71+
new FindPublicationByPublicationTopicOrRequestType(),
72+
outbox,
73+
maxOutStandingMessages: -1
74+
);
75+
76+
_commandProcessor = new CommandProcessor(
77+
new InMemoryRequestContextFactory(),
78+
new DefaultPolicy(),
79+
resiliencePipelineRegistry,
80+
_mediator,
81+
new InMemorySchedulerFactory()
82+
);
83+
}
84+
85+
[Fact]
86+
public async Task When_Bulk_Dispatching_Reply_Messages_Async()
87+
{
88+
//arrange - deposit two replies whose mapper sets Header.Topic to the reply
89+
//address, not the registered producer topic
90+
var context = new RequestContext();
91+
await _commandProcessor.DepositPostAsync<MyResponse>([_replyOne, _replyTwo], context);
92+
93+
//act - drain via the bulk path (exercised by background outbox sweeps)
94+
await _mediator.ClearOutstandingFromOutboxAsync(
95+
amountToClear: 10,
96+
minimumAge: TimeSpan.Zero,
97+
useBulk: true,
98+
requestContext: context);
99+
100+
//allow background clear to run
101+
await Task.Delay(500);
102+
103+
//assert - messages landed on the reply topic. If producer lookup had used
104+
//Header.Topic (reply address) instead of the bag's ProducerTopic, LookupBy
105+
//would have thrown and nothing would arrive on the bus.
106+
var messages = _internalBus.Stream(_replyTopic).ToArray();
107+
Assert.Equal(2, messages.Length);
108+
}
109+
}
110+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
2+
using Xunit;
3+
4+
namespace Paramore.Brighter.Core.Tests.MessageSerialisation;
5+
6+
// Gap: WrapPipeline.Wrap now writes publication.Topic into Header.Bag when the
7+
// mapper-set topic differs from the publication topic. The fix guards against a
8+
// null publication.Topic before writing. Without this test, a future refactor that
9+
// drops the null-check would NRE at runtime for publications that legitimately
10+
// have no topic (dynamic routing scenarios).
11+
public class WrapNullPublicationTopicTests
12+
{
13+
private readonly TransformPipelineBuilder _pipelineBuilder;
14+
private readonly MyResponse _myResponse;
15+
private readonly Publication _publication;
16+
17+
public WrapNullPublicationTopicTests()
18+
{
19+
TransformPipelineBuilder.ClearPipelineCache();
20+
21+
var mapperRegistry = new MessageMapperRegistry(
22+
new SimpleMessageMapperFactory(_ => new MyResponseMessageMapper()),
23+
null);
24+
mapperRegistry.Register<MyResponse, MyResponseMessageMapper>();
25+
26+
var replyTopic = new RoutingKey(Uuid.NewAsString());
27+
var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString());
28+
_myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" };
29+
30+
var messageTransformerFactory = new SimpleMessageTransformerFactory(_ => null);
31+
32+
//publication with no topic — mapper still sets its own topic from the reply address
33+
_publication = new Publication
34+
{
35+
Topic = null,
36+
RequestType = typeof(MyResponse)
37+
};
38+
39+
_pipelineBuilder = new TransformPipelineBuilder(mapperRegistry, messageTransformerFactory);
40+
}
41+
42+
[Fact]
43+
public void When_Wrapping_With_Null_Publication_Topic()
44+
{
45+
//act
46+
var transformPipeline = _pipelineBuilder.BuildWrapPipeline<MyResponse>();
47+
var message = transformPipeline.Wrap(_myResponse, new RequestContext(), _publication);
48+
49+
//assert - topic came from the mapper
50+
Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic);
51+
52+
//assert - no ProducerTopic entry was written to the bag (the guard held)
53+
Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName));
54+
}
55+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System.Threading.Tasks;
2+
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
3+
using Paramore.Brighter.Observability;
4+
using Xunit;
5+
6+
namespace Paramore.Brighter.Core.Tests.MessageSerialisation;
7+
8+
// Async twin of WrapNullPublicationTopicTests — pins the null-publication-topic
9+
// guard in WrapPipelineAsync so refactors cannot silently NRE.
10+
public class AsyncWrapNullPublicationTopicTests
11+
{
12+
private readonly TransformPipelineBuilderAsync _pipelineBuilder;
13+
private readonly MyResponse _myResponse;
14+
private readonly Publication _publication;
15+
16+
public AsyncWrapNullPublicationTopicTests()
17+
{
18+
TransformPipelineBuilder.ClearPipelineCache();
19+
20+
var mapperRegistry = new MessageMapperRegistry(
21+
null,
22+
new SimpleMessageMapperFactoryAsync(_ => new MyResponseMessageMapperAsync()));
23+
mapperRegistry.RegisterAsync<MyResponse, MyResponseMessageMapperAsync>();
24+
25+
var replyTopic = new RoutingKey(Uuid.NewAsString());
26+
var replyAddress = new ReplyAddress(replyTopic, Uuid.NewAsString());
27+
_myResponse = new MyResponse(replyAddress) { ReplyValue = "Hello World" };
28+
29+
var messageTransformerFactory = new SimpleMessageTransformerFactoryAsync(_ => null);
30+
31+
_publication = new Publication
32+
{
33+
Topic = null,
34+
RequestType = typeof(MyResponse)
35+
};
36+
37+
_pipelineBuilder = new TransformPipelineBuilderAsync(mapperRegistry, messageTransformerFactory, InstrumentationOptions.All);
38+
}
39+
40+
[Fact]
41+
public async Task When_Wrapping_With_Null_Publication_Topic_Async()
42+
{
43+
//act
44+
var transformPipeline = _pipelineBuilder.BuildWrapPipeline<MyResponse>();
45+
var message = await transformPipeline.WrapAsync(_myResponse, new RequestContext(), _publication);
46+
47+
//assert - topic came from the mapper
48+
Assert.Equal(_myResponse.SendersAddress.Topic, message.Header.Topic);
49+
50+
//assert - no ProducerTopic entry was written to the bag (the guard held)
51+
Assert.False(message.Header.Bag.ContainsKey(Message.ProducerTopicHeaderName));
52+
}
53+
}

0 commit comments

Comments
 (0)