Skip to content

Commit 35c1b0a

Browse files
committed
opt for getConnection
1 parent db95e6e commit 35c1b0a

2 files changed

Lines changed: 41 additions & 19 deletions

File tree

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public enum Property {
136136
// when a big package is blocking the buffer but the server is OK.
137137
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
138138

139-
NETTY_CHECK_WRITABLE_ENABLED("bolt.netty.check.writable.enabled", true, "是否启用netty写缓存可写性检查"),
139+
NETTY_CHECK_WRITABLE_ENABLED("bolt.netty.check.writable.enabled", false, "是否启用netty写缓存可写性检查"),
140140

141141
// [ObTable][OTHERS]
142142
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),

src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -844,8 +844,10 @@ private static class ObTableConnectionPool {
844844
private final int obTableConnectionPoolSize;
845845
private ObTable obTable;
846846
private volatile AtomicReference<ObTableConnection[]> connectionPool;
847-
// round-robin scheduling
848-
private AtomicLong turn = new AtomicLong(0);
847+
// counter for checkAndReconnect: records the start position of each scan
848+
private AtomicLong reconnectTurn = new AtomicLong(0);
849+
// counter for getConnection: increments on each call to ensure random distribution
850+
private AtomicLong getConnectionTurn = new AtomicLong(0);
849851
private boolean shouldStopExpand = false;
850852
private ScheduledFuture<?> expandTaskFuture = null;
851853
private ScheduledFuture<?> checkAndReconnectFuture = null;
@@ -864,11 +866,14 @@ public ObTableConnectionPool(ObTable obTable, int connectionPoolSize) {
864866
public void init() throws Exception {
865867
// only create at most 2 connections for use
866868
// expand other connections (if needed) in the background
869+
int initConnectionNum = this.obTableConnectionPoolSize > 10 ? 10 : this.obTableConnectionPoolSize;
867870
connectionPool = new AtomicReference<ObTableConnection[]>();
868-
ObTableConnection[] curConnectionPool = new ObTableConnection[1];
869-
curConnectionPool[0] = new ObTableConnection(obTable, obTable.isOdpMode());
870-
curConnectionPool[0].enableLoginWithConfigs();
871-
curConnectionPool[0].init();
871+
ObTableConnection[] curConnectionPool = new ObTableConnection[initConnectionNum];
872+
for (int i = 0; i < initConnectionNum; i++) {
873+
curConnectionPool[i] = new ObTableConnection(obTable, obTable.isOdpMode());
874+
curConnectionPool[i].enableLoginWithConfigs();
875+
curConnectionPool[i].init();
876+
}
872877

873878
connectionPool.set(curConnectionPool);
874879
// check connection pool size and expand every 3 seconds
@@ -879,16 +884,20 @@ public void init() throws Exception {
879884

880885
/*
881886
* Get connection.
887+
* Use counter (getConnectionTurn) that increments on each connection access
888+
* to ensure random distribution of connections, regardless of expired connections.
882889
*/
883890
public ObTableConnection getConnection() {
884891
ObTableConnection[] connections = connectionPool.get();
885-
long nextTurn = turn.getAndIncrement();
886-
if (nextTurn == Long.MAX_VALUE) {
887-
turn.set(0);
888-
}
889-
int round = (int) (nextTurn % connections.length);
892+
893+
// Start from current counter position and find first valid connection
894+
// Increment counter for each connection accessed
890895
for (int i = 0; i < connections.length; i++) {
891-
int idx = (round + i) % connections.length;
896+
long nextTurn = getConnectionTurn.getAndIncrement();
897+
if (nextTurn == Long.MAX_VALUE) {
898+
getConnectionTurn.set(0);
899+
}
900+
int idx = (int) (nextTurn % connections.length);
892901
if (!connections[idx].isExpired()) {
893902
return connections[idx];
894903
}
@@ -902,12 +911,13 @@ public ObTableConnection getConnection() {
902911
* This method will not impact the connections in use and create a reasonable number of connections.
903912
* */
904913
private void checkAndExpandPool() {
905-
if (obTableConnectionPoolSize == 1 || shouldStopExpand) {
914+
ObTableConnection[] curConnections = connectionPool.get();
915+
if (curConnections.length == obTableConnectionPoolSize) {
906916
// stop the background task if pools reach the setting
907917
this.expandTaskFuture.cancel(false);
908918
return;
909919
}
910-
ObTableConnection[] curConnections = connectionPool.get();
920+
911921
if (curConnections.length < obTableConnectionPoolSize) {
912922
int diffSize = obTableConnectionPoolSize - curConnections.length;
913923
// limit expand size not too big to ensure the instant availability of connections
@@ -949,6 +959,9 @@ private void checkAndExpandPool() {
949959
* 2. Mark a third of the expired connections for reconnection.
950960
* 3. Pause for a predefined timeout period.
951961
* 4. Attempt to reconnect the marked connections.
962+
*
963+
* The scan starts from the end position of the previous scan to ensure
964+
* all connections are checked over time.
952965
**/
953966
private void checkAndReconnect() {
954967
if (obTableConnectionPoolSize == 1) {
@@ -960,9 +973,10 @@ private void checkAndReconnect() {
960973
// Iterate over the connection pool to identify connections that have expired
961974
List<Integer> expiredConnIds = new ArrayList<>();
962975
ObTableConnection[] connections = connectionPool.get();
963-
long num = turn.get();
964-
for (int i = 1; i <= connections.length; ++i) {
965-
int idx = (int) ((i + num) % connections.length);
976+
// Start from the end position of previous scan
977+
long startPos = reconnectTurn.get();
978+
for (int i = 0; i < connections.length; ++i) {
979+
int idx = (int) ((i + startPos) % connections.length);
966980
if (connections[idx].checkExpired()) {
967981
expiredConnIds.add(idx);
968982
}
@@ -975,9 +989,17 @@ private void checkAndReconnect() {
975989
connections[idx].setExpired(true);
976990
}
977991

992+
// Update counter to the end position of this scan for next time
993+
// Use modulo to keep it within reasonable range
994+
long nextStartPos = startPos + needReconnectCount;
995+
if (nextStartPos >= Long.MAX_VALUE) {
996+
nextStartPos = nextStartPos % connections.length;
997+
}
998+
reconnectTurn.set(nextStartPos);
999+
9781000
// Sleep for a predefined timeout period before attempting reconnection
9791001
try {
980-
Thread.sleep(RPC_EXECUTE_TIMEOUT.getDefaultInt());
1002+
Thread.sleep((long) (obTable.getObTableExecuteTimeout() * 1.5));
9811003
} catch (InterruptedException e) {
9821004
Thread.currentThread().interrupt();
9831005
}

0 commit comments

Comments
 (0)