Skip to content

Commit 419b886

Browse files
sunil-solaceclaude
andcommitted
DATAGO-134580: Recreate JCSMP producer on unsolicited CloseFlow
Backport of PR SolaceProducts#475 (SolaceProducts) / commits 931f09c..ee61040 on master to stage-4.11.1. When the broker fans out an unsolicited CloseFlow on a publisher flow (message-spool maintenance, DR failover, "503: Service Unavailable" on GD), JCSMP marks the per-binding XMLMessageProducer terminally closed. The JCSMP session stays connected, but every subsequent producer.send throws StaleSessionException until the application is restarted. Outbound handler (JCSMPOutboundMessageHandler): - New volatile recreateProducer flag + lifecycleLock that covers start(), stop(), closeResources(), createProducerInternal(), and recreateProducerIfNeeded(). - Catch arm in handleMessage detects StaleSessionException / JCSMPTransportException / ClosedFacilityException (and a closed producer post-send), arms the flag, and surfaces the original exception via the error channel. - Pre-check at the top of every handleMessage rebuilds the producer proactively when producer.isClosed() returns true. - createProducerInternal is now self-contained: locks, gets the shared session-default producer from JCSMPSessionProducerManager, creates the per-binding producer (+ transacted session when configured), and on failure closes whatever was partially built and wraps in a RuntimeException. - Recreate failure stays armed so the next inbound message retries. Shared producer manager: - JCSMPSessionProducerManager.forceRecreate(expected) added. CAS semantics: only recreates if the manager still holds the supplied reference; otherwise returns the currently-installed one. Error-queue path (ErrorQueueInfrastructure): - Proactive isClosed() check on the shared session-default producer before send. - Reactive forceRecreate(observed) on stale-flow / transport / closed send exceptions. Recovery is single-shot here because ErrorQueueRepublishCorrelationKey.handleError() already loops up to errorQueueMaxDeliveryAttempts. Tests: - Unit (JCSMPOutboundMessageHandlerTest, ErrorQueueInfrastructureTest): Cartesian coverage over transacted x stale-flow exception type for the recovery paths; proactive isClosed pre-check; recreate-failure retry; stop/start flag-reset; CAS noop for forceRecreate. - Broker IT (JCSMPProducerCloseFlowRecoveryIT, new): three control scenarios that document broker disruptions which do NOT reproduce the bug (spool-quota toggle on persistent topic, direct topic, queue ingress/egress toggle), plus the customer-reported reproducer driven via broker CLI (hardware message-spool shutdown over docker exec with TTY for confirmation prompts) and a repeated-cycles variant. Container is selected by SMF host port to avoid targeting leftover containers. After re-enable, the test waits for JCSMP's PUB_GUARANTEED capability to refresh before driving recovery publishes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3748ade commit 419b886

8 files changed

Lines changed: 975 additions & 42 deletions

File tree

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,7 @@
77
**/.flattened-pom.xml
88

99
# Release files
10-
release.properties
10+
release.properties
11+
12+
# Local toolchain / IDE files
13+
.java-version

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java

Lines changed: 102 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager.CloudStreamEventHandler;
1515
import com.solace.spring.cloud.stream.binder.util.StaticMessageHeaderMapAccessor;
1616
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
17+
import com.solacesystems.jcsmp.ClosedFacilityException;
1718
import com.solacesystems.jcsmp.Destination;
1819
import com.solacesystems.jcsmp.JCSMPException;
1920
import com.solacesystems.jcsmp.JCSMPFactory;
2021
import com.solacesystems.jcsmp.JCSMPSession;
2122
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
23+
import com.solacesystems.jcsmp.JCSMPTransportException;
24+
import com.solacesystems.jcsmp.StaleSessionException;
2225
import com.solacesystems.jcsmp.Topic;
2326
import com.solacesystems.jcsmp.XMLMessage;
2427
import com.solacesystems.jcsmp.XMLMessageProducer;
@@ -64,6 +67,10 @@ public class JCSMPOutboundMessageHandler implements MessageHandler, Lifecycle {
6467
private boolean isRunning = false;
6568
private ErrorMessageStrategy errorMessageStrategy;
6669

70+
// DATAGO-134580: recreate JCSMP producer on unsolicited termination from Solace broker.
71+
private volatile boolean recreateProducer = false;
72+
private final Object lifecycleLock = new Object();
73+
6774
private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class);
6875

6976
public JCSMPOutboundMessageHandler(ProducerDestination destination,
@@ -95,6 +102,8 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
95102
throw handleMessagingException(correlationKey, msg0, new ClosedChannelBindingException(msg1));
96103
}
97104

105+
recreateProducerIfNeeded(correlationKey);
106+
98107
try {
99108
CorrelationData correlationData = message.getHeaders()
100109
.get(SolaceBinderHeaders.CONFIRM_CORRELATION, CorrelationData.class);
@@ -165,6 +174,18 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
165174
producerEventHandler.responseReceivedEx(correlationKey);
166175
}
167176
} catch (JCSMPException e) {
177+
if (e instanceof StaleSessionException
178+
|| e instanceof JCSMPTransportException
179+
|| e instanceof ClosedFacilityException
180+
|| producer.isClosed()) {
181+
if (!recreateProducer) {
182+
LOGGER.debug("Detected stale JCSMP producer for binding {} (cause: {}); will " +
183+
"recreate on next message <message handler ID: {}>",
184+
properties.getBindingName(), e.getClass().getSimpleName(), id);
185+
}
186+
recreateProducer = true;
187+
}
188+
168189
if (transactedSession != null) {
169190
try {
170191
if (!(e instanceof RollbackException)) {
@@ -229,62 +250,103 @@ private Destination getDynamicDestination(Map<String, Object> headers, ErrorChan
229250
@Override
230251
public void start() {
231252
LOGGER.info("Creating producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
232-
if (isRunning()) {
233-
LOGGER.warn("Nothing to do, message handler {} is already running", id);
234-
return;
253+
synchronized (lifecycleLock) {
254+
if (isRunning()) {
255+
LOGGER.warn("Nothing to do, message handler {} is already running", id);
256+
return;
257+
}
258+
recreateProducer = false;
259+
260+
try {
261+
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
262+
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
263+
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
264+
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
265+
IllegalArgumentException exception = new IllegalArgumentException(String.format(
266+
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
267+
properties.getExtension().getHeaderNameMapping(), id));
268+
LOGGER.warn(exception.getMessage());
269+
throw exception;
270+
}
271+
}
272+
} catch (Exception e) {
273+
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
274+
LOGGER.warn(msg, e);
275+
throw new RuntimeException(msg, e);
276+
}
277+
278+
createProducerInternal();
279+
isRunning = true;
235280
}
281+
}
236282

237-
try {
238-
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
239-
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
240-
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
241-
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
242-
IllegalArgumentException exception = new IllegalArgumentException(String.format(
243-
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
244-
properties.getExtension().getHeaderNameMapping(), id));
245-
LOGGER.warn(exception.getMessage());
246-
throw exception;
283+
private void createProducerInternal() {
284+
synchronized (lifecycleLock) {
285+
try {
286+
producerManager.get(id);
287+
if (properties.getExtension().isTransacted()) {
288+
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
289+
transactedSession = jcsmpSession.createTransactedSession();
290+
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
291+
producerEventHandler);
292+
} else {
293+
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
294+
producerEventHandler);
247295
}
296+
} catch (Exception e) {
297+
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
298+
LOGGER.warn(msg, e);
299+
closeResources();
300+
throw new RuntimeException(msg, e);
248301
}
302+
}
303+
}
249304

250-
producerManager.get(id);
251-
if (properties.getExtension().isTransacted()) {
252-
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
253-
transactedSession = jcsmpSession.createTransactedSession();
254-
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
255-
producerEventHandler);
256-
} else {
257-
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
258-
producerEventHandler);
305+
private void recreateProducerIfNeeded(ErrorChannelSendingCorrelationKey correlationKey) throws MessagingException {
306+
if (!recreateProducer && !producer.isClosed()) {
307+
return;
308+
}
309+
synchronized (lifecycleLock) {
310+
if (!recreateProducer && !producer.isClosed()) {
311+
return;
259312
}
260-
} catch (Exception e) {
261-
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
262-
LOGGER.warn(msg, e);
313+
LOGGER.debug("Recreating JCSMP producer for binding {} after stale-flow detection <message handler ID: {}>",
314+
properties.getBindingName(), id);
263315
closeResources();
264-
throw new RuntimeException(msg, e);
316+
try {
317+
createProducerInternal();
318+
recreateProducer = false;
319+
} catch (Exception createError) {
320+
recreateProducer = true;
321+
throw handleMessagingException(correlationKey,
322+
"Failed to recreate JCSMP producer after stale-flow detection", createError);
323+
}
265324
}
266-
267-
isRunning = true;
268325
}
269326

270327
@Override
271328
public void stop() {
272-
if (!isRunning()) return;
273-
closeResources();
274-
isRunning = false;
329+
synchronized (lifecycleLock) {
330+
if (!isRunning()) return;
331+
closeResources();
332+
isRunning = false;
333+
}
275334
}
276335

277336
private void closeResources() {
278-
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
279-
if (producer != null) {
280-
LOGGER.info("Closing producer <message handler ID: {}>", id);
281-
producer.close();
282-
}
283-
if (transactedSession != null) {
284-
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
285-
transactedSession.close();
337+
synchronized (lifecycleLock) {
338+
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
339+
recreateProducer = false;
340+
if (producer != null) {
341+
LOGGER.info("Closing producer <message handler ID: {}>", id);
342+
producer.close();
343+
}
344+
if (transactedSession != null) {
345+
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
346+
transactedSession.close();
347+
}
348+
producerManager.release(id);
286349
}
287-
producerManager.release(id);
288350
}
289351

290352
@Override

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.solace.spring.cloud.stream.binder.util;
22

33
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
4+
import com.solacesystems.jcsmp.ClosedFacilityException;
45
import com.solacesystems.jcsmp.JCSMPException;
56
import com.solacesystems.jcsmp.JCSMPFactory;
7+
import com.solacesystems.jcsmp.JCSMPTransportException;
68
import com.solacesystems.jcsmp.Queue;
9+
import com.solacesystems.jcsmp.StaleSessionException;
710
import com.solacesystems.jcsmp.XMLMessage;
811
import com.solacesystems.jcsmp.XMLMessageProducer;
912
import org.slf4j.Logger;
@@ -34,6 +37,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
3437
XMLMessageProducer producer;
3538
try {
3639
producer = producerManager.get(producerKey);
40+
if (producer.isClosed()) {
41+
LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating",
42+
errorQueueName);
43+
producer = producerManager.forceRecreate(producer);
44+
}
3745
} catch (Exception e) {
3846
MessagingException wrappedException = new MessagingException(
3947
String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(),
@@ -42,7 +50,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
4250
throw wrappedException;
4351
}
4452

45-
producer.send(xmlMessage, queue);
53+
try {
54+
producer.send(xmlMessage, queue);
55+
} catch (JCSMPException e) {
56+
if (e instanceof StaleSessionException
57+
|| e instanceof JCSMPTransportException
58+
|| e instanceof ClosedFacilityException
59+
|| producer.isClosed()) {
60+
LOGGER.debug("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " +
61+
"recreating for next attempt",
62+
errorQueueName, e.getClass().getSimpleName());
63+
try {
64+
producerManager.forceRecreate(producer);
65+
} catch (Exception recreateError) {
66+
LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError);
67+
e.addSuppressed(recreateError);
68+
}
69+
}
70+
throw e;
71+
}
4672
}
4773

4874
public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer,

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,32 @@ public T get(String key) throws Exception {
4343
return sharedResource;
4444
}
4545

46+
/**
47+
* Compare-and-swap the shared resource. If the manager still holds {@code expected},
48+
* close it and {@link #create()} a fresh one; otherwise return the currently-installed
49+
* resource without re-creating.
50+
*
51+
* @param expected the reference the caller observed and considers no longer usable
52+
* @return the resource currently installed in the manager
53+
* @throws Exception whatever {@link #create()} may throw
54+
*/
55+
public T forceRecreate(T expected) throws Exception {
56+
synchronized (lock) {
57+
if (sharedResource != expected) {
58+
return sharedResource;
59+
}
60+
if (sharedResource != null) {
61+
try {
62+
close();
63+
} catch (Exception e) {
64+
LOGGER.debug("Failed to close current {} during forceRecreate", type, e);
65+
}
66+
}
67+
sharedResource = create();
68+
return sharedResource;
69+
}
70+
}
71+
4672
/**
4773
* De-register {@code key} from the shared resource.
4874
* <p>If this is the last {@code key} associated to the shared resource, {@link #close()} the resource.

0 commit comments

Comments
 (0)