@@ -92,14 +92,16 @@ public void testSimple() throws Exception {
9292 ClientMessage m = consumer .receive (1000 );
9393 assertNotNull (m );
9494 m .acknowledge ();
95+ clientSession .commit ();
96+
9597 assertEquals ("m2" , m .getBodyBuffer ().readString ());
9698 }
9799
98100 @ Test
99101 public void testSimpleExclusive () throws Exception {
100102 ServerLocator locator = createNettyNonHALocator ().setConsumerWindowSize (0 );
101103 ClientSessionFactory sf = createSessionFactory (locator );
102- ClientSession clientSession = addClientSession (sf .createSession (false , true , true ));
104+ ClientSession clientSession = addClientSession (sf .createSession (false , true , false ));
103105 final String EXCLUSIVE_QUEUE = "exclusiveQueue" ;
104106
105107 clientSession .createQueue (QueueConfiguration .of (EXCLUSIVE_QUEUE ).setExclusive (true ).setLastValue (true ));
@@ -116,6 +118,7 @@ public void testSimpleExclusive() throws Exception {
116118 ClientMessage m = consumer .receive (1000 );
117119 assertNotNull (m );
118120 m .acknowledge ();
121+ clientSession .commit ();
119122 assertEquals ("m2" , m .getBodyBuffer ().readString ());
120123 }
121124
@@ -138,7 +141,7 @@ public void testSimpleRestart() throws Exception {
138141 assertEquals (1 , server .locateQueue (qName1 ).getMessageCount ());
139142 ServerLocator locator = createNettyNonHALocator ().setBlockOnAcknowledge (true ).setAckBatchSize (0 );
140143 ClientSessionFactory sf = createSessionFactory (locator );
141- clientSession = addClientSession (sf .createSession (false , true , true ));
144+ clientSession = addClientSession (sf .createSession (false , true , false ));
142145 producer = clientSession .createProducer (address );
143146 ClientMessage m3 = createTextMessage (clientSession , "m3" );
144147 m3 .putStringProperty (Message .HDR_LAST_VALUE_NAME , rh );
@@ -151,6 +154,7 @@ public void testSimpleRestart() throws Exception {
151154 ClientMessage m = consumer .receive (1000 );
152155 assertNotNull (m );
153156 m .acknowledge ();
157+ clientSession .commit ();
154158 assertEquals ("m3" , m .getBodyBuffer ().readString ());
155159 }
156160
@@ -180,10 +184,12 @@ public void testMultipleMessages() throws Exception {
180184 ClientMessage m = consumer .receive (1000 );
181185 assertNotNull (m );
182186 m .acknowledge ();
187+ clientSession .commit ();
183188 assertEquals ("m3" , m .getBodyBuffer ().readString ());
184189 m = consumer .receive (1000 );
185190 assertNotNull (m );
186191 m .acknowledge ();
192+ clientSession .commit ();
187193 assertEquals ("m4" , m .getBodyBuffer ().readString ());
188194 assertNull (consumer .receiveImmediate ());
189195 }
@@ -203,10 +209,12 @@ public void testMultipleMessagesWithoutLastValue() throws Exception {
203209 ClientMessage m = consumer .receive (1000 );
204210 assertNotNull (m );
205211 m .acknowledge ();
212+ clientSession .commit ();
206213 assertEquals ("message1" , m .getBodyBuffer ().readString ());
207214 m = consumer .receive (1000 );
208215 assertNotNull (m );
209216 m .acknowledge ();
217+ clientSession .commit ();
210218 assertEquals ("message2" , m .getBodyBuffer ().readString ());
211219 }
212220
@@ -237,6 +245,7 @@ public void testMultipleRollback() throws Exception {
237245 ClientMessage m = consumer .receive (1000 );
238246 assertNotNull (m );
239247 m .acknowledge ();
248+ clientSession .commit ();
240249 assertEquals ("m1" , m .getBodyBuffer ().readString ());
241250 assertNull (consumer .receiveImmediate ());
242251 clientSessionTxReceives .commit ();
@@ -257,10 +266,12 @@ public void testFirstMessageReceivedButAckedAfter() throws Exception {
257266 assertNotNull (m );
258267 producer .send (m2 );
259268 m .acknowledge ();
269+ clientSession .commit ();
260270 assertEquals ("m1" , m .getBodyBuffer ().readString ());
261271 m = consumer .receive (1000 );
262272 assertNotNull (m );
263273 m .acknowledge ();
274+ clientSession .commit ();
264275 assertEquals ("m2" , m .getBodyBuffer ().readString ());
265276 }
266277
@@ -285,6 +296,7 @@ public void testFirstMessageReceivedAndCancelled() throws Exception {
285296 assertNotNull (m );
286297 assertEquals ("m2" , m .getBodyBuffer ().readString ());
287298 m .acknowledge ();
299+ clientSession .commit ();
288300 m = consumer .receiveImmediate ();
289301 assertNull (m );
290302 }
@@ -337,6 +349,7 @@ public void testManyMessagesReceivedAndCancelled() throws Exception {
337349 m = consumer .receive (1000 );
338350 assertNotNull (m );
339351 m .acknowledge ();
352+ clientSession .commit ();
340353 assertEquals ("m6" , m .getBodyBuffer ().readString ());
341354 m = consumer .receiveImmediate ();
342355 assertNull (m );
@@ -358,6 +371,7 @@ public void testSimpleInTx() throws Exception {
358371 ClientMessage m = consumer .receive (1000 );
359372 assertNotNull (m );
360373 m .acknowledge ();
374+ clientSession .commit ();
361375 assertEquals ("m2" , m .getBodyBuffer ().readString ());
362376 }
363377
@@ -383,10 +397,12 @@ public void testMultipleMessagesInTx() throws Exception {
383397 ClientMessage m = consumer .receive (1000 );
384398 assertNotNull (m );
385399 m .acknowledge ();
400+ clientSession .commit ();
386401 assertEquals ("m3" , m .getBodyBuffer ().readString ());
387402 m = consumer .receive (1000 );
388403 assertNotNull (m );
389404 m .acknowledge ();
405+ clientSession .commit ();
390406 assertEquals ("m4" , m .getBodyBuffer ().readString ());
391407 clientSessionTxReceives .commit ();
392408 m = consumer .receiveImmediate ();
@@ -413,29 +429,35 @@ public void testMultipleMessagesInTxRollback() throws Exception {
413429 ClientMessage m = consumer .receive (1000 );
414430 assertNotNull (m );
415431 m .acknowledge ();
432+ clientSession .commit ();
416433 assertEquals ("m1" , m .getBodyBuffer ().readString ());
417434 m = consumer .receive (1000 );
418435 assertNotNull (m );
419436 m .acknowledge ();
437+ clientSession .commit ();
420438 assertEquals ("m2" , m .getBodyBuffer ().readString ());
421439 producer .send (m3 );
422440 producer .send (m4 );
423441 m = consumer .receive (1000 );
424442 assertNotNull (m );
425443 m .acknowledge ();
444+ clientSession .commit ();
426445 assertEquals ("m3" , m .getBodyBuffer ().readString ());
427446 m = consumer .receive (1000 );
428447 assertNotNull (m );
429448 m .acknowledge ();
449+ clientSession .commit ();
430450 assertEquals ("m4" , m .getBodyBuffer ().readString ());
431451 clientSessionTxReceives .rollback ();
432452 m = consumer .receive (1000 );
433453 assertNotNull (m );
434454 m .acknowledge ();
455+ clientSession .commit ();
435456 assertEquals ("m3" , m .getBodyBuffer ().readString ());
436457 m = consumer .receive (1000 );
437458 assertNotNull (m );
438459 m .acknowledge ();
460+ clientSession .commit ();
439461 assertEquals ("m4" , m .getBodyBuffer ().readString ());
440462 }
441463
@@ -455,6 +477,7 @@ public void testSingleTXRollback() throws Exception {
455477 m = consumer .receive (1000 );
456478 assertNotNull (m );
457479 m .acknowledge ();
480+ clientSessionTxReceives .commit ();
458481 assertEquals ("m1" , m .getBodyBuffer ().readString ());
459482 assertNull (consumer .receiveImmediate ());
460483 }
@@ -488,13 +511,15 @@ public void testMultipleMessagesInTxSend() throws Exception {
488511 ClientMessage m = consumer .receive (1000 );
489512 assertNotNull (m );
490513 m .acknowledge ();
514+ clientSessionTxSends .commit ();
491515 assertEquals ("m" + i , m .getBodyBuffer ().readString ());
492516 }
493517 consumer .close ();
494518 consumer = clientSessionTxSends .createConsumer (qName1 );
495519 ClientMessage m = consumer .receive (1000 );
496520 assertNotNull (m );
497521 m .acknowledge ();
522+ clientSessionTxSends .commit ();
498523 assertEquals ("m6" , m .getBodyBuffer ().readString ());
499524 }
500525
@@ -531,9 +556,11 @@ public void testMultipleMessagesPersistedCorrectly() throws Exception {
531556 ClientMessage m = consumer .receive (1000 );
532557 assertNotNull (m );
533558 m .acknowledge ();
559+ clientSession .commit ();
534560 assertEquals ("m6" , m .getBodyBuffer ().readString ());
535561 m = consumer .receiveImmediate ();
536562 assertNull (m );
563+ clientSession .commit ();
537564 }
538565
539566 @ Test
@@ -559,10 +586,15 @@ public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception {
559586 m6 .putStringProperty (Message .HDR_LAST_VALUE_NAME , rh );
560587 m6 .setDurable (true );
561588 producer .send (m1 );
589+ clientSessionTxSends .commit ();
562590 producer .send (m2 );
591+ clientSessionTxSends .commit ();
563592 producer .send (m3 );
593+ clientSessionTxSends .commit ();
564594 producer .send (m4 );
595+ clientSessionTxSends .commit ();
565596 producer .send (m5 );
597+ clientSessionTxSends .commit ();
566598 producer .send (m6 );
567599 clientSessionTxSends .commit ();
568600 clientSessionTxSends .start ();
@@ -605,31 +637,37 @@ public void testMultipleAcksPersistedCorrectly() throws Exception {
605637 ClientMessage m = consumer .receive (1000 );
606638 assertNotNull (m );
607639 m .acknowledge ();
640+ clientSession .commit ();
608641 assertEquals ("m1" , m .getBodyBuffer ().readString ());
609642 producer .send (m2 );
610643 m = consumer .receive (1000 );
611644 assertNotNull (m );
612645 m .acknowledge ();
646+ clientSession .commit ();
613647 assertEquals ("m2" , m .getBodyBuffer ().readString ());
614648 producer .send (m3 );
615649 m = consumer .receive (1000 );
616650 assertNotNull (m );
617651 m .acknowledge ();
652+ clientSession .commit ();
618653 assertEquals ("m3" , m .getBodyBuffer ().readString ());
619654 producer .send (m4 );
620655 m = consumer .receive (1000 );
621656 assertNotNull (m );
622657 m .acknowledge ();
658+ clientSession .commit ();
623659 assertEquals ("m4" , m .getBodyBuffer ().readString ());
624660 producer .send (m5 );
625661 m = consumer .receive (1000 );
626662 assertNotNull (m );
627663 m .acknowledge ();
664+ clientSession .commit ();
628665 assertEquals ("m5" , m .getBodyBuffer ().readString ());
629666 producer .send (m6 );
630667 m = consumer .receive (1000 );
631668 assertNotNull (m );
632669 m .acknowledge ();
670+ clientSession .commit ();
633671 assertEquals ("m6" , m .getBodyBuffer ().readString ());
634672
635673 assertEquals (0 , queue .getDeliveringCount ());
@@ -646,6 +684,7 @@ public void testRemoveMessageThroughManagement() throws Exception {
646684 m1 .putStringProperty (Message .HDR_LAST_VALUE_NAME , rh );
647685 m1 .setDurable (true );
648686 producer .send (m1 );
687+ clientSession .commit ();
649688
650689 queue .deleteAllReferences ();
651690
@@ -655,6 +694,7 @@ public void testRemoveMessageThroughManagement() throws Exception {
655694 ClientMessage m = consumer .receive (1000 );
656695 assertNotNull (m );
657696 m .acknowledge ();
697+ clientSession .commit ();
658698 assertEquals ("m1" , m .getBodyBuffer ().readString ());
659699
660700 assertEquals (0 , queue .getDeliveringCount ());
@@ -713,11 +753,13 @@ public void testMultipleAcksPersistedCorrectly2() throws Exception {
713753 ClientMessage m = consumer .receive (1000 );
714754 assertNotNull (m );
715755 m .acknowledge ();
756+ clientSession .commit ();
716757 assertEquals ("m1" , m .getBodyBuffer ().readString ());
717758 producer .send (m2 );
718759 m = consumer .receive (1000 );
719760 assertNotNull (m );
720761 m .acknowledge ();
762+ clientSession .commit ();
721763 assertEquals ("m2" , m .getBodyBuffer ().readString ());
722764
723765 assertEquals (0 , queue .getDeliveringCount ());
@@ -956,9 +998,9 @@ public void setUp() throws Exception {
956998 ServerLocator locator = createNettyNonHALocator ().setBlockOnAcknowledge (true ).setAckBatchSize (0 );
957999
9581000 ClientSessionFactory sf = createSessionFactory (locator );
959- clientSession = addClientSession (sf .createSession (false , true , true ));
1001+ clientSession = addClientSession (sf .createSession (false , true , false ));
9601002 clientSessionTxReceives = addClientSession (sf .createSession (false , true , false ));
961- clientSessionTxSends = addClientSession (sf .createSession (false , false , true ));
1003+ clientSessionTxSends = addClientSession (sf .createSession (false , false , false ));
9621004 clientSession .createQueue (QueueConfiguration .of (qName1 ).setAddress (address ));
9631005 }
9641006}
0 commit comments