Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions quickfixj-core/src/main/java/quickfix/JdbcStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class JdbcStore implements MessageStore {
private final String defaultSessionIdPropertyValue;
private final boolean persistMessages;

private String SQL_UPDATE_SEQNUMS;
private String SQL_UPDATE_INCOMING_SEQNUM;
private String SQL_UPDATE_OUTGOING_SEQNUM;
private String SQL_INSERT_SESSION;
private String SQL_GET_SEQNUMS;
private String SQL_UPDATE_MESSAGE;
Expand Down Expand Up @@ -96,8 +97,12 @@ public static String getMessageTableName(SessionSettings settings, SessionID ses
}
}

public static String getUpdateSequenceNumsSql(String sessionTableName, String idWhereClause) {
return "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " + "outgoing_seqnum=? WHERE " + idWhereClause;
public static String getUpdateIncomingSequenceNumberSql(String sessionTableName, String idWhereClause) {
return "UPDATE " + sessionTableName + " SET incoming_seqnum=? WHERE " + idWhereClause;
}

public static String getUpdateOutgoingSequenceNumberSql(String sessionTableName, String idWhereClause) {
return "UPDATE " + sessionTableName + " SET outgoing_seqnum=? WHERE " + idWhereClause;
}

public static String getInsertSessionSql(String sessionTableName, String idColumns, String idPlaceholders) {
Expand Down Expand Up @@ -133,7 +138,8 @@ private void setSqlStrings() {
String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported);
String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported);

SQL_UPDATE_SEQNUMS = getUpdateSequenceNumsSql(sessionTableName, idWhereClause);
SQL_UPDATE_INCOMING_SEQNUM = getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause);
SQL_UPDATE_OUTGOING_SEQNUM = getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause);
SQL_INSERT_SESSION = getInsertSessionSql(sessionTableName, idColumns, idPlaceholders);
SQL_GET_SEQNUMS = getSequenceNumsSql(sessionTableName, idWhereClause);
SQL_UPDATE_MESSAGE = getUpdateMessageSql(messageTableName, idWhereClause);
Expand Down Expand Up @@ -293,23 +299,22 @@ public boolean set(int sequence, String message) throws IOException {

public void setNextSenderMsgSeqNum(int next) throws IOException {
cache.setNextSenderMsgSeqNum(next);
storeSequenceNumbers();
storeSequenceNumber(SQL_UPDATE_OUTGOING_SEQNUM, next);
}

public void setNextTargetMsgSeqNum(int next) throws IOException {
cache.setNextTargetMsgSeqNum(next);
storeSequenceNumbers();
storeSequenceNumber(SQL_UPDATE_INCOMING_SEQNUM, next);
}

private void storeSequenceNumbers() throws IOException {
private void storeSequenceNumber(String sequenceUpdateSql, int sequence) throws IOException {
Connection connection = null;
PreparedStatement update = null;
try {
connection = dataSource.getConnection();
update = connection.prepareStatement(SQL_UPDATE_SEQNUMS);
update.setInt(1, cache.getNextTargetMsgSeqNum());
update.setInt(2, cache.getNextSenderMsgSeqNum());
setSessionIdParameters(update, 3);
update = connection.prepareStatement(sequenceUpdateSql);
update.setInt(1, sequence);
setSessionIdParameters(update, 2);
update.execute();
} catch (SQLException e) {
throw new IOException(e.getMessage(), e);
Expand Down
6 changes: 3 additions & 3 deletions quickfixj-core/src/main/java/quickfix/MemoryStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public Calendar getCreationTimeCalendar() throws IOException {
return creationTime;
}

/* package */void setCreationTime(Calendar creationTime) {
void setCreationTime(Calendar creationTime) {
this.creationTime = creationTime;
}

public int getNextSenderMsgSeqNum() throws IOException {
public int getNextSenderMsgSeqNum() {
return nextSenderMsgSeqNum;
}

public int getNextTargetMsgSeqNum() throws IOException {
public int getNextTargetMsgSeqNum() {
return nextTargetMsgSeqNum;
}

Expand Down
30 changes: 30 additions & 0 deletions quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,36 @@ public void testSequenceNumbersWithCustomSessionsTableName() throws Exception {
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());
}

public void testIncrementSequences() throws ConfigError, SQLException, IOException {
initializeTableDefinitions("xsessions", "messages");
JdbcStore store = (JdbcStore) getMessageStoreFactory("xsessions", "messages").create(
getSessionID());
store.reset();

assertEquals("wrong value", 1, store.getNextSenderMsgSeqNum());
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());

store.incrNextSenderMsgSeqNum();

assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());

store.incrNextTargetMsgSeqNum();

assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
assertEquals("wrong value", 2, store.getNextTargetMsgSeqNum());

store.incrNextTargetMsgSeqNum();

assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum());

store.incrNextSenderMsgSeqNum();

assertEquals("wrong value", 3, store.getNextSenderMsgSeqNum());
assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum());
}

public void testMessageStorageMessagesWithCustomMessagesTableName() throws Exception {
initializeTableDefinitions("sessions", "xmessages");
JdbcStore store = (JdbcStore) getMessageStoreFactory("sessions", "xmessages").create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Calendar;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -85,8 +86,15 @@ private static JdbcStoreWrapper createWrapper() throws Exception {
// UPDATE SEQUENCE NUMS
Database database = new Database();

String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause);
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database)).when(connection).prepareStatement(updateSequenceNumsSql);
String updateIncomingSequenceNumberSql = JdbcStore.getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause);
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateTargetSequence))
.when(connection)
.prepareStatement(updateIncomingSequenceNumberSql);

String updateOutgoingSequenceNumberSql = JdbcStore.getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause);
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateSenderSequence))
.when(connection)
.prepareStatement(updateOutgoingSequenceNumberSql);

JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource);

Expand Down Expand Up @@ -280,11 +288,20 @@ public Database() {
this.targetSequence = -1;
}

public void update(int senderSequence, int targetSequence) {
public void updateSenderSequence(int senderSequence) {
lock.lock();

try {
this.senderSequence = senderSequence;
} finally {
lock.unlock();
}
}

public void updateTargetSequence(int targetSequence) {
lock.lock();

try {
this.targetSequence = targetSequence;
} finally {
lock.unlock();
Expand All @@ -294,14 +311,12 @@ public void update(int senderSequence, int targetSequence) {

private static final class UpdateSequenceStatement implements PreparedStatement {

private final Database database;
private int senderSequence;
private int targetSequence;
private final IntConsumer dbUpdater;
private int sequence;

public UpdateSequenceStatement(Database database) {
this.database = database;
this.senderSequence = -1;
this.targetSequence = -1;
public UpdateSequenceStatement(IntConsumer dbUpdater) {
this.dbUpdater = dbUpdater;
this.sequence = -1;
}

@Override
Expand Down Expand Up @@ -337,9 +352,7 @@ public void setShort(int parameterIndex, short x) {
@Override
public void setInt(int parameterIndex, int x) {
if (parameterIndex == 1) {
targetSequence = x;
} else if (parameterIndex == 2) {
senderSequence = x;
sequence = x;
}
}

Expand Down Expand Up @@ -419,7 +432,7 @@ public void setObject(int parameterIndex, Object x) {

@Override
public boolean execute() {
database.update(senderSequence, targetSequence);
dbUpdater.accept(sequence);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,11 @@ public void incrementTargetSequence() {
}

public int getSenderSequence() {
try {
return store.getNextSenderMsgSeqNum();
} catch (IOException e) {
throw new RuntimeException(e);
}
return store.getNextSenderMsgSeqNum();
}

public int getTargetSequence() {
try {
return store.getNextTargetMsgSeqNum();
} catch (IOException e) {
throw new RuntimeException(e);
}
return store.getNextTargetMsgSeqNum();
}
}
}