77import com .danubemessaging .client .model .StreamMessage ;
88import org .junit .jupiter .api .Test ;
99
10- import java .time .Duration ;
1110import java .util .HashSet ;
12- import java .util .List ;
1311import java .util .Map ;
1412import java .util .Set ;
13+ import java .util .concurrent .TimeUnit ;
1514
1615import static com .danubemessaging .client .it .TestHelpers .*;
1716import static org .junit .jupiter .api .Assertions .*;
2221 */
2322class PartitionedBasicIT {
2423
25- private void runPartitionedBasic (String topicPrefix , SubType subType ) {
24+ private void runPartitionedBasic (String topicPrefix , SubType subType ) throws Exception {
2625 DanubeClient client = newClient ();
2726 String topic = uniqueTopic (topicPrefix );
2827 int partitions = 3 ;
@@ -43,21 +42,25 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
4342 consumer .subscribe ();
4443
4544 try {
46- var publisher = consumer .receive ();
45+ String [] expected = { "Hello Danube 1" , "Hello Danube 2" , "Hello Danube 3" };
46+
47+ // Attach collector BEFORE sending
48+ var collector = new TestHelpers .MessageCollector (expected .length );
49+ consumer .receive ().subscribe (collector );
4750
4851 Thread .sleep (300 );
4952
50- String [] expected = {"Hello Danube 1" , "Hello Danube 2" , "Hello Danube 3" };
5153 for (String body : expected ) {
5254 producer .send (body .getBytes (), Map .of ());
5355 }
5456
55- List <StreamMessage > messages = receiveMessages (publisher , expected .length , Duration .ofSeconds (10 ));
57+ assertTrue (collector .latch .await (10 , TimeUnit .SECONDS ),
58+ "Timeout: received " + collector .messages .size () + "/" + expected .length );
5659
5760 // Verify all payloads received
5861 Set <String > received = new HashSet <>();
5962 Set <String > partsSeen = new HashSet <>();
60- for (StreamMessage msg : messages ) {
63+ for (StreamMessage msg : collector . messages ) {
6164 received .add (new String (msg .payload ()));
6265 partsSeen .add (msg .messageId ().topicName ());
6366 consumer .ack (msg );
@@ -72,22 +75,19 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
7275 String partName = topic + "-part-" + i ;
7376 assertTrue (partsSeen .contains (partName ), "missing partition: " + partName );
7477 }
75- } catch (InterruptedException e ) {
76- Thread .currentThread ().interrupt ();
77- fail ("Interrupted" );
7878 } finally {
7979 consumer .close ();
8080 client .close ();
8181 }
8282 }
8383
8484 @ Test
85- void partitionedBasicExclusive () {
85+ void partitionedBasicExclusive () throws Exception {
8686 runPartitionedBasic ("/default/part_basic_excl" , SubType .EXCLUSIVE );
8787 }
8888
8989 @ Test
90- void partitionedBasicShared () {
90+ void partitionedBasicShared () throws Exception {
9191 runPartitionedBasic ("/default/part_basic_shared" , SubType .SHARED );
9292 }
9393}
0 commit comments