1818namespace Motor . Extensions . Hosting . PgMq_IntegrationTest ;
1919
2020[ Collection ( "PgMqMessage" ) ]
21- public class PgMqIntegrationTests : IClassFixture < PostgresFixture >
21+ public class PgMqIntegrationTests ( PostgresFixture fixture ) : IClassFixture < PostgresFixture >
2222{
23- private readonly IRandomizerString _randomizerString ;
24- private readonly NpgsqlConnectionStringBuilder _pgConnectionStringBuilder ;
25-
26- public PgMqIntegrationTests ( PostgresFixture fixture )
27- {
28- _randomizerString = RandomizerFactory . GetRandomizer ( new FieldOptionsTextRegex { Pattern = @"^[a-z]{10}" } ) ;
29- _pgConnectionStringBuilder = new NpgsqlConnectionStringBuilder ( fixture . ConnectionString ) ;
30- }
23+ private readonly IRandomizerString _randomizerString = RandomizerFactory . GetRandomizer ( new FieldOptionsTextRegex { Pattern = @"^[a-z]{10}" } ) ;
24+ private readonly NpgsqlConnectionStringBuilder _pgConnectionStringBuilder = new ( fixture . ConnectionString ) ;
3125
3226 [ Fact ( Timeout = 50000 ) ]
3327 public async Task Consume_ProduceAndConsumeProtocolFormat_ConsumedDataEqualsPublished ( )
@@ -47,15 +41,14 @@ public async Task Consume_ProduceAndConsumeProtocolFormat_ConsumedDataEqualsPubl
4741 using var cts = new CancellationTokenSource ( ) ;
4842 var executeTask = consumer . ExecuteAsync ( cts . Token ) ;
4943 await producer . PublishMessageAsync (
50- MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( expectedMessage ) )
51- ) ;
44+ MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( expectedMessage ) ) , cts . Token ) ;
5245 var received =
5346 await Task . WhenAny ( tcs . Task , executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ) == tcs . Task
5447 ? await tcs . Task
5548 : null ;
5649 await cts . CancelAsync ( ) ;
5750 await executeTask ;
58- await consumer . StopAsync ( ) ;
51+ await consumer . StopAsync ( cts . Token ) ;
5952 Assert . NotNull ( received ) ;
6053 Assert . Equal ( expectedMessage , Encoding . UTF8 . GetString ( received ! ) ) ;
6154 }
@@ -108,11 +101,11 @@ public async Task Consume_CallbackReturnsFailureStatus_StopsApplicationAndLogsEr
108101 consumer . ConsumeCallbackAsync = ( _ , _ ) => Task . FromResult ( ProcessedMessageStatus . CriticalFailure ) ;
109102 using var cts = new CancellationTokenSource ( ) ;
110103 var executeTask = consumer . ExecuteAsync ( cts . Token ) ;
111- await producer . PublishMessageAsync ( MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( "fail-me" ) ) ) ;
112- await Task . WhenAny ( executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ) ;
104+ await producer . PublishMessageAsync ( MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( "fail-me" ) ) , cts . Token ) ;
105+ await Task . WhenAny ( executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) , cts . Token ) ) ;
113106 await cts . CancelAsync ( ) ;
114107 await executeTask ;
115- await consumer . StopAsync ( ) ;
108+ await consumer . StopAsync ( cts . Token ) ;
116109 lifetimeMock . Verify ( x => x . StopApplication ( ) , Times . AtLeastOnce ) ;
117110 }
118111
@@ -129,11 +122,11 @@ public async Task Consume_UnhandledExceptionInCallback_StopsApplicationAndLogsCr
129122 consumer . ConsumeCallbackAsync = ( _ , _ ) => throw new InvalidOperationException ( "boom" ) ;
130123 using var cts = new CancellationTokenSource ( ) ;
131124 var executeTask = consumer . ExecuteAsync ( cts . Token ) ;
132- await producer . PublishMessageAsync ( MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( "boom" ) ) ) ;
133- await Task . WhenAny ( executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ) ;
125+ await producer . PublishMessageAsync ( MotorCloudEvent . CreateTestCloudEvent ( Encoding . UTF8 . GetBytes ( "boom" ) ) , cts . Token ) ;
126+ await Task . WhenAny ( executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) , cts . Token ) ) ;
134127 await cts . CancelAsync ( ) ;
135128 await executeTask ;
136- await consumer . StopAsync ( ) ;
129+ await consumer . StopAsync ( cts . Token ) ;
137130 lifetimeMock . Verify ( x => x . StopApplication ( ) , Times . AtLeastOnce ) ;
138131 loggerMock . Verify (
139132 x =>
@@ -165,7 +158,9 @@ public async Task Consume_MultipleMessages_AllConsumedSequentiallyAndAcknowledge
165158 {
166159 received . Add ( Encoding . UTF8 . GetString ( evt . TypedData ) ) ;
167160 if ( received . Count == messageCount )
161+ {
168162 tcs . TrySetResult ( ) ;
163+ }
169164 }
170165 return Task . FromResult ( ProcessedMessageStatus . Success ) ;
171166 } ;
@@ -177,7 +172,7 @@ await producer.PublishMessageAsync(
177172 }
178173 using var cts = new CancellationTokenSource ( ) ;
179174 var executeTask = consumer . ExecuteAsync ( cts . Token ) ;
180- await Task . WhenAny ( tcs . Task , executeTask , Task . Delay ( TimeSpan . FromSeconds ( 45 ) ) ) ;
175+ await Task . WhenAny ( tcs . Task , executeTask , Task . Delay ( TimeSpan . FromSeconds ( 45 ) , cts . Token ) ) ;
181176 Assert . Equal ( messageCount , received . Count ) ;
182177 await cts . CancelAsync ( ) ;
183178 await executeTask ;
@@ -198,7 +193,7 @@ public async Task Consume_TokenCancelledDuringPoll_ExitsGracefully()
198193 await consumer . StartAsync ( ) ;
199194 using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 2 ) ) ;
200195 await consumer . ExecuteAsync ( cts . Token ) ;
201- await consumer . StopAsync ( ) ;
196+ await consumer . StopAsync ( cts . Token ) ;
202197 lifetimeMock . Verify ( x => x . StopApplication ( ) , Times . Never ) ;
203198 }
204199
@@ -258,16 +253,16 @@ public async Task Consume_SuccessfulMessage_ProtocolFormat_CloudEventAttributesR
258253 null ,
259254 null
260255 ) ;
261- await producer . PublishMessageAsync ( cloudEvent ) ;
256+ await producer . PublishMessageAsync ( cloudEvent , cts . Token ) ;
262257 var receivedEvent =
263- await Task . WhenAny ( tcs . Task , executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ) == tcs . Task
258+ await Task . WhenAny ( tcs . Task , executeTask , Task . Delay ( TimeSpan . FromSeconds ( 30 ) , cts . Token ) ) == tcs . Task
264259 ? await tcs . Task
265260 : null ;
266261 await cts . CancelAsync ( ) ;
267262 await executeTask ;
268- await consumer . StopAsync ( ) ;
263+ await consumer . StopAsync ( cts . Token ) ;
269264 Assert . NotNull ( receivedEvent ) ;
270- Assert . Equal ( expectedId , receivedEvent ! . Id ) ;
265+ Assert . Equal ( expectedId , receivedEvent . Id ) ;
271266 Assert . Equal ( expectedSource , receivedEvent . Source ) ;
272267 }
273268
@@ -323,7 +318,7 @@ options with
323318 Password = _pgConnectionStringBuilder . Password ?? string . Empty ,
324319 } ;
325320
326- private IApplicationNameService GetApplicationNameService ( string source = "motor://test" )
321+ private static IApplicationNameService GetApplicationNameService ( string source = "motor://test" )
327322 {
328323 var mock = new Mock < IApplicationNameService > ( ) ;
329324 mock . Setup ( t => t . GetSource ( ) ) . Returns ( new Uri ( source ) ) ;
0 commit comments