DATAGO-134580: Recreate JCSMP producer on unsolicited CloseFlow#141
DATAGO-134580: Recreate JCSMP producer on unsolicited CloseFlow#141sunil-solace wants to merge 14 commits into
Conversation
When the broker fans out an unsolicited CloseFlow on a publisher flow (message-spool maintenance, DR failover, "503: Service Unavailable" on GD), JCSMP terminally closes the per-binding XMLMessageProducer. The JCSMP session stays connected, but every subsequent producer.send(...) throws StaleSessionException until the application is restarted - customer-reported in DATAGO-132760. JCSMPOutboundMessageHandler now detects StaleSessionException and ClosedFacilityException in the send-path catch block, arms a volatile producerNeedsRecreation flag, and at the top of the next handleMessage rebuilds the producer (and the TransactedSession when configured) under a double-checked-lock before publishing. Recreation failures are routed through the existing error channel and leave the flag armed so the next inbound message retries. The flag is reset on start() and closeResources() to keep stop/start lifecycles clean. Tests: - Unit (JCSMPOutboundMessageHandlerTest): parameterized stale-recovery across plain and transacted sessions, ClosedFacilityException variant, recreation-failure-then-retry, and lifecycle-driven flag reset. - Broker IT (JCSMPProducerCloseFlowRecoveryIT, new): five scenarios against PubSubPlusExtension - three controls that document broker disruptions which do NOT reproduce the bug (spool-quota toggle on persistent topic, direct topic, queue ingress/egress toggle), the customer-reported reproducer driven via the broker's CLI (hardware message-spool shutdown over docker exec with TTY for confirmation prompts), and a repeated-cycles variant that proves the recreate path resets cleanly across consecutive stale-flow events on the same binding.
There was a problem hiding this comment.
Pull request overview
Addresses DATAGO-134580 by making the outbound binder resilient to broker-driven unsolicited CloseFlow events that leave the per-binding XMLMessageProducer terminally closed (leading to repeated StaleSessionException until restart). The change adds lazy producer (and transacted-session) recreation on the next publish after stale detection, and introduces unit + broker integration tests to characterize and prevent regressions.
Changes:
- Add stale-producer detection in
JCSMPOutboundMessageHandlerand recreate the producer (andTransactedSessionwhen configured) on the nexthandleMessagecall using a double-checked lock. - Add unit tests covering stale/closed recovery, recreation failure + retry behavior, and lifecycle-driven flag reset.
- Add a new broker IT suite that documents negative controls and reproduces the customer scenario via broker CLI-driven message-spool shutdown/restore.
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandler.java | Implements stale detection and lazy producer/session recreation on subsequent messages. |
| solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/outbound/JCSMPOutboundMessageHandlerTest.java | Adds unit coverage for stale/closed recovery, recreation failure retries, and lifecycle reset. |
| solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/JCSMPProducerCloseFlowRecoveryIT.java | New broker integration tests (controls + reproducer) using docker-exec CLI to trigger CloseFlow conditions. |
| solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/it/util/semp/config/BrokerConfiguratorBuilder.java | Adds VPN spool quota toggle helpers for ITs. |
| .gitignore | Ignores local .java-version file. |
Comments suppressed due to low confidence (1)
solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/JCSMPProducerCloseFlowRecoveryIT.java:538
- Same issue as the single-cycle test: the
shutdownCLI sequence is only followed by one"y", but the comment earlier indicates two confirmation prompts for message-spool shutdown. If two prompts are indeed present, this loop will hang or fail to actually shut down the spool on every cycle. Consider sending twoyresponses (or updating the earlier comment if only one prompt is expected).
runSolaceCliCommands(solaceContainerId,
"enable",
"configure",
"hardware",
"message-spool",
"shutdown",
"y");
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "configure", | ||
| "hardware", | ||
| "message-spool", | ||
| "shutdown", |
Five round-2 fixes from PR #141 review (Copilot): 1. Broaden the recreate trigger in JCSMPOutboundMessageHandler to also arm the producerNeedsRecreation flag on JCSMPTransportException. The broker IT explicitly documents that an unsolicited CloseFlow can surface on send(...) as either StaleSessionException (the typical form) or the raw JCSMPTransportException (when the CloseFlow event reaches send() before JCSMP's stale-marker has propagated); both are now treated as recreation triggers. New unit test test_producerRecreatedAfterJCSMPTransportException covers the new arm. 2. Fix BrokerConfiguratorBuilder.disableMsgSpoolForVpn Javadoc which incorrectly claimed zeroing maxMsgSpoolUsage tears down already-bound publisher / consumer flows. The IT documents the opposite: the broker keeps existing flows alive and only NACKs new GD publishes. Javadoc now matches the empirical behaviour and points at IT test 1. 3. JCSMPProducerCloseFlowRecoveryIT comment-vs-code mismatch: the "TWO sequential confirmation prompts" comment was inaccurate - the `shutdown` command commits after a single `y` (the second prompt visible in the broker's TTY output is decorative; the broker accepts our `y` on the first answerable prompt). Comment realigned to a single confirmation. 4. runSolaceCliCommands now checks the boolean from awaitCompletion(...) and throws an IllegalStateException with the full script and captured stdout/stderr if the docker exec does not complete in time, instead of silently letting downstream assertions fail with misleading timeouts. The Frame callback was extended to buffer the CLI output for inclusion in the diagnostic. 5. CLI script terminator extended from `end\nexit\n` to `end\nexit\nexit\n`. The single `exit` only popped from privileged-exec `#` to user-exec `>` and then sat there waiting for input; cli -A never terminated and awaitCompletion returned false. The second `exit` closes the user-exec session so the cli -A process actually exits and awaitCompletion can observe it. Without this, the new defensive throw in (4) would fire on every CLI invocation and turn the IT into a hard failure. All 411 binder-core unit tests remain green; all 5 broker IT scenarios turn green again with these changes.
| boolean completed = docker.execStartCmd(exec.getId()) | ||
| .withTty(true) | ||
| .withStdIn(stdin) | ||
| .exec(callback) | ||
| .awaitCompletion(30, TimeUnit.SECONDS); | ||
|
|
||
| if (!completed) { | ||
| throw new IllegalStateException(String.format( | ||
| "Solace CLI exec did not complete within 30s in container '%s'. " + | ||
| "Script:%n%s%nCaptured output:%n%s", | ||
| containerId, script, capturedOutput)); |
Five changes from mayur-solace's PR #141 review, all in JCSMPOutboundMessageHandler: - M1 / producer.isClosed() defensive check: stale-detection in the send-path catch now also arms the recreation flag when producer.isClosed() returns true. Guards against any future JCSMP exception subclass we don't enumerate explicitly - if JCSMP has marked the producer closed, recreation is always the right move. - M2 / single-line if braces: all single-line if guards in recreateProducerIfNeeded() are now braced for consistency with house style. - M3 / field rename: producerNeedsRecreation -> recreateProducer. Test Javadocs and assertion messages updated to match. - M4 / catch-block ordering: stale-detection moved to the top of the catch in handleMessage(), before the transactional rollback handling. Behaviour is unchanged (the flag is consumed by the next handleMessage, not by anything in this catch) but the structure now reads as "establish facts about the producer first, then handle transactional bookkeeping". - M5 / comment cleanup: stripped the verbose DATAGO-134580 comments and the Javadocs I added on createProducerInternal and recreateProducerIfNeeded. Method names are self-documenting; the ticket reference now lives on a single line above the field declaration, matching the reviewer's suggestion. 411 binder-core unit tests remain green (no behavioural change from M1-M5; only one new code path is M1's isClosed() OR-arm).
Extend the recreate-on-stale guard with a proactive producer.isClosed() pre-check at the top of handleMessage(...). The reactive (catch-block) detection stays as-is and continues to cover the race window where the broker tears down the producer between our pre-check and the actual send(...) call, plus any future JCSMP exception subclass we don't enumerate. Customer-visible effect: the first publish after the broker fans out an unsolicited CloseFlow now succeeds rather than being surfaced to the error channel. Previously the reactive-only path always lost that first message - the catch armed the flag and the next message recovered, but the in-flight message that hit the dead producer always failed. Unit test test_producerRecreatedProactivelyWhenIsClosedDetectedBeforeSend mocks a producer that reports isClosed() == true on the first handleMessage, with no exception thrown by send, and verifies: - recreation happens (createProducer called twice: once at start, once proactively) - the fresh producer services the publish - the closed original is never sent through (Mockito.never()) - the closed original is closed during the recreate Integration test updates: the bug-witness assertions in tests 4 and 5 of JCSMPProducerCloseFlowRecoveryIT previously expected the first post-shutdown publish to throw a JCSMP-rooted MessagingException - that was the master-branch failure mode they documented. With the proactive check, the handler intercepts the dead producer before send(...) is ever called, so the first publish now succeeds transparently. The bug- witness phase is replaced with a doesNotThrowAnyException assertion on the first publish, plus a steady-state assertion on a follow-up publish. The new shape is strictly stricter than the old: a regression that removed the proactive check would fail because reactive-only recovery surfaces an exception on that first attempt; a regression that removed both halves would fail outright; a regression on cycle 2+ in test 5 would fail on the second cycle. Test-class Javadoc, per-test headers, and the test list have all been updated to drop the "reproducer"/"bug witness" framing - the bug doesn't manifest as a publish failure anymore, the IT now characterises the recovery contract instead. 411 binder-core unit tests still green (75 in JCSMPOutboundMessageHandlerTest, +1 from previous commit for the new proactive test).
The reactive + proactive recreate-on-stale logic added in PR #141 (commits 931f09c..134e7ef) protects each binding's per-binding XMLMessageProducer in JCSMPOutboundMessageHandler. The error-queue republish path in ErrorQueueInfrastructure has the same exposure but on a different producer: it borrows the session-default producer from JCSMPSessionProducerManager via producerManager.get(producerKey) and historically had no recovery logic when the broker tore that producer down via unsolicited CloseFlow. Failure mode without this fix: when the broker fans out CloseFlow (message-spool maintenance, DR failover, "503: Service Unavailable"), the shared session-default producer is marked closed by JCSMP. Every subsequent error-queue republish in ErrorQueueInfrastructure.send() throws StaleSessionException / JCSMPTransportException / ClosedFacilityException; ErrorQueueRepublishCorrelationKey.handleError() catches at the message-retry level and re-attempts up to maxDeliveryAttempts - all attempts re-using the same dead producer reference, all doomed to fail. After max attempts the message is re-queued onto the original consumer queue, the consumer redelivers it, fails again, hits the error-queue path again, fails again. Net effect: failed-consumer messages disappear from the system after a DR failover or spool maintenance event. The fix mirrors the outbound-handler approach: - Proactive: at the top of send(), after producerManager.get(...), check producer.isClosed(). If true, call the new producerManager.forceRecreate() to rebuild the shared producer before send is attempted. - Reactive: wrap producer.send(...) in a try-catch. On StaleSessionException, JCSMPTransportException, ClosedFacilityException, or post-failure producer.isClosed(), call forceRecreate() so the next ErrorQueueRepublishCorrelationKey retry-loop iteration picks up a fresh producer. The original exception still propagates so the retry caller can do its errorQueueDeliveryAttempt++ bookkeeping. The shared producer is reference-counted across the entire session (JCSMPOutboundMessageHandler also registers itself for ref-count purposes even though it uses its own per-binding producer for sends). release() + get() does NOT work as a recovery primitive in production because it only closes the resource when registeredIds.size() <= 1 - in any deployment with at least one outbound binding, the ref-count stays > 1 and release() leaves the dead resource in place. The new forceRecreate() in SharedResourceManager sidesteps the ref-count: it unconditionally closes the current resource and create()s a new one under the existing lock, leaving registrations intact so every already-registered caller picks up the fresh resource on their next get(). Added as a generic method on SharedResourceManager since the recovery contract is independent of the JCSMP specifics. Tests (ErrorQueueInfrastructureTest, new): - test_errorQueueProducerRecreatedProactivelyOnIsClosed: closed producer detected before send -> forceRecreate -> fresh producer services the publish; stale producer never sent through (Mockito.never()). - test_errorQueueProducerRecreatedReactivelyOnStaleSendException: @CartesianTest over Stale / JCSMPTransport / ClosedFacility - verifies all three exception types trigger forceRecreate AND propagate to the retry caller (so handleError can drive its loop). - test_errorQueueProducerNotRecreatedOnUnrelatedJCSMPException: negative control - a non-stale JCSMPException (e.g. malformed message) propagates normally and does NOT churn the shared producer, guarding against an over-broad reactive arm. 417 binder-core unit tests green (was 411 + 6 new from this commit). This branch is layered on DATAGO-134580 (PR #141) so the new SharedResourceManager.forceRecreate() and the ErrorQueueInfrastructure changes can be reviewed alongside the related outbound-handler work.
Three PR #142 review items: C1 (Copilot) + C3 (mayur-solace) - race in forceRecreate(). The original unconditional implementation could have two callers both observe the same stale shared resource, both enter forceRecreate(), and have the second caller close a healthy replacement that the first caller just installed. Fix: compare-and-swap. forceRecreate now takes an `expected` argument - the reference the caller observed. Under the lock, the manager recreates only if `sharedResource == expected`; otherwise it returns whatever a concurrent caller already installed without closing or re-creating anything. The caller-visible contract is now: pass what you observed, use what's returned. C2 (mayur-solace) - Javadoc on SharedResourceManager.forceRecreate referenced the broker / CloseFlow concern specifically. Since SharedResourceManager is generic and could host non-broker resources in the future, the docs are rewritten to describe the CAS contract generically without naming the JCSMP/broker context. ErrorQueueInfrastructure.send() updated at both call sites to pass the observed producer reference and use the value returned by forceRecreate (which may be the fresh one we requested, or the already-installed replacement another caller put in place). New unit test testErrorQueueProducerUsesManagerReturnedReferenceAfterForceRecreate simulates the exact race C1 flagged: stale producer observed, manager's CAS returns an already-installed replacement, send must use the replacement rather than the locally-observed stale one. Existing tests updated to pass the observed reference and verify CAS arguments. Also aligned the test method names to drop the test_ underscore form, matching the no-underscore convention used elsewhere in the binder-core test suite (e.g. SolaceErrorMessageHandlerTest). 418 binder-core unit tests green (was 417 + 1 new CAS-race test).
Per PR #142 follow-up: the previous Javadoc (24 lines, two paragraphs of explanation) was verbose for an IDE hover. Reduced to a single sentence describing the CAS contract plus the standard param/return/throws.
Collapse multi-paragraph Javadoc and inline narrative on the DATAGO-134580 test additions to one-line summaries; drop the two unused imports that the trimmed Javadoc had been carrying. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Rename test methods to repo's testCamelCase convention. - Hoist BrokerConfigurator, vpnName, original spool quota, and cleanup of producer binding / queue deprovision / spool restore into @beforeeach + @AfterEach so each test focuses on the scenario under test. - Extract createPersistentProducerChannel helper for the shared binder.bindProducer setup. - Drop the two SEMP-polling helpers (awaitVpnMaxMsgSpoolUsage, awaitQueueIngressEgress); SEMPv2 calls are synchronous so the toggles are committed by the time the call returns. Keep awaitBrokerSempResponsive for the CLI shutdown path where the broker management surface itself stutters. - Collapse BrokerConfiguratorBuilder's disable/restore spool pair into setMsgVpnSpool, and add getMsgVpnSpool accessor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DATAGO-134580: Recover error-queue producer from unsolicited CloseFlow
Align the six DATAGO-134580 tests in JCSMPOutboundMessageHandlerTest with the repo's dominant testCamelCase convention. Pre-existing test_xxx_yyy methods in the same file are not part of this PR and are left alone. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
@sunil-solace close this PR. and change the upstream to |
@Nephery this branch has 2 PRs one into SolaceDev for our internal review and another one SolaceProducts#475 which is into the SolaceProducts. So, we will eventually merge the 475 PR and this is only to make and address the PR comments. |
Outbound handler / error-queue (PR SolaceProducts#475 review): - Drop orphan section comment above ErrorQueueInfrastructure.send; the recreate logic in the catch arm is self-documenting. - Rename recreateLock -> lifecycleLock and take it inside start(), stop(), and recreateProducerIfNeeded() so start/stop no longer race the recreate path. - Collapse recreateProducerIfNeeded body to closeResources() + producerManager.get(id) + createProducerInternal(). Keep the recreateProducer flag re-armed on failure so the next-message retry contract still holds. - Demote the hot-path "Detected stale ..." / "Recreating ..." logs from warn to debug in both the outbound handler and the error queue path to avoid flooding user logs on the message path. Integration test robustness: - findSolaceContainerId now matches the broker container by the SMF host port the JCSMP session is connected to, parsed from JCSMPProperties.HOST. Filtering by image name alone was unsafe with Ryuk disabled or with leftover containers across runs, which silently sent CLI commands to the wrong broker and made the recovery test pass vacuously. - After 'no shutdown', wait on session.isCapable(PUB_GUARANTEED) via Awaitility before the recovery publish. SEMP being responsive is not a proxy for JCSMP's cached capability set, which the per-binding createProducer call checks; without this wait the publish races the capability refresh and intermittently fails with "Router does not support guaranteed publisher flows". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eProducts#475) Apply latest Nephery review feedback on the outbound handler: - Move producerManager.get(id) into createProducerInternal so callers no longer have to remember the prelude. - Move the catch (with closeResources + RuntimeException wrap) into createProducerInternal so cleanup occurs whenever creation fails, regardless of which caller invoked it. - Add defensive synchronized(lifecycleLock) inside createProducerInternal and closeResources to keep them safe if future callers don't already hold the lock (intrinsic locks are reentrant, so current call sites are unaffected). - Drop now-unreachable null checks on the producer field in the handleMessage catch arm and recreateProducerIfNeeded pre-checks; once the isRunning gate has passed, producer is guaranteed non-null and is never re-nulled. The null check in closeResources stays because it can be called from start()'s catch where producer is still unset. - Knock-on: start() outer try/catch now only wraps the header check; recreateProducerIfNeeded no longer needs the inline get(id). - One unit test updated: the underlying JCSMPException is now the ROOT cause (wrapped once by createProducerInternal's RuntimeException), so testProducerRecreationFailurePropagatesAndRetriesNext switches hasCauseInstanceOf -> hasRootCauseInstanceOf. All 75 outbound-handler unit tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…(PR SolaceProducts#475) Apply Nephery review feedback and round out test coverage: - Merge testProducerRecreatedAfterClosedFacilityException and testProducerRecreatedAfterJCSMPTransportException into testProducerRecreatedAfterUnsolicitedCloseFlow as a Cartesian parameter (transacted x exceptionType). 4 tests -> 6 instances, same coverage, less duplication. - Drop the class-level Javadoc on ErrorQueueInfrastructureTest; test names are self-documenting. - Add `transacted` as a Cartesian dimension to the three remaining DATAGO-134580 outbound tests that previously only exercised the non-transacted path: * testProducerRecreatedProactivelyWhenIsClosedDetectedBeforeSend * testProducerRecreationFailurePropagatesAndRetriesNext * testRecreationFlagResetAcrossStopStartCycle This catches transacted-specific regressions in the proactive pre-check, the recreate-failure-then-retry path, and the stop/start flag-reset semantic. All 80 JCSMPOutboundMessageHandlerTest cases and 6 ErrorQueueInfrastructureTest cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the broker fans out an unsolicited CloseFlow on a publisher flow (message-spool maintenance, DR failover, "503: Service Unavailable" on GD), JCSMP terminally closes the per-binding XMLMessageProducer. The JCSMP session stays connected, but every subsequent producer.send(...) throws StaleSessionException until the application is restarted - customer-reported in DATAGO-132760.
JCSMPOutboundMessageHandler now detects StaleSessionException and ClosedFacilityException in the send-path catch block, arms a volatile producerNeedsRecreation flag, and at the top of the next handleMessage rebuilds the producer (and the TransactedSession when configured) under a double-checked-lock before publishing. Recreation failures are routed through the existing error channel and leave the flag armed so the next inbound message retries. The flag is reset on start() and closeResources() to keep stop/start lifecycles clean.
Tests: