Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ObDirectLoadConnection {
private long heartBeatTimeout = 0;
private long heartBeatInterval = 0;

private long connectTimeout = 0;

private boolean isInited = false;
private boolean isClosed = false;

Expand All @@ -61,7 +63,7 @@ public class ObDirectLoadConnection {

ObDirectLoadConnection(ObDirectLoadConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.traceId = ObDirectLoadTraceId.generateTraceId();
this.traceId = ObDirectLoadTraceIdGenerator.generate();
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
}

Expand Down Expand Up @@ -137,6 +139,8 @@ private void fillParams(Builder builder) throws ObDirectLoadException {
heartBeatInterval = builder.heartBeatInterval;

writeConnectionNum = builder.writeConnectionNum;

connectTimeout = builder.connectTimeout;
}

private void initCheck() throws ObDirectLoadException {
Expand Down Expand Up @@ -167,25 +171,30 @@ private void initCheck() throws ObDirectLoadException {
"Param 'heartBeatInterval' must not be greater than or equal to Param 'heartBeatTimeout', heartBeatTimeout:"
+ heartBeatTimeout + ", heartBeatInterval:" + heartBeatInterval);
}
ObDirectLoadUtil.checkPositive(connectTimeout, "connectTimeout", logger);
}

private void initProtocol() throws ObDirectLoadException {
// 构造一个连接, 获取版本号
long obVersion = 0;
ObTable table = null;
synchronized (connectionFactory) { // 防止并发访问ObGlobal.OB_VERSION
ObGlobal.OB_VERSION = 0;
try {
Properties properties = new Properties();
properties.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(),
String.valueOf(1));
properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(),
String.valueOf(connectTimeout));
table = new ObTable.Builder(ip, port)
.setLoginInfo(tenantName, userName, password, databaseName)
.setProperties(properties).build();
} catch (Exception e) {
throw new ObDirectLoadException(e);
}
obVersion = ObGlobal.OB_VERSION;
}
this.protocol = ObDirectLoadProtocolFactory.getProtocol(traceId, ObGlobal.OB_VERSION);
this.protocol = ObDirectLoadProtocolFactory.getProtocol(traceId, obVersion);
this.protocol.init();
table.close();
}
Expand Down Expand Up @@ -276,6 +285,8 @@ public static final class Builder {

private static final long MAX_HEART_BEAT_TIMEOUT = 1L * 365 * 24 * 3600 * 1000; // 1year

private long connectTimeout = 1000;

Builder(ObDirectLoadConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
Expand Down Expand Up @@ -306,12 +317,17 @@ public Builder setHeartBeatInfo(long heartBeatTimeout, long heartBeatInterval) {
return this;
}

public Builder setConnectTimeout(long connectTimeout) {
this.connectTimeout = Math.min(connectTimeout, (long) Integer.MAX_VALUE);
return this;
}

public String toString() {
return String
.format(
"{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d}",
"{ip:\"%s\", port:%d, tenantName:\"%s\", userName:\"%s\", databaseName:\"%s\", writeConnectionNum:%d, heartBeatTimeout:%d, heartBeatInterval:%d, connectTimeout:%d}",
ip, port, tenantName, userName, databaseName, writeConnectionNum,
heartBeatTimeout, heartBeatInterval);
heartBeatTimeout, heartBeatInterval, connectTimeout);
}

public ObDirectLoadConnection build() throws ObDirectLoadException {
Expand Down Expand Up @@ -370,6 +386,8 @@ private void initTables() throws ObDirectLoadException {
Properties properties = new Properties();
properties
.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), String.valueOf(1));
properties.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(),
String.valueOf(connection.connectTimeout));
properties.setProperty(Property.RPC_EXECUTE_TIMEOUT.getKey(),
String.valueOf(timeoutMillis));
properties.setProperty(Property.RPC_OPERATION_TIMEOUT.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ObDirectLoadStatement {

ObDirectLoadStatement(ObDirectLoadConnection connection) {
this.connection = connection;
this.traceId = ObDirectLoadTraceId.generateTraceId();
this.traceId = ObDirectLoadTraceIdGenerator.generate();
this.logger = ObDirectLoadLogger.getLogger(this.traceId);
}

Expand Down Expand Up @@ -309,7 +309,7 @@ public static final class Builder {
private long maxErrorRowCount = 0;
private String loadMethod = "full";

private static final long MAX_QUERY_TIMEOUT = 1L * 365 * 24 * 3600 * 1000; // 1year
private static final long MAX_QUERY_TIMEOUT = Integer.MAX_VALUE;

Builder(ObDirectLoadConnection connection) {
this.connection = connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package com.alipay.oceanbase.rpc.direct_load;

import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicLong;

public class ObDirectLoadTraceId {

private final long uniqueId;
Expand All @@ -43,50 +40,9 @@ public long getSequence() {
}

public static final ObDirectLoadTraceId DEFAULT_TRACE_ID;
public static TraceIdGenerator traceIdGenerator;

static {
DEFAULT_TRACE_ID = new ObDirectLoadTraceId(0, 0);
traceIdGenerator = new TraceIdGenerator();
}

public static ObDirectLoadTraceId generateTraceId() {
return traceIdGenerator.generate();
}

public static class TraceIdGenerator {

private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();

private final long uniqueId;
private AtomicLong sequence;

public TraceIdGenerator() {
long ip = 0;
try {
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
} catch (Exception e) {
logger.warn("get local host address failed", e);
}
long port = (long) (Math.random() % 65536) << 32;
long isUserRequest = (1l << (32 + 16));
long reserved = 0;
uniqueId = ip | port | isUserRequest | reserved;
sequence = new AtomicLong(0);
}

private static long ipToLong(String strIp) {
String[] ip = strIp.split("\\.");
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
}

public ObDirectLoadTraceId generate() {
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet()
% 1000;
return new ObDirectLoadTraceId(uniqueId, newSequence);
}

};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2025 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.direct_load;

import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicLong;

public class ObDirectLoadTraceIdGenerator {

private final ObDirectLoadLogger logger = ObDirectLoadLogger.getLogger();
private static ObDirectLoadTraceIdGenerator instance = new ObDirectLoadTraceIdGenerator();

private final long uniqueId;
private AtomicLong sequence;

public ObDirectLoadTraceIdGenerator() {
long ip = 0;
try {
ip = ipToLong(InetAddress.getLocalHost().getHostAddress());
} catch (Exception e) {
logger.warn("get local host address failed", e);
}
long port = (long) (Math.random() % 65536) << 32;
long isUserRequest = (1l << (32 + 16));
long reserved = 0;
uniqueId = ip | port | isUserRequest | reserved;
sequence = new AtomicLong(0);
}

private static long ipToLong(String strIp) {
String[] ip = strIp.split("\\.");
return (Long.parseLong(ip[0]) << 24) + (Long.parseLong(ip[1]) << 16)
+ (Long.parseLong(ip[2]) << 8) + (Long.parseLong(ip[3]));
}

public ObDirectLoadTraceId generateNextId() {
long newSequence = System.currentTimeMillis() * 1000 + sequence.incrementAndGet() % 1000;
return new ObDirectLoadTraceId(uniqueId, newSequence);
}

public static ObDirectLoadTraceId generate() {
return instance.generateNextId();
}

}