@@ -468,6 +468,8 @@ private class AmqpConsumer implements SchedulingAwareRunnable, AutoCloseable {
468468
469469 private final ClientReceiver receiver ;
470470
471+ private final Lock receiverLock = new ReentrantLock ();
472+
471473 private final ProtonReceiver protonReceiver ;
472474
473475 private final ProtonLinkCreditState creditState ;
@@ -494,31 +496,37 @@ int queuedDeliveries() {
494496
495497 @ Override
496498 public void run () {
497- while (this .running ) {
498- try {
499- Delivery delivery =
500- this .receiver .receive (AmqpMessageListenerContainer .this .receiveTimeout .toMillis (),
501- TimeUnit .MILLISECONDS );
502- if (delivery != null ) {
503- doInvokeListener (delivery , this ::replenishCredit );
499+ this .receiverLock .lock ();
500+ try {
501+ while (this .running ) {
502+ try {
503+ Delivery delivery =
504+ this .receiver .receive (AmqpMessageListenerContainer .this .receiveTimeout .toMillis (),
505+ TimeUnit .MILLISECONDS );
506+ if (delivery != null ) {
507+ doInvokeListener (delivery , this ::replenishCredit );
508+ }
504509 }
505- }
506- catch (Exception ex ) {
507- if (this .running ) {
508- AmqpException amqpException = ProtonUtils .toAmqpException (ex );
509- ErrorHandler errorHandlerToUse = AmqpMessageListenerContainer .this .errorHandler ;
510- if (errorHandlerToUse != null ) {
511- errorHandlerToUse .handleError (amqpException );
510+ catch (Exception ex ) {
511+ if (this .running ) {
512+ AmqpException amqpException = ProtonUtils .toAmqpException (ex );
513+ ErrorHandler errorHandlerToUse = AmqpMessageListenerContainer .this .errorHandler ;
514+ if (errorHandlerToUse != null ) {
515+ errorHandlerToUse .handleError (amqpException );
516+ }
517+ else {
518+ throw amqpException ;
519+ }
512520 }
513521 else {
514- throw amqpException ;
522+ LOG . debug ( ex , "Consumer stopped" ) ;
515523 }
516524 }
517- else {
518- LOG .debug (ex , "Consumer stopped" );
519- }
520525 }
521526 }
527+ finally {
528+ this .receiverLock .unlock ();
529+ }
522530 }
523531
524532 /**
@@ -534,7 +542,9 @@ private void replenishCredit() {
534542 if (potentialPrefetch <= AmqpMessageListenerContainer .this .initialCredits * 0.7 ) {
535543 int additionalCredit = AmqpMessageListenerContainer .this .initialCredits - potentialPrefetch ;
536544
537- this .receiver .addCredit (additionalCredit );
545+ if (!this .paused && this .running ) {
546+ this .receiver .addCredit (additionalCredit );
547+ }
538548 }
539549 }
540550 }
@@ -549,15 +559,15 @@ private void replenishCredit() {
549559 * so rely on the reflection to imitate behavior with resetting credits to zero.
550560 */
551561 void pause () {
552- if (this .running ) {
562+ if (this .running && ! this . paused ) {
553563 this .paused = true ;
554564 this .creditState .updateCredit (0 );
555565 ReflectionUtils .invokeMethod (WRITE_FLOW_METHOD , this .sessionWindow , this .protonReceiver );
556566 }
557567 }
558568
559569 void resume () {
560- if (this .running ) {
570+ if (this .running && this . paused ) {
561571 this .paused = false ;
562572 try {
563573 this .receiver .addCredit (AmqpMessageListenerContainer .this .initialCredits );
@@ -576,7 +586,13 @@ public boolean isLongLived() {
576586 @ Override
577587 public void close () {
578588 this .running = false ;
579- this .receiver .close ();
589+ this .receiverLock .lock ();
590+ try {
591+ this .receiver .close ();
592+ }
593+ finally {
594+ this .receiverLock .unlock ();
595+ }
580596 }
581597
582598 }
0 commit comments