Skip to content

Commit 7682f66

Browse files
author
chengyitian
committed
JAVAOS-1340: support 'enableSeqNo' for MTW、ExclusiveDBConnectionPool; add 'setDBConnection(DBConnection conn, boolean enableSeqNo)' for DBTask;
1 parent f141d90 commit 7682f66

4 files changed

Lines changed: 68 additions & 30 deletions

File tree

src/com/xxdb/BasicDBTask.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
public class BasicDBTask implements DBTask {
1111
private String script;
12+
private boolean enableSeqNo = true;
1213
private List<Entity> args;
1314
private DBConnection conn;
1415
private Entity result = null;
@@ -17,6 +18,9 @@ public class BasicDBTask implements DBTask {
1718
private CountDownLatch latch;
1819
private int timeOut = -1;
1920

21+
private static final int DEFAULT_PRIORITY = 4;
22+
private static final int DEFAULT_PARALLELISM = 64;
23+
2024
public BasicDBTask(String script, List<Entity> args) {
2125
this.script = script;
2226
this.args = args;
@@ -30,10 +34,12 @@ public BasicDBTask(String script) {
3034
@Override
3135
public Entity call() {
3236
try {
33-
if (args != null)
34-
result = conn.run(script, args);
35-
else
36-
result = conn.run(script);
37+
if (args != null) {
38+
result = conn.run(script, args, DEFAULT_PRIORITY, DEFAULT_PARALLELISM, 0, enableSeqNo);
39+
} else {
40+
result = conn.run(script, null, DEFAULT_PRIORITY, DEFAULT_PARALLELISM, 0, false, "", enableSeqNo);
41+
}
42+
3743
errMsg = null;
3844
synchronized (this) {
3945
status = TaskStatus.SUCCESS;
@@ -55,6 +61,12 @@ public void setDBConnection(DBConnection conn) {
5561
this.conn = conn;
5662
}
5763

64+
@Override
65+
public void setDBConnection(DBConnection conn, boolean enableSeqNo) {
66+
this.conn = conn;
67+
this.enableSeqNo = enableSeqNo;
68+
}
69+
5870
@Override
5971
public Entity getResult() {
6072
if (status != TaskStatus.SUCCESS) {

src/com/xxdb/DBTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
public interface DBTask extends Callable<Entity>{
77
void setDBConnection(DBConnection conn);
8+
void setDBConnection(DBConnection conn, boolean enableSeqNo);
89
Entity getResult();
910
String getErrorMsg();
1011
String getScript();

src/com/xxdb/ExclusiveDBConnectionPool.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ public class ExclusiveDBConnectionPool implements DBConnectionPool {
2020

2121
private class AsyncWorker implements Runnable {
2222
private DBConnection conn_;
23+
boolean enableSeqNo = true;
2324
private final Thread workThread_;
2425

25-
public AsyncWorker(DBConnection conn) {
26+
public AsyncWorker(DBConnection conn, boolean enableSeqNo) {
2627
this.conn_ = conn;
28+
this.enableSeqNo = enableSeqNo;
2729
workThread_ = new Thread(this);
2830
workThread_.start();
2931
}
@@ -49,7 +51,7 @@ public void run() {
4951
break;
5052
}
5153
try {
52-
task.setDBConnection(conn_);
54+
task.setDBConnection(conn_, enableSeqNo);
5355
task.call();
5456
} catch (InterruptedException e) {
5557
break;
@@ -71,10 +73,14 @@ public void run() {
7173
}
7274

7375
public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd, int count, boolean loadBalance, boolean enableHighAvailability) throws IOException {
74-
this(host, port, uid, pwd, count, loadBalance, enableHighAvailability, null, "",false, false, false);
76+
this(host, port, uid, pwd, count, loadBalance, enableHighAvailability, null, "",false, false, false, true);
7577
}
7678

7779
public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd, int count, boolean loadBalance, boolean enableHighAvailability, String[] highAvailabilitySites, String initialScript,boolean compress, boolean useSSL, boolean usePython) throws IOException {
80+
this(host, port, uid, pwd, count, loadBalance, enableHighAvailability, highAvailabilitySites, initialScript, compress, useSSL, usePython, true);
81+
}
82+
83+
public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd, int count, boolean loadBalance, boolean enableHighAvailability, String[] highAvailabilitySites, String initialScript,boolean compress, boolean useSSL, boolean usePython, boolean enableSeqNo) throws IOException {
7884
if (count <= 0)
7985
throw new RuntimeException("The thread count can not be less than 0");
8086
if (!loadBalance) {
@@ -90,7 +96,7 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
9096
throw new RuntimeException("Can't connect to the specified host: ", e);
9197
}
9298

93-
workers_.add(new AsyncWorker(conn));
99+
workers_.add(new AsyncWorker(conn, enableSeqNo));
94100
}
95101
} else {
96102
BasicStringVector nodes = null;
@@ -117,7 +123,7 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
117123
DBConnection conn = new DBConnection(false, useSSL, compress, usePython);
118124
if(!conn.connect(hosts[i % nodeCount], ports[i % nodeCount], uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites,false,false))
119125
throw new RuntimeException("Can't connect to the host " + nodes.getString(i));
120-
workers_.add(new AsyncWorker(conn));
126+
workers_.add(new AsyncWorker(conn, enableSeqNo));
121127
}
122128
}
123129
}

0 commit comments

Comments
 (0)