Skip to content

Commit 0106ea8

Browse files
Nepherysunil-solaceclaude
authored
DATAGO-134580: Recreate JCSMP producer on unsolicited CloseFlow (#484)
* DATAGO-134580: Recreate JCSMP producer on unsolicited CloseFlow When the broker fans out an unsolicited CloseFlow on a publisher flow (message-spool maintenance, DR failover, "503: Service Unavailable" on GD), JCSMP terminally closes the per-binding XMLMessageProducer. The JCSMP session stays connected, but every subsequent producer.send(...) throws StaleSessionException until the application is restarted - customer-reported in DATAGO-132760. JCSMPOutboundMessageHandler now detects StaleSessionException and ClosedFacilityException in the send-path catch block, arms a volatile producerNeedsRecreation flag, and at the top of the next handleMessage rebuilds the producer (and the TransactedSession when configured) under a double-checked-lock before publishing. Recreation failures are routed through the existing error channel and leave the flag armed so the next inbound message retries. The flag is reset on start() and closeResources() to keep stop/start lifecycles clean. Tests: - Unit (JCSMPOutboundMessageHandlerTest): parameterized stale-recovery across plain and transacted sessions, ClosedFacilityException variant, recreation-failure-then-retry, and lifecycle-driven flag reset. - Broker IT (JCSMPProducerCloseFlowRecoveryIT, new): five scenarios against PubSubPlusExtension - three controls that document broker disruptions which do NOT reproduce the bug (spool-quota toggle on persistent topic, direct topic, queue ingress/egress toggle), the customer-reported reproducer driven via the broker's CLI (hardware message-spool shutdown over docker exec with TTY for confirmation prompts), and a repeated-cycles variant that proves the recreate path resets cleanly across consecutive stale-flow events on the same binding. * DATAGO-134580: Address PR review feedback Five round-2 fixes from PR #141 review (Copilot): 1. Broaden the recreate trigger in JCSMPOutboundMessageHandler to also arm the producerNeedsRecreation flag on JCSMPTransportException. The broker IT explicitly documents that an unsolicited CloseFlow can surface on send(...) as either StaleSessionException (the typical form) or the raw JCSMPTransportException (when the CloseFlow event reaches send() before JCSMP's stale-marker has propagated); both are now treated as recreation triggers. New unit test test_producerRecreatedAfterJCSMPTransportException covers the new arm. 2. Fix BrokerConfiguratorBuilder.disableMsgSpoolForVpn Javadoc which incorrectly claimed zeroing maxMsgSpoolUsage tears down already-bound publisher / consumer flows. The IT documents the opposite: the broker keeps existing flows alive and only NACKs new GD publishes. Javadoc now matches the empirical behaviour and points at IT test 1. 3. JCSMPProducerCloseFlowRecoveryIT comment-vs-code mismatch: the "TWO sequential confirmation prompts" comment was inaccurate - the `shutdown` command commits after a single `y` (the second prompt visible in the broker's TTY output is decorative; the broker accepts our `y` on the first answerable prompt). Comment realigned to a single confirmation. 4. runSolaceCliCommands now checks the boolean from awaitCompletion(...) and throws an IllegalStateException with the full script and captured stdout/stderr if the docker exec does not complete in time, instead of silently letting downstream assertions fail with misleading timeouts. The Frame callback was extended to buffer the CLI output for inclusion in the diagnostic. 5. CLI script terminator extended from `end\nexit\n` to `end\nexit\nexit\n`. The single `exit` only popped from privileged-exec `#` to user-exec `>` and then sat there waiting for input; cli -A never terminated and awaitCompletion returned false. The second `exit` closes the user-exec session so the cli -A process actually exits and awaitCompletion can observe it. Without this, the new defensive throw in (4) would fire on every CLI invocation and turn the IT into a hard failure. All 411 binder-core unit tests remain green; all 5 broker IT scenarios turn green again with these changes. * DATAGO-134580: Apply review feedback on outbound handler Five changes from mayur-solace's PR #141 review, all in JCSMPOutboundMessageHandler: - M1 / producer.isClosed() defensive check: stale-detection in the send-path catch now also arms the recreation flag when producer.isClosed() returns true. Guards against any future JCSMP exception subclass we don't enumerate explicitly - if JCSMP has marked the producer closed, recreation is always the right move. - M2 / single-line if braces: all single-line if guards in recreateProducerIfNeeded() are now braced for consistency with house style. - M3 / field rename: producerNeedsRecreation -> recreateProducer. Test Javadocs and assertion messages updated to match. - M4 / catch-block ordering: stale-detection moved to the top of the catch in handleMessage(), before the transactional rollback handling. Behaviour is unchanged (the flag is consumed by the next handleMessage, not by anything in this catch) but the structure now reads as "establish facts about the producer first, then handle transactional bookkeeping". - M5 / comment cleanup: stripped the verbose DATAGO-134580 comments and the Javadocs I added on createProducerInternal and recreateProducerIfNeeded. Method names are self-documenting; the ticket reference now lives on a single line above the field declaration, matching the reviewer's suggestion. 411 binder-core unit tests remain green (no behavioural change from M1-M5; only one new code path is M1's isClosed() OR-arm). * DATAGO-134580: Proactively recreate producer on isClosed() pre-check Extend the recreate-on-stale guard with a proactive producer.isClosed() pre-check at the top of handleMessage(...). The reactive (catch-block) detection stays as-is and continues to cover the race window where the broker tears down the producer between our pre-check and the actual send(...) call, plus any future JCSMP exception subclass we don't enumerate. Customer-visible effect: the first publish after the broker fans out an unsolicited CloseFlow now succeeds rather than being surfaced to the error channel. Previously the reactive-only path always lost that first message - the catch armed the flag and the next message recovered, but the in-flight message that hit the dead producer always failed. Unit test test_producerRecreatedProactivelyWhenIsClosedDetectedBeforeSend mocks a producer that reports isClosed() == true on the first handleMessage, with no exception thrown by send, and verifies: - recreation happens (createProducer called twice: once at start, once proactively) - the fresh producer services the publish - the closed original is never sent through (Mockito.never()) - the closed original is closed during the recreate Integration test updates: the bug-witness assertions in tests 4 and 5 of JCSMPProducerCloseFlowRecoveryIT previously expected the first post-shutdown publish to throw a JCSMP-rooted MessagingException - that was the master-branch failure mode they documented. With the proactive check, the handler intercepts the dead producer before send(...) is ever called, so the first publish now succeeds transparently. The bug- witness phase is replaced with a doesNotThrowAnyException assertion on the first publish, plus a steady-state assertion on a follow-up publish. The new shape is strictly stricter than the old: a regression that removed the proactive check would fail because reactive-only recovery surfaces an exception on that first attempt; a regression that removed both halves would fail outright; a regression on cycle 2+ in test 5 would fail on the second cycle. Test-class Javadoc, per-test headers, and the test list have all been updated to drop the "reproducer"/"bug witness" framing - the bug doesn't manifest as a publish failure anymore, the IT now characterises the recovery contract instead. 411 binder-core unit tests still green (75 in JCSMPOutboundMessageHandlerTest, +1 from previous commit for the new proactive test). * DATAGO-134580: Recover error-queue producer from unsolicited CloseFlow The reactive + proactive recreate-on-stale logic added in PR #141 (commits 931f09c..134e7ef) protects each binding's per-binding XMLMessageProducer in JCSMPOutboundMessageHandler. The error-queue republish path in ErrorQueueInfrastructure has the same exposure but on a different producer: it borrows the session-default producer from JCSMPSessionProducerManager via producerManager.get(producerKey) and historically had no recovery logic when the broker tore that producer down via unsolicited CloseFlow. Failure mode without this fix: when the broker fans out CloseFlow (message-spool maintenance, DR failover, "503: Service Unavailable"), the shared session-default producer is marked closed by JCSMP. Every subsequent error-queue republish in ErrorQueueInfrastructure.send() throws StaleSessionException / JCSMPTransportException / ClosedFacilityException; ErrorQueueRepublishCorrelationKey.handleError() catches at the message-retry level and re-attempts up to maxDeliveryAttempts - all attempts re-using the same dead producer reference, all doomed to fail. After max attempts the message is re-queued onto the original consumer queue, the consumer redelivers it, fails again, hits the error-queue path again, fails again. Net effect: failed-consumer messages disappear from the system after a DR failover or spool maintenance event. The fix mirrors the outbound-handler approach: - Proactive: at the top of send(), after producerManager.get(...), check producer.isClosed(). If true, call the new producerManager.forceRecreate() to rebuild the shared producer before send is attempted. - Reactive: wrap producer.send(...) in a try-catch. On StaleSessionException, JCSMPTransportException, ClosedFacilityException, or post-failure producer.isClosed(), call forceRecreate() so the next ErrorQueueRepublishCorrelationKey retry-loop iteration picks up a fresh producer. The original exception still propagates so the retry caller can do its errorQueueDeliveryAttempt++ bookkeeping. The shared producer is reference-counted across the entire session (JCSMPOutboundMessageHandler also registers itself for ref-count purposes even though it uses its own per-binding producer for sends). release() + get() does NOT work as a recovery primitive in production because it only closes the resource when registeredIds.size() <= 1 - in any deployment with at least one outbound binding, the ref-count stays > 1 and release() leaves the dead resource in place. The new forceRecreate() in SharedResourceManager sidesteps the ref-count: it unconditionally closes the current resource and create()s a new one under the existing lock, leaving registrations intact so every already-registered caller picks up the fresh resource on their next get(). Added as a generic method on SharedResourceManager since the recovery contract is independent of the JCSMP specifics. Tests (ErrorQueueInfrastructureTest, new): - test_errorQueueProducerRecreatedProactivelyOnIsClosed: closed producer detected before send -> forceRecreate -> fresh producer services the publish; stale producer never sent through (Mockito.never()). - test_errorQueueProducerRecreatedReactivelyOnStaleSendException: @CartesianTest over Stale / JCSMPTransport / ClosedFacility - verifies all three exception types trigger forceRecreate AND propagate to the retry caller (so handleError can drive its loop). - test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException: negative control - a non-stale JCSMPException (e.g. malformed message) propagates normally and does NOT churn the shared producer, guarding against an over-broad reactive arm. 417 binder-core unit tests green (was 411 + 6 new from this commit). This branch is layered on DATAGO-134580 (PR #141) so the new SharedResourceManager.forceRecreate() and the ErrorQueueInfrastructure changes can be reviewed alongside the related outbound-handler work. * DATAGO-134580: Apply CAS semantics to forceRecreate + generic docs Three PR #142 review items: C1 (Copilot) + C3 (mayur-solace) - race in forceRecreate(). The original unconditional implementation could have two callers both observe the same stale shared resource, both enter forceRecreate(), and have the second caller close a healthy replacement that the first caller just installed. Fix: compare-and-swap. forceRecreate now takes an `expected` argument - the reference the caller observed. Under the lock, the manager recreates only if `sharedResource == expected`; otherwise it returns whatever a concurrent caller already installed without closing or re-creating anything. The caller-visible contract is now: pass what you observed, use what's returned. C2 (mayur-solace) - Javadoc on SharedResourceManager.forceRecreate referenced the broker / CloseFlow concern specifically. Since SharedResourceManager is generic and could host non-broker resources in the future, the docs are rewritten to describe the CAS contract generically without naming the JCSMP/broker context. ErrorQueueInfrastructure.send() updated at both call sites to pass the observed producer reference and use the value returned by forceRecreate (which may be the fresh one we requested, or the already-installed replacement another caller put in place). New unit test testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate simulates the exact race C1 flagged: stale producer observed, manager's CAS returns an already-installed replacement, send must use the replacement rather than the locally-observed stale one. Existing tests updated to pass the observed reference and verify CAS arguments. Also aligned the test method names to drop the test_ underscore form, matching the no-underscore convention used elsewhere in the binder-core test suite (e.g. SolaceErrorMessageHandlerTest). 418 binder-core unit tests green (was 417 + 1 new CAS-race test). * DATAGO-134580: Trim forceRecreate Javadoc to essentials Per PR #142 follow-up: the previous Javadoc (24 lines, two paragraphs of explanation) was verbose for an IDE hover. Reduced to a single sentence describing the CAS contract plus the standard param/return/throws. * DATAGO-134580: Trim verbose test docs and comments Collapse multi-paragraph Javadoc and inline narrative on the DATAGO-134580 test additions to one-line summaries; drop the two unused imports that the trimmed Javadoc had been carrying. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Apply IT review feedback (PR #141) - Rename test methods to repo's testCamelCase convention. - Hoist BrokerConfigurator, vpnName, original spool quota, and cleanup of producer binding / queue deprovision / spool restore into @beforeeach + @AfterEach so each test focuses on the scenario under test. - Extract createPersistentProducerChannel helper for the shared binder.bindProducer setup. - Drop the two SEMP-polling helpers (awaitVpnMaxMsgSpoolUsage, awaitQueueIngressEgress); SEMPv2 calls are synchronous so the toggles are committed by the time the call returns. Keep awaitBrokerSempResponsive for the CLI shutdown path where the broker management surface itself stutters. - Collapse BrokerConfiguratorBuilder's disable/restore spool pair into setMsgVpnSpool, and add getMsgVpnSpool accessor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Rename new unit tests to testCamelCase (PR #141) Align the six DATAGO-134580 tests in JCSMPOutboundMessageHandlerTest with the repo's dominant testCamelCase convention. Pre-existing test_xxx_yyy methods in the same file are not part of this PR and are left alone. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Apply PR review feedback and harden CLI-shutdown ITs Outbound handler / error-queue (PR #475 review): - Drop orphan section comment above ErrorQueueInfrastructure.send; the recreate logic in the catch arm is self-documenting. - Rename recreateLock -> lifecycleLock and take it inside start(), stop(), and recreateProducerIfNeeded() so start/stop no longer race the recreate path. - Collapse recreateProducerIfNeeded body to closeResources() + producerManager.get(id) + createProducerInternal(). Keep the recreateProducer flag re-armed on failure so the next-message retry contract still holds. - Demote the hot-path "Detected stale ..." / "Recreating ..." logs from warn to debug in both the outbound handler and the error queue path to avoid flooding user logs on the message path. Integration test robustness: - findSolaceContainerId now matches the broker container by the SMF host port the JCSMP session is connected to, parsed from JCSMPProperties.HOST. Filtering by image name alone was unsafe with Ryuk disabled or with leftover containers across runs, which silently sent CLI commands to the wrong broker and made the recovery test pass vacuously. - After 'no shutdown', wait on session.isCapable(PUB_GUARANTEED) via Awaitility before the recovery publish. SEMP being responsive is not a proxy for JCSMP's cached capability set, which the per-binding createProducer call checks; without this wait the publish races the capability refresh and intermittently fails with "Router does not support guaranteed publisher flows". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Fold get/cleanup into createProducerInternal (PR #475) Apply latest Nephery review feedback on the outbound handler: - Move producerManager.get(id) into createProducerInternal so callers no longer have to remember the prelude. - Move the catch (with closeResources + RuntimeException wrap) into createProducerInternal so cleanup occurs whenever creation fails, regardless of which caller invoked it. - Add defensive synchronized(lifecycleLock) inside createProducerInternal and closeResources to keep them safe if future callers don't already hold the lock (intrinsic locks are reentrant, so current call sites are unaffected). - Drop now-unreachable null checks on the producer field in the handleMessage catch arm and recreateProducerIfNeeded pre-checks; once the isRunning gate has passed, producer is guaranteed non-null and is never re-nulled. The null check in closeResources stays because it can be called from start()'s catch where producer is still unset. - Knock-on: start() outer try/catch now only wraps the header check; recreateProducerIfNeeded no longer needs the inline get(id). - One unit test updated: the underlying JCSMPException is now the ROOT cause (wrapped once by createProducerInternal's RuntimeException), so testProducerRecreationFailurePropagatesAndRetriesNext switches hasCauseInstanceOf -> hasRootCauseInstanceOf. All 75 outbound-handler unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Collapse stale-flow tests + add transacted Cartesians (PR #475) Apply Nephery review feedback and round out test coverage: - Merge testProducerRecreatedAfterClosedFacilityException and testProducerRecreatedAfterJCSMPTransportException into testProducerRecreatedAfterUnsolicitedCloseFlow as a Cartesian parameter (transacted x exceptionType). 4 tests -> 6 instances, same coverage, less duplication. - Drop the class-level Javadoc on ErrorQueueInfrastructureTest; test names are self-documenting. - Add `transacted` as a Cartesian dimension to the three remaining DATAGO-134580 outbound tests that previously only exercised the non-transacted path: * testProducerRecreatedProactivelyWhenIsClosedDetectedBeforeSend * testProducerRecreationFailurePropagatesAndRetriesNext * testRecreationFlagResetAcrossStopStartCycle This catches transacted-specific regressions in the proactive pre-check, the recreate-failure-then-retry path, and the stop/start flag-reset semantic. All 80 JCSMPOutboundMessageHandlerTest cases and 6 ErrorQueueInfrastructureTest cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * DATAGO-134580: Expose JCSMP cause on producer-recovery failure Carries over the equivalent fixes that landed on the 4.11.1 backport branch (PR #483 squashed into 383af5c on stage-4.11.1, including review-cleanup commits 9f340b1 and 09059d1): 1. Move recreateProducerIfNeeded(correlationKey) from immediately after the isRunning() guard to just before the send-iteration try block. Same structural placement as the backport, so future cross-branch work stays mechanical. The rawMessages re-anchoring rationale that prompted the move on 4.11.1 doesn't apply here (master removed SourceData via #459), but keeping the shape identical avoids drift. 2. In recreateProducerIfNeeded()'s catch, narrow to RuntimeException and unwrap createProducerInternal()'s wrapper inline so the underlying JCSMPException (typically ClosedFacilityException) reaches the error channel as the direct .cause(). start()'s Lifecycle wrapping is unchanged. 3. Simplify testProducerErrorChannel's transacted/non-transacted suppressed assertion to a direct .hasNoSuppressedExceptions() on the cause chain. The new pre-send recovery path closes the transacted session cleanly via closeResources() instead of attempting a rollback on a known-dead session, so there is no longer a rollback failure to capture as a suppressed exception. The mid-flight catch-block rollback path is unaffected. Test results: testProducerErrorChannel 0/8 -> 8/8 passing; JCSMPProducerCloseFlowRecoveryIT remains 5/5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: sunil-solace <sunil.singh@solace.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Sunil Singh <102377028+sunil-solace@users.noreply.github.com>
1 parent 1d43e63 commit 0106ea8

9 files changed

Lines changed: 979 additions & 52 deletions

File tree

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,7 @@
77
**/.flattened-pom.xml
88

99
# Release files
10-
release.properties
10+
release.properties
11+
12+
# Local toolchain / IDE files
13+
.java-version

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager.CloudStreamEventHandler;
1515
import com.solace.spring.cloud.stream.binder.util.StaticMessageHeaderMapAccessor;
1616
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
17+
import com.solacesystems.jcsmp.ClosedFacilityException;
1718
import com.solacesystems.jcsmp.Destination;
1819
import com.solacesystems.jcsmp.JCSMPException;
1920
import com.solacesystems.jcsmp.JCSMPFactory;
2021
import com.solacesystems.jcsmp.JCSMPSession;
2122
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
23+
import com.solacesystems.jcsmp.JCSMPTransportException;
24+
import com.solacesystems.jcsmp.StaleSessionException;
2225
import com.solacesystems.jcsmp.Topic;
2326
import com.solacesystems.jcsmp.XMLMessage;
2427
import com.solacesystems.jcsmp.XMLMessageProducer;
@@ -64,6 +67,10 @@ public final class JCSMPOutboundMessageHandler implements MessageHandler, Lifecy
6467
private boolean isRunning = false;
6568
private ErrorMessageStrategy errorMessageStrategy;
6669

70+
// DATAGO-134580: recreate JCSMP producer on unsolicited termination from Solace broker.
71+
private volatile boolean recreateProducer = false;
72+
private final Object lifecycleLock = new Object();
73+
6774
private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class);
6875

6976
public JCSMPOutboundMessageHandler(ProducerDestination destination,
@@ -141,6 +148,8 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
141148
dynamicDestinations = Collections.singletonList(getDynamicDestination(message.getHeaders(), correlationKey));
142149
}
143150

151+
recreateProducerIfNeeded(correlationKey);
152+
144153
try {
145154
for (int i = 0; i < smfMessages.size(); i++) {
146155
XMLMessage smfMessage = smfMessages.get(i);
@@ -163,6 +172,18 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
163172
producerEventHandler.responseReceivedEx(correlationKey);
164173
}
165174
} catch (JCSMPException e) {
175+
if (e instanceof StaleSessionException
176+
|| e instanceof JCSMPTransportException
177+
|| e instanceof ClosedFacilityException
178+
|| producer.isClosed()) {
179+
if (!recreateProducer) {
180+
LOGGER.debug("Detected stale JCSMP producer for binding {} (cause: {}); will " +
181+
"recreate on next message <message handler ID: {}>",
182+
properties.getBindingName(), e.getClass().getSimpleName(), id);
183+
}
184+
recreateProducer = true;
185+
}
186+
166187
if (transactedSession != null) {
167188
try {
168189
if (!(e instanceof RollbackException)) {
@@ -227,62 +248,106 @@ private Destination getDynamicDestination(Map<String, Object> headers, ErrorChan
227248
@Override
228249
public void start() {
229250
LOGGER.info("Creating producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
230-
if (isRunning()) {
231-
LOGGER.warn("Nothing to do, message handler {} is already running", id);
232-
return;
251+
synchronized (lifecycleLock) {
252+
if (isRunning()) {
253+
LOGGER.warn("Nothing to do, message handler {} is already running", id);
254+
return;
255+
}
256+
recreateProducer = false;
257+
258+
try {
259+
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
260+
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
261+
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
262+
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
263+
IllegalArgumentException exception = new IllegalArgumentException(String.format(
264+
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
265+
properties.getExtension().getHeaderNameMapping(), id));
266+
LOGGER.warn(exception.getMessage());
267+
throw exception;
268+
}
269+
}
270+
} catch (Exception e) {
271+
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
272+
LOGGER.warn(msg, e);
273+
throw new RuntimeException(msg, e);
274+
}
275+
276+
createProducerInternal();
277+
isRunning = true;
233278
}
279+
}
234280

235-
try {
236-
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
237-
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
238-
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
239-
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
240-
IllegalArgumentException exception = new IllegalArgumentException(String.format(
241-
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
242-
properties.getExtension().getHeaderNameMapping(), id));
243-
LOGGER.warn(exception.getMessage());
244-
throw exception;
281+
private void createProducerInternal() {
282+
synchronized (lifecycleLock) {
283+
try {
284+
producerManager.get(id);
285+
if (properties.getExtension().isTransacted()) {
286+
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
287+
transactedSession = jcsmpSession.createTransactedSession();
288+
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
289+
producerEventHandler);
290+
} else {
291+
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
292+
producerEventHandler);
245293
}
294+
} catch (Exception e) {
295+
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
296+
LOGGER.warn(msg, e);
297+
closeResources();
298+
throw new RuntimeException(msg, e);
246299
}
300+
}
301+
}
247302

248-
producerManager.get(id);
249-
if (properties.getExtension().isTransacted()) {
250-
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
251-
transactedSession = jcsmpSession.createTransactedSession();
252-
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
253-
producerEventHandler);
254-
} else {
255-
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
256-
producerEventHandler);
303+
private void recreateProducerIfNeeded(ErrorChannelSendingCorrelationKey correlationKey) throws MessagingException {
304+
if (!recreateProducer && !producer.isClosed()) {
305+
return;
306+
}
307+
synchronized (lifecycleLock) {
308+
if (!recreateProducer && !producer.isClosed()) {
309+
return;
257310
}
258-
} catch (Exception e) {
259-
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
260-
LOGGER.warn(msg, e);
311+
LOGGER.debug("Recreating JCSMP producer for binding {} after stale-flow detection <message handler ID: {}>",
312+
properties.getBindingName(), id);
261313
closeResources();
262-
throw new RuntimeException(msg, e);
314+
try {
315+
createProducerInternal();
316+
recreateProducer = false;
317+
} catch (RuntimeException createError) {
318+
recreateProducer = true;
319+
// unwrap createProducerInternal()'s wrapper exception if necessary
320+
Exception toReport = (createError.getCause() instanceof Exception unwrapped)
321+
? unwrapped : createError;
322+
throw handleMessagingException(correlationKey,
323+
"Failed to recreate JCSMP producer after stale-flow detection", toReport);
324+
}
263325
}
264-
265-
isRunning = true;
266326
}
267327

268328
@Override
269329
public void stop() {
270-
if (!isRunning()) return;
271-
closeResources();
272-
isRunning = false;
330+
synchronized (lifecycleLock) {
331+
if (!isRunning()) return;
332+
closeResources();
333+
isRunning = false;
334+
}
273335
}
274336

275337
private void closeResources() {
276-
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
277-
if (producer != null) {
278-
LOGGER.info("Closing producer <message handler ID: {}>", id);
279-
producer.close();
280-
}
281-
if (transactedSession != null) {
282-
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
283-
transactedSession.close();
338+
synchronized (lifecycleLock) {
339+
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
340+
recreateProducer = false;
341+
if (producer != null) {
342+
LOGGER.info("Closing producer <message handler ID: {}>", id);
343+
producer.close();
344+
}
345+
if (transactedSession != null) {
346+
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
347+
transactedSession.close();
348+
}
349+
producerManager.release(id);
284350
}
285-
producerManager.release(id);
286351
}
287352

288353
@Override

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/ErrorQueueInfrastructure.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package com.solace.spring.cloud.stream.binder.util;
22

33
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
4+
import com.solacesystems.jcsmp.ClosedFacilityException;
45
import com.solacesystems.jcsmp.JCSMPException;
56
import com.solacesystems.jcsmp.JCSMPFactory;
7+
import com.solacesystems.jcsmp.JCSMPTransportException;
68
import com.solacesystems.jcsmp.Queue;
9+
import com.solacesystems.jcsmp.StaleSessionException;
710
import com.solacesystems.jcsmp.XMLMessage;
811
import com.solacesystems.jcsmp.XMLMessageProducer;
912
import org.slf4j.Logger;
@@ -34,6 +37,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
3437
XMLMessageProducer producer;
3538
try {
3639
producer = producerManager.get(producerKey);
40+
if (producer.isClosed()) {
41+
LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating",
42+
errorQueueName);
43+
producer = producerManager.forceRecreate(producer);
44+
}
3745
} catch (Exception e) {
3846
MessagingException wrappedException = new MessagingException(
3947
String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(),
@@ -42,7 +50,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
4250
throw wrappedException;
4351
}
4452

45-
producer.send(xmlMessage, queue);
53+
try {
54+
producer.send(xmlMessage, queue);
55+
} catch (JCSMPException e) {
56+
if (e instanceof StaleSessionException
57+
|| e instanceof JCSMPTransportException
58+
|| e instanceof ClosedFacilityException
59+
|| producer.isClosed()) {
60+
LOGGER.debug("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " +
61+
"recreating for next attempt",
62+
errorQueueName, e.getClass().getSimpleName());
63+
try {
64+
producerManager.forceRecreate(producer);
65+
} catch (Exception recreateError) {
66+
LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError);
67+
e.addSuppressed(recreateError);
68+
}
69+
}
70+
throw e;
71+
}
4672
}
4773

4874
public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer,

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SharedResourceManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,32 @@ public T get(String key) throws Exception {
4343
return sharedResource;
4444
}
4545

46+
/**
47+
* Compare-and-swap the shared resource. If the manager still holds {@code expected},
48+
* close it and {@link #create()} a fresh one; otherwise return the currently-installed
49+
* resource without re-creating.
50+
*
51+
* @param expected the reference the caller observed and considers no longer usable
52+
* @return the resource currently installed in the manager
53+
* @throws Exception whatever {@link #create()} may throw
54+
*/
55+
public T forceRecreate(T expected) throws Exception {
56+
synchronized (lock) {
57+
if (sharedResource != expected) {
58+
return sharedResource;
59+
}
60+
if (sharedResource != null) {
61+
try {
62+
close();
63+
} catch (Exception e) {
64+
LOGGER.debug("Failed to close current {} during forceRecreate", type, e);
65+
}
66+
}
67+
sharedResource = create();
68+
return sharedResource;
69+
}
70+
}
71+
4672
/**
4773
* De-register {@code key} from the shared resource.
4874
* <p>If this is the last {@code key} associated to the shared resource, {@link #close()} the resource.

0 commit comments

Comments
 (0)