Skip to content

Commit 1ff6f7a

Browse files
committed
refactor(matching-engine): snapshot preload
1 parent e67391e commit 1ff6f7a

3 files changed

Lines changed: 51 additions & 9 deletions

File tree

src/main/java/com/gitbitex/Bootstrap.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.gitbitex.marketdata.manager.TradeManager;
88
import com.gitbitex.marketdata.orderbook.OrderBookSnapshotManager;
99
import com.gitbitex.marketdata.repository.CandleRepository;
10+
import com.gitbitex.matchingengine.MatchingEngineLoader;
1011
import com.gitbitex.matchingengine.MatchingEngineThread;
1112
import com.gitbitex.matchingengine.MessageSender;
1213
import com.gitbitex.matchingengine.command.Command;
@@ -40,6 +41,7 @@ public class Bootstrap {
4041
private final AppProperties appProperties;
4142
private final KafkaProperties kafkaProperties;
4243
private final EngineSnapshotManager engineSnapshotManager;
44+
private final MatchingEngineLoader matchingEngineLoader;
4345
private final MessageSender messageSender;
4446
private final OrderBookSnapshotManager orderBookSnapshotManager;
4547
private final RedissonClient redissonClient;
@@ -61,7 +63,7 @@ private void startMatchingEngine(int nThreads) {
6163
for (int i = 0; i < nThreads; i++) {
6264
String groupId = "MatchingEngine";
6365
var consumer = getEngineCommandKafkaConsumer(groupId);
64-
var thread = new MatchingEngineThread(consumer, engineSnapshotManager, messageSender, appProperties);
66+
var thread = new MatchingEngineThread(consumer, matchingEngineLoader, appProperties);
6567
thread.setName(groupId + "-" + thread.getId());
6668
thread.setUncaughtExceptionHandler(getUncaughtExceptionHandler(() -> startMatchingEngine(1)));
6769
thread.start();
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.gitbitex.matchingengine;
2+
3+
import com.gitbitex.matchingengine.snapshot.EngineSnapshotManager;
4+
import lombok.Getter;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.stereotype.Component;
7+
8+
import javax.annotation.Nullable;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
12+
@Slf4j
13+
@Component
14+
public class MatchingEngineLoader {
15+
private final EngineSnapshotManager engineSnapshotManager;
16+
private final MessageSender messageSender;
17+
@Getter
18+
@Nullable
19+
private volatile MatchingEngine preperedMatchingEngine;
20+
21+
public MatchingEngineLoader(EngineSnapshotManager engineSnapshotManager, MessageSender messageSender) {
22+
this.engineSnapshotManager = engineSnapshotManager;
23+
this.messageSender = messageSender;
24+
startRefreshPreparingMatchingEnginePeriodically();
25+
}
26+
27+
private void startRefreshPreparingMatchingEnginePeriodically() {
28+
Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> {
29+
try {
30+
logger.info("reloading latest snapshot");
31+
preperedMatchingEngine = new MatchingEngine(engineSnapshotManager, messageSender);
32+
logger.info("done");
33+
} catch (Exception e) {
34+
logger.error("matching engine create error: {}", e.getMessage(), e);
35+
}
36+
}, 0, 1, TimeUnit.MINUTES);
37+
}
38+
39+
}

src/main/java/com/gitbitex/matchingengine/MatchingEngineThread.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.gitbitex.matchingengine;
22

33
import com.gitbitex.AppProperties;
4-
import com.gitbitex.matchingengine.command.*;
5-
import com.gitbitex.matchingengine.snapshot.EngineSnapshotManager;
4+
import com.gitbitex.matchingengine.command.Command;
65
import com.gitbitex.middleware.kafka.KafkaConsumerThread;
76
import lombok.extern.slf4j.Slf4j;
87
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -17,16 +16,15 @@
1716
public class MatchingEngineThread extends KafkaConsumerThread<String, Command>
1817
implements ConsumerRebalanceListener {
1918
private final AppProperties appProperties;
20-
private final EngineSnapshotManager engineSnapshotManager;
21-
private final MessageSender messageSender;
19+
private final MatchingEngineLoader matchingEngineLoader;
2220
private MatchingEngine matchingEngine;
2321

24-
public MatchingEngineThread(KafkaConsumer<String, Command> consumer, EngineSnapshotManager engineSnapshotManager, MessageSender messageSender,
22+
public MatchingEngineThread(KafkaConsumer<String, Command> consumer, MatchingEngineLoader matchingEngineLoader,
2523
AppProperties appProperties) {
2624
super(consumer, logger);
2725
this.appProperties = appProperties;
28-
this.engineSnapshotManager = engineSnapshotManager;
29-
this.messageSender = messageSender;
26+
this.matchingEngineLoader = matchingEngineLoader;
27+
3028
}
3129

3230
@Override
@@ -40,7 +38,10 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
4038
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
4139
for (TopicPartition partition : partitions) {
4240
logger.info("partition assigned: {}", partition.toString());
43-
matchingEngine = new MatchingEngine(engineSnapshotManager, messageSender);
41+
matchingEngine = matchingEngineLoader.getPreperedMatchingEngine();
42+
if (matchingEngine == null) {
43+
throw new RuntimeException("no prepared matching engine");
44+
}
4445
if (matchingEngine.getStartupCommandOffset() != null) {
4546
logger.info("seek to offset: {}", matchingEngine.getStartupCommandOffset() + 1);
4647
consumer.seek(partition, matchingEngine.getStartupCommandOffset() + 1);

0 commit comments

Comments
 (0)