Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@
**/.flattened-pom.xml

# Release files
release.properties
release.properties

# Local toolchain / IDE files
.java-version
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager.CloudStreamEventHandler;
import com.solace.spring.cloud.stream.binder.util.StaticMessageHeaderMapAccessor;
import com.solace.spring.cloud.stream.binder.util.XMLMessageMapper;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.Destination;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.StaleSessionException;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
Expand Down Expand Up @@ -64,6 +67,10 @@ public final class JCSMPOutboundMessageHandler implements MessageHandler, Lifecy
private boolean isRunning = false;
private ErrorMessageStrategy errorMessageStrategy;

// DATAGO-134580: recreate JCSMP producer on unsolicited termination from Solace broker.
private volatile boolean recreateProducer = false;
private final Object lifecycleLock = new Object();

private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPOutboundMessageHandler.class);

public JCSMPOutboundMessageHandler(ProducerDestination destination,
Expand Down Expand Up @@ -95,6 +102,8 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
throw handleMessagingException(correlationKey, msg0, new ClosedChannelBindingException(msg1));
}

recreateProducerIfNeeded(correlationKey);

try {
CorrelationData correlationData = message.getHeaders()
.get(SolaceBinderHeaders.CONFIRM_CORRELATION, CorrelationData.class);
Expand Down Expand Up @@ -163,6 +172,18 @@ public void handleMessage(@NonNull Message<?> message) throws MessagingException
producerEventHandler.responseReceivedEx(correlationKey);
}
} catch (JCSMPException e) {
if (e instanceof StaleSessionException
|| e instanceof JCSMPTransportException
|| e instanceof ClosedFacilityException
|| producer.isClosed()) {
if (!recreateProducer) {
LOGGER.debug("Detected stale JCSMP producer for binding {} (cause: {}); will " +
"recreate on next message <message handler ID: {}>",
properties.getBindingName(), e.getClass().getSimpleName(), id);
}
recreateProducer = true;
Comment thread
Nephery marked this conversation as resolved.
}

if (transactedSession != null) {
try {
if (!(e instanceof RollbackException)) {
Expand Down Expand Up @@ -227,62 +248,103 @@ private Destination getDynamicDestination(Map<String, Object> headers, ErrorChan
@Override
public void start() {
LOGGER.info("Creating producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (isRunning()) {
LOGGER.warn("Nothing to do, message handler {} is already running", id);
return;
synchronized (lifecycleLock) {
if (isRunning()) {
LOGGER.warn("Nothing to do, message handler {} is already running", id);
return;
}
recreateProducer = false;

try {
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
properties.getExtension().getHeaderNameMapping(), id));
LOGGER.warn(exception.getMessage());
throw exception;
}
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
throw new RuntimeException(msg, e);
}
Comment thread
Nephery marked this conversation as resolved.

createProducerInternal();
isRunning = true;
}
}

try {
Map<String, String> headerNameMapping = properties.getExtension().getHeaderNameMapping();
if (headerNameMapping != null && !headerNameMapping.isEmpty()) {
Set<String> uniqueTargetHeaderNames = new HashSet<>(headerNameMapping.values());
if (uniqueTargetHeaderNames.size() < headerNameMapping.size()) {
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Two or more headers map to the same header name in headerNameMapping %s <outbound adapter %s>",
properties.getExtension().getHeaderNameMapping(), id));
LOGGER.warn(exception.getMessage());
throw exception;
private void createProducerInternal() {
synchronized (lifecycleLock) {
try {
producerManager.get(id);
if (properties.getExtension().isTransacted()) {
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
transactedSession = jcsmpSession.createTransactedSession();
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
} else {
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
closeResources();
throw new RuntimeException(msg, e);
}
}
}

producerManager.get(id);
if (properties.getExtension().isTransacted()) {
LOGGER.info("Creating transacted session <message handler ID: {}>", id);
transactedSession = jcsmpSession.createTransactedSession();
producer = transactedSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
} else {
producer = jcsmpSession.createProducer(SolaceProvisioningUtil.getProducerFlowProperties(jcsmpSession),
producerEventHandler);
private void recreateProducerIfNeeded(ErrorChannelSendingCorrelationKey correlationKey) throws MessagingException {
if (!recreateProducer && !producer.isClosed()) {
return;
}
synchronized (lifecycleLock) {
if (!recreateProducer && !producer.isClosed()) {
return;
}
} catch (Exception e) {
String msg = String.format("Unable to get a message producer for session %s", jcsmpSession.getSessionName());
LOGGER.warn(msg, e);
LOGGER.debug("Recreating JCSMP producer for binding {} after stale-flow detection <message handler ID: {}>",
properties.getBindingName(), id);
closeResources();
throw new RuntimeException(msg, e);
try {
createProducerInternal();
recreateProducer = false;
} catch (Exception createError) {
recreateProducer = true;
throw handleMessagingException(correlationKey,
"Failed to recreate JCSMP producer after stale-flow detection", createError);
}
}

isRunning = true;
}
Comment thread
sunil-solace marked this conversation as resolved.

@Override
public void stop() {
if (!isRunning()) return;
closeResources();
isRunning = false;
synchronized (lifecycleLock) {
if (!isRunning()) return;
closeResources();
isRunning = false;
}
}

private void closeResources() {
Comment thread
Nephery marked this conversation as resolved.
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
if (producer != null) {
LOGGER.info("Closing producer <message handler ID: {}>", id);
producer.close();
}
if (transactedSession != null) {
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
transactedSession.close();
synchronized (lifecycleLock) {
LOGGER.info("Stopping producer to {} {} <message handler ID: {}>", configDestinationType, configDestination.getName(), id);
recreateProducer = false;
if (producer != null) {
LOGGER.info("Closing producer <message handler ID: {}>", id);
producer.close();
}
if (transactedSession != null) {
LOGGER.info("Closing transacted session <message handler ID: {}>", id);
transactedSession.close();
}
producerManager.release(id);
}
producerManager.release(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.solace.spring.cloud.stream.binder.util;

import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.Queue;
import com.solacesystems.jcsmp.StaleSessionException;
import com.solacesystems.jcsmp.XMLMessage;
import com.solacesystems.jcsmp.XMLMessageProducer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -34,6 +37,11 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
XMLMessageProducer producer;
try {
producer = producerManager.get(producerKey);
if (producer.isClosed()) {
LOGGER.warn("Detected closed shared JCSMP producer before sending to error queue {}; recreating",
errorQueueName);
producer = producerManager.forceRecreate(producer);
}
} catch (Exception e) {
MessagingException wrappedException = new MessagingException(
String.format("Failed to get producer to send message %s to queue %s", xmlMessage.getMessageId(),
Expand All @@ -42,7 +50,25 @@ public void send(MessageContainer messageContainer, ErrorQueueRepublishCorrelati
throw wrappedException;
}

producer.send(xmlMessage, queue);
try {
producer.send(xmlMessage, queue);
} catch (JCSMPException e) {
if (e instanceof StaleSessionException
|| e instanceof JCSMPTransportException
|| e instanceof ClosedFacilityException
|| producer.isClosed()) {
LOGGER.debug("Detected stale shared JCSMP producer while sending to error queue {} (cause: {}); " +
"recreating for next attempt",
errorQueueName, e.getClass().getSimpleName());
try {
producerManager.forceRecreate(producer);
Comment thread
Nephery marked this conversation as resolved.
} catch (Exception recreateError) {
LOGGER.warn("Failed to recreate shared JCSMP producer after stale-flow detection", recreateError);
e.addSuppressed(recreateError);
}
}
throw e;
}
}

public ErrorQueueRepublishCorrelationKey createCorrelationKey(MessageContainer messageContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ public T get(String key) throws Exception {
return sharedResource;
}

/**
* Compare-and-swap the shared resource. If the manager still holds {@code expected},
* close it and {@link #create()} a fresh one; otherwise return the currently-installed
* resource without re-creating.
*
* @param expected the reference the caller observed and considers no longer usable
* @return the resource currently installed in the manager
* @throws Exception whatever {@link #create()} may throw
*/
public T forceRecreate(T expected) throws Exception {
synchronized (lock) {
if (sharedResource != expected) {
return sharedResource;
}
if (sharedResource != null) {
try {
close();
} catch (Exception e) {
LOGGER.debug("Failed to close current {} during forceRecreate", type, e);
}
}
sharedResource = create();
return sharedResource;
}
}

/**
* De-register {@code key} from the shared resource.
* <p>If this is the last {@code key} associated to the shared resource, {@link #close()} the resource.
Expand Down
Loading
Loading