Skip to content

Commit e0f1a36

Browse files
authored
Lazy Solace session initialization (SolaceProducts#420)
* Added lazy Solace session initialization through binder config property *session-initialization-mode* * Add the close() method SolaceSessionManager
1 parent 7f2c80f commit e0f1a36

26 files changed

Lines changed: 890 additions & 178 deletions

File tree

solace-spring-cloud-starters/solace-spring-cloud-stream-starter/README.adoc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,19 @@ See https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spri
180180
The Solace session can be configured to use OAuth2 authentication. See https://github.com/SolaceDev/solace-spring-boot/tree/master/solace-spring-boot-starters/solace-java-spring-boot-starter#using-oauth2-authentication-scheme-with-solace-java-api[JCSMP Spring Boot: Using OAuth2 Authentication Scheme] for more info.
181181
====
182182

183+
==== Solace Binder Properties
184+
185+
The following properties are available for configuring the Solace binder itself and must be prefixed with `spring.cloud.stream.solace.binder.`.
186+
187+
sessionInitializationMode::
188+
Specifies when the Solace session should be initialized for the binder.
189+
+
190+
When set to `lazy`, the session will be created only when the first binding (producer or consumer) is created that requires it.
191+
+
192+
When set to `eager`, the session will be created immediately when the binder is initialized, regardless of whether any bindings exist.
193+
+
194+
Default: `eager`
195+
183196
==== Solace Consumer Properties
184197

185198
The following properties are available for Solace consumers only and must be prefixed with `spring.cloud.stream.solace.bindings.<bindingName>.consumer.` where `bindingName` looks something like `functionName-in-0` as defined in https://docs.spring.io/spring-cloud-stream/reference/{scst-version}/spring-cloud-stream/functional-binding-names.html[Functional Binding Names].

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/health/handlers/SolaceSessionEventHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,12 @@ public void handleEvent(SessionEventArgs eventArgs) {
3434
public void setSessionHealthUp() {
3535
this.sessionHealthIndicator.up();
3636
}
37+
38+
public void setSessionHealthDown() {
39+
this.sessionHealthIndicator.down(null, false);
40+
}
41+
42+
public void setSessionHealthReconnecting() {
43+
this.sessionHealthIndicator.reconnecting(null);
44+
}
3745
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.solace.spring.cloud.stream.binder.properties;
2+
3+
4+
import com.solace.spring.cloud.stream.binder.util.SessionInitializationMode;
5+
import org.springframework.boot.context.properties.ConfigurationProperties;
6+
7+
@ConfigurationProperties(prefix = "spring.cloud.stream.solace.binder")
8+
public class SolaceBinderConfigurationProperties {
9+
10+
11+
private SessionInitializationMode sessionInitializationMode = SessionInitializationMode.EAGER;
12+
13+
public SessionInitializationMode getSessionInitializationMode() {
14+
return sessionInitializationMode;
15+
}
16+
17+
public void setSessionInitializationMode(SessionInitializationMode sessionInitializationMode) {
18+
this.sessionInitializationMode = sessionInitializationMode;
19+
}
20+
}

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/provisioning/SolaceEndpointProvisioner.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.solace.spring.cloud.stream.binder.provisioning;
22

3+
import com.solace.spring.cloud.stream.binder.util.SolaceSessionManager;
34
import com.solace.spring.cloud.stream.binder.properties.SolaceCommonProperties;
45
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
56
import com.solace.spring.cloud.stream.binder.properties.SolaceProducerProperties;
@@ -37,12 +38,12 @@
3738
public class SolaceEndpointProvisioner
3839
implements ProvisioningProvider<ExtendedConsumerProperties<SolaceConsumerProperties>,ExtendedProducerProperties<SolaceProducerProperties>> {
3940

40-
private final JCSMPSession jcsmpSession;
41+
private final SolaceSessionManager solaceSessionManager;
4142

4243
private static final Logger LOGGER = LoggerFactory.getLogger(SolaceEndpointProvisioner.class);
4344

44-
public SolaceEndpointProvisioner(JCSMPSession jcsmpSession) {
45-
this.jcsmpSession = jcsmpSession;
45+
public SolaceEndpointProvisioner(SolaceSessionManager solaceSessionManager) {
46+
this.solaceSessionManager = solaceSessionManager;
4647
}
4748

4849
@Override
@@ -186,14 +187,14 @@ private <T extends Endpoint> T provisionEndpoint(
186187
if (isDurable) {
187188
endpoint = endpointProvider.createInstance(name);
188189
if (doDurableProvisioning) {
189-
jcsmpSession.provision(endpoint, endpointProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
190+
solaceSessionManager.getSession().provision(endpoint, endpointProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
190191
} else {
191192
LOGGER.debug("Provisioning is disabled, {} will not be provisioned nor will its configuration be validated",
192193
name);
193194
}
194195
} else {
195196
// EndpointProperties will be applied during consumer creation
196-
endpoint = endpointProvider.createTemporaryEndpoint(name, jcsmpSession);
197+
endpoint = endpointProvider.createTemporaryEndpoint(name, solaceSessionManager.getSession());
197198
}
198199
} catch (Exception e) {
199200
String action = isDurable ? "provision durable" : "create temporary";
@@ -222,7 +223,7 @@ private void testFlowConnection(Endpoint endpoint,
222223
endpointType, endpoint.getName());
223224
final ConsumerFlowProperties testFlowProperties = consumerFlowProperties.setEndpoint(endpoint)
224225
.setStartState(false);
225-
jcsmpSession.createFlow(null, testFlowProperties, endpointProperties).close();
226+
solaceSessionManager.getSession().createFlow(null, testFlowProperties, endpointProperties).close();
226227
LOGGER.info("Connected test consumer flow to {} {}, closing it",
227228
endpointType, endpoint.getName());
228229
} catch (JCSMPException e) {
@@ -266,7 +267,7 @@ public void addSubscriptionToQueue(Queue queue, String topicName, SolaceCommonPr
266267
try {
267268
Topic topic = JCSMPFactory.onlyInstance().createTopic(topicName);
268269
try {
269-
jcsmpSession.addSubscription(queue, topic, JCSMPSession.WAIT_FOR_CONFIRM);
270+
solaceSessionManager.getSession().addSubscription(queue, topic, JCSMPSession.WAIT_FOR_CONFIRM);
270271
} catch (JCSMPErrorResponseException e) {
271272
if (e.getSubcodeEx() == JCSMPErrorResponseSubcodeEx.SUBSCRIPTION_ALREADY_PRESENT) {
272273
LOGGER.info("Queue {} is already subscribed to topic {}, SUBSCRIPTION_ALREADY_PRESENT error will be ignored...",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.solace.spring.cloud.stream.binder.util;
2+
3+
import static com.solacesystems.jcsmp.XMLMessage.Outcome.ACCEPTED;
4+
import static com.solacesystems.jcsmp.XMLMessage.Outcome.FAILED;
5+
import static com.solacesystems.jcsmp.XMLMessage.Outcome.REJECTED;
6+
7+
import com.solace.spring.cloud.stream.binder.health.handlers.SolaceSessionEventHandler;
8+
import com.solacesystems.jcsmp.Context;
9+
import com.solacesystems.jcsmp.ContextProperties;
10+
import com.solacesystems.jcsmp.JCSMPException;
11+
import com.solacesystems.jcsmp.JCSMPProperties;
12+
import com.solacesystems.jcsmp.JCSMPSession;
13+
import com.solacesystems.jcsmp.SolaceSessionOAuth2TokenProvider;
14+
import com.solacesystems.jcsmp.SpringJCSMPFactory;
15+
import com.solacesystems.jcsmp.impl.JCSMPBasicSession;
16+
import com.solacesystems.jcsmp.impl.client.ClientInfoProvider;
17+
import java.util.Set;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.lang.Nullable;
21+
22+
public class DefaultSolaceSessionManager implements SolaceSessionManager {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSolaceSessionManager.class);
25+
26+
private final JCSMPProperties jcsmpProperties;
27+
private final ClientInfoProvider clientInfoProvider;
28+
private final SolaceSessionEventHandler solaceSessionEventHandler;
29+
private final SolaceSessionOAuth2TokenProvider solaceSessionOAuth2TokenProvider;
30+
31+
private volatile JCSMPSession jcsmpSession;
32+
private volatile Context context;
33+
private final Object sessionLock = new Object();
34+
35+
public DefaultSolaceSessionManager(JCSMPProperties jcsmpProperties,
36+
@Nullable ClientInfoProvider clientInfoProvider,
37+
@Nullable SolaceSessionEventHandler eventHandler,
38+
@Nullable SolaceSessionOAuth2TokenProvider solaceSessionOAuth2TokenProvider) {
39+
this.jcsmpProperties = jcsmpProperties;
40+
this.clientInfoProvider = clientInfoProvider;
41+
this.solaceSessionEventHandler = eventHandler;
42+
this.solaceSessionOAuth2TokenProvider = solaceSessionOAuth2TokenProvider;
43+
}
44+
45+
@Override
46+
public JCSMPSession getSession() throws JCSMPException {
47+
if (jcsmpSession == null) {
48+
createSessionIfNeeded();
49+
}
50+
return jcsmpSession;
51+
}
52+
53+
private void createSessionIfNeeded() throws JCSMPException {
54+
synchronized (sessionLock) {
55+
if (jcsmpSession != null) {
56+
return;
57+
}
58+
59+
JCSMPProperties solaceJcsmpProperties = (JCSMPProperties) this.jcsmpProperties.clone();
60+
if (this.clientInfoProvider != null) {
61+
solaceJcsmpProperties.setProperty(JCSMPProperties.CLIENT_INFO_PROVIDER, clientInfoProvider);
62+
}
63+
64+
try {
65+
SpringJCSMPFactory springJCSMPFactory = new SpringJCSMPFactory(solaceJcsmpProperties,
66+
solaceSessionOAuth2TokenProvider);
67+
68+
if (solaceSessionEventHandler != null) {
69+
LOGGER.debug("Registering Solace Session Event handler on session");
70+
context = springJCSMPFactory.createContext(new ContextProperties());
71+
jcsmpSession = springJCSMPFactory.createSession(context, solaceSessionEventHandler);
72+
} else {
73+
jcsmpSession = springJCSMPFactory.createSession();
74+
}
75+
76+
if (solaceSessionEventHandler != null) {
77+
solaceSessionEventHandler.setSessionHealthDown();
78+
}
79+
80+
LOGGER.info("Connecting JCSMP session {}", jcsmpSession.getSessionName());
81+
jcsmpSession.connect();
82+
83+
if (solaceSessionEventHandler != null) {
84+
solaceSessionEventHandler.setSessionHealthUp();
85+
}
86+
87+
// Check broker compatibility
88+
if (jcsmpSession instanceof JCSMPBasicSession session
89+
&& !session.isRequiredSettlementCapable(Set.of(ACCEPTED, FAILED, REJECTED))) {
90+
LOGGER.warn("The connected Solace PubSub+ Broker is not compatible. It doesn't support message NACK capability. Consumer bindings will fail to start.");
91+
}
92+
93+
LOGGER.info("Successfully created and connected Solace JCSMP session");
94+
} catch (JCSMPException e) {
95+
LOGGER.error("Failed to initialize Solace JCSMP session", e);
96+
throw e;
97+
}
98+
}
99+
}
100+
101+
@Override
102+
public void close() {
103+
synchronized (sessionLock) {
104+
if (jcsmpSession != null) {
105+
try {
106+
LOGGER.info("Closing JCSMP session {}", jcsmpSession.getSessionName());
107+
jcsmpSession.closeSession();
108+
} catch (Exception e) {
109+
LOGGER.warn("Error closing JCSMP session", e);
110+
} finally {
111+
jcsmpSession = null;
112+
}
113+
}
114+
115+
if (context != null) {
116+
try {
117+
context.destroy();
118+
} catch (Exception e) {
119+
LOGGER.warn("Error destroying JCSMP context", e);
120+
} finally {
121+
context = null;
122+
}
123+
}
124+
}
125+
}
126+
}

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/JCSMPSessionProducerManager.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.solace.spring.cloud.stream.binder.util;
22

33
import com.solacesystems.jcsmp.JCSMPException;
4-
import com.solacesystems.jcsmp.JCSMPSession;
54
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
65
import com.solacesystems.jcsmp.XMLMessageProducer;
76
import org.slf4j.Logger;
@@ -15,19 +14,19 @@
1514
import java.util.UUID;
1615

1716
public class JCSMPSessionProducerManager extends SharedResourceManager<XMLMessageProducer> {
18-
private final JCSMPSession session;
17+
private final SolaceSessionManager solaceSessionManager;
1918
private final CloudStreamEventHandler publisherEventHandler = new CloudStreamEventHandler();
2019

2120
private static final Logger LOGGER = LoggerFactory.getLogger(JCSMPSessionProducerManager.class);
2221

23-
public JCSMPSessionProducerManager(JCSMPSession session) {
22+
public JCSMPSessionProducerManager(SolaceSessionManager solaceSessionManager) {
2423
super("producer");
25-
this.session = session;
24+
this.solaceSessionManager = solaceSessionManager;
2625
}
2726

2827
@Override
2928
XMLMessageProducer create() throws JCSMPException {
30-
return session.getMessageProducer(publisherEventHandler);
29+
return solaceSessionManager.getSession().getMessageProducer(publisherEventHandler);
3130
}
3231

3332
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.solace.spring.cloud.stream.binder.util;
2+
3+
public enum SessionInitializationMode {
4+
5+
EAGER,
6+
LAZY;
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.solace.spring.cloud.stream.binder.util;
2+
3+
import com.solacesystems.jcsmp.JCSMPException;
4+
import com.solacesystems.jcsmp.JCSMPSession;
5+
6+
/**
7+
* Interface for managing Solace JCSMP sessions within the Spring Cloud Stream binder.
8+
*/
9+
public interface SolaceSessionManager {
10+
11+
/**
12+
* Create a single shared JCSMP session for Solace messaging operations.
13+
*
14+
* @return the active JCSMP session
15+
* @throws JCSMPException if the session cannot be retrieved or created
16+
*/
17+
JCSMPSession getSession() throws JCSMPException;
18+
19+
/**
20+
* Closes the session manager and releases all associated resources.
21+
*/
22+
void close();
23+
}

solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/inbound/acknowledge/JCSMPAcknowledgementCallbackFactoryIT.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.solace.spring.cloud.stream.binder.inbound.acknowledge;
22

33
import com.solace.spring.boot.autoconfigure.SolaceJavaAutoConfiguration;
4+
import com.solace.spring.cloud.stream.binder.util.DefaultSolaceSessionManager;
45
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
56
import com.solace.spring.cloud.stream.binder.util.ErrorQueueInfrastructure;
67
import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
@@ -828,29 +829,36 @@ private ErrorQueueInfrastructure initializeErrorQueueInfrastructure(JCSMPSession
828829
throw new IllegalStateException("Should only have one error queue infrastructure");
829830
}
830831

831-
String producerManagerKey = UUID.randomUUID().toString();
832-
JCSMPSessionProducerManager jcsmpSessionProducerManager = new JCSMPSessionProducerManager(jcsmpSession);
833-
ErrorQueueInfrastructure errorQueueInfrastructure = new ErrorQueueInfrastructure(jcsmpSessionProducerManager,
834-
producerManagerKey, RandomStringUtils.randomAlphanumeric(20), new SolaceConsumerProperties());
835-
Queue errorQueue = JCSMPFactory.onlyInstance().createQueue(errorQueueInfrastructure.getErrorQueueName());
836-
ackCallbackFactory.setErrorQueueInfrastructure(errorQueueInfrastructure);
837-
closeErrorQueueInfrastructureCallback = () -> {
838-
jcsmpSessionProducerManager.release(producerManagerKey);
839-
840-
try {
841-
jcsmpSession.deprovision(errorQueue, JCSMPSession.WAIT_FOR_CONFIRM);
842-
} catch (JCSMPException e) {
843-
throw new RuntimeException(e);
844-
}
845-
};
846-
847832
try {
833+
DefaultSolaceSessionManager defaultSolaceSessionManager = Mockito.mock(
834+
DefaultSolaceSessionManager.class);
835+
when(defaultSolaceSessionManager.getSession()).thenReturn(jcsmpSession);
836+
837+
String producerManagerKey = UUID.randomUUID().toString();
838+
JCSMPSessionProducerManager jcsmpSessionProducerManager = new JCSMPSessionProducerManager(
839+
defaultSolaceSessionManager);
840+
ErrorQueueInfrastructure errorQueueInfrastructure = new ErrorQueueInfrastructure(
841+
jcsmpSessionProducerManager,
842+
producerManagerKey, RandomStringUtils.randomAlphanumeric(20),
843+
new SolaceConsumerProperties());
844+
Queue errorQueue = JCSMPFactory.onlyInstance()
845+
.createQueue(errorQueueInfrastructure.getErrorQueueName());
846+
ackCallbackFactory.setErrorQueueInfrastructure(errorQueueInfrastructure);
847+
closeErrorQueueInfrastructureCallback = () -> {
848+
jcsmpSessionProducerManager.release(producerManagerKey);
849+
850+
try {
851+
jcsmpSession.deprovision(errorQueue, JCSMPSession.WAIT_FOR_CONFIRM);
852+
} catch (JCSMPException e) {
853+
throw new RuntimeException(e);
854+
}
855+
};
856+
848857
jcsmpSession.provision(errorQueue, new EndpointProperties(), JCSMPSession.WAIT_FOR_CONFIRM);
858+
return errorQueueInfrastructure;
849859
} catch (JCSMPException e) {
850860
throw new RuntimeException(e);
851861
}
852-
853-
return errorQueueInfrastructure;
854862
}
855863

856864
private void validateNumEnqueuedMessages(SempV2Api sempV2Api, String queueName, int expectedCount)

0 commit comments

Comments
 (0)