Skip to content

Commit 5157717

Browse files
authored
Merge pull request #142 from SolaceDev/DATAGO-134580-error-queue
DATAGO-134580: Recover error-queue producer from unsolicited CloseFlow
2 parents f10cb85 + cedd2f2 commit 5157717

3 files changed

Lines changed: 227 additions & 1 deletion

File tree

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.solace.spring.cloud.stream.binder.util;
22

33
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
4+
import com.solacesystems.jcsmp.ClosedFacilityException;
45
import com.solacesystems.jcsmp.JCSMPException;
56
import com.solacesystems.jcsmp.JCSMPFactory;
7+
import com.solacesystems.jcsmp.JCSMPTransportException;
68
import com.solacesystems.jcsmp.Queue;
9+
import com.solacesystems.jcsmp.StaleSessionException;
710
import com.solacesystems.jcsmp.XMLMessage;
811
import com.solacesystems.jcsmp.XMLMessageProducer;
912
import org.slf4j.Logger;
@@ -27,13 +30,20 @@ public ErrorQueueInfrastructure(JCSMPSessionProducerManager producerManager, Str
2730
this.consumerProperties = consumerProperties;
2831
}
2932

33+
// DATAGO-134580: recreate shared JCSMP producer on unsolicited termination from Solace broker.
34+
3035
public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelationKey key) throws JCSMPException {
3136
XMLMessage xmlMessage = xmlMessageMapper.mapError(messageContainer.getMessage(), consumerProperties);
3237
xmlMessage.setCorrelationKey(key);
3338
Queue queue = JCSMPFactory.onlyInstance().createQueue(errorQueueName);
3439
XMLMessageProducer producer;
3540
try {
3641
producer = producerManager.get(producerKey);
42+
if (producer.isClosed()) {
43+
LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating",
44+
errorQueueName);
45+
producer = producerManager.forceRecreate(producer);
46+
}
3747
} catch (Exception e) {
3848
MessagingException wrappedException = new MessagingException(
3949
String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(),
@@ -42,7 +52,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
4252
throw wrappedException;
4353
}
4454

45-
producer.send(xmlMessage, queue);
55+
try {
56+
producer.send(xmlMessage, queue);
57+
} catch (JCSMPException e) {
58+
if (e instanceof StaleSessionException
59+
|| e instanceof JCSMPTransportException
60+
|| e instanceof ClosedFacilityException
61+
|| producer.isClosed()) {
62+
LOGGER.warn("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " +
63+
"recreating for next attempt",
64+
errorQueueName, e.getClass().getSimpleName());
65+
try {
66+
producerManager.forceRecreate(producer);
67+
} catch (Exception recreateError) {
68+
LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError);
69+
e.addSuppressed(recreateError);
70+
}
71+
}
72+
throw e;
73+
}
4674
}
4775

4876
public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer,

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,32 @@ public T get(String key) throws Exception {
4343
return sharedResource;
4444
}
4545

46+
/**
47+
* Compare-and-swap the shared resource. If the manager still holds {@code expected},
48+
* close it and {@link #create()} a fresh one; otherwise return the currently-installed
49+
* resource without re-creating.
50+
*
51+
* @param expected the reference the caller observed and considers no longer usable
52+
* @return the resource currently installed in the manager
53+
* @throws Exception whatever {@link #create()} may throw
54+
*/
55+
public T forceRecreate(T expected) throws Exception {
56+
synchronized (lock) {
57+
if (sharedResource != expected) {
58+
return sharedResource;
59+
}
60+
if (sharedResource != null) {
61+
try {
62+
close();
63+
} catch (Exception e) {
64+
LOGGER.debug("Failed to close current {} during forceRecreate", type, e);
65+
}
66+
}
67+
sharedResource = create();
68+
return sharedResource;
69+
}
70+
}
71+
4672
/**
4773
* De-register {@code key} from the shared resource.
4874
* <p>If this is the last {@code key} associated to the shared resource, {@link #close()} the resource.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
package com.solace.spring.cloud.stream.binder.util;
2+
3+
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
4+
import com.solacesystems.jcsmp.BytesXMLMessage;
5+
import com.solacesystems.jcsmp.ClosedFacilityException;
6+
import com.solacesystems.jcsmp.Destination;
7+
import com.solacesystems.jcsmp.JCSMPException;
8+
import com.solacesystems.jcsmp.JCSMPFactory;
9+
import com.solacesystems.jcsmp.JCSMPTransportException;
10+
import com.solacesystems.jcsmp.StaleSessionException;
11+
import com.solacesystems.jcsmp.XMLMessage;
12+
import com.solacesystems.jcsmp.XMLMessageProducer;
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.extension.ExtendWith;
16+
import org.junitpioneer.jupiter.cartesian.CartesianTest;
17+
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
18+
import org.mockito.Mock;
19+
import org.mockito.Mockito;
20+
import org.mockito.junit.jupiter.MockitoExtension;
21+
22+
import static org.assertj.core.api.Assertions.assertThatCode;
23+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
24+
import static org.mockito.ArgumentMatchers.any;
25+
26+
/**
27+
* Unit tests for the DATAGO-134580 stale-flow recovery added to {@link ErrorQueueInfrastructure}.
28+
* The error-queue republish path borrows the session-default producer from
29+
* {@link JCSMPSessionProducerManager} and historically had no recovery logic when the broker
30+
* fanned out an unsolicited CloseFlow on that producer (reactive recreation in
31+
* {@code JCSMPOutboundMessageHandler} only protects per-binding producers, not the shared
32+
* session-default one).
33+
*/
34+
@ExtendWith(MockitoExtension.class)
35+
class ErrorQueueInfrastructureTest {
36+
private static final String PRODUCER_KEY = "test-producer-key";
37+
private static final String ERROR_QUEUE_NAME = "test-error-queue";
38+
39+
@Mock JCSMPSessionProducerManager producerManager;
40+
@Mock MessageContainer messageContainer;
41+
@Mock ErrorQueueRepublishCorrelationKey correlationKey;
42+
43+
BytesXMLMessage inputMessage;
44+
SolaceConsumerProperties consumerProperties;
45+
ErrorQueueInfrastructure errorQueueInfrastructure;
46+
47+
@BeforeEach
48+
void setup() {
49+
inputMessage = JCSMPFactory.onlyInstance().createMessage(BytesXMLMessage.class);
50+
consumerProperties = new SolaceConsumerProperties();
51+
errorQueueInfrastructure = new ErrorQueueInfrastructure(
52+
producerManager, PRODUCER_KEY, ERROR_QUEUE_NAME, consumerProperties);
53+
Mockito.when(messageContainer.getMessage()).thenReturn(inputMessage);
54+
}
55+
56+
/**
57+
* DATAGO-134580: proactive {@code producer.isClosed()} pre-check on the error-queue
58+
* republish path. If the broker has already torn down the shared session-default
59+
* producer before this {@code send(...)} runs, the very first error-queue publish
60+
* should still succeed - the manager is asked to recreate the producer before send is
61+
* attempted, and the fresh producer services the publish.
62+
*/
63+
@Test
64+
void testErrorQueueProducerRecreatedProactivelyOnIsClosed(
65+
@Mock XMLMessageProducer staleProducer,
66+
@Mock XMLMessageProducer freshProducer) throws Exception {
67+
Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer);
68+
Mockito.when(staleProducer.isClosed()).thenReturn(true);
69+
Mockito.when(producerManager.forceRecreate(staleProducer)).thenReturn(freshProducer);
70+
71+
assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey))
72+
.as("Proactive recreate must allow the publish to succeed on the fresh producer")
73+
.doesNotThrowAnyException();
74+
75+
// CAS contract: caller passes the observed (stale) reference so the manager only
76+
// recreates if it still holds that exact instance.
77+
Mockito.verify(producerManager).forceRecreate(staleProducer);
78+
Mockito.verify(freshProducer).send(any(XMLMessage.class), any(Destination.class));
79+
Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class));
80+
}
81+
82+
/**
83+
* DATAGO-134580: reactive recreation when {@code send(...)} itself throws a
84+
* stale-flow exception. The race window between our proactive {@code isClosed()}
85+
* check and the actual send means the broker can tear the producer down mid-flight;
86+
* in that case the exception must propagate so {@code ErrorQueueRepublishCorrelationKey}
87+
* can retry, and the manager must be force-recreated so the next retry attempt picks up
88+
* a fresh producer rather than re-using the dead one.
89+
*
90+
* <p>Parameterized over the three concrete JCSMP exception types we treat as
91+
* stale-flow signals - the recovery contract must apply to all of them.
92+
*/
93+
@CartesianTest(name = "[{index}] exception={0}")
94+
void testErrorQueueProducerRecreatedReactivelyOnStaleSendException(
95+
@Values(strings = {"stale", "transport", "closed-facility"}) String exceptionType,
96+
@Mock XMLMessageProducer staleProducer) throws Exception {
97+
Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer);
98+
Mockito.when(staleProducer.isClosed()).thenReturn(false);
99+
100+
JCSMPException sendError = switch (exceptionType) {
101+
case "stale" -> new StaleSessionException(
102+
"Tried to perform operation on a closed XML message producer",
103+
new JCSMPException("Received unsolicited CloseFlow for producer (503:Service Unavailable)."));
104+
case "transport" -> new JCSMPTransportException(
105+
"Received unsolicited CloseFlow for producer (503:Service Unavailable).");
106+
case "closed-facility" -> new ClosedFacilityException("Producer is closed");
107+
default -> throw new IllegalArgumentException("unknown exception type: " + exceptionType);
108+
};
109+
Mockito.doThrow(sendError).when(staleProducer).send(any(XMLMessage.class), any(Destination.class));
110+
111+
assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey))
112+
.as("Stale-flow send failure must propagate so the retry caller can re-attempt")
113+
.isInstanceOf(sendError.getClass());
114+
115+
// The manager must have been asked to forceRecreate (with the observed stale
116+
// producer for CAS semantics) so the next retry by ErrorQueueRepublishCorrelationKey
117+
// gets a fresh producer instead of the dead one.
118+
Mockito.verify(producerManager).forceRecreate(staleProducer);
119+
}
120+
121+
/**
122+
* DATAGO-134580: a non-stale {@code JCSMPException} from {@code send(...)} must
123+
* propagate normally and must <em>not</em> trigger a producer recreate. Guards
124+
* against an over-broad reactive arm that would churn the shared producer on
125+
* every transient publish error (e.g. a malformed message).
126+
*/
127+
@Test
128+
void testErrorQueueProducerNotRecreatedOnUnrelatedJCSMPException(
129+
@Mock XMLMessageProducer producer) throws Exception {
130+
Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(producer);
131+
Mockito.when(producer.isClosed()).thenReturn(false);
132+
133+
JCSMPException unrelated = new JCSMPException("Some unrelated publishing error");
134+
Mockito.doThrow(unrelated).when(producer).send(any(XMLMessage.class), any(Destination.class));
135+
136+
assertThatThrownBy(() -> errorQueueInfrastructure.send(messageContainer, correlationKey))
137+
.isInstanceOf(JCSMPException.class)
138+
.hasMessage("Some unrelated publishing error");
139+
140+
Mockito.verify(producerManager, Mockito.never()).forceRecreate(any());
141+
}
142+
143+
/**
144+
* DATAGO-134580: CAS contract verification. When two callers both observe the
145+
* same stale producer and both call {@code forceRecreate(stale)}, the manager
146+
* recreates exactly once - the second call returns the already-recreated
147+
* resource without closing it. {@code ErrorQueueInfrastructure.send(...)} must
148+
* use the value returned by {@code forceRecreate} (rather than its own observed
149+
* reference) so it ends up using whatever the manager currently holds, not a
150+
* resource that another caller has since closed and replaced.
151+
*/
152+
@Test
153+
void testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate(
154+
@Mock XMLMessageProducer staleProducer,
155+
@Mock XMLMessageProducer alreadyRecreatedByAnotherCaller) throws Exception {
156+
Mockito.when(producerManager.get(PRODUCER_KEY)).thenReturn(staleProducer);
157+
Mockito.when(staleProducer.isClosed()).thenReturn(true);
158+
// Simulate the CAS no-op outcome: another caller already replaced the stale
159+
// producer, so the manager's CAS does not recreate again - it returns the
160+
// already-installed replacement instead.
161+
Mockito.when(producerManager.forceRecreate(staleProducer))
162+
.thenReturn(alreadyRecreatedByAnotherCaller);
163+
164+
assertThatCode(() -> errorQueueInfrastructure.send(messageContainer, correlationKey))
165+
.as("send must use the manager-returned reference (the already-installed replacement) " +
166+
"and not the locally-observed stale reference")
167+
.doesNotThrowAnyException();
168+
169+
Mockito.verify(alreadyRecreatedByAnotherCaller).send(any(XMLMessage.class), any(Destination.class));
170+
Mockito.verify(staleProducer, Mockito.never()).send(any(XMLMessage.class), any(Destination.class));
171+
}
172+
}

0 commit comments

Comments
 (0)