diff --git a/quickfixj-core/src/main/java/quickfix/JdbcStore.java b/quickfixj-core/src/main/java/quickfix/JdbcStore.java index 0087e9de8a..f05fb6e16b 100644 --- a/quickfixj-core/src/main/java/quickfix/JdbcStore.java +++ b/quickfixj-core/src/main/java/quickfix/JdbcStore.java @@ -43,7 +43,8 @@ class JdbcStore implements MessageStore { private final String defaultSessionIdPropertyValue; private final boolean persistMessages; - private String SQL_UPDATE_SEQNUMS; + private String SQL_UPDATE_INCOMING_SEQNUM; + private String SQL_UPDATE_OUTGOING_SEQNUM; private String SQL_INSERT_SESSION; private String SQL_GET_SEQNUMS; private String SQL_UPDATE_MESSAGE; @@ -96,8 +97,12 @@ public static String getMessageTableName(SessionSettings settings, SessionID ses } } - public static String getUpdateSequenceNumsSql(String sessionTableName, String idWhereClause) { - return "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " + "outgoing_seqnum=? WHERE " + idWhereClause; + public static String getUpdateIncomingSequenceNumberSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET incoming_seqnum=? WHERE " + idWhereClause; + } + + public static String getUpdateOutgoingSequenceNumberSql(String sessionTableName, String idWhereClause) { + return "UPDATE " + sessionTableName + " SET outgoing_seqnum=? WHERE " + idWhereClause; } public static String getInsertSessionSql(String sessionTableName, String idColumns, String idPlaceholders) { @@ -133,7 +138,8 @@ private void setSqlStrings() { String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported); String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported); - SQL_UPDATE_SEQNUMS = getUpdateSequenceNumsSql(sessionTableName, idWhereClause); + SQL_UPDATE_INCOMING_SEQNUM = getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause); + SQL_UPDATE_OUTGOING_SEQNUM = getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause); SQL_INSERT_SESSION = getInsertSessionSql(sessionTableName, idColumns, idPlaceholders); SQL_GET_SEQNUMS = getSequenceNumsSql(sessionTableName, idWhereClause); SQL_UPDATE_MESSAGE = getUpdateMessageSql(messageTableName, idWhereClause); @@ -293,23 +299,22 @@ public boolean set(int sequence, String message) throws IOException { public void setNextSenderMsgSeqNum(int next) throws IOException { cache.setNextSenderMsgSeqNum(next); - storeSequenceNumbers(); + storeSequenceNumber(SQL_UPDATE_OUTGOING_SEQNUM, next); } public void setNextTargetMsgSeqNum(int next) throws IOException { cache.setNextTargetMsgSeqNum(next); - storeSequenceNumbers(); + storeSequenceNumber(SQL_UPDATE_INCOMING_SEQNUM, next); } - private void storeSequenceNumbers() throws IOException { + private void storeSequenceNumber(String sequenceUpdateSql, int sequence) throws IOException { Connection connection = null; PreparedStatement update = null; try { connection = dataSource.getConnection(); - update = connection.prepareStatement(SQL_UPDATE_SEQNUMS); - update.setInt(1, cache.getNextTargetMsgSeqNum()); - update.setInt(2, cache.getNextSenderMsgSeqNum()); - setSessionIdParameters(update, 3); + update = connection.prepareStatement(sequenceUpdateSql); + update.setInt(1, sequence); + setSessionIdParameters(update, 2); update.execute(); } catch (SQLException e) { throw new IOException(e.getMessage(), e); diff --git a/quickfixj-core/src/main/java/quickfix/MemoryStore.java b/quickfixj-core/src/main/java/quickfix/MemoryStore.java index f3cb430ad4..9bbf967f83 100644 --- a/quickfixj-core/src/main/java/quickfix/MemoryStore.java +++ b/quickfixj-core/src/main/java/quickfix/MemoryStore.java @@ -74,15 +74,15 @@ public Calendar getCreationTimeCalendar() throws IOException { return creationTime; } - /* package */void setCreationTime(Calendar creationTime) { + void setCreationTime(Calendar creationTime) { this.creationTime = creationTime; } - public int getNextSenderMsgSeqNum() throws IOException { + public int getNextSenderMsgSeqNum() { return nextSenderMsgSeqNum; } - public int getNextTargetMsgSeqNum() throws IOException { + public int getNextTargetMsgSeqNum() { return nextTargetMsgSeqNum; } diff --git a/quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java b/quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java index b3fe5f93cb..e442f9881e 100644 --- a/quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java +++ b/quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java @@ -104,6 +104,36 @@ public void testSequenceNumbersWithCustomSessionsTableName() throws Exception { assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum()); } + public void testIncrementSequences() throws ConfigError, SQLException, IOException { + initializeTableDefinitions("xsessions", "messages"); + JdbcStore store = (JdbcStore) getMessageStoreFactory("xsessions", "messages").create( + getSessionID()); + store.reset(); + + assertEquals("wrong value", 1, store.getNextSenderMsgSeqNum()); + assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum()); + + store.incrNextSenderMsgSeqNum(); + + assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum()); + assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum()); + + store.incrNextTargetMsgSeqNum(); + + assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum()); + assertEquals("wrong value", 2, store.getNextTargetMsgSeqNum()); + + store.incrNextTargetMsgSeqNum(); + + assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum()); + assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum()); + + store.incrNextSenderMsgSeqNum(); + + assertEquals("wrong value", 3, store.getNextSenderMsgSeqNum()); + assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum()); + } + public void testMessageStorageMessagesWithCustomMessagesTableName() throws Exception { initializeTableDefinitions("sessions", "xmessages"); JdbcStore store = (JdbcStore) getMessageStoreFactory("sessions", "xmessages").create( diff --git a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java index 97f3fab224..f30fb20cbd 100644 --- a/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java +++ b/quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java @@ -34,6 +34,7 @@ import java.util.Calendar; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntConsumer; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -85,8 +86,15 @@ private static JdbcStoreWrapper createWrapper() throws Exception { // UPDATE SEQUENCE NUMS Database database = new Database(); - String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause); - doAnswer(invocationOnMock -> new UpdateSequenceStatement(database)).when(connection).prepareStatement(updateSequenceNumsSql); + String updateIncomingSequenceNumberSql = JdbcStore.getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause); + doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateTargetSequence)) + .when(connection) + .prepareStatement(updateIncomingSequenceNumberSql); + + String updateOutgoingSequenceNumberSql = JdbcStore.getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause); + doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateSenderSequence)) + .when(connection) + .prepareStatement(updateOutgoingSequenceNumberSql); JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource); @@ -280,11 +288,20 @@ public Database() { this.targetSequence = -1; } - public void update(int senderSequence, int targetSequence) { + public void updateSenderSequence(int senderSequence) { lock.lock(); try { this.senderSequence = senderSequence; + } finally { + lock.unlock(); + } + } + + public void updateTargetSequence(int targetSequence) { + lock.lock(); + + try { this.targetSequence = targetSequence; } finally { lock.unlock(); @@ -294,14 +311,12 @@ public void update(int senderSequence, int targetSequence) { private static final class UpdateSequenceStatement implements PreparedStatement { - private final Database database; - private int senderSequence; - private int targetSequence; + private final IntConsumer dbUpdater; + private int sequence; - public UpdateSequenceStatement(Database database) { - this.database = database; - this.senderSequence = -1; - this.targetSequence = -1; + public UpdateSequenceStatement(IntConsumer dbUpdater) { + this.dbUpdater = dbUpdater; + this.sequence = -1; } @Override @@ -337,9 +352,7 @@ public void setShort(int parameterIndex, short x) { @Override public void setInt(int parameterIndex, int x) { if (parameterIndex == 1) { - targetSequence = x; - } else if (parameterIndex == 2) { - senderSequence = x; + sequence = x; } } @@ -419,7 +432,7 @@ public void setObject(int parameterIndex, Object x) { @Override public boolean execute() { - database.update(senderSequence, targetSequence); + dbUpdater.accept(sequence); return true; } diff --git a/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java index 1ec31a0538..8dd5625557 100644 --- a/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java +++ b/quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java @@ -126,19 +126,11 @@ public void incrementTargetSequence() { } public int getSenderSequence() { - try { - return store.getNextSenderMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return store.getNextSenderMsgSeqNum(); } public int getTargetSequence() { - try { - return store.getNextTargetMsgSeqNum(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return store.getNextTargetMsgSeqNum(); } } }