@@ -11,6 +11,84 @@ namespace StackExchange.Redis.Tests.MultiGroupTests;
1111[ RunPerProtocol ]
1212public class BasicMultiGroupTests ( ITestOutputHelper log )
1313{
14+ private sealed class Capture ( ITestOutputHelper log )
15+ {
16+ private readonly List < string > _seen = [ ] ;
17+
18+ public void Seen ( string source , RedisChannel channel , RedisValue value )
19+ {
20+ string message = $ "[{ source } ] { channel } : { value } ";
21+ lock ( _seen )
22+ {
23+ _seen . Add ( message ) ;
24+ }
25+ }
26+
27+ public void Reset ( )
28+ {
29+ lock ( _seen )
30+ {
31+ _seen . Clear ( ) ;
32+ }
33+ }
34+
35+ public void AssertTakeAny ( string value )
36+ {
37+ lock ( _seen )
38+ {
39+ Assert . True ( _seen . Remove ( value ) , $ "Expected to find '{ value } ', but did not") ;
40+ }
41+ }
42+
43+ public void AssertTakeFirst ( string value )
44+ {
45+ lock ( _seen )
46+ {
47+ Assert . NotEmpty ( _seen ) ;
48+ Assert . Equal ( value , _seen [ 0 ] ) ;
49+ _seen . RemoveAt ( 0 ) ; // not concerned by perf here
50+ }
51+ }
52+
53+ public async ValueTask AwaitAsync ( ISubscriber sub , int expected )
54+ {
55+ for ( int i = 0 ; i < 10 ; i ++ )
56+ {
57+ lock ( _seen )
58+ {
59+ if ( _seen . Count >= expected )
60+ {
61+ Assert . Equal ( expected , _seen . Count ) ;
62+ log . WriteLine ( "Messages:" ) ;
63+ foreach ( var item in _seen )
64+ {
65+ log . WriteLine ( item ) ;
66+ }
67+ return ;
68+ }
69+ }
70+ log . WriteLine ( $ "Waiting for { expected } messages, got { i } , pausing...") ;
71+ await Task . Delay ( 10 , TestContext . Current . CancellationToken ) ;
72+ await sub . PingAsync ( ) ;
73+ }
74+
75+ int actual ;
76+ lock ( _seen )
77+ {
78+ actual = _seen . Count ;
79+ }
80+ throw new TimeoutException ( $ "Timed out waiting for { expected } messages, got { actual } ") ;
81+ }
82+
83+ public Task WriteSeen ( string source , ChannelMessageQueue queue ) =>
84+ Task . Run ( async ( ) =>
85+ {
86+ await foreach ( var msg in queue )
87+ {
88+ Seen ( source , msg . Channel , msg . Message ) ;
89+ }
90+ } ) ;
91+ }
1492 protected TextWriter Log { get ; } = new TextWriterOutputHelper ( log ) ;
1593
1694 [ Fact ]
@@ -20,9 +98,9 @@ public async Task SelectByWeight()
2098 EndPoint canada = new DnsEndPoint ( "canada" , 6379 ) ;
2199 EndPoint tokyo = new DnsEndPoint ( "tokyo" , 6379 ) ;
22100
23- using var server0 = new InProcessTestServer ( endpoint : germany ) ;
24- using var server1 = new InProcessTestServer ( endpoint : canada ) ;
25- using var server2 = new InProcessTestServer ( endpoint : tokyo ) ;
101+ using var server0 = new InProcessTestServer ( log , endpoint : germany ) ;
102+ using var server1 = new InProcessTestServer ( log , endpoint : canada ) ;
103+ using var server2 = new InProcessTestServer ( log , endpoint : tokyo ) ;
26104
27105 ConnectionGroupMember [ ] members = [
28106 new ( server0 . GetClientConfig ( ) ) { Weight = 2 } ,
@@ -64,9 +142,9 @@ public async Task SelectByLatency()
64142 EndPoint canada = new DnsEndPoint ( "canada" , 6379 ) ;
65143 EndPoint tokyo = new DnsEndPoint ( "tokyo" , 6379 ) ;
66144
67- using var server0 = new InProcessTestServer ( endpoint : germany ) ;
68- using var server1 = new InProcessTestServer ( endpoint : canada ) ;
69- using var server2 = new InProcessTestServer ( endpoint : tokyo ) ;
145+ using var server0 = new InProcessTestServer ( log , endpoint : germany ) ;
146+ using var server1 = new InProcessTestServer ( log , endpoint : canada ) ;
147+ using var server2 = new InProcessTestServer ( log , endpoint : tokyo ) ;
70148
71149 ConnectionGroupMember [ ] members = [
72150 new ( server0 . GetClientConfig ( ) ) ,
@@ -110,36 +188,20 @@ public async Task PubSubRouted()
110188 EndPoint canada = new DnsEndPoint ( "canada" , 6379 ) ;
111189 EndPoint tokyo = new DnsEndPoint ( "tokyo" , 6379 ) ;
112190
113- using var server0 = new InProcessTestServer ( endpoint : germany ) ;
191+ using var server0 = new InProcessTestServer ( log , endpoint : germany ) ;
114192 Assert . SkipUnless ( server0 . GetClientConfig ( ) . CommandMap . IsAvailable ( RedisCommand . PUBLISH ) , "PUBLISH is not available" ) ;
115- using var server1 = new InProcessTestServer ( endpoint : canada ) ;
116- using var server2 = new InProcessTestServer ( endpoint : tokyo ) ;
193+ using var server1 = new InProcessTestServer ( log , endpoint : canada ) ;
194+ using var server2 = new InProcessTestServer ( log , endpoint : tokyo ) ;
117195
118- HashSet < string > seen = [ ] ;
196+ Capture capture = new ( log ) ;
119197
120- void Seen ( string source , RedisChannel channel , RedisValue value )
121- {
122- string message = $ "[{ source } ] { channel } : { value } ";
123- lock ( seen )
124- {
125- seen . Add ( message ) ;
126- }
127- }
128-
129- void Reset ( )
130- {
131- lock ( seen )
132- {
133- seen . Clear ( ) ;
134- }
135- }
136198 RedisChannel channel = RedisChannel . Literal ( "chan" ) ;
137199 var pub0 = ( await server0 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
138- await pub0 . SubscribeAsync ( channel , ( x , y ) => Seen ( nameof ( pub0 ) , x , y ) ) ;
200+ await pub0 . SubscribeAsync ( channel , ( x , y ) => capture . Seen ( nameof ( pub0 ) , x , y ) ) ;
139201 var pub1 = ( await server1 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
140- await pub1 . SubscribeAsync ( channel , ( x , y ) => Seen ( nameof ( pub1 ) , x , y ) ) ;
202+ await pub1 . SubscribeAsync ( channel , ( x , y ) => capture . Seen ( nameof ( pub1 ) , x , y ) ) ;
141203 var pub2 = ( await server2 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
142- await pub2 . SubscribeAsync ( channel , ( x , y ) => Seen ( nameof ( pub2 ) , x , y ) ) ;
204+ await pub2 . SubscribeAsync ( channel , ( x , y ) => capture . Seen ( nameof ( pub2 ) , x , y ) ) ;
143205
144206 ConnectionGroupMember [ ] members = [
145207 new ( server0 . GetClientConfig ( ) ) { Weight = 2 } ,
@@ -150,51 +212,50 @@ void Reset()
150212 Assert . True ( conn . IsConnected ) ;
151213 var typed = Assert . IsType < MultiGroupMultiplexer > ( conn ) ;
152214 var multi = conn . GetSubscriber ( ) ;
153- await multi . SubscribeAsync ( channel , ( x , y ) => Seen ( nameof ( conn ) , x , y ) ) ;
215+ await multi . SubscribeAsync ( channel , ( x , y ) => capture . Seen ( nameof ( conn ) , x , y ) ) ;
154216
155217 // (R.4.1) If multiple member databases are configured, then I want to failover to the one with the highest weight.
156218 var db = conn . GetDatabase ( ) ;
157219 var ep = await db . IdentifyEndpointAsync ( ) ;
158220 Assert . Equal ( canada , ep ) ;
159221
160222 // now publish via all 4 options, see what happens
161- Reset ( ) ;
223+ capture . Reset ( ) ;
162224 await pub0 . PublishAsync ( channel , "abc" ) ;
163225 await pub1 . PublishAsync ( channel , "def" ) ;
164226 await pub2 . PublishAsync ( channel , "ghi" ) ;
165227 await multi . PublishAsync ( channel , "jkl" ) ;
166- await multi . PingAsync ( ) ;
167228
168229 // we're expecting just canada, so:
169- Assert . Equal ( 6 , seen . Count ) ;
170- Assert . Contains ( "[pub0] chan: abc" , seen ) ;
171- Assert . Contains ( "[pub1] chan: def" , seen ) ;
172- Assert . Contains ( "[pub2] chan: ghi" , seen ) ;
173- Assert . Contains ( "[conn] chan: def" , seen ) ; // receives the message from pub1
174- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from itself
175- Assert . Contains ( "[pub1] chan: jkl" , seen ) ; // received the message from the multi-group
230+ await capture . AwaitAsync ( multi , 6 ) ;
231+ capture . AssertTakeAny ( "[pub0] chan: abc" ) ;
232+ capture . AssertTakeAny ( "[pub1] chan: def" ) ;
233+ capture . AssertTakeAny ( "[pub2] chan: ghi" ) ;
234+ capture . AssertTakeAny ( "[conn] chan: def" ) ; // receives the message from pub1
235+ capture . AssertTakeAny ( "[conn] chan: jkl" ) ; // receives the message from itself
236+ capture . AssertTakeAny ( "[pub1] chan: jkl" ) ; // received the message from the multi-group
176237
177238 // change weight and update
178239 members [ 1 ] . Weight = 1 ;
179240 typed . SelectPreferredGroup ( ) ;
180241 ep = await db . IdentifyEndpointAsync ( ) ;
181242 Assert . Equal ( tokyo , ep ) ;
182243
183- Reset ( ) ;
244+ capture . Reset ( ) ;
245+ log . WriteLine ( "Publishing..." ) ;
184246 await pub0 . PublishAsync ( channel , "abc" ) ;
185247 await pub1 . PublishAsync ( channel , "def" ) ;
186248 await pub2 . PublishAsync ( channel , "ghi" ) ;
187249 await multi . PublishAsync ( channel , "jkl" ) ;
188- await multi . PingAsync ( ) ;
189250
190251 // now we're expecting just tokyo, so:
191- Assert . Equal ( 6 , seen . Count ) ;
192- Assert . Contains ( "[pub0] chan: abc" , seen ) ;
193- Assert . Contains ( "[pub1] chan: def" , seen ) ;
194- Assert . Contains ( "[pub2] chan: ghi" , seen ) ;
195- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from pub2
196- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from itself
197- Assert . Contains ( "[pub2] chan: jkl" , seen ) ; // received the message from the multi-group
252+ await capture . AwaitAsync ( multi , 6 ) ;
253+ capture . AssertTakeAny ( "[pub0] chan: abc" ) ;
254+ capture . AssertTakeAny ( "[pub1] chan: def" ) ;
255+ capture . AssertTakeAny ( "[pub2] chan: ghi" ) ;
256+ capture . AssertTakeAny ( "[conn] chan: ghi" ) ; // receives the message from pub2
257+ capture . AssertTakeAny ( "[conn] chan: jkl" ) ; // receives the message from itself
258+ capture . AssertTakeAny ( "[pub2] chan: jkl" ) ; // received the message from the multi-group
198259 }
199260
200261 [ Fact ]
@@ -204,50 +265,25 @@ public async Task PubSubOrderedRouted()
204265 EndPoint canada = new DnsEndPoint ( "canada" , 6379 ) ;
205266 EndPoint tokyo = new DnsEndPoint ( "tokyo" , 6379 ) ;
206267
207- using var server0 = new InProcessTestServer ( endpoint : germany ) ;
208- Assert . SkipUnless ( server0 . GetClientConfig ( ) . CommandMap . IsAvailable ( RedisCommand . PUBLISH ) , "PUBLISH is not available" ) ;
209- using var server1 = new InProcessTestServer ( endpoint : canada ) ;
210- using var server2 = new InProcessTestServer ( endpoint : tokyo ) ;
211-
212- HashSet < string > seen = [ ] ;
213-
214- void Seen ( string source , RedisChannel channel , RedisValue value )
215- {
216- string message = $ "[{ source } ] { channel } : { value } ";
217- lock ( seen )
218- {
219- seen . Add ( message ) ;
220- }
221- }
268+ using var server0 = new InProcessTestServer ( log , endpoint : germany ) ;
269+ Assert . SkipUnless (
270+ server0 . GetClientConfig ( ) . CommandMap . IsAvailable ( RedisCommand . PUBLISH ) ,
271+ "PUBLISH is not available" ) ;
272+ using var server1 = new InProcessTestServer ( log , endpoint : canada ) ;
273+ using var server2 = new InProcessTestServer ( log , endpoint : tokyo ) ;
222274
223- void Reset ( )
224- {
225- lock ( seen )
226- {
227- seen . Clear ( ) ;
228- }
229- }
230-
231- void WriteSeen ( string source , ChannelMessageQueue queue )
232- {
233- _ = Task . Run ( async ( ) =>
234- {
235- await foreach ( var msg in queue )
236- {
237- Seen ( source , msg . Channel , msg . Message ) ;
238- }
239- } ) ;
240- }
275+ Capture capture = new ( log ) ;
241276
242277 RedisChannel channel = RedisChannel . Literal ( "chan" ) ;
243278 var pub0 = ( await server0 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
244- WriteSeen ( nameof ( pub0 ) , await pub0 . SubscribeAsync ( channel ) ) ;
279+ _ = capture . WriteSeen ( nameof ( pub0 ) , await pub0 . SubscribeAsync ( channel ) ) ;
245280 var pub1 = ( await server1 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
246- WriteSeen ( nameof ( pub1 ) , await pub1 . SubscribeAsync ( channel ) ) ;
281+ _ = capture . WriteSeen ( nameof ( pub1 ) , await pub1 . SubscribeAsync ( channel ) ) ;
247282 var pub2 = ( await server2 . ConnectAsync ( ) ) . GetSubscriber ( ) ;
248- WriteSeen ( nameof ( pub2 ) , await pub2 . SubscribeAsync ( channel ) ) ;
283+ _ = capture . WriteSeen ( nameof ( pub2 ) , await pub2 . SubscribeAsync ( channel ) ) ;
249284
250- ConnectionGroupMember [ ] members = [
285+ ConnectionGroupMember [ ] members =
286+ [
251287 new ( server0 . GetClientConfig ( ) ) { Weight = 2 } ,
252288 new ( server1 . GetClientConfig ( ) ) { Weight = 9 } ,
253289 new ( server2 . GetClientConfig ( ) ) { Weight = 3 } ,
@@ -256,50 +292,69 @@ void WriteSeen(string source, ChannelMessageQueue queue)
256292 Assert . True ( conn . IsConnected ) ;
257293 var typed = Assert . IsType < MultiGroupMultiplexer > ( conn ) ;
258294 var multi = conn . GetSubscriber ( ) ;
259- WriteSeen ( nameof ( conn ) , await multi . SubscribeAsync ( channel ) ) ;
295+ _ = capture . WriteSeen ( nameof ( conn ) , await multi . SubscribeAsync ( channel ) ) ;
260296
261297 // (R.4.1) If multiple member databases are configured, then I want to failover to the one with the highest weight.
262298 var db = conn . GetDatabase ( ) ;
263299 var ep = await db . IdentifyEndpointAsync ( ) ;
264300 Assert . Equal ( canada , ep ) ;
265301
266302 // now publish via all 4 options, see what happens
267- Reset ( ) ;
303+ capture . Reset ( ) ;
268304 await pub0 . PublishAsync ( channel , "abc" ) ;
269305 await pub1 . PublishAsync ( channel , "def" ) ;
270306 await pub2 . PublishAsync ( channel , "ghi" ) ;
271- await multi . PublishAsync ( channel , "jkl" ) ;
272- await multi . PingAsync ( ) ;
307+ for ( int i = 0 ; i < 5 ; i ++ )
308+ {
309+ await multi . PublishAsync ( channel , $ "jkl{ i } ") ;
310+ }
273311
274312 // we're expecting just canada, so:
275- Assert . Equal ( 6 , seen . Count ) ;
276- Assert . Contains ( "[pub0] chan: abc" , seen ) ;
277- Assert . Contains ( "[pub1] chan: def" , seen ) ;
278- Assert . Contains ( "[pub2] chan: ghi" , seen ) ;
279- Assert . Contains ( "[conn] chan: def" , seen ) ; // receives the message from pub1
280- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from itself
281- Assert . Contains ( "[pub1] chan: jkl" , seen ) ; // received the message from the multi-group
313+ await capture . AwaitAsync ( multi , 14 ) ;
314+ capture . AssertTakeAny ( "[pub0] chan: abc" ) ;
315+ capture . AssertTakeAny ( "[pub1] chan: def" ) ;
316+ capture . AssertTakeAny ( "[pub2] chan: ghi" ) ;
317+ for ( int i = 0 ; i < 5 ; i ++ )
318+ {
319+ capture . AssertTakeAny ( $ "[pub1] chan: jkl{ i } ") ; // received the message from the multi-group
320+ }
321+ // these should be ordered
322+ capture . AssertTakeFirst ( "[conn] chan: def" ) ; // receives the message from pub1
323+ for ( int i = 0 ; i < 5 ; i ++ )
324+ {
325+ capture . AssertTakeFirst ( $ "[conn] chan: jkl{ i } ") ; // receives the message from itself
326+ }
282327
283328 // change weight and update
284329 members [ 1 ] . Weight = 1 ;
285330 typed . SelectPreferredGroup ( ) ;
286331 ep = await db . IdentifyEndpointAsync ( ) ;
287332 Assert . Equal ( tokyo , ep ) ;
288333
289- Reset ( ) ;
334+ capture . Reset ( ) ;
290335 await pub0 . PublishAsync ( channel , "abc" ) ;
291336 await pub1 . PublishAsync ( channel , "def" ) ;
292337 await pub2 . PublishAsync ( channel , "ghi" ) ;
293- await multi . PublishAsync ( channel , "jkl" ) ;
294- await multi . PingAsync ( ) ;
338+ for ( int i = 0 ; i < 5 ; i ++ )
339+ {
340+ await multi . PublishAsync ( channel , $ "jkl{ i } ") ;
341+ }
295342
296343 // now we're expecting just tokyo, so:
297- Assert . Equal ( 6 , seen . Count ) ;
298- Assert . Contains ( "[pub0] chan: abc" , seen ) ;
299- Assert . Contains ( "[pub1] chan: def" , seen ) ;
300- Assert . Contains ( "[pub2] chan: ghi" , seen ) ;
301- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from pub2
302- Assert . Contains ( "[conn] chan: jkl" , seen ) ; // receives the message from itself
303- Assert . Contains ( "[pub2] chan: jkl" , seen ) ; // received the message from the multi-group
344+ await capture . AwaitAsync ( multi , 14 ) ;
345+ capture . AssertTakeAny ( "[pub0] chan: abc" ) ;
346+ capture . AssertTakeAny ( "[pub1] chan: def" ) ;
347+ capture . AssertTakeAny ( "[pub2] chan: ghi" ) ;
348+ for ( int i = 0 ; i < 5 ; i ++ )
349+ {
350+ capture . AssertTakeAny ( $ "[pub2] chan: jkl{ i } ") ; // received the message from the multi-group
351+ }
352+
353+ // these should be ordered
354+ capture . AssertTakeFirst ( "[conn] chan: ghi" ) ; // receives the message from pub2
355+ for ( int i = 0 ; i < 5 ; i ++ )
356+ {
357+ capture . AssertTakeFirst ( $ "[conn] chan: jkl{ i } ") ; // receives the message from itself
358+ }
304359 }
305360}
0 commit comments