11package org .reactivecommons .async .rabbit .listeners ;
22
33import com .rabbitmq .client .AMQP ;
4+ import com .rabbitmq .client .Channel ;
5+ import com .rabbitmq .client .ShutdownSignalException ;
46import lombok .extern .java .Log ;
57import org .reactivecommons .async .commons .DiscardNotifier ;
68import org .reactivecommons .async .commons .FallbackStrategy ;
1012import org .reactivecommons .async .rabbit .RabbitMessage ;
1113import org .reactivecommons .async .rabbit .communications .ReactiveMessageListener ;
1214import org .reactivecommons .async .rabbit .communications .TopologyCreator ;
15+ import reactor .core .Disposable ;
1316import reactor .core .publisher .Flux ;
1417import reactor .core .publisher .Mono ;
1518import reactor .core .scheduler .Scheduler ;
2528import java .util .List ;
2629import java .util .Optional ;
2730import java .util .concurrent .ConcurrentHashMap ;
31+ import java .util .concurrent .atomic .AtomicReference ;
2832import java .util .function .Function ;
2933import java .util .logging .Level ;
3034
@@ -49,7 +53,8 @@ public abstract class GenericMessageListener {
4953 private final DiscardNotifier discardNotifier ;
5054 private final String objectType ;
5155 private final CustomReporter customReporter ;
52- private volatile Flux <AcknowledgableDelivery > messageFlux ;
56+ private final AtomicReference <Channel > channelRef = new AtomicReference <>();
57+ private Disposable listenerSubscription ;
5358
5459 public GenericMessageListener (String queueName , ReactiveMessageListener listener , boolean useDLQRetries ,
5560 boolean createTopology , long maxRetries , long retryDelay , DiscardNotifier discardNotifier ,
@@ -78,34 +83,45 @@ protected Mono<Void> setUpBindings(TopologyCreator creator) {
7883 return Mono .empty ();
7984 }
8085
81- public void startListener () {
82- log .log (Level .INFO , "Using max concurrency {0}, in queue: {1}" , new Object []{messageListener .getMaxConcurrency (), queueName });
83- if (useDLQRetries ) {
84- log .log (Level .INFO , "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!" , new Object []{maxRetries });
85- } else {
86- log .log (Level .INFO , "ATTENTION! Using infinite fast retries as Retry Strategy" );
86+ public synchronized void startListener () {
87+ Channel current = channelRef .get ();
88+ if (current != null && current .isOpen ()) {
89+ log .warning ("Channel is already open, no need to restart listener" );
90+ return ;
8791 }
92+ stopListener ();
93+ var baseSubscriber = new LoggerSubscriber <>(getClass ().getName ());
94+ listenerSubscription = baseSubscriber ;
95+ Flux .defer (this ::buildConsumeFlux )
96+ .subscribe (baseSubscriber );
97+ }
8898
89- ConsumeOptions consumeOptions = new ConsumeOptions ();
90- consumeOptions .qos (messageListener .getPrefetchCount ());
91-
92- if (createTopology ) {
93- this .messageFlux = setUpBindings (messageListener .getTopologyCreator ())
94- .thenMany (receiver .consumeManualAck (queueName , consumeOptions )
95- .transform (this ::consumeFaultTolerant ));
96- } else {
97- this .messageFlux = receiver .consumeManualAck (queueName , consumeOptions )
98- .doOnError (err -> log .log (Level .SEVERE , "Error listening queue" , err ))
99- .transform (this ::consumeFaultTolerant );
100- }
10199
100+ private Flux <AcknowledgableDelivery > buildConsumeFlux () {
101+ ConsumeOptions options = new ConsumeOptions ()
102+ .qos (messageListener .getPrefetchCount ())
103+ .channelCallback (channel -> {
104+ channelRef .set (channel );
105+ channel .addShutdownListener (this ::onChannelShutdown );
106+ });
107+
108+ Flux <AcknowledgableDelivery > source = createTopology
109+ ? setUpBindings (messageListener .getTopologyCreator ())
110+ .thenMany (receiver .consumeManualAck (queueName , options ))
111+ : receiver .consumeManualAck (queueName , options );
102112
103- onTerminate ( );
113+ return source . transform ( this :: consumeFaultTolerant );
104114 }
105115
106- private void onTerminate () {
107- messageFlux .doOnTerminate (this ::onTerminate )
108- .subscribe (new LoggerSubscriber <>(getClass ().getName ()));
116+ private void onChannelShutdown (ShutdownSignalException cause ) {
117+ log .log (Level .SEVERE , "Channel shutdown detected in listener" , cause );
118+ startListener ();
119+ }
120+
121+ public synchronized void stopListener () {
122+ if (listenerSubscription != null && !listenerSubscription .isDisposed ()) {
123+ listenerSubscription .dispose ();
124+ }
109125 }
110126
111127 protected Mono <AcknowledgableDelivery > handle (AcknowledgableDelivery msj , Instant initTime ) {
0 commit comments