|
| 1 | +/******************************************************************************* |
| 2 | + * Copyright (c) quickfixengine.org All rights reserved. |
| 3 | + * |
| 4 | + * This file is part of the QuickFIX FIX Engine |
| 5 | + * |
| 6 | + * This file may be distributed under the terms of the quickfixengine.org |
| 7 | + * license as defined by quickfixengine.org and appearing in the file |
| 8 | + * LICENSE included in the packaging of this file. |
| 9 | + * |
| 10 | + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING |
| 11 | + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A |
| 12 | + * PARTICULAR PURPOSE. |
| 13 | + * |
| 14 | + * See http://www.quickfixengine.org/LICENSE for licensing information. |
| 15 | + * |
| 16 | + * Contact ask@quickfixengine.org if any conditions of this licensing |
| 17 | + * are not clear to you. |
| 18 | + ******************************************************************************/ |
| 19 | + |
| 20 | +package quickfix; |
| 21 | + |
| 22 | +import org.junit.After; |
| 23 | +import org.junit.Before; |
| 24 | +import org.junit.Test; |
| 25 | +import quickfix.field.BeginString; |
| 26 | +import quickfix.field.EncryptMethod; |
| 27 | +import quickfix.field.HeartBtInt; |
| 28 | +import quickfix.field.MsgSeqNum; |
| 29 | +import quickfix.field.ResetSeqNumFlag; |
| 30 | +import quickfix.field.SenderCompID; |
| 31 | +import quickfix.field.TargetCompID; |
| 32 | + |
| 33 | +import java.io.IOException; |
| 34 | +import java.lang.reflect.Field; |
| 35 | +import java.util.List; |
| 36 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 37 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 38 | +import java.util.concurrent.atomic.AtomicReference; |
| 39 | + |
| 40 | +import static org.junit.Assert.assertEquals; |
| 41 | +import static org.junit.Assert.assertTrue; |
| 42 | + |
| 43 | +/** |
| 44 | + * Regression test for GitHub issue #902. |
| 45 | + * |
| 46 | + * <p><b>Bug description:</b> A QuickFIX/J acceptor responds to a client Logon |
| 47 | + * with {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum(34)=1} with a Logon |
| 48 | + * response carrying {@code MsgSeqNum=2} instead of 1. |
| 49 | + * |
| 50 | + * <p><b>Root cause:</b> Inside {@code Session.sendRaw()}, every outbound |
| 51 | + * message goes through {@code persist()}, which increments the |
| 52 | + * next-sender-sequence-number <em>before</em> the {@code send()} call. The |
| 53 | + * condition that gates the actual {@code send()} call is: |
| 54 | + * <pre> |
| 55 | + * if (MsgType.LOGON.equals(msgType) || MsgType.LOGOUT.equals(msgType) |
| 56 | + * || MsgType.RESEND_REQUEST.equals(msgType) |
| 57 | + * || MsgType.SEQUENCE_RESET.equals(msgType) || isLoggedOn()) { |
| 58 | + * result = send(messageString); |
| 59 | + * } |
| 60 | + * </pre> |
| 61 | + * A Heartbeat message does NOT match any of those types, and at the moment |
| 62 | + * it is triggered the session is not yet fully logged on ({@code isLogonSent()} |
| 63 | + * is still {@code false}). Therefore the heartbeat's call to {@code persist()} |
| 64 | + * bumps the sequence counter from 1 to 2, but the heartbeat is silently |
| 65 | + * dropped. When the acceptor's own Logon response is subsequently serialised |
| 66 | + * and sent, it picks up sequence number 2. |
| 67 | + * |
| 68 | + * <p><b>How the race arises in production:</b> In {@code nextLogon()}, |
| 69 | + * {@code state.setLogonReceived(true)} is called <em>before</em> |
| 70 | + * {@code generateLogon()} is called. Between those two operations, a |
| 71 | + * timer/heartbeat thread calling {@code next()} sees |
| 72 | + * {@code state.isLogonReceived()==true} and {@code isHeartBeatNeeded()==true} |
| 73 | + * (because {@code lastSentTime} is stale from the previous session), so it |
| 74 | + * invokes {@code generateHeartbeat()}, consuming seq=1 without sending the |
| 75 | + * heartbeat. The subsequent {@code generateLogon()} then uses seq=2. |
| 76 | + * |
| 77 | + * <p><b>Test strategy:</b> We inject the race deterministically using a |
| 78 | + * custom {@link Log} implementation that intercepts the {@code "Received logon"} |
| 79 | + * event (which fires between {@code setLogonReceived(true)} and |
| 80 | + * {@code generateLogon()} inside {@code nextLogon()}) and calls |
| 81 | + * {@link Session#next()} from within that event handler. We use |
| 82 | + * {@link MockSystemTimeSource} to ensure {@code isHeartBeatNeeded()} returns |
| 83 | + * {@code true} at exactly that moment. |
| 84 | + */ |
| 85 | +public class SessionLogonSeqNumIssue902Test { |
| 86 | + |
| 87 | + private static final String RECEIVED_LOGON_EVENT = "Received logon"; |
| 88 | + |
| 89 | + private MockSystemTimeSource timeSource; |
| 90 | + private Session acceptorSession; |
| 91 | + private RecordingResponder responder; |
| 92 | + |
| 93 | + /** Holds the session so the log hook can reach it after construction. */ |
| 94 | + private final AtomicReference<Session> sessionRef = new AtomicReference<>(); |
| 95 | + |
| 96 | + /** |
| 97 | + * Guard that prevents re-entrant injection: the hook must fire at most |
| 98 | + * once per test run so that the simulated timer-thread interleaving is |
| 99 | + * applied exactly once. |
| 100 | + */ |
| 101 | + private final AtomicBoolean heartbeatInjected = new AtomicBoolean(false); |
| 102 | + |
| 103 | + @Before |
| 104 | + public void setUp() { |
| 105 | + timeSource = new MockSystemTimeSource(1_000_000L); |
| 106 | + SystemTime.setTimeSource(timeSource); |
| 107 | + } |
| 108 | + |
| 109 | + @After |
| 110 | + public void tearDown() throws Exception { |
| 111 | + SystemTime.setTimeSource(null); |
| 112 | + if (acceptorSession != null) { |
| 113 | + acceptorSession.close(); |
| 114 | + } |
| 115 | + } |
| 116 | + |
| 117 | + /** |
| 118 | + * Verifies that the acceptor's Logon response carries {@code MsgSeqNum=1} |
| 119 | + * when it receives a Logon with {@code ResetSeqNumFlag=Y} and |
| 120 | + * {@code MsgSeqNum=1}. |
| 121 | + * |
| 122 | + * <p>The test deliberately injects a simulated timer-thread interleaving |
| 123 | + * (via the session log hook described in the class Javadoc) so that |
| 124 | + * {@code generateHeartbeat()} runs between {@code setLogonReceived(true)} |
| 125 | + * and {@code generateLogon()} inside {@code nextLogon()}. This reproduces |
| 126 | + * the race condition described in issue #902. |
| 127 | + * |
| 128 | + * <p>With the current (buggy) code the heartbeat's {@code persist()} call |
| 129 | + * bumps the sequence counter from 1 to 2 without sending the heartbeat, |
| 130 | + * so the Logon response ends up with {@code MsgSeqNum=2}. The assertion |
| 131 | + * at the end therefore <b>fails</b>, proving that the bug exists. |
| 132 | + */ |
| 133 | + @Test |
| 134 | + public void testAcceptorLogonResponseSeqNumIsOneWhenResetSeqNumFlagReceived() |
| 135 | + throws Exception { |
| 136 | + |
| 137 | + // ----------------------------------------------------------------- |
| 138 | + // 1. Build the acceptor session. |
| 139 | + // |
| 140 | + // The custom LogFactory creates a Log whose onEvent() method |
| 141 | + // fires session.next() when "Received logon" is logged. That |
| 142 | + // log event is emitted by nextLogon() AFTER setLogonReceived(true) |
| 143 | + // but BEFORE generateLogon(), which is exactly the window where |
| 144 | + // the production race condition occurs. |
| 145 | + // ----------------------------------------------------------------- |
| 146 | + SessionID sessionID = new SessionID( |
| 147 | + FixVersions.BEGINSTRING_FIX44, "ACCEPTOR", "CLIENT"); |
| 148 | + |
| 149 | + LogFactory injectingLogFactory = id -> new Log() { |
| 150 | + @Override public void clear() {} |
| 151 | + @Override public void onIncoming(String message) {} |
| 152 | + @Override public void onOutgoing(String message) {} |
| 153 | + @Override public void onErrorEvent(String text) {} |
| 154 | + |
| 155 | + @Override |
| 156 | + public void onEvent(String text) { |
| 157 | + /* |
| 158 | + * "Received logon" is the log event emitted at line ~2262 |
| 159 | + * of Session.java, between: |
| 160 | + * state.setLogonReceived(true) [line ~2229] |
| 161 | + * generateLogon(...) [line ~2275] |
| 162 | + * |
| 163 | + * Calling session.next() here models a timer thread that |
| 164 | + * woke up at exactly this critical moment. |
| 165 | + * |
| 166 | + * The AtomicBoolean guard ensures we inject at most once, |
| 167 | + * even if re-entrant log calls occur. |
| 168 | + */ |
| 169 | + if (RECEIVED_LOGON_EVENT.equals(text) |
| 170 | + && heartbeatInjected.compareAndSet(false, true)) { |
| 171 | + Session s = sessionRef.get(); |
| 172 | + if (s != null) { |
| 173 | + try { |
| 174 | + // This triggers generateHeartbeat() which calls |
| 175 | + // persist() and bumps the seq counter 1 → 2 |
| 176 | + // without sending the heartbeat (isLogonSent() |
| 177 | + // is still false at this point). |
| 178 | + s.next(); |
| 179 | + } catch (IOException e) { |
| 180 | + throw new RuntimeException( |
| 181 | + "Unexpected IOException in injected next() call", e); |
| 182 | + } |
| 183 | + } |
| 184 | + } |
| 185 | + } |
| 186 | + }; |
| 187 | + |
| 188 | + /* |
| 189 | + * Build the acceptor session with heartbeatInterval=0 (the default for |
| 190 | + * acceptors). This ensures that SessionState.isInitiator() == false, |
| 191 | + * which is required for nextLogon() to call generateLogon() and thus |
| 192 | + * produce the Logon response. |
| 193 | + * |
| 194 | + * NOTE: heartbeatInterval=0 causes the Session constructor to set |
| 195 | + * state.initiator = (0 != 0) = false. If we passed 30 here, |
| 196 | + * state.initiator would be true and generateLogon() would be |
| 197 | + * skipped entirely, making the test scenario impossible to set up. |
| 198 | + */ |
| 199 | + acceptorSession = new SessionFactoryTestSupport.Builder() |
| 200 | + .setSessionId(sessionID) |
| 201 | + .setApplication(new UnitTestApplication()) |
| 202 | + .setIsInitiator(false) |
| 203 | + .setCheckLatency(false) |
| 204 | + .setCheckCompID(true) |
| 205 | + .setPersistMessages(true) |
| 206 | + .setLogFactory(injectingLogFactory) |
| 207 | + .build(); |
| 208 | + |
| 209 | + /* |
| 210 | + * Now update the heartbeat interval to 30 s WITHOUT changing |
| 211 | + * state.isInitiator() (which is final and was set to false above). |
| 212 | + * This makes next() reach the isHeartBeatNeeded() check instead of |
| 213 | + * returning early at the "getHeartBeatInterval() == 0" guard, which |
| 214 | + * is the prerequisite for the bug to manifest. |
| 215 | + */ |
| 216 | + acceptorSession.setHeartBeatInterval(30); |
| 217 | + |
| 218 | + sessionRef.set(acceptorSession); |
| 219 | + |
| 220 | + responder = new RecordingResponder(); |
| 221 | + acceptorSession.setResponder(responder); |
| 222 | + |
| 223 | + // ----------------------------------------------------------------- |
| 224 | + // 2. Simulate a previous session by setting seq numbers to 5 |
| 225 | + // and recording lastSentTime at the INITIAL clock value (t=1 000 000). |
| 226 | + // This makes lastSentTime "stale" once we advance the clock. |
| 227 | + // ----------------------------------------------------------------- |
| 228 | + SessionState state = getSessionState(acceptorSession); |
| 229 | + state.getMessageStore().setNextSenderMsgSeqNum(5); |
| 230 | + state.getMessageStore().setNextTargetMsgSeqNum(5); |
| 231 | + state.setLastSentTime(SystemTime.currentTimeMillis()); // = 1_000_000 |
| 232 | + |
| 233 | + // ----------------------------------------------------------------- |
| 234 | + // 3. Advance the mock clock by 31 s (> the 30 s heartbeat interval) |
| 235 | + // so that isHeartBeatNeeded() returns true when next() is called |
| 236 | + // from inside the log hook. |
| 237 | + // ----------------------------------------------------------------- |
| 238 | + timeSource.increment(31_000L); // clock = 1_031_000 |
| 239 | + |
| 240 | + // ----------------------------------------------------------------- |
| 241 | + // 4. Deliver the incoming Logon with ResetSeqNumFlag=Y, MsgSeqNum=1. |
| 242 | + // |
| 243 | + // Execution path (with the injected interleaving): |
| 244 | + // nextLogon() |
| 245 | + // → resetState() : seq counter reset to 1 |
| 246 | + // → setLogonReceived(true) : session now appears "logon received" |
| 247 | + // → log("Received logon") : our hook fires session.next() |
| 248 | + // → isHeartBeatNeeded() = true (31 s > 30 s, stale time) |
| 249 | + // → generateHeartbeat() |
| 250 | + // → sendRaw(heartbeat): |
| 251 | + // persist() bumps seq 1 → 2 |
| 252 | + // send() NOT called (isLogonSent() still false) |
| 253 | + // → next() returns |
| 254 | + // → generateLogon() : initializeHeader sets MsgSeqNum=2 ← BUG |
| 255 | + // → sendRaw(logon): sends the Logon response with "34=2" |
| 256 | + // ----------------------------------------------------------------- |
| 257 | + acceptorSession.next(buildIncomingLogon(sessionID)); |
| 258 | + |
| 259 | + // ----------------------------------------------------------------- |
| 260 | + // 5. Sanity-check: the log hook must have fired to confirm the test |
| 261 | + // scenario was actually triggered. |
| 262 | + // ----------------------------------------------------------------- |
| 263 | + assertTrue("Log hook for 'Received logon' must have fired during test", |
| 264 | + heartbeatInjected.get()); |
| 265 | + |
| 266 | + // ----------------------------------------------------------------- |
| 267 | + // 6. Assert: the Logon response MUST carry MsgSeqNum=1. |
| 268 | + // |
| 269 | + // With the current buggy code this assertion FAILS because the |
| 270 | + // heartbeat consumed seq=1 and the Logon response was sent with |
| 271 | + // MsgSeqNum=2. |
| 272 | + // ----------------------------------------------------------------- |
| 273 | + String logonResponse = findLogonResponse(responder.sentMessages); |
| 274 | + int seqNum = extractMsgSeqNum(logonResponse); |
| 275 | + |
| 276 | + assertEquals( |
| 277 | + "Acceptor Logon response must carry MsgSeqNum=1 when responding to " |
| 278 | + + "a Logon with ResetSeqNumFlag=Y and MsgSeqNum=1, but got MsgSeqNum=" |
| 279 | + + seqNum, |
| 280 | + 1, |
| 281 | + seqNum); |
| 282 | + } |
| 283 | + |
| 284 | + // ------------------------------------------------------------------------- |
| 285 | + // Helpers |
| 286 | + // ------------------------------------------------------------------------- |
| 287 | + |
| 288 | + /** |
| 289 | + * Builds an incoming FIX 4.4 Logon message addressed to {@code sessionID}'s |
| 290 | + * acceptor side, carrying {@code ResetSeqNumFlag=Y} and {@code MsgSeqNum=1}. |
| 291 | + */ |
| 292 | + private static quickfix.fix44.Logon buildIncomingLogon(SessionID sessionID) |
| 293 | + throws FieldNotFound { |
| 294 | + quickfix.fix44.Logon logon = new quickfix.fix44.Logon(); |
| 295 | + logon.getHeader().setString(BeginString.FIELD, |
| 296 | + sessionID.getBeginString()); |
| 297 | + logon.getHeader().setString(SenderCompID.FIELD, |
| 298 | + sessionID.getTargetCompID()); // from the remote peer's perspective |
| 299 | + logon.getHeader().setString(TargetCompID.FIELD, |
| 300 | + sessionID.getSenderCompID()); |
| 301 | + logon.getHeader().setInt(MsgSeqNum.FIELD, 1); |
| 302 | + logon.setInt(EncryptMethod.FIELD, EncryptMethod.NONE_OTHER); |
| 303 | + logon.setInt(HeartBtInt.FIELD, 30); |
| 304 | + logon.setBoolean(ResetSeqNumFlag.FIELD, true); |
| 305 | + return logon; |
| 306 | + } |
| 307 | + |
| 308 | + /** |
| 309 | + * Returns the last outbound Logon message (35=A) recorded by the responder. |
| 310 | + * |
| 311 | + * @throws AssertionError if no Logon response was sent |
| 312 | + */ |
| 313 | + private static String findLogonResponse(List<String> sent) { |
| 314 | + for (int i = sent.size() - 1; i >= 0; i--) { |
| 315 | + if (sent.get(i).contains("\00135=A\001")) { |
| 316 | + return sent.get(i); |
| 317 | + } |
| 318 | + } |
| 319 | + throw new AssertionError( |
| 320 | + "No Logon response (35=A) found in sent messages: " + sent); |
| 321 | + } |
| 322 | + |
| 323 | + /** |
| 324 | + * Parses a raw FIX message string and returns the integer value of tag 34 |
| 325 | + * (MsgSeqNum). |
| 326 | + */ |
| 327 | + private static int extractMsgSeqNum(String fixMessage) { |
| 328 | + for (String field : fixMessage.split("\001")) { |
| 329 | + if (field.startsWith("34=")) { |
| 330 | + return Integer.parseInt(field.substring(3)); |
| 331 | + } |
| 332 | + } |
| 333 | + throw new AssertionError("Tag 34 (MsgSeqNum) not found in message: " + fixMessage); |
| 334 | + } |
| 335 | + |
| 336 | + /** |
| 337 | + * Obtains the private {@code SessionState} from a {@link Session} via |
| 338 | + * reflection. |
| 339 | + */ |
| 340 | + private static SessionState getSessionState(Session session) throws Exception { |
| 341 | + Field stateField = Session.class.getDeclaredField("state"); |
| 342 | + stateField.setAccessible(true); |
| 343 | + return (SessionState) stateField.get(session); |
| 344 | + } |
| 345 | + |
| 346 | + // ------------------------------------------------------------------------- |
| 347 | + // Inner types |
| 348 | + // ------------------------------------------------------------------------- |
| 349 | + |
| 350 | + /** |
| 351 | + * A minimal {@link Responder} that records every raw FIX string passed to |
| 352 | + * {@link #send(String)}. |
| 353 | + */ |
| 354 | + private static final class RecordingResponder implements Responder { |
| 355 | + |
| 356 | + final List<String> sentMessages = new CopyOnWriteArrayList<>(); |
| 357 | + |
| 358 | + @Override |
| 359 | + public boolean send(String data) { |
| 360 | + sentMessages.add(data); |
| 361 | + return true; |
| 362 | + } |
| 363 | + |
| 364 | + @Override |
| 365 | + public void disconnect() { |
| 366 | + } |
| 367 | + |
| 368 | + @Override |
| 369 | + public String getRemoteAddress() { |
| 370 | + return "127.0.0.1:54321"; |
| 371 | + } |
| 372 | + } |
| 373 | +} |
0 commit comments