diff --git a/.gitignore b/.gitignore index 0e34d93ce..d3b71df03 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,7 @@ **/.flattened-pom.xml # Release files -release.properties \ No newline at end of file +release.properties + +# Local toolchain / IDE files +.java-version \ No newline at end of file diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java index 15dedc33a..14fb20fdb 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java @@ -14,11 +14,14 @@ import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager.CloudStreamEventHandler; import com.solace.spring.cloud.stream.binder.util.StaticMessageHeaderMapAccessor; import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper; +import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import com.solacesystems.jcsmp.JCSMPTransportException; +import com.solacesystems.jcsmp.StaleSessionException; import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; @@ -64,6 +67,10 @@ public final class JCSMPOutboundMessageHandler implements MessageHandler, Lifecy private boolean isRunning = false; private ErrorMessageStrategy errorMessageStrategy; + // DATAGO-134580: recreate JCSMP producer on unsolicited termination from Solace broker. + private volatile boolean recreateProducer = false; + private final Object lifecycleLock = new Object(); + private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class); public JCSMPOutboundMessageHandler(ProducerDestination destination, @@ -95,6 +102,8 @@ public void handleMessage(@NonNull Message message) throws MessagingException throw handleMessagingException(correlationKey, msg0, new ClosedChannelBindingException(msg1)); } + recreateProducerIfNeeded(correlationKey); + try { CorrelationData correlationData = message.getHeaders() .get(SolaceBinderHeaders.CONFIRM_CORRELATION, CorrelationData.class); @@ -163,6 +172,18 @@ public void handleMessage(@NonNull Message message) throws MessagingException producerEventHandler.responseReceivedEx(correlationKey); } } catch (JCSMPException e) { + if (e instanceof StaleSessionException + || e instanceof JCSMPTransportException + || e instanceof ClosedFacilityException + || producer.isClosed()) { + if (!recreateProducer) { + LOGGER.debug("Detected stale JCSMP producer for binding {} (cause: {}); will " + + "recreate on next message ", + properties.getBindingName(), e.getClass().getSimpleName(), id); + } + recreateProducer = true; + } + if (transactedSession != null) { try { if (!(e instanceof RollbackException)) { @@ -227,62 +248,103 @@ private Destination getDynamicDestination(Map headers, ErrorChan @Override public void start() { LOGGER.info("Creating producer to {} {} ", configDestinationType, configDestination.getName(), id); - if (isRunning()) { - LOGGER.warn("Nothing to do, message handler {} is already running", id); - return; + synchronized (lifecycleLock) { + if (isRunning()) { + LOGGER.warn("Nothing to do, message handler {} is already running", id); + return; + } + recreateProducer = false; + + try { + Map headerNameMapping = properties.getExtension().getHeaderNameMapping(); + if (headerNameMapping != null && !headerNameMapping.isEmpty()) { + Set uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values()); + if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) { + IllegalArgumentException exception = new IllegalArgumentException(String.format( + "Two or more headers map to the same header name in headerNameMapping %s ", + properties.getExtension().getHeaderNameMapping(), id)); + LOGGER.warn(exception.getMessage()); + throw exception; + } + } + } catch (Exception e) { + String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName()); + LOGGER.warn(msg, e); + throw new RuntimeException(msg, e); + } + + createProducerInternal(); + isRunning = true; } + } - try { - Map headerNameMapping = properties.getExtension().getHeaderNameMapping(); - if (headerNameMapping != null && !headerNameMapping.isEmpty()) { - Set uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values()); - if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) { - IllegalArgumentException exception = new IllegalArgumentException(String.format( - "Two or more headers map to the same header name in headerNameMapping %s ", - properties.getExtension().getHeaderNameMapping(), id)); - LOGGER.warn(exception.getMessage()); - throw exception; + private void createProducerInternal() { + synchronized (lifecycleLock) { + try { + producerManager.get(id); + if (properties.getExtension().isTransacted()) { + LOGGER.info("Creating transacted session ", id); + transactedSession = jcsmpSession.createTransactedSession(); + producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), + producerEventHandler); + } else { + producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), + producerEventHandler); } + } catch (Exception e) { + String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName()); + LOGGER.warn(msg, e); + closeResources(); + throw new RuntimeException(msg, e); } + } + } - producerManager.get(id); - if (properties.getExtension().isTransacted()) { - LOGGER.info("Creating transacted session ", id); - transactedSession = jcsmpSession.createTransactedSession(); - producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), - producerEventHandler); - } else { - producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession), - producerEventHandler); + private void recreateProducerIfNeeded(ErrorChannelSendingCorrelationKey correlationKey) throws MessagingException { + if (!recreateProducer && !producer.isClosed()) { + return; + } + synchronized (lifecycleLock) { + if (!recreateProducer && !producer.isClosed()) { + return; } - } catch (Exception e) { - String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName()); - LOGGER.warn(msg, e); + LOGGER.debug("Recreating JCSMP producer for binding {} after stale-flow detection ", + properties.getBindingName(), id); closeResources(); - throw new RuntimeException(msg, e); + try { + createProducerInternal(); + recreateProducer = false; + } catch (Exception createError) { + recreateProducer = true; + throw handleMessagingException(correlationKey, + "Failed to recreate JCSMP producer after stale-flow detection", createError); + } } - - isRunning = true; } @Override public void stop() { - if (!isRunning()) return; - closeResources(); - isRunning = false; + synchronized (lifecycleLock) { + if (!isRunning()) return; + closeResources(); + isRunning = false; + } } private void closeResources() { - LOGGER.info("Stopping producer to {} {} ", configDestinationType, configDestination.getName(), id); - if (producer != null) { - LOGGER.info("Closing producer ", id); - producer.close(); - } - if (transactedSession != null) { - LOGGER.info("Closing transacted session ", id); - transactedSession.close(); + synchronized (lifecycleLock) { + LOGGER.info("Stopping producer to {} {} ", configDestinationType, configDestination.getName(), id); + recreateProducer = false; + if (producer != null) { + LOGGER.info("Closing producer ", id); + producer.close(); + } + if (transactedSession != null) { + LOGGER.info("Closing transacted session ", id); + transactedSession.close(); + } + producerManager.release(id); } - producerManager.release(id); } @Override diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java index 0a23a1bca..13da8599b 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java @@ -1,9 +1,12 @@ package com.solace.spring.cloud.stream.binder.util; import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPTransportException; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.StaleSessionException; import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; import org.slf4j.Logger; @@ -34,6 +37,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati XMLMessageProducer producer; try { producer = producerManager.get(producerKey); + if (producer.isClosed()) { + LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating", + errorQueueName); + producer = producerManager.forceRecreate(producer); + } } catch (Exception e) { MessagingException wrappedException = new MessagingException( String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(), @@ -42,7 +50,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati throw wrappedException; } - producer.send(xmlMessage, queue); + try { + producer.send(xmlMessage, queue); + } catch (JCSMPException e) { + if (e instanceof StaleSessionException + || e instanceof JCSMPTransportException + || e instanceof ClosedFacilityException + || producer.isClosed()) { + LOGGER.debug("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " + + "recreating for next attempt", + errorQueueName, e.getClass().getSimpleName()); + try { + producerManager.forceRecreate(producer); + } catch (Exception recreateError) { + LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError); + e.addSuppressed(recreateError); + } + } + throw e; + } } public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer, diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java index a6279c081..38fdd509b 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java @@ -43,6 +43,32 @@ public T get(String key) throws Exception { return sharedResource; } + /** + * Compare-and-swap the shared resource. If the manager still holds {@code expected}, + * close it and {@link #create()} a fresh one; otherwise return the currently-installed + * resource without re-creating. + * + * @param expected the reference the caller observed and considers no longer usable + * @return the resource currently installed in the manager + * @throws Exception whatever {@link #create()} may throw + */ + public T forceRecreate(T expected) throws Exception { + synchronized (lock) { + if (sharedResource != expected) { + return sharedResource; + } + if (sharedResource != null) { + try { + close(); + } catch (Exception e) { + LOGGER.debug("Failed to close current {} during forceRecreate", type, e); + } + } + sharedResource = create(); + return sharedResource; + } + } + /** * De-register {@code key} from the shared resource. *

If this is the last {@code key} associated to the shared resource, {@link #close()} the resource. diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java index aee461d96..3c94190f1 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java @@ -16,13 +16,16 @@ import com.solace.spring.cloud.stream.binder.util.SolaceMessageConversionException; import com.solace.spring.cloud.stream.binder.util.SolaceMessageHeaderErrorMessageStrategy; import com.solacesystems.jcsmp.BytesMessage; +import com.solacesystems.jcsmp.ClosedFacilityException; import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import com.solacesystems.jcsmp.JCSMPTransportException; import com.solacesystems.jcsmp.ProducerFlowProperties; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.StaleSessionException; import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessage; import com.solacesystems.jcsmp.XMLMessageProducer; @@ -684,6 +687,215 @@ void testGetBindingName() { assertThat(messageHandler.getBindingName()).isNotEmpty().isEqualTo(producerProperties.getBindingName()); } + /** DATAGO-134580: stale producer is recreated on the next send for each stale-flow exception type. */ + @CartesianTest(name = "[{index}] transacted={0} exception={1}") + public void testProducerRecreatedAfterUnsolicitedCloseFlow( + @Values(booleans = {false, true}) boolean transacted, + @Values(strings = {"stale", "closed-facility", "transport"}) String exceptionType, + @Mock XMLMessageProducer producerB, + @Mock TransactedSession transactedSessionB) throws Exception { + producerProperties.getExtension().setTransacted(transacted); + + if (transacted) { + Mockito.when(session.createTransactedSession()) + .thenReturn(transactedSession) + .thenReturn(transactedSessionB); + Mockito.when(transactedSessionB.createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class))).thenReturn(producerB); + } else { + Mockito.when(session.createProducer( + producerFlowPropertiesCaptor.capture(), pubEventHandlerCaptor.capture())) + .thenReturn(messageProducer) + .thenReturn(producerB); + } + + JCSMPException sendError = switch (exceptionType) { + case "stale" -> new StaleSessionException( + "Tried to perform operation on a closed XML message producer", + new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable).")); + case "closed-facility" -> new ClosedFacilityException("Producer is closed"); + case "transport" -> new JCSMPTransportException( + "Received unsolicited CloseFlow for producer (503:Service Unavailable)."); + default -> throw new IllegalArgumentException("unknown exception type: " + exceptionType); + }; + Mockito.doThrow(sendError).when(messageProducer).send(any(XMLMessage.class), any(Destination.class)); + + messageHandler.start(); + + Message firstMessage = MessageBuilder.withPayload("payload-1").build(); + assertThatThrownBy(() -> messageHandler.handleMessage(firstMessage)) + .isInstanceOf(MessagingException.class) + .hasCauseInstanceOf(sendError.getClass()); + assertThat(messageHandler.isRunning()).isTrue(); + + Message secondMessage = MessageBuilder.withPayload("payload-2").build(); + messageHandler.handleMessage(secondMessage); + + if (transacted) { + Mockito.verify(session, Mockito.times(2)).createTransactedSession(); + Mockito.verify(transactedSession, Mockito.atLeastOnce()).close(); + Mockito.verify(transactedSessionB, Mockito.times(1)) + .createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } else { + Mockito.verify(session, Mockito.times(2)) + .createProducer(any(ProducerFlowProperties.class), any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } + + Mockito.verify(producerB, Mockito.times(1)).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(messageProducer, Mockito.times(1)).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(messageProducer, Mockito.atLeastOnce()).close(); + } + + /** DATAGO-134580: proactive {@code isClosed()} pre-check rebuilds before the first send. */ + @CartesianTest(name = "[{index}] transacted={0}") + void testProducerRecreatedProactivelyWhenIsClosedDetectedBeforeSend( + @Values(booleans = {false, true}) boolean transacted, + @Mock XMLMessageProducer producerB, + @Mock TransactedSession transactedSessionB) throws Exception { + producerProperties.getExtension().setTransacted(transacted); + + if (transacted) { + Mockito.when(session.createTransactedSession()) + .thenReturn(transactedSession) + .thenReturn(transactedSessionB); + Mockito.when(transactedSessionB.createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class))).thenReturn(producerB); + } else { + Mockito.when(session.createProducer( + producerFlowPropertiesCaptor.capture(), pubEventHandlerCaptor.capture())) + .thenReturn(messageProducer) + .thenReturn(producerB); + } + + Mockito.when(messageProducer.isClosed()).thenReturn(true); + + messageHandler.start(); + + Message firstMessage = MessageBuilder.withPayload("payload-1").build(); + messageHandler.handleMessage(firstMessage); + + if (transacted) { + Mockito.verify(session, Mockito.times(2)).createTransactedSession(); + Mockito.verify(transactedSession, Mockito.atLeastOnce()).close(); + Mockito.verify(transactedSessionB, Mockito.times(1)) + .createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } else { + Mockito.verify(session, Mockito.times(2)) + .createProducer(any(ProducerFlowProperties.class), any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } + Mockito.verify(producerB, Mockito.times(1)).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(messageProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(messageProducer, Mockito.atLeastOnce()).close(); + } + + /** DATAGO-134580: failed recreation surfaces an error and retries on the next message. */ + @CartesianTest(name = "[{index}] transacted={0}") + void testProducerRecreationFailurePropagatesAndRetriesNext( + @Values(booleans = {false, true}) boolean transacted, + @Mock XMLMessageProducer producerB, + @Mock TransactedSession transactedSessionB) throws Exception { + producerProperties.getExtension().setTransacted(transacted); + + if (transacted) { + Mockito.when(session.createTransactedSession()) + .thenReturn(transactedSession) + .thenThrow(new JCSMPException("Broker still reconnecting")) + .thenReturn(transactedSessionB); + Mockito.when(transactedSessionB.createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class))).thenReturn(producerB); + } else { + Mockito.when(session.createProducer( + producerFlowPropertiesCaptor.capture(), pubEventHandlerCaptor.capture())) + .thenReturn(messageProducer) + .thenThrow(new JCSMPException("Broker still reconnecting")) + .thenReturn(producerB); + } + + StaleSessionException stale = new StaleSessionException( + "Tried to perform operation on a closed XML message producer", + new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable).")); + Mockito.doThrow(stale).when(messageProducer).send(any(XMLMessage.class), any(Destination.class)); + + messageHandler.start(); + + Message firstMessage = MessageBuilder.withPayload("payload-1").build(); + assertThatThrownBy(() -> messageHandler.handleMessage(firstMessage)) + .isInstanceOf(MessagingException.class) + .hasCauseInstanceOf(StaleSessionException.class); + + Message secondMessage = MessageBuilder.withPayload("payload-2").build(); + assertThatThrownBy(() -> messageHandler.handleMessage(secondMessage)) + .isInstanceOf(MessagingException.class) + .hasMessageContaining("Failed to recreate JCSMP producer") + .hasRootCauseInstanceOf(JCSMPException.class); + + Message thirdMessage = MessageBuilder.withPayload("payload-3").build(); + messageHandler.handleMessage(thirdMessage); + + if (transacted) { + Mockito.verify(session, Mockito.times(3)).createTransactedSession(); + Mockito.verify(transactedSessionB, Mockito.times(1)) + .createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } else { + Mockito.verify(session, Mockito.times(3)) + .createProducer(any(ProducerFlowProperties.class), any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } + Mockito.verify(producerB, Mockito.times(1)).send(any(XMLMessage.class), any(Destination.class)); + assertThat(messageHandler.isRunning()).isTrue(); + } + + /** DATAGO-134580: stop/start clears the recreate flag so the next send doesn't rebuild. */ + @CartesianTest(name = "[{index}] transacted={0}") + void testRecreationFlagResetAcrossStopStartCycle( + @Values(booleans = {false, true}) boolean transacted, + @Mock XMLMessageProducer producerB, + @Mock TransactedSession transactedSessionB) throws Exception { + producerProperties.getExtension().setTransacted(transacted); + + if (transacted) { + Mockito.when(session.createTransactedSession()) + .thenReturn(transactedSession) + .thenReturn(transactedSessionB); + Mockito.when(transactedSessionB.createProducer(any(ProducerFlowProperties.class), + any(JCSMPStreamingPublishCorrelatingEventHandler.class))).thenReturn(producerB); + } else { + Mockito.when(session.createProducer( + producerFlowPropertiesCaptor.capture(), pubEventHandlerCaptor.capture())) + .thenReturn(messageProducer) + .thenReturn(producerB); + } + + StaleSessionException stale = new StaleSessionException( + "Tried to perform operation on a closed XML message producer", + new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable).")); + Mockito.doThrow(stale).when(messageProducer).send(any(XMLMessage.class), any(Destination.class)); + + messageHandler.start(); + Message firstMessage = MessageBuilder.withPayload("payload-1").build(); + assertThatThrownBy(() -> messageHandler.handleMessage(firstMessage)) + .isInstanceOf(MessagingException.class) + .hasCauseInstanceOf(StaleSessionException.class); + + messageHandler.stop(); + messageHandler.start(); + + Mockito.clearInvocations(session); + + Message secondMessage = MessageBuilder.withPayload("payload-2").build(); + messageHandler.handleMessage(secondMessage); + + if (transacted) { + Mockito.verify(session, Mockito.never()).createTransactedSession(); + } else { + Mockito.verify(session, Mockito.never()) + .createProducer(any(ProducerFlowProperties.class), any(JCSMPStreamingPublishCorrelatingEventHandler.class)); + } + Mockito.verify(producerB, Mockito.times(1)).send(any(XMLMessage.class), any(Destination.class)); + } + private List getCorrelationKeys() throws JCSMPException { Mockito.verify(messageProducer, Mockito.atLeastOnce()).send(xmlMessageCaptor.capture(), any(Destination.class)); return xmlMessageCaptor.getAllValues() diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java new file mode 100644 index 000000000..507d3a8be --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructureTest.java @@ -0,0 +1,164 @@ +package com.solace.spring.cloud.stream.binder.util; + +import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.ClosedFacilityException; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPTransportException; +import com.solacesystems.jcsmp.StaleSessionException; +import com.solacesystems.jcsmp.XMLMessage; +import com.solacesystems.jcsmp.XMLMessageProducer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; + +@ExtendWith(MockitoExtension.class) +class ErrorQueueInfrastructureTest { + private static final String PRODUCER_KEY = "test-producer-key"; + private static final String ERROR_QUEUE_NAME = "test-error-queue"; + + @Mock JCSMPSessionProducerManager producerManager; + @Mock MessageContainer messageContainer; + @Mock ErrorQueueRepublishCorrelationKey correlationKey; + + BytesXMLMessage inputMessage; + SolaceConsumerProperties consumerProperties; + ErrorQueueInfrastructure errorQueueInfrastructure; + + @BeforeEach + void setup() { + inputMessage = JCSMPFactory.onlyInstance().createMessage(BytesXMLMessage.class); + consumerProperties = new SolaceConsumerProperties(); + errorQueueInfrastructure = new ErrorQueueInfrastructure( + producerManager, PRODUCER_KEY, ERROR_QUEUE_NAME, consumerProperties); + Mockito.when(messageContainer.getMessage()).thenReturn(inputMessage); + } + + /** + * DATAGO-134580: proactive {@code producer.isClosed()} pre-check on the error-queue + * republish path. If the broker has already torn down the shared session-default + * producer before this {@code send(...)} runs, the very first error-queue publish + * should still succeed - the manager is asked to recreate the producer before send is + * attempted, and the fresh producer services the publish. + */ + @Test + void testErrorQueueProducerRecreatedProactivelyOnIsClosed( + @Mock XMLMessageProducer staleProducer, + @Mock XMLMessageProducer freshProducer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(true); + Mockito.when(producerManager.forceRecreate(staleProducer)).thenReturn(freshProducer); + + assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("Proactive recreate must allow the publish to succeed on the fresh producer") + .doesNotThrowAnyException(); + + // CAS contract: caller passes the observed (stale) reference so the manager only + // recreates if it still holds that exact instance. + Mockito.verify(producerManager).forceRecreate(staleProducer); + Mockito.verify(freshProducer).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); + } + + /** + * DATAGO-134580: reactive recreation when {@code send(...)} itself throws a + * stale-flow exception. The race window between our proactive {@code isClosed()} + * check and the actual send means the broker can tear the producer down mid-flight; + * in that case the exception must propagate so {@code ErrorQueueRepublishCorrelationKey} + * can retry, and the manager must be force-recreated so the next retry attempt picks up + * a fresh producer rather than re-using the dead one. + * + *

Parameterized over the three concrete JCSMP exception types we treat as + * stale-flow signals - the recovery contract must apply to all of them. + */ + @CartesianTest(name = "[{index}] exception={0}") + void testErrorQueueProducerRecreatedReactivelyOnStaleSendException( + @Values(strings = {"stale", "transport", "closed-facility"}) String exceptionType, + @Mock XMLMessageProducer staleProducer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(false); + + JCSMPException sendError = switch (exceptionType) { + case "stale" -> new StaleSessionException( + "Tried to perform operation on a closed XML message producer", + new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable).")); + case "transport" -> new JCSMPTransportException( + "Received unsolicited CloseFlow for producer (503:Service Unavailable)."); + case "closed-facility" -> new ClosedFacilityException("Producer is closed"); + default -> throw new IllegalArgumentException("unknown exception type: " + exceptionType); + }; + Mockito.doThrow(sendError).when(staleProducer).send(any(XMLMessage.class), any(Destination.class)); + + assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("Stale-flow send failure must propagate so the retry caller can re-attempt") + .isInstanceOf(sendError.getClass()); + + // The manager must have been asked to forceRecreate (with the observed stale + // producer for CAS semantics) so the next retry by ErrorQueueRepublishCorrelationKey + // gets a fresh producer instead of the dead one. + Mockito.verify(producerManager).forceRecreate(staleProducer); + } + + /** + * DATAGO-134580: a non-stale {@code JCSMPException} from {@code send(...)} must + * propagate normally and must not trigger a producer recreate. Guards + * against an over-broad reactive arm that would churn the shared producer on + * every transient publish error (e.g. a malformed message). + */ + @Test + void testErrorQueueProducerNotRecreatedOnUnrelatedJCSMPException( + @Mock XMLMessageProducer producer) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(producer); + Mockito.when(producer.isClosed()).thenReturn(false); + + JCSMPException unrelated = new JCSMPException("Some unrelated publishing error"); + Mockito.doThrow(unrelated).when(producer).send(any(XMLMessage.class), any(Destination.class)); + + assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .isInstanceOf(JCSMPException.class) + .hasMessage("Some unrelated publishing error"); + + Mockito.verify(producerManager, Mockito.never()).forceRecreate(any()); + } + + /** + * DATAGO-134580: CAS contract verification. When two callers both observe the + * same stale producer and both call {@code forceRecreate(stale)}, the manager + * recreates exactly once - the second call returns the already-recreated + * resource without closing it. {@code ErrorQueueInfrastructure.send(...)} must + * use the value returned by {@code forceRecreate} (rather than its own observed + * reference) so it ends up using whatever the manager currently holds, not a + * resource that another caller has since closed and replaced. + */ + @Test + void testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate( + @Mock XMLMessageProducer staleProducer, + @Mock XMLMessageProducer alreadyRecreatedByAnotherCaller) throws Exception { + Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer); + Mockito.when(staleProducer.isClosed()).thenReturn(true); + // Simulate the CAS no-op outcome: another caller already replaced the stale + // producer, so the manager's CAS does not recreate again - it returns the + // already-installed replacement instead. + Mockito.when(producerManager.forceRecreate(staleProducer)) + .thenReturn(alreadyRecreatedByAnotherCaller); + + assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey)) + .as("send must use the manager-returned reference (the already-installed replacement) " + + "and not the locally-observed stale reference") + .doesNotThrowAnyException(); + + Mockito.verify(alreadyRecreatedByAnotherCaller).send(any(XMLMessage.class), any(Destination.class)); + Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class)); + } +} \ No newline at end of file diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/it/util/semp/config/BrokerConfiguratorBuilder.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/it/util/semp/config/BrokerConfiguratorBuilder.java index 95b735879..561b41d94 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/it/util/semp/config/BrokerConfiguratorBuilder.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/it/util/semp/config/BrokerConfiguratorBuilder.java @@ -279,6 +279,18 @@ public void disableVpn(String msgVpnName) { updateVpn(vpn); } + /** Returns the VPN's GD message-spool quota (MB). */ + public Long getMsgVpnSpool(String msgVpnName) { + return queryVpn(msgVpnName).getMaxMsgSpoolUsage(); + } + + /** Sets the VPN's GD message-spool quota (MB). Pass 0 to disable. */ + public void setMsgVpnSpool(String msgVpnName, Long maxMsgSpoolUsageMb) { + final ConfigMsgVpn vpn = queryVpn(msgVpnName); + vpn.maxMsgSpoolUsage(maxMsgSpoolUsageMb); + updateVpn(vpn); + } + /** * Queries all vpns on a broker * diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/JCSMPProducerCloseFlowRecoveryIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/JCSMPProducerCloseFlowRecoveryIT.java new file mode 100644 index 000000000..6474b7689 --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/JCSMPProducerCloseFlowRecoveryIT.java @@ -0,0 +1,428 @@ +package com.solace.spring.cloud.stream.binder; + +import com.solace.it.util.semp.config.BrokerConfiguratorBuilder; +import com.solace.it.util.semp.config.BrokerConfiguratorBuilder.BrokerConfigurator; +import com.solace.spring.boot.autoconfigure.SolaceJavaAutoConfiguration; +import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties; +import com.solace.spring.cloud.stream.binder.test.junit.extension.SpringCloudStreamExtension; +import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext; +import com.solace.spring.cloud.stream.binder.test.util.SimpleJCSMPEventHandler; +import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder; +import com.solace.spring.cloud.stream.binder.util.DestinationType; +import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension; +import com.solace.test.integration.semp.v2.SempV2Api; +import com.solacesystems.jcsmp.CapabilityType; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.TextMessage; +import com.solacesystems.jcsmp.Topic; +import com.solacesystems.jcsmp.XMLMessageProducer; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import com.github.dockerjava.api.model.Container; +import org.apache.commons.lang3.RandomStringUtils; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; +import org.springframework.cloud.stream.binder.Binding; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.config.BindingProperties; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.testcontainers.DockerClientFactory; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** + * DATAGO-134580 broker ITs: three control cases (spool toggle, direct publisher, queue + * ingress/egress toggle) plus two recovery cases (single + repeated broker-level + * {@code hardware message-spool shutdown}) proving the binding recovers from unsolicited + * {@code CloseFlow}. Runs SAME_THREAD because each test mutates shared broker state. + */ +@SpringJUnitConfig(classes = SolaceJavaAutoConfiguration.class, + initializers = ConfigDataApplicationContextInitializer.class) +@ExtendWith(PubSubPlusExtension.class) +@ExtendWith(SpringCloudStreamExtension.class) +@Execution(ExecutionMode.SAME_THREAD) +class JCSMPProducerCloseFlowRecoveryIT { + private static final Logger logger = LoggerFactory.getLogger(JCSMPProducerCloseFlowRecoveryIT.class); + + private JCSMPSession jcsmpSession; + private SempV2Api sempV2Api; + private SpringCloudStreamContext context; + private String vpnName; + private BrokerConfigurator brokerConfig; + private Long originalMaxMsgSpoolUsageMb; + + // Per-test artifacts cleaned up in @AfterEach. + private Binding producerBinding; + private XMLMessageProducer rawProducer; + private String provisionedQueueName; + private boolean spoolModified; + private boolean brokerSpoolNeedsRestore; + private String solaceContainerId; + + @BeforeEach + void setUp(JCSMPSession jcsmpSession, SempV2Api sempV2Api, SpringCloudStreamContext context) { + this.jcsmpSession = jcsmpSession; + this.sempV2Api = sempV2Api; + this.context = context; + this.vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME); + this.brokerConfig = BrokerConfiguratorBuilder.create(sempV2Api).build(); + this.originalMaxMsgSpoolUsageMb = brokerConfig.vpns().getMsgVpnSpool(vpnName); + assertThat(originalMaxMsgSpoolUsageMb) + .as("Captured maxMsgSpoolUsage should be a positive value") + .isNotNull() + .isPositive(); + } + + @AfterEach + void tearDown() { + if (spoolModified) { + try { + brokerConfig.vpns().setMsgVpnSpool(vpnName, originalMaxMsgSpoolUsageMb); + } catch (Exception e) { + logger.warn("Failed to restore maxMsgSpoolUsage for VPN '{}' during cleanup", vpnName, e); + } + } + if (brokerSpoolNeedsRestore) { + try { + runSolaceCliCommands(solaceContainerId, + "enable", + "configure", + "hardware", + "message-spool", + "no shutdown"); + } catch (Exception e) { + logger.warn("Failed to re-enable broker message-spool during cleanup", e); + } + } + if (rawProducer != null) { + rawProducer.close(); + } + if (producerBinding != null) { + producerBinding.unbind(); + } + if (provisionedQueueName != null) { + Queue queue = JCSMPFactory.onlyInstance().createQueue(provisionedQueueName); + try { + jcsmpSession.deprovision(queue, JCSMPSession.FLAG_IGNORE_DOES_NOT_EXIST); + } catch (Exception e) { + logger.warn("Failed to deprovision queue '{}' during cleanup", provisionedQueueName, e); + } + } + } + + /** Control: persistent topic publisher survives a VPN spool quota toggle. */ + @Test + void testPersistentTopicPublisherSurvivesSpoolToggle(TestInfo testInfo) throws Exception { + DirectChannel moduleOutputChannel = createPersistentProducerChannel( + RandomStringUtils.randomAlphanumeric(10), DestinationType.TOPIC, testInfo); + + moduleOutputChannel.send(MessageBuilder.withPayload("before-toggle").build()); + + logger.info("Zeroing maxMsgSpoolUsage for VPN '{}'", vpnName); + spoolModified = true; + brokerConfig.vpns().setMsgVpnSpool(vpnName, 0L); + + logger.info("Restoring maxMsgSpoolUsage={} MB for VPN '{}'", originalMaxMsgSpoolUsageMb, vpnName); + brokerConfig.vpns().setMsgVpnSpool(vpnName, originalMaxMsgSpoolUsageMb); + spoolModified = false; + + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload("after-toggle").build())) + .as("Persistent topic publisher should be unaffected by a VPN spool quota toggle") + .doesNotThrowAnyException(); + } + + /** Control: direct topic publisher (raw JCSMP) is unaffected by spool toggle. */ + @Test + void testDirectTopicPublisherSurvivesSpoolToggle() throws Exception { + String topicName = RandomStringUtils.randomAlphanumeric(10); + Topic topic = JCSMPFactory.onlyInstance().createTopic(topicName); + rawProducer = jcsmpSession.getMessageProducer(new SimpleJCSMPEventHandler()); + + TextMessage warmup = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); + warmup.setDeliveryMode(DeliveryMode.DIRECT); + warmup.setText("before-toggle"); + rawProducer.send(warmup, topic); + + logger.info("Zeroing maxMsgSpoolUsage for VPN '{}'", vpnName); + spoolModified = true; + brokerConfig.vpns().setMsgVpnSpool(vpnName, 0L); + + logger.info("Restoring maxMsgSpoolUsage={} MB for VPN '{}'", originalMaxMsgSpoolUsageMb, vpnName); + brokerConfig.vpns().setMsgVpnSpool(vpnName, originalMaxMsgSpoolUsageMb); + spoolModified = false; + + TextMessage after = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); + after.setDeliveryMode(DeliveryMode.DIRECT); + after.setText("after-toggle"); + assertThatCode(() -> rawProducer.send(after, topic)) + .as("Direct topic publisher should be unaffected by a VPN spool quota toggle (no spool involvement)") + .doesNotThrowAnyException(); + } + + /** Control: persistent queue publisher survives a queue ingress/egress toggle. */ + @Test + void testPersistentQueuePublisherSurvivesQueueIngressEgressToggle(TestInfo testInfo) throws Exception { + String queueName = RandomStringUtils.randomAlphanumeric(20); + provisionedQueueName = queueName; + DirectChannel moduleOutputChannel = createPersistentProducerChannel(queueName, DestinationType.QUEUE, testInfo); + + moduleOutputChannel.send(MessageBuilder.withPayload("before-toggle").build()); + + logger.info("Disabling ingress and egress on queue '{}' in VPN '{}'", queueName, vpnName); + brokerConfig.queues().disableIngressOnQueue(vpnName, queueName); + brokerConfig.queues().disableEgressOnQueue(vpnName, queueName); + + logger.info("Re-enabling ingress and egress on queue '{}' in VPN '{}'", queueName, vpnName); + brokerConfig.queues().reenableIngressOnQueue(vpnName, queueName); + brokerConfig.queues().reenableEgressOnQueue(vpnName, queueName); + + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload("after-toggle").build())) + .as("Persistent queue publisher should be unaffected by a queue ingress/egress toggle") + .doesNotThrowAnyException(); + } + + /** + * Recovery: broker-level {@code hardware message-spool shutdown} fans unsolicited + * CloseFlow to the bound producer; the handler's proactive {@code isClosed()} pre-check + * must rebuild and the first post-shutdown publish must succeed. + */ + @Test + void testPersistentQueuePublisherRecoversAfterMessageSpoolCliShutdown(TestInfo testInfo) throws Exception { + String queueName = RandomStringUtils.randomAlphanumeric(20); + provisionedQueueName = queueName; + DirectChannel moduleOutputChannel = createPersistentProducerChannel(queueName, DestinationType.QUEUE, testInfo); + solaceContainerId = findSolaceContainerId(); + + moduleOutputChannel.send(MessageBuilder.withPayload("before-shutdown").build()); + + // CLI sub-modes are entered one line at a time; the `shutdown` prompt requires `y`. + logger.info("Shutting down broker message-spool via CLI in container '{}'", solaceContainerId); + brokerSpoolNeedsRestore = true; + runSolaceCliCommands(solaceContainerId, + "enable", + "configure", + "hardware", + "message-spool", + "shutdown", + "y"); + awaitBrokerSempResponsive(sempV2Api, vpnName); + + logger.info("Re-enabling broker message-spool via CLI in container '{}'", solaceContainerId); + runSolaceCliCommands(solaceContainerId, + "enable", + "configure", + "hardware", + "message-spool", + "no shutdown"); + brokerSpoolNeedsRestore = false; + awaitBrokerSempResponsive(sempV2Api, vpnName); + awaitJcsmpGuaranteedPublisherCapability(); + + logger.info("Attempting first post-shutdown publish; expecting proactive recovery"); + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload("recovery-1").build())) + .as("First publish after broker CLI message-spool shutdown must recover via proactive isClosed() pre-check") + .doesNotThrowAnyException(); + + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload("recovery-2").build())) + .as("Steady-state publish after recovery must continue to work") + .doesNotThrowAnyException(); + } + + /** Recovery: same disruption looped 3x on the same binding to catch state-accumulation regressions. */ + @Test + void testPersistentQueuePublisherRecoversAcrossRepeatedMessageSpoolCliShutdowns(TestInfo testInfo) throws Exception { + String queueName = RandomStringUtils.randomAlphanumeric(20); + provisionedQueueName = queueName; + DirectChannel moduleOutputChannel = createPersistentProducerChannel(queueName, DestinationType.QUEUE, testInfo); + solaceContainerId = findSolaceContainerId(); + final int cycles = 3; + + moduleOutputChannel.send(MessageBuilder.withPayload("initial-healthy").build()); + + for (int cycle = 1; cycle <= cycles; cycle++) { + logger.info("=== Cycle {}/{}: shutdown -> witness-failure -> recover ===", cycle, cycles); + brokerSpoolNeedsRestore = true; + + runSolaceCliCommands(solaceContainerId, + "enable", + "configure", + "hardware", + "message-spool", + "shutdown", + "y"); + awaitBrokerSempResponsive(sempV2Api, vpnName); + + runSolaceCliCommands(solaceContainerId, + "enable", + "configure", + "hardware", + "message-spool", + "no shutdown"); + brokerSpoolNeedsRestore = false; + awaitBrokerSempResponsive(sempV2Api, vpnName); + awaitJcsmpGuaranteedPublisherCapability(); + + final int currentCycle = cycle; + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload( + "recovery-c" + currentCycle).build())) + .as("Cycle %d: first publish after broker CLI shutdown must recover via proactive isClosed() pre-check", currentCycle) + .doesNotThrowAnyException(); + + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload( + "steady-c" + currentCycle).build())) + .as("Cycle %d: steady-state publish after recovery must continue to work", currentCycle) + .doesNotThrowAnyException(); + } + + assertThatCode(() -> moduleOutputChannel.send(MessageBuilder.withPayload("final-healthy").build())) + .as("After %d shutdown/recover cycles, the binding's producer must continue to publish normally", cycles) + .doesNotThrowAnyException(); + } + + private DirectChannel createPersistentProducerChannel(String destination, DestinationType destinationType, + TestInfo testInfo) throws Exception { + SolaceTestBinder binder = context.getBinder(); + ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo); + producerProperties.getExtension().setDestinationType(destinationType); + BindingProperties producerBindingProperties = new BindingProperties(); + producerBindingProperties.setProducer(producerProperties); + DirectChannel moduleOutputChannel = context.createBindableChannel("output", producerBindingProperties); + producerBinding = binder.bindProducer(destination, moduleOutputChannel, producerProperties); + return moduleOutputChannel; + } + + /** Polls until JCSMP reports {@link CapabilityType#PUB_GUARANTEED} after broker spool re-enable. */ + private void awaitJcsmpGuaranteedPublisherCapability() { + Awaitility.await("JCSMP session reports PUB_GUARANTEED capability") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofMillis(250)) + .untilAsserted(() -> assertThat(jcsmpSession.isCapable(CapabilityType.PUB_GUARANTEED)) + .as("session must re-advertise PUB_GUARANTEED after broker message-spool re-enable") + .isTrue()); + } + + /** Polls until the broker SEMP API answers a {@code MsgVpn} lookup again. */ + private static void awaitBrokerSempResponsive(SempV2Api sempV2Api, String vpnName) { + Awaitility.await("broker SEMP API responsive for VPN '" + vpnName + "'") + .atMost(Duration.ofSeconds(15)) + .pollInterval(Duration.ofMillis(100)) + .ignoreExceptions() + .untilAsserted(() -> assertThat(sempV2Api.monitor() + .getMsgVpn(vpnName, null) + .getData()) + .isNotNull()); + } + + // Docker / CLI helpers (only used by the message-spool CLI shutdown tests). + + /** Finds the broker container this test's JCSMP session is connected to, matched by SMF host port. */ + private String findSolaceContainerId() { + int smfPort = extractSmfPortFromJcsmpSession(); + DockerClient docker = DockerClientFactory.instance().client(); + List containers = docker.listContainersCmd().exec(); + return containers.stream() + .filter(c -> c.getImage() != null && c.getImage().contains("solace-pubsub-standard")) + .filter(c -> { + com.github.dockerjava.api.model.ContainerPort[] ports = c.getPorts(); + if (ports == null) return false; + for (com.github.dockerjava.api.model.ContainerPort port : ports) { + Integer publicPort = port.getPublicPort(); + if (publicPort != null && publicPort == smfPort) return true; + } + return false; + }) + .findFirst() + .map(Container::getId) + .orElseThrow(() -> new IllegalStateException(String.format( + "No running 'solace-pubsub-standard' container exposing SMF host port %d found.", + smfPort))); + } + + private int extractSmfPortFromJcsmpSession() { + Object host = jcsmpSession.getProperty(JCSMPProperties.HOST); + if (host == null) { + throw new IllegalStateException("JCSMP HOST property is not set"); + } + String firstHost = host.toString().split(",")[0].trim(); + int colonIdx = firstHost.lastIndexOf(':'); + if (colonIdx < 0 || colonIdx == firstHost.length() - 1) { + throw new IllegalStateException("Cannot parse SMF port from JCSMP HOST: " + host); + } + String portStr = firstHost.substring(colonIdx + 1).replaceAll("[^0-9].*", ""); + if (portStr.isEmpty()) { + throw new IllegalStateException("Cannot parse SMF port from JCSMP HOST: " + host); + } + return Integer.parseInt(portStr); + } + + /** + * Runs the Solace CLI inside the broker container. Requires a pseudo-TTY so confirmation + * prompts (e.g. destructive {@code shutdown}) are honoured; trailing {@code end} + two + * {@code exit}s ensure {@code cli -A} terminates instead of hanging at the prompt. + */ + private static void runSolaceCliCommands(String containerId, String... cliCommands) throws Exception { + DockerClient docker = DockerClientFactory.instance().client(); + + StringBuilder script = new StringBuilder(); + for (String cmd : cliCommands) { + script.append(cmd).append("\r\n"); + } + script.append("end\r\n").append("exit\r\n").append("exit\r\n"); + + ExecCreateCmdResponse exec = docker.execCreateCmd(containerId) + .withTty(true) + .withAttachStdin(true) + .withAttachStdout(true) + .withAttachStderr(true) + .withCmd("/usr/sw/loads/currentload/bin/cli", "-A") + .exec(); + + ByteArrayInputStream stdin = new ByteArrayInputStream( + script.toString().getBytes(StandardCharsets.UTF_8)); + + final StringBuilder capturedOutput = new StringBuilder(); + com.github.dockerjava.api.async.ResultCallback.Adapter callback = + new com.github.dockerjava.api.async.ResultCallback.Adapter<>() { + @Override + public void onNext(com.github.dockerjava.api.model.Frame frame) { + capturedOutput.append(new String(frame.getPayload(), StandardCharsets.UTF_8)); + } + }; + + boolean completed = docker.execStartCmd(exec.getId()) + .withTty(true) + .withStdIn(stdin) + .exec(callback) + .awaitCompletion(30, TimeUnit.SECONDS); + + if (!completed) { + throw new IllegalStateException(String.format( + "Solace CLI exec did not complete within 30s in container '%s'. " + + "Script:%n%s%nCaptured output:%n%s", + containerId, script, capturedOutput)); + } + } +}