Skip to content

Commit ce9686f

Browse files
alexwibowochrjohn
authored andcommitted
Bugfix for QFJ-963 (#217)
QFJ-963: Fixing thread safety issue within SessionConnector, where the sessions are kept in non thread safe map. This is especially dangerous in a wildcarded threadsocketacceptor configuration, where sessions are added dynamically. * use ConcurrentHashMap in SessionConnector (cherry picked from commit fc356a1)
1 parent a6fa7f4 commit ce9686f

2 files changed

Lines changed: 131 additions & 3 deletions

File tree

quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.concurrent.ScheduledFuture;
5151
import java.util.concurrent.ThreadFactory;
5252
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.ConcurrentHashMap;
5354
import org.apache.mina.core.future.CloseFuture;
5455
import org.apache.mina.core.service.IoService;
5556

@@ -67,7 +68,7 @@ public abstract class SessionConnector implements Connector {
6768

6869
protected final PropertyChangeSupport propertyChangeSupport = new PropertyChangeSupport(this);
6970

70-
private Map<SessionID, Session> sessions = Collections.emptyMap();
71+
private final Map<SessionID, Session> sessions = new ConcurrentHashMap<>();
7172
private final SessionSettings settings;
7273
private final SessionFactory sessionFactory;
7374
private final static ScheduledExecutorService scheduledExecutorService = Executors
@@ -117,7 +118,8 @@ public void removePropertyChangeListener(PropertyChangeListener listener) {
117118
}
118119

119120
protected void setSessions(Map<SessionID, Session> sessions) {
120-
this.sessions = sessions;
121+
clearConnectorSessions();
122+
this.sessions.putAll(sessions);
121123
propertyChangeSupport.firePropertyChange(SESSIONS_PROPERTY, null, sessions);
122124
}
123125

quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import quickfix.Acceptor;
2323
import quickfix.ConfigError;
2424
import quickfix.DefaultSessionFactory;
25+
import quickfix.Dictionary;
2526
import quickfix.FixVersions;
2627
import quickfix.Initiator;
2728
import quickfix.MemoryStoreFactory;
@@ -47,12 +48,19 @@
4748
import java.util.List;
4849
import java.util.Map;
4950
import java.util.Set;
51+
import java.util.Random;
52+
import java.util.Iterator;
53+
import java.util.concurrent.ConcurrentHashMap;
54+
import java.util.concurrent.CountDownLatch;
5055

5156
import static org.junit.Assert.assertEquals;
5257
import static org.junit.Assert.assertFalse;
5358
import static org.junit.Assert.assertNotNull;
5459
import static org.junit.Assert.assertTrue;
60+
import static org.junit.Assert.assertThat;
61+
import static org.junit.Assert.fail;
5562
import org.junit.Test;
63+
import org.hamcrest.Matchers;
5664

5765
public class SessionConnectorTest {
5866
private final List<PropertyChangeEvent> propertyChangeEvents = new ArrayList<>();
@@ -144,7 +152,7 @@ public void testOneSessionLoggedOnOneSessionNotLoggedOne() throws Exception {
144152
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
145153
try (Session session2 = connector.createSession(sessionID2)) {
146154
assertNotNull(session2);
147-
sessions.put(session2.getSessionID(), session2);
155+
connector.addDynamicSession(session2);
148156
assertFalse(connector.isLoggedOn());
149157
assertTrue(connector.anyLoggedOn());
150158
}
@@ -254,6 +262,124 @@ public void testDynamicInitiatorSession() throws Exception {
254262
connector.stop();
255263
}
256264

265+
@Test
266+
public void testConcurrentAccess() throws ConfigError, InterruptedException {
267+
final SessionSettings sessionSettings = new SessionSettings();
268+
sessionSettings.set(new Dictionary(null, createDefaultSettings()));
269+
sessionSettings.set(new SessionID("FIX.4.2:FOOBAR_PRICING->*"), new Dictionary("sessions", createPricingSection()));
270+
sessionSettings.set(new SessionID("FIX.4.2:FOOBAR_TRADING->*"), new Dictionary("sessions", createTradingSection()));
271+
272+
final DefaultSessionFactory sessionFactory = new DefaultSessionFactory(new UnitTestApplication(),
273+
new MemoryStoreFactory(), new SLF4JLogFactory(sessionSettings));
274+
final SessionConnector connector = new SessionConnectorUnderTest(sessionSettings, sessionFactory);
275+
276+
// connector is initialised with the wildcard sessions.
277+
final Map<SessionID, Session> sessions = new HashMap<>();
278+
for (final Iterator<SessionID> sessionIterator = sessionSettings.sectionIterator(); sessionIterator.hasNext();) {
279+
final SessionID sessionID = sessionIterator.next();
280+
sessions.put(sessionID, sessionFactory.create(sessionID, sessionSettings));
281+
}
282+
connector.setSessions(sessions);
283+
284+
// register a listener on the connector, e.g. to simulate MBean registration
285+
final Set<SessionID> exportedSessionIDs = Collections.newSetFromMap(new ConcurrentHashMap<>());
286+
connector.addPropertyChangeListener(evt -> {
287+
exportedSessionIDs.addAll(connector.getSessions());
288+
});
289+
290+
final int numClients = 500;
291+
final CountDownLatch startLatch = new CountDownLatch(1);
292+
final CountDownLatch countDownLatch = new CountDownLatch(numClients);
293+
294+
final Random random = new Random();
295+
for (int clientIndex = 0; clientIndex < numClients; clientIndex++) {
296+
final String clientPricingSessionIDString = "FIX.4.2:FOOBAR_PRICING->CLIENT" + clientIndex;
297+
final String clientTradingSessionIDString = "FIX.4.2:FOOBAR_TRADING->CLIENT" + clientIndex;
298+
299+
int randomSleep = random.nextInt(20);
300+
final Thread clientThread = new Thread(() -> {
301+
try {
302+
// wait for everyone to be ready
303+
startLatch.await();
304+
305+
// individual thread to sleep at random interval, to simulate spread connection attempt
306+
Thread.sleep(randomSleep);
307+
308+
connector.addDynamicSession(connector.createSession(new SessionID(clientPricingSessionIDString)));
309+
connector.addDynamicSession(connector.createSession(new SessionID(clientTradingSessionIDString)));
310+
311+
// sleep at the end, before we verify the outcome
312+
Thread.sleep(randomSleep);
313+
} catch (final Throwable throwable) {
314+
throwable.printStackTrace();
315+
fail("Well.. this operation shouldnt fail");
316+
} finally {
317+
countDownLatch.countDown();
318+
}
319+
},"Client_"+clientIndex);
320+
clientThread.setDaemon(true);
321+
clientThread.start();
322+
}
323+
324+
// go go go , everyone!
325+
startLatch.countDown();
326+
327+
// ok.. wait for everyone to finish
328+
countDownLatch.await();
329+
330+
assertThat("We should have all sessions exported. Failure here means initialisation has failed somewhere", exportedSessionIDs.size(), Matchers.equalTo(1002));
331+
assertTrue(exportedSessionIDs.contains(new SessionID("FIX.4.2:FOOBAR_PRICING->*")));
332+
assertTrue(exportedSessionIDs.contains(new SessionID("FIX.4.2:FOOBAR_TRADING->*")));
333+
for (int clientIndex = 0; clientIndex < numClients; clientIndex++) {
334+
assertTrue(exportedSessionIDs.contains(new SessionID("FIX.4.2:FOOBAR_PRICING->CLIENT" + clientIndex)));
335+
assertTrue(exportedSessionIDs.contains(new SessionID("FIX.4.2:FOOBAR_TRADING->CLIENT" + clientIndex)));
336+
}
337+
}
338+
339+
private Map<Object, Object> createTradingSection() {
340+
final Map<Object, Object> tradingSection = new HashMap<>();
341+
tradingSection.put("PersistMessages","Y");
342+
tradingSection.put("SocketAcceptPort","7566");
343+
tradingSection.put("DataDictionary","FIX44_Custom_Test.xml");
344+
tradingSection.put("ResetOnLogon","N");
345+
tradingSection.put("MaxLatency","1");
346+
return tradingSection;
347+
}
348+
349+
private Map<Object, Object> createPricingSection() {
350+
final Map<Object, Object> pricingSection = new HashMap<>();
351+
pricingSection.put("PersistMessages","N");
352+
pricingSection.put("SocketAcceptPort","7565");
353+
pricingSection.put("DataDictionary","FIX44_Custom_Test.xml");
354+
pricingSection.put("ResetOnLogon","Y");
355+
pricingSection.put("MaxLatency","120");
356+
return pricingSection;
357+
}
358+
359+
private Map<Object, Object> createDefaultSettings() {
360+
final Map<Object, Object> defaultSettings = new HashMap<>();
361+
defaultSettings.put("TimeZone", "UTC");
362+
defaultSettings.put("StartDay", "Sunday");
363+
defaultSettings.put("StartTime", "07:00:00");
364+
defaultSettings.put("EndDay", "Friday");
365+
defaultSettings.put("EndTime", "17:00:00");
366+
defaultSettings.put("NonStopSession", "N");
367+
defaultSettings.put("ConnectionType", "acceptor");
368+
defaultSettings.put("HeartBtInt", "30");
369+
defaultSettings.put("UseDataDictionary", "Y");
370+
defaultSettings.put("ThreadModel", "ThreadPerSession");
371+
defaultSettings.put("UseJmx", "Y");
372+
defaultSettings.put("FileStorePath", "/home/wibowoa/var/lib/myApp");
373+
defaultSettings.put("FileLogPath", "logs/fixlog");
374+
defaultSettings.put("FileIncludeTimeStampForMessages", "Y");
375+
defaultSettings.put("FileIncludeMilliseconds", "Y");
376+
defaultSettings.put("CheckLatency", "Y");
377+
defaultSettings.put("BeginString", "FIX.4.2");
378+
defaultSettings.put("AcceptorTemplate", "Y");
379+
defaultSettings.put("TargetCompID", "*");
380+
return defaultSettings;
381+
}
382+
257383
private SessionSettings setUpSessionSettings(SessionID sessionID) {
258384
SessionSettings settings = new SessionSettings();
259385
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");

0 commit comments

Comments
 (0)