Skip to content

Commit 9e3a8de

Browse files
authored
fix some bugs and add time trace (#428)
* fix some bugs and add time trace * fix review
1 parent 8fcfd9c commit 9e3a8de

File tree

11 files changed

+576
-35
lines changed

11 files changed

+576
-35
lines changed

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacket.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.protocol;
1919

20+
import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace;
2021
import com.alipay.oceanbase.rpc.protocol.packet.ObRpcPacketHeader;
2122
import com.alipay.remoting.CommandCode;
2223
import com.alipay.remoting.InvokeContext;
@@ -43,6 +44,9 @@ public class ObTablePacket implements RemotingCommand {
4344
private int transportCode;
4445
private String message;
4546
private Throwable cause;
47+
48+
// 时间追踪
49+
private ObTableTimeTrace timeTrace;
4650

4751
/**
4852
* Decode packet header.
@@ -205,6 +209,20 @@ public void setCause(Throwable cause) {
205209
this.cause = cause;
206210
}
207211

212+
/*
213+
* Get time trace.
214+
*/
215+
public ObTableTimeTrace getTimeTrace() {
216+
return timeTrace;
217+
}
218+
219+
/*
220+
* Set time trace.
221+
*/
222+
public void setTimeTrace(ObTableTimeTrace timeTrace) {
223+
this.timeTrace = timeTrace;
224+
}
225+
208226
// TODO useless for now
209227

210228
/*

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketEncoder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.alipay.oceanbase.rpc.bolt.protocol;
1919

20+
import com.alipay.oceanbase.rpc.bolt.transport.ObTableTimeTrace;
2021
import com.alipay.oceanbase.rpc.util.Serialization;
2122
import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
2223
import com.alipay.remoting.CommandEncoder;
@@ -48,6 +49,12 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr
4849
* -----------------------------------
4950
*/
5051
ObTablePacket cmd = (ObTablePacket) msg;
52+
53+
// Record when EventLoop starts encoding (reflects queue wait time)
54+
ObTableTimeTrace timeTrace = cmd.getTimeTrace();
55+
if (timeTrace != null) {
56+
timeTrace.markNettyEncodeStart();
57+
}
5158

5259
// 1. header
5360
out.writeBytes(ObTableProtocol.MAGIC_HEADER_FLAG);
@@ -57,6 +64,11 @@ public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) thr
5764

5865
// 2. payload
5966
out.writeBytes(cmd.getPacketContent());
67+
68+
// Record when encoding is complete
69+
if (timeTrace != null) {
70+
timeTrace.markNettyEncodeEnd();
71+
}
6072

6173
} else {
6274
String warnMsg = "msg type [" + msg.getClass() + "] is not subclass of ObCommand";

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,27 @@ public class ObClientFuture implements InvokeFuture {
4343
private static int BY_BACKGROUND = 2;
4444

4545
private AtomicInteger releaseFlag = new AtomicInteger(INIT);
46+
47+
// 响应接收时间
48+
private long responseReceivedTime;
49+
50+
// 时间追踪
51+
private ObTableTimeTrace timeTrace;
4652

4753
/*
4854
* Ob client future.
4955
*/
5056
public ObClientFuture(int channelId) {
5157
this.channelId = channelId;
5258
}
59+
60+
/*
61+
* Ob client future with time trace.
62+
*/
63+
public ObClientFuture(int channelId, ObTableTimeTrace timeTrace) {
64+
this.channelId = channelId;
65+
this.timeTrace = timeTrace;
66+
}
5367

5468
/*
5569
* Wait response.
@@ -87,6 +101,12 @@ public RemotingCommand waitResponse() throws InterruptedException {
87101
*/
88102
@Override
89103
public void putResponse(RemotingCommand response) {
104+
// 记录响应接收时间
105+
this.responseReceivedTime = System.currentTimeMillis();
106+
// 更新时间追踪
107+
if (timeTrace != null) {
108+
timeTrace.markResponseReceived();
109+
}
90110
this.response = response;
91111
waiter.countDown();
92112
if (!releaseFlag.compareAndSet(INIT, BY_WORKER)) {
@@ -96,6 +116,27 @@ public void putResponse(RemotingCommand response) {
96116
}
97117
}
98118

119+
/*
120+
* Get response received time.
121+
*/
122+
public long getResponseReceivedTime() {
123+
return responseReceivedTime;
124+
}
125+
126+
/*
127+
* Get time trace.
128+
*/
129+
public ObTableTimeTrace getTimeTrace() {
130+
return timeTrace;
131+
}
132+
133+
/*
134+
* Set time trace.
135+
*/
136+
public void setTimeTrace(ObTableTimeTrace timeTrace) {
137+
this.timeTrace = timeTrace;
138+
}
139+
99140
/*
100141
* Invoke id.
101142
*/

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ public void init() throws Exception {
9696
}
9797

9898
private boolean connect() throws Exception {
99+
return connect(obTable.getObTableConnectTimeout());
100+
}
101+
102+
private boolean connect(int connectTimeoutMs) throws Exception {
99103
if (checkAvailable()) { // double check status available
100104
return false;
101105
}
@@ -106,7 +110,7 @@ private boolean connect() throws Exception {
106110
for (; tries < maxTryTimes; tries++) {
107111
try {
108112
connection = obTable.getConnectionFactory().createConnection(obTable.getIp(),
109-
obTable.getPort(), obTable.getObTableConnectTimeout());
113+
obTable.getPort(), connectTimeoutMs);
110114
break;
111115
} catch (Exception e) {
112116
cause = e;
@@ -243,29 +247,29 @@ public void checkStatus() throws Exception {
243247
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
244248
reconnect("Check connection failed for address: " + connection.getUrl());
245249
}
246-
if (!connection.getChannel().isWritable()) {
247-
LOGGER.warn("The connection might be write overflow : " + connection.getUrl());
248-
// Wait some interval for the case when a big package is blocking the buffer but server is ok.
249-
// Don't bother to call flush() here as we invoke writeAndFlush() when send request.
250-
Thread.sleep(obTable.getNettyBlockingWaitInterval());
251-
if (!connection.getChannel().isWritable()) {
252-
throw new ObTableConnectionUnWritableException(
253-
"Check connection failed for address: " + connection.getUrl()
254-
+ ", maybe write overflow!");
255-
}
256-
}
257250
}
258251

259252
public void reConnectAndLogin(String msg) throws ObTableException {
253+
reConnectAndLogin(msg, obTable.getObTableConnectTimeout());
254+
}
255+
256+
/**
257+
* Reconnect and login with a specified connect timeout.
258+
* This is useful for background tasks that need a longer timeout to ensure connection success.
259+
*
260+
* @param msg the reconnect reason
261+
* @param connectTimeoutMs the connection timeout in milliseconds
262+
*/
263+
public void reConnectAndLogin(String msg, int connectTimeoutMs) throws ObTableException {
260264
try {
261265
// 1. check the connection is available, force to close it
262266
if (checkAvailable()) {
263267
LOGGER.info("The connection would be closed and reconnected is: "
264268
+ connection.getUrl());
265269
close();
266270
}
267-
// 2. reconnect
268-
reconnect(msg);
271+
// 2. reconnect with specified timeout
272+
reconnect(msg, connectTimeoutMs);
269273
} catch (ConnectException ex) {
270274
// cannot connect to ob server, need refresh table location
271275
throw new ObTableServerConnectException(ex);
@@ -286,9 +290,22 @@ public void reConnectAndLogin(String msg) throws ObTableException {
286290
*
287291
*/
288292
private void reconnect(String msg) throws Exception {
293+
reconnect(msg, obTable.getObTableConnectTimeout());
294+
}
295+
296+
/**
297+
* Reconnect current connection and login with specified timeout
298+
*
299+
* @param msg the reconnect reason
300+
* @param connectTimeoutMs the connection timeout in milliseconds
301+
* @exception Exception if connect successfully or connection already reconnected by others
302+
* throw exception if connect failed
303+
*
304+
*/
305+
private void reconnect(String msg, int connectTimeoutMs) throws Exception {
289306
if (isReConnecting.compareAndSet(false, true)) {
290307
try {
291-
if (connect()) {
308+
if (connect(connectTimeoutMs)) {
292309
LOGGER.info("reconnect success. reconnect reason: [{}]", msg);
293310
} else {
294311
LOGGER.info(

0 commit comments

Comments
 (0)