@@ -55,35 +55,27 @@ void queueMode_queue_createsWorkQueueStream() throws Exception {
5555 String streamName = "rqueue-" + "qm-queue-" + System .nanoTime ();
5656 String subject = "rqueue." + "qm-queue-" + System .nanoTime ();
5757 JetStreamManagement jsm = connection .jetStreamManagement ();
58- NatsProvisioner provisioner =
59- new NatsProvisioner (connection , jsm , RqueueNatsConfig .defaults ());
58+ NatsProvisioner provisioner = new NatsProvisioner (connection , jsm , RqueueNatsConfig .defaults ());
6059
6160 provisioner .ensureStream (streamName , List .of (subject ), QueueType .QUEUE );
6261
63- RetentionPolicy actual =
64- jsm .getStreamInfo (streamName ).getConfiguration ().getRetentionPolicy ();
62+ RetentionPolicy actual = jsm .getStreamInfo (streamName ).getConfiguration ().getRetentionPolicy ();
6563 assertEquals (
66- RetentionPolicy .WorkQueue ,
67- actual ,
68- "QUEUE mode must create a WorkQueue-retention stream" );
64+ RetentionPolicy .WorkQueue , actual , "QUEUE mode must create a WorkQueue-retention stream" );
6965 }
7066
7167 @ Test
7268 void queueMode_stream_createsLimitsStream () throws Exception {
7369 String streamName = "rqueue-" + "qm-stream-" + System .nanoTime ();
7470 String subject = "rqueue." + "qm-stream-" + System .nanoTime ();
7571 JetStreamManagement jsm = connection .jetStreamManagement ();
76- NatsProvisioner provisioner =
77- new NatsProvisioner (connection , jsm , RqueueNatsConfig .defaults ());
72+ NatsProvisioner provisioner = new NatsProvisioner (connection , jsm , RqueueNatsConfig .defaults ());
7873
7974 provisioner .ensureStream (streamName , List .of (subject ), QueueType .STREAM );
8075
81- RetentionPolicy actual =
82- jsm .getStreamInfo (streamName ).getConfiguration ().getRetentionPolicy ();
76+ RetentionPolicy actual = jsm .getStreamInfo (streamName ).getConfiguration ().getRetentionPolicy ();
8377 assertEquals (
84- RetentionPolicy .Limits ,
85- actual ,
86- "STREAM mode must create a Limits-retention stream" );
78+ RetentionPolicy .Limits , actual , "STREAM mode must create a Limits-retention stream" );
8779 }
8880
8981 // ---- Contract 2: consumer reuse preserves delivery position -----------
@@ -145,7 +137,8 @@ void queueMode_consumerReuse_preservesDeliveryPosition() throws Exception {
145137 cd .getMaxAckPending ());
146138
147139 // Verify the consumer info still reflects the already-delivered messages.
148- ConsumerInfo info = jsm .getConsumerInfo (RqueueNatsConfig .defaults ().getStreamPrefix () + q .getName (), consumerName );
140+ ConsumerInfo info = jsm .getConsumerInfo (
141+ RqueueNatsConfig .defaults ().getStreamPrefix () + q .getName (), consumerName );
149142 long numAcked = info .getNumAckPending () == 0
150143 ? total - info .getNumPending ()
151144 : total - info .getNumPending () - info .getNumAckPending ();
@@ -154,8 +147,8 @@ void queueMode_consumerReuse_preservesDeliveryPosition() throws Exception {
154147 assertEquals (
155148 total - firstBatch ,
156149 remaining ,
157- "consumer position must be preserved across ensureConsumer calls; "
158- + "remaining=" + remaining + " but expected " + (total - firstBatch ));
150+ "consumer position must be preserved across ensureConsumer calls; " + "remaining="
151+ + remaining + " but expected " + (total - firstBatch ));
159152 }
160153 }
161154
@@ -179,8 +172,7 @@ void queueMode_queue_competingConsumers_eachMessageDeliveredOnce() throws Except
179172 for (int t = 0 ; t < 2 ; t ++) {
180173 pool .submit (() -> {
181174 for (int round = 0 ; round < 100 && done .getCount () > 0 ; round ++) {
182- List <RqueueMessage > msgs =
183- broker .pop (q , sharedConsumer , 5 , Duration .ofMillis (300 ));
175+ List <RqueueMessage > msgs = broker .pop (q , sharedConsumer , 5 , Duration .ofMillis (300 ));
184176 for (RqueueMessage m : msgs ) {
185177 if (seen .add (m .getId ())) {
186178 done .countDown ();
@@ -194,9 +186,7 @@ void queueMode_queue_competingConsumers_eachMessageDeliveredOnce() throws Except
194186 pool .shutdownNow ();
195187
196188 assertEquals (
197- total ,
198- seen .size (),
199- "QUEUE mode: each message must be delivered to exactly one worker" );
189+ total , seen .size (), "QUEUE mode: each message must be delivered to exactly one worker" );
200190 }
201191 }
202192
@@ -219,9 +209,7 @@ void queueMode_stream_fanOut_everyConsumerReceivesAllMessages() throws Exception
219209 Set <String > listenerTwoSeen = drain (broker , q , "listener-svc-2" , total );
220210
221211 assertEquals (
222- total ,
223- listenerOneSeen .size (),
224- "STREAM mode: listener-svc-1 must receive all messages" );
212+ total , listenerOneSeen .size (), "STREAM mode: listener-svc-1 must receive all messages" );
225213 assertEquals (
226214 total ,
227215 listenerTwoSeen .size (),
0 commit comments