Skip to content

Commit 9f80901

Browse files
authored
Merge branch 'master' into obkv_442_features
2 parents 53e8b54 + 4b2a54f commit 9f80901

21 files changed

Lines changed: 207 additions & 262 deletions

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,26 @@ public static boolean isDistributeNeedTabletIdSupport() {
132132

133133
/*-------------------------------------------- OB_VERSION --------------------------------------------*/
134134

135+
public static final long OB_VERSION_4_2_1_0 = calcVersion(4, (short) 2, (byte) 1, (byte) 0);
136+
137+
public static final long OB_VERSION_4_2_1_7 = calcVersion(4, (short) 2, (byte) 1, (byte) 7);
138+
139+
public static final long OB_VERSION_4_2_2_0 = calcVersion(4, (short) 2, (byte) 2, (byte) 0);
140+
135141
public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0);
136142

143+
public static final long OB_VERSION_4_2_4_0 = calcVersion(4, (short) 2, (byte) 4, (byte) 0);
144+
137145
public static final long OB_VERSION_4_2_5_2 = calcVersion(4, (short) 2, (byte) 5, (byte) 2);
138146

147+
public static final long OB_VERSION_4_2_5_3 = calcVersion(4, (short) 2, (byte) 5, (byte) 3);
148+
139149
public static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);
140150

151+
public static final long OB_VERSION_4_3_0_1 = calcVersion(4, (short) 3, (byte) 0, (byte) 1);
152+
153+
public static final long OB_VERSION_4_3_2_0 = calcVersion(4, (short) 3, (byte) 2, (byte) 0);
154+
141155
public static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);
142156

143157
public static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
@@ -154,6 +168,10 @@ public static boolean isDistributeNeedTabletIdSupport() {
154168

155169
public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0);
156170

171+
public static final long OB_VERSION_4_4_2_0 = calcVersion(4, (short) 4, (byte) 2, (byte) 0);
172+
173+
public static final long OB_VERSION_4_5_0_0 = calcVersion(4, (short) 5, (byte) 0, (byte) 0);
174+
157175
public static long OB_VERSION = calcVersion(0, (short) 0, (byte) 0, (byte) 0);
158176

159177
/*-------------------------------------------- OB_PROXY_VERSION --------------------------------------------*/

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 15 additions & 91 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableRemoting.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,12 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
7878

7979
if (response == null) {
8080
String errMessage = TraceUtil.formatTraceMessage(conn, request, "get null response");
81-
logger.warn(errMessage);
8281
ExceptionUtil.throwObTableTransportException(errMessage,
8382
TransportCodes.BOLT_RESPONSE_NULL);
8483
return null;
8584
} else if (!response.isSuccess()) {
8685
String errMessage = TraceUtil.formatTraceMessage(conn, request,
8786
"get an error response: " + response.getMessage() + ", transportCode: " + response.getTransportCode());
88-
logger.warn(errMessage);
8987
response.releaseByteBuf();
9088
ExceptionUtil.throwObTableTransportException(errMessage, response.getTransportCode());
9189
return null;
@@ -101,7 +99,6 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
10199
request,
102100
"Rpc Result is compressed. Java Client is not supported. msg:"
103101
+ response.getMessage());
104-
logger.warn(errMessage);
105102
throw new FeatureNotSupportedException(errMessage);
106103
}
107104
ByteBuf buf = response.getPacketContentBuf();
@@ -112,7 +109,6 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
112109
if (ObPureCrc32C.calculate(content) != expected_checksum) {
113110
String errMessage = TraceUtil.formatTraceMessage(conn, request,
114111
"get response with checksum error: " + response.getMessage());
115-
logger.warn(errMessage);
116112
ExceptionUtil.throwObTableTransportException(errMessage,
117113
TransportCodes.BOLT_CHECKSUM_ERR);
118114
return null;

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class ObDirectLoadConnection {
6464

6565
ObDirectLoadConnection(ObDirectLoadConnectionFactory connectionFactory) {
6666
this.connectionFactory = connectionFactory;
67-
this.traceId = ObDirectLoadTraceId.generateTraceId();
67+
this.traceId = ObDirectLoadTraceIdGenerator.generate();
6868
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
6969
}
7070

@@ -259,7 +259,7 @@ ObDirectLoadStatement buildStatement(ObDirectLoadStatement.Builder builder)
259259
ObDirectLoadStatement stmt = null;
260260
try {
261261
final ObDirectLoadTraceId traceId = builder.getTraceId() != null ? builder.getTraceId()
262-
: ObDirectLoadTraceId.generateTraceId();
262+
: ObDirectLoadTraceIdGenerator.generate();
263263
stmt = createStatement(traceId);
264264
stmt.init(builder);
265265
} catch (Exception e) {

src/main/java/com/alipay/oceanbase/rpc/direct_load/ObDirectLoadTraceId.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package com.alipay.oceanbase.rpc.direct_load;
1919

20-
import java.net.InetAddress;
21-
import java.util.concurrent.atomic.AtomicLong;
22-
2320
import com.alipay.oceanbase.rpc.util.ObByteBuf;
2421
import com.alipay.oceanbase.rpc.util.Serialization;
2522

@@ -79,50 +76,9 @@ public int getEncodedSize() {
7976
}
8077

8178
public static final ObDirectLoadTraceId DEFAULT_TRACE_ID;
82-
public static TraceIdGenerator traceIdGenerator;
8379

8480
static {
8581
DEFAULT_TRACE_ID = new ObDirectLoadTraceId(0, 0);
86-
traceIdGenerator = new TraceIdGenerator();
87-
}
88-
89-
public static ObDirectLoadTraceId generateTraceId() {
90-
return traceIdGenerator.generate();
9182
}
9283

93-
public static class TraceIdGenerator {
94-
95-
private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
96-
97-
private final long uniqueId;
98-
private AtomicLong sequence;
99-
100-
public TraceIdGenerator() {
101-
long ip = 0;
102-
try {
103-
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
104-
} catch (Exception e) {
105-
logger.warn("get local host address failed", e);
106-
}
107-
long port = (long) (Math.random() % 65536) << 32;
108-
long isUserRequest = (1l << (32 + 16));
109-
long reserved = 0;
110-
uniqueId = ip | port | isUserRequest | reserved;
111-
sequence = new AtomicLong(0);
112-
}
113-
114-
private static long ipToLong(String strIp) {
115-
String[] ip = strIp.split("\\.");
116-
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
117-
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
118-
}
119-
120-
public ObDirectLoadTraceId generate() {
121-
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet()
122-
% 1000;
123-
return new ObDirectLoadTraceId(uniqueId, newSequence);
124-
}
125-
126-
};
127-
12884
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.direct_load;
19+
20+
import java.net.InetAddress;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
public class ObDirectLoadTraceIdGenerator {
24+
25+
private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
26+
private static ObDirectLoadTraceIdGenerator instance = new ObDirectLoadTraceIdGenerator();
27+
28+
private final long uniqueId;
29+
private AtomicLong sequence;
30+
31+
public ObDirectLoadTraceIdGenerator() {
32+
long ip = 0;
33+
try {
34+
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
35+
} catch (Exception e) {
36+
logger.warn("get local host address failed", e);
37+
}
38+
long port = (long) (Math.random() % 65536) << 32;
39+
long isUserRequest = (1l << (32 + 16));
40+
long reserved = 0;
41+
uniqueId = ip | port | isUserRequest | reserved;
42+
sequence = new AtomicLong(0);
43+
}
44+
45+
private static long ipToLong(String strIp) {
46+
String[] ip = strIp.split("\\.");
47+
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
48+
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
49+
}
50+
51+
public ObDirectLoadTraceId generateNextId() {
52+
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet() % 1000;
53+
return new ObDirectLoadTraceId(uniqueId, newSequence);
54+
}
55+
56+
public static ObDirectLoadTraceId generate() {
57+
return instance.generateNextId();
58+
}
59+
60+
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/ObDirectLoadProtocolFactory.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,40 +25,18 @@
2525

2626
public class ObDirectLoadProtocolFactory {
2727

28-
private static final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
29-
30-
// 起始版本
31-
// 4_2_1_release
32-
public static final long OB_VERSION_4_2_1_0 = ObGlobal.calcVersion(4, (short) 2,
33-
(byte) 1, (byte) 0);
34-
// 4_2_x_release
35-
public static final long OB_VERSION_4_2_2_0 = ObGlobal.calcVersion(4, (short) 2,
36-
(byte) 2, (byte) 0);
37-
// master
38-
public static final long OB_VERSION_4_3_0_0 = ObGlobal.calcVersion(4, (short) 3,
39-
(byte) 0, (byte) 0);
40-
41-
// 最低支持版本
42-
// 4_2_1_release
43-
public static final long OB_VERSION_4_2_1_7 = ObGlobal.calcVersion(4, (short) 2,
44-
(byte) 1, (byte) 7);
45-
// 4_2_x_release
46-
public static final long OB_VERSION_4_2_4_0 = ObGlobal.calcVersion(4, (short) 2,
47-
(byte) 4, (byte) 0);
48-
// master
49-
public static final long OB_VERSION_4_3_0_1 = ObGlobal.calcVersion(4, (short) 3,
50-
(byte) 0, (byte) 1);
28+
private static final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
5129

5230
public static long getSupportedMinimumObVersion(long obVersion) {
5331
long minimumObVersion = 0;
54-
if (obVersion < OB_VERSION_4_2_1_0) { // < 421
55-
minimumObVersion = OB_VERSION_4_2_1_7;
56-
} else if (obVersion < OB_VERSION_4_2_2_0) { // 421
57-
minimumObVersion = OB_VERSION_4_2_1_7;
58-
} else if (obVersion < OB_VERSION_4_3_0_0) { // 42x
59-
minimumObVersion = OB_VERSION_4_2_4_0;
32+
if (obVersion < ObGlobal.OB_VERSION_4_2_1_0) { // < 421
33+
minimumObVersion = ObGlobal.OB_VERSION_4_2_1_7;
34+
} else if (obVersion < ObGlobal.OB_VERSION_4_2_2_0) { // 421
35+
minimumObVersion = ObGlobal.OB_VERSION_4_2_1_7;
36+
} else if (obVersion < ObGlobal.OB_VERSION_4_3_0_0) { // 42x
37+
minimumObVersion = ObGlobal.OB_VERSION_4_2_4_0;
6038
} else { // master
61-
minimumObVersion = OB_VERSION_4_3_0_1;
39+
minimumObVersion = ObGlobal.OB_VERSION_4_3_0_1;
6240
}
6341
return minimumObVersion;
6442
}

src/main/java/com/alipay/oceanbase/rpc/direct_load/protocol/v0/ObDirectLoadProtocolV0.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,7 @@
2828

2929
public class ObDirectLoadProtocolV0 implements ObDirectLoadProtocol {
3030

31-
public static final long OB_VERSION_4_2_5_3 = ObGlobal.calcVersion(4, (short) 2,
32-
(byte) 5, (byte) 3);
33-
public static final long OB_VERSION_4_3_0_0 = ObGlobal.calcVersion(4, (short) 3,
34-
(byte) 0, (byte) 0);
35-
36-
public static final long OB_VERSION_4_3_2_0 = ObGlobal.calcVersion(4, (short) 3,
37-
(byte) 2, (byte) 0);
38-
public static final long OB_VERSION_4_3_5_0 = ObGlobal.calcVersion(4, (short) 3,
39-
(byte) 5, (byte) 0);
40-
41-
private static final int PROTOCOL_VERSION = 0;
31+
private static final int PROTOCOL_VERSION = 0;
4232
private final ObDirectLoadLogger logger;
4333
private final long obVersion;
4434

@@ -58,26 +48,27 @@ public int getProtocolVersion() {
5848

5949
@Override
6050
public void checkIsSupported(ObDirectLoadStatement statement) throws ObDirectLoadException {
61-
if (obVersion < OB_VERSION_4_3_2_0) {
51+
if (obVersion < ObGlobal.OB_VERSION_4_3_2_0) {
6252
// 432以下不支持inc|inc_replace
6353
String loadMethod = statement.getLoadMethod();
6454
if (!loadMethod.isEmpty() && !loadMethod.equalsIgnoreCase("full")) {
6555
logger.warn("load method in ob version " + ObGlobal.getObVsnString(obVersion)
6656
+ "is not supported, minimum version required is "
67-
+ ObGlobal.getObVsnString(OB_VERSION_4_3_2_0));
57+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_3_2_0));
6858
throw new ObDirectLoadNotSupportedException(
6959
"load method in ob version " + ObGlobal.getObVsnString(obVersion)
7060
+ " is not supported, minimum version required is "
71-
+ ObGlobal.getObVsnString(OB_VERSION_4_3_2_0));
61+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_3_2_0));
7262
}
73-
} else if (obVersion < OB_VERSION_4_3_5_0 && statement.getPartitionNames().length > 0) {
63+
} else if (obVersion < ObGlobal.OB_VERSION_4_3_5_0
64+
&& statement.getPartitionNames().length > 0) {
7465
logger.warn("partition names in ob version " + ObGlobal.getObVsnString(obVersion)
7566
+ "is not supported, minimum version required is "
76-
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
67+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_3_5_0));
7768
throw new ObDirectLoadNotSupportedException(
7869
"partition names in ob version " + ObGlobal.getObVsnString(obVersion)
7970
+ " is not supported, minimum version required is "
80-
+ ObGlobal.getObVsnString(OB_VERSION_4_3_5_0));
71+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_3_5_0));
8172
}
8273
}
8374

@@ -114,15 +105,25 @@ public ObDirectLoadHeartBeatRpc getHeartBeatRpc(ObDirectLoadTraceId traceId) {
114105
@Override
115106
public ObDirectLoadDetachRpc getDetachRpc(ObDirectLoadTraceId traceId)
116107
throws ObDirectLoadException {
117-
if (obVersion < OB_VERSION_4_3_0_0) {
118-
if (obVersion < OB_VERSION_4_2_5_3) {
108+
if (obVersion < ObGlobal.OB_VERSION_4_3_0_0) {
109+
if (obVersion < ObGlobal.OB_VERSION_4_2_5_3) {
110+
logger.warn("detach in ob version " + ObGlobal.getObVsnString(obVersion)
111+
+ "is not supported, minimum version required is "
112+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_2_5_3));
113+
throw new ObDirectLoadNotSupportedException(
114+
"detach in ob version " + ObGlobal.getObVsnString(obVersion)
115+
+ " is not supported, minimum version required is "
116+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_2_5_3));
117+
}
118+
} else if (obVersion < ObGlobal.OB_VERSION_4_5_0_0) {
119+
if (obVersion < ObGlobal.OB_VERSION_4_4_2_0) {
119120
logger.warn("detach in ob version " + ObGlobal.getObVsnString(obVersion)
120121
+ "is not supported, minimum version required is "
121-
+ ObGlobal.getObVsnString(OB_VERSION_4_2_5_3));
122+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_4_2_0));
122123
throw new ObDirectLoadNotSupportedException(
123124
"detach in ob version " + ObGlobal.getObVsnString(obVersion)
124125
+ " is not supported, minimum version required is "
125-
+ ObGlobal.getObVsnString(OB_VERSION_4_2_5_3));
126+
+ ObGlobal.getObVsnString(ObGlobal.OB_VERSION_4_4_2_0));
126127
}
127128
} else {
128129
logger.warn("detach in ob version " + ObGlobal.getObVsnString(obVersion)

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import java.sql.*;
4444
import java.text.MessageFormat;
4545
import java.util.*;
46+
import java.util.regex.Matcher;
47+
import java.util.regex.Pattern;
4648

4749
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MAX_PARTITION_ELEMENT;
4850
import static com.alipay.oceanbase.rpc.location.model.partition.ObPartitionKey.MIN_PARTITION_ELEMENT;
@@ -1235,6 +1237,18 @@ public static Long getTableIdFromRemote(ObServerAddr obServerAddr, ObUserAuth sy
12351237
return tableId;
12361238
}
12371239

1240+
private static String getActualIndexName(String indexTableName) {
1241+
String actualIndexName = null;
1242+
if (indexTableName != null && indexTableName.startsWith("__idx_")) {
1243+
Pattern pattern = Pattern.compile("^__idx_\\d+_(.+)$");
1244+
Matcher matcher = pattern.matcher(indexTableName);
1245+
if (matcher.matches()) {
1246+
actualIndexName = matcher.group(1);
1247+
}
1248+
}
1249+
return actualIndexName == null ? indexTableName == null ? "" : indexTableName : actualIndexName;
1250+
}
1251+
12381252
public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUserAuth sysUA,
12391253
long connectTimeout, long socketTimeout,
12401254
String indexTableName)
@@ -1255,22 +1269,22 @@ public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUs
12551269
indexInfo.setIndexTableId(rs.getLong("table_id"));
12561270
indexInfo.setIndexType(ObIndexType.valueOf(rs.getInt("index_type")));
12571271
} else {
1258-
throw new ObTableEntryRefreshException("index is not exist");
1272+
throw new ObTableEntryRefreshException("[-5224][OB_WRONG_NAME_FOR_INDEX] index name is wrong");
12591273
}
12601274
} catch (SQLException e) {
12611275
// cannot execute sql, maybe some of the observers have been killed
1262-
RUNTIME.error(LCD.convert("01-00010"), indexTableName, e.getMessage());
1276+
RUNTIME.error(LCD.convert("01-00010"), getActualIndexName(indexTableName), e.getMessage());
12631277
throw new ObTableEntryRefreshException("fail to get index info from remote", e, true);
12641278
} catch (Exception e) {
12651279
if (e instanceof ObTableEntryRefreshException) {
12661280
throw new ObTableEntryRefreshException(format(
1267-
"fail to get index info from remote, indexTableName: %s, error message: %s",
1268-
indexTableName, e.getMessage()), e,
1281+
"fail to get index info from remote, indexName: %s, error message: %s",
1282+
getActualIndexName(indexTableName), e.getMessage()), e,
12691283
((ObTableEntryRefreshException) e).isConnectInactive());
12701284
} else {
12711285
throw new ObTableEntryRefreshException(format(
1272-
"fail to get index info from remote, indexTableName: %s, error message: %s",
1273-
indexTableName, e.getMessage()), e);
1286+
"fail to get index info from remote, indexName: %s, error message: %s",
1287+
getActualIndexName(indexTableName), e.getMessage()), e);
12741288
}
12751289
} finally {
12761290
try {

0 commit comments

Comments
 (0)