Skip to content

Commit 7948166

Browse files
authored
add config for enable/disable netty isWritable check && opt for connection pool (#434)
* add config for enable/disable netty isWritable check * fix exception info print * opt for getConnection * opt for connection pool
1 parent 6f3cda7 commit 7948166

File tree

4 files changed

+62
-19
lines changed

4 files changed

+62
-19
lines changed

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,15 +244,15 @@ public void checkStatus() throws Exception {
244244
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
245245
reconnect("Check connection failed for address: " + connection.getUrl());
246246
}
247-
if (!connection.getChannel().isWritable()) {
247+
if (obTable.isNettyCheckWritableEnabled() && !connection.getChannel().isWritable()) {
248248
LOGGER.warn("The connection might be write overflow : " + connection.getUrl());
249249
// Wait some interval for the case when a big package is blocking the buffer but server is ok.
250250
// Don't bother to call flush() here as we invoke writeAndFlush() when send request.
251251
Thread.sleep(obTable.getNettyBlockingWaitInterval());
252252
if (!connection.getChannel().isWritable()) {
253253
throw new ObTableConnectionUnWritableException(
254254
"Check connection failed for address: " + connection.getUrl()
255-
+ ", maybe write overflow!");
255+
+ ", maybe write overflow! Overflow bytes:" + connection.getChannel().bytesBeforeWritable());
256256
}
257257
}
258258
}

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ public enum Property {
136136
// when a big package is blocking the buffer but the server is OK.
137137
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),
138138

139+
NETTY_CHECK_WRITABLE_ENABLED("bolt.netty.check.writable.enabled", false, "是否启用netty写缓存可写性检查"),
140+
139141
// [ObTable][OTHERS]
140142
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),
141143

src/main/java/com/alipay/oceanbase/rpc/table/AbstractObTable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public abstract class AbstractObTable extends AbstractTable {
4141

4242
protected int nettyBlockingWaitInterval = NETTY_BLOCKING_WAIT_INTERVAL.getDefaultInt();
4343

44+
protected boolean nettyCheckWritableEnabled = NETTY_CHECK_WRITABLE_ENABLED.getDefaultBoolean();
45+
4446
protected long maxConnExpiredTime = MAX_CONN_EXPIRED_TIME.getDefaultLong();
4547

4648
/*
@@ -162,6 +164,13 @@ public int getNettyBlockingWaitInterval() {
162164
return nettyBlockingWaitInterval;
163165
}
164166

167+
/*
168+
* Get netty check writable enabled.
169+
*/
170+
public boolean isNettyCheckWritableEnabled() {
171+
return nettyCheckWritableEnabled;
172+
}
173+
165174
/*
166175
* Get connection max expired time
167176
*/

src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ private void initProperties() {
178178
nettyBufferHighWatermark);
179179
nettyBlockingWaitInterval = parseToInt(NETTY_BLOCKING_WAIT_INTERVAL.getKey(),
180180
nettyBlockingWaitInterval);
181+
nettyCheckWritableEnabled = parseToBoolean(NETTY_CHECK_WRITABLE_ENABLED.getKey(),
182+
nettyCheckWritableEnabled);
181183
enableRerouting = parseToBoolean(SERVER_ENABLE_REROUTING.getKey(), enableRerouting);
182184
maxConnExpiredTime = parseToLong(MAX_CONN_EXPIRED_TIME.getKey(), maxConnExpiredTime);
183185

@@ -842,8 +844,10 @@ private static class ObTableConnectionPool {
842844
private final int obTableConnectionPoolSize;
843845
private ObTable obTable;
844846
private volatile AtomicReference<ObTableConnection[]> connectionPool;
845-
// round-robin scheduling
846-
private AtomicLong turn = new AtomicLong(0);
847+
// counter for checkAndReconnect: records the start position of each scan
848+
private AtomicLong reconnectTurn = new AtomicLong(0);
849+
// counter for getConnection: increments on each call to ensure random distribution
850+
private AtomicLong getConnectionTurn = new AtomicLong(0);
847851
private boolean shouldStopExpand = false;
848852
private ScheduledFuture<?> expandTaskFuture = null;
849853
private ScheduledFuture<?> checkAndReconnectFuture = null;
@@ -862,11 +866,14 @@ public ObTableConnectionPool(ObTable obTable, int connectionPoolSize) {
862866
public void init() throws Exception {
863867
// only create at most 2 connections for use
864868
// expand other connections (if needed) in the background
869+
int initConnectionNum = this.obTableConnectionPoolSize > 10 ? 10 : this.obTableConnectionPoolSize;
865870
connectionPool = new AtomicReference<ObTableConnection[]>();
866-
ObTableConnection[] curConnectionPool = new ObTableConnection[1];
867-
curConnectionPool[0] = new ObTableConnection(obTable, obTable.isOdpMode());
868-
curConnectionPool[0].enableLoginWithConfigs();
869-
curConnectionPool[0].init();
871+
ObTableConnection[] curConnectionPool = new ObTableConnection[initConnectionNum];
872+
for (int i = 0; i < initConnectionNum; i++) {
873+
curConnectionPool[i] = new ObTableConnection(obTable, obTable.isOdpMode());
874+
curConnectionPool[i].enableLoginWithConfigs();
875+
curConnectionPool[i].init();
876+
}
870877

871878
connectionPool.set(curConnectionPool);
872879
// check connection pool size and expand every 3 seconds
@@ -877,16 +884,25 @@ public void init() throws Exception {
877884

878885
/*
879886
* Get connection.
887+
* Use counter (getConnectionTurn) that increments on each call to ensure random distribution.
888+
* Guarantees to return a valid connection if one exists, and avoids multiple threads
889+
* getting the same connection even when there are consecutive expired connections.
880890
*/
881891
public ObTableConnection getConnection() {
882892
ObTableConnection[] connections = connectionPool.get();
883-
long nextTurn = turn.getAndIncrement();
884-
if (nextTurn == Long.MAX_VALUE) {
885-
turn.set(0);
893+
894+
895+
// Get starting position from counter (increments on each call for randomness)
896+
long startTurn = getConnectionTurn.getAndIncrement();
897+
if (startTurn == Long.MAX_VALUE) {
898+
getConnectionTurn.set(0);
886899
}
887-
int round = (int) (nextTurn % connections.length);
900+
int startIdx = (int) (startTurn % connections.length);
901+
902+
// Traverse all connections starting from startIdx to guarantee finding a valid one
903+
// This ensures we check all connections if needed, avoiding null return
888904
for (int i = 0; i < connections.length; i++) {
889-
int idx = (round + i) % connections.length;
905+
int idx = (startIdx + i) % connections.length;
890906
if (!connections[idx].isExpired()) {
891907
return connections[idx];
892908
}
@@ -900,12 +916,13 @@ public ObTableConnection getConnection() {
900916
* This method will not impact the connections in use and create a reasonable number of connections.
901917
* */
902918
private void checkAndExpandPool() {
903-
if (obTableConnectionPoolSize == 1 || shouldStopExpand) {
919+
ObTableConnection[] curConnections = connectionPool.get();
920+
if (curConnections.length == obTableConnectionPoolSize) {
904921
// stop the background task if pools reach the setting
905922
this.expandTaskFuture.cancel(false);
906923
return;
907924
}
908-
ObTableConnection[] curConnections = connectionPool.get();
925+
909926
if (curConnections.length < obTableConnectionPoolSize) {
910927
int diffSize = obTableConnectionPoolSize - curConnections.length;
911928
// limit expand size not too big to ensure the instant availability of connections
@@ -947,6 +964,9 @@ private void checkAndExpandPool() {
947964
* 2. Mark a third of the expired connections for reconnection.
948965
* 3. Pause for a predefined timeout period.
949966
* 4. Attempt to reconnect the marked connections.
967+
*
968+
* The scan starts from the end position of the previous scan to ensure
969+
* all connections are checked over time.
950970
**/
951971
private void checkAndReconnect() {
952972
if (obTableConnectionPoolSize == 1) {
@@ -958,24 +978,36 @@ private void checkAndReconnect() {
958978
// Iterate over the connection pool to identify connections that have expired
959979
List<Integer> expiredConnIds = new ArrayList<>();
960980
ObTableConnection[] connections = connectionPool.get();
961-
long num = turn.get();
962-
for (int i = 1; i <= connections.length; ++i) {
963-
int idx = (int) ((i + num) % connections.length);
981+
// Start from the end position of previous scan
982+
long startPos = reconnectTurn.get();
983+
for (int i = 0; i < connections.length; ++i) {
984+
int idx = (int) ((i + startPos) % connections.length);
964985
if (connections[idx].checkExpired()) {
965986
expiredConnIds.add(idx);
966987
}
967988
}
968989

990+
// Shuffle the expired connection indices to avoid consecutive connections
991+
Collections.shuffle(expiredConnIds);
992+
969993
// Mark a third of the expired connections for reconnection
970994
int needReconnectCount = (int) Math.ceil(expiredConnIds.size() / 3.0);
971995
for (int i = 0; i < needReconnectCount; i++) {
972996
int idx = expiredConnIds.get(i);
973997
connections[idx].setExpired(true);
974998
}
975999

1000+
// Update counter to the end position of this scan for next time
1001+
// Use modulo to keep it within reasonable range
1002+
long nextStartPos = startPos + needReconnectCount;
1003+
if (nextStartPos >= Long.MAX_VALUE) {
1004+
nextStartPos = nextStartPos % connections.length;
1005+
}
1006+
reconnectTurn.set(nextStartPos);
1007+
9761008
// Sleep for a predefined timeout period before attempting reconnection
9771009
try {
978-
Thread.sleep(RPC_EXECUTE_TIMEOUT.getDefaultInt());
1010+
Thread.sleep((long) (obTable.getObTableExecuteTimeout() * 1.5));
9791011
} catch (InterruptedException e) {
9801012
Thread.currentThread().interrupt();
9811013
}

0 commit comments

Comments
 (0)