-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathXYZTransactionHandler.java
More file actions
100 lines (84 loc) · 4.24 KB
/
XYZTransactionHandler.java
File metadata and controls
100 lines (84 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.here.xyz.pub;
import com.here.xyz.pub.handlers.PubJobHandler;
import com.here.xyz.pub.handlers.SeqJobHandler;
import com.here.xyz.pub.models.JdbcConnectionParams;
import com.here.xyz.pub.models.PubConfig;
import io.vertx.core.json.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/*
* Class responsible for periodically running couple of background jobs.
* 1) Sequencer Job:
* It adds incrementing sequence number to newer entries entering in transactions table.
* It spawns multiple threads, subject to thread pool capacity, with one thread per connector (i.e. per SpaceDB).
* 2) Publisher Job:
* It publishes transactions onto respective destinations subscribed as per xyz_subscriptions table.
* It spawns multiple threads, subject to thread pool capacity, with one thread per subscription.
*/
public class XYZTransactionHandler {
private static final Logger logger = LogManager.getLogger();
// holds singleton instance
private static XYZTransactionHandler handler;
private static JsonObject rawConfig;
private static PubConfig pubCfg;
private static JdbcConnectionParams adminDBConnParams;
// empty constructor to prevent external instantiation of singleton
private XYZTransactionHandler(final JsonObject config) {
rawConfig = config;
readConfig();
}
// create and return singleton instance
public static synchronized XYZTransactionHandler getInstance(final JsonObject config) {
if (handler == null) {
handler = new XYZTransactionHandler(config);
}
return handler;
}
// Read and validate publisher specific config
private static void readConfig() {
pubCfg = rawConfig.mapTo(PubConfig.class);
// Read AdminDB connection params
adminDBConnParams = new JdbcConnectionParams();
adminDBConnParams.setSpaceId("XYZ_ADMIN_DB");
adminDBConnParams.setDbUrl(pubCfg.STORAGE_DB_URL);
adminDBConnParams.setUser(pubCfg.STORAGE_DB_USER);
adminDBConnParams.setPswd(pubCfg.STORAGE_DB_PASSWORD);
adminDBConnParams.setMaxPoolSize(25); // keeping higher value to allow parallel seq/publisher jobs
// Set AWS account access details
System.setProperty("aws.accessKeyId", (pubCfg.AWS_ACCESS_KEY_ID!=null) ? pubCfg.AWS_ACCESS_KEY_ID : "");
System.setProperty("aws.secretAccessKey", (pubCfg.AWS_SECRET_ACCESS_KEY!=null) ? pubCfg.AWS_SECRET_ACCESS_KEY : "");
}
// Starts the periodic publisher job (if enabled in config)
public void start() {
// Start sequencer job (if enabled)
if (pubCfg.ENABLE_TXN_SEQUENCER) {
// Schedule Sequencer job (as per configured frequency e.g. 2 secs)
new ScheduledThreadPoolExecutor(1)
.scheduleWithFixedDelay(
new SeqJobHandler(pubCfg, adminDBConnParams),
pubCfg.TXN_SEQ_JOB_INITIAL_DELAY_MS, pubCfg.TXN_SEQ_JOB_SUBSEQUENT_DELAY_MS, TimeUnit.MILLISECONDS
);
logger.info("Transaction Sequencer job is set to start after {}ms with subsequent delay of {}ms.",
pubCfg.TXN_SEQ_JOB_INITIAL_DELAY_MS, pubCfg.TXN_SEQ_JOB_SUBSEQUENT_DELAY_MS);
}
else {
logger.warn("As per config, Transaction Sequencer is not enabled.");
}
// Start publisher job (if enabled)
if (pubCfg.ENABLE_TXN_PUBLISHER) {
// Schedule Publisher job (as per configured frequency e.g. 2 secs)
new ScheduledThreadPoolExecutor(1)
.scheduleWithFixedDelay(
new PubJobHandler(pubCfg, adminDBConnParams),
pubCfg.TXN_PUB_JOB_INITIAL_DELAY_MS, pubCfg.TXN_PUB_JOB_SUBSEQUENT_DELAY_MS, TimeUnit.MILLISECONDS
);
logger.info("Transaction Publisher job is set to start after {}ms with subsequent delay of {}ms.",
pubCfg.TXN_PUB_JOB_INITIAL_DELAY_MS, pubCfg.TXN_PUB_JOB_SUBSEQUENT_DELAY_MS);
}
else {
logger.warn("As per config, Transaction Publisher is not enabled.");
}
}
}