diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java index 21e3bd2d..218dea5c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java @@ -52,6 +52,7 @@ public class ObTableConnection { private AtomicBoolean isExpired = new AtomicBoolean(false); private LocalDateTime lastConnectionTime; private boolean loginWithConfigs = false; + private boolean isOdpMode = false; public static long ipToLong(String strIp) { String[] ip = strIp.split("\\."); @@ -79,8 +80,9 @@ public void enableLoginWithConfigs() { /* * Ob table connection. */ - public ObTableConnection(ObTable obTable) { + public ObTableConnection(ObTable obTable, boolean isOdpMode) { this.obTable = obTable; + this.isOdpMode = isOdpMode; } /* @@ -157,6 +159,7 @@ private void login() throws Exception { request.setTenantName(obTable.getTenantName()); request.setUserName(obTable.getUserName()); request.setDatabaseName(obTable.getDatabase()); + request.setAllowDistributeCapability(isAllowDistributeCapability()); // When the caller doesn't provide any parameters, configsMap is empty. // In this case, we don't generate any JSON to avoid creating an empty object. if (loginWithConfigs && !obTable.getConfigs().isEmpty()) { @@ -402,4 +405,11 @@ private String logMessage(String traceId, String methodName, String endpoint, lo return stringBuilder.toString(); } + private boolean isAllowDistributeCapability() { + if (isOdpMode) { + return ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0; + } else { + return true; + } + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java index 011aec98..4f485e15 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java @@ -1583,11 +1583,10 @@ private static ObPartitionEntry getPartitionLocationFromResultSet(TableEntry tab long partitionId; partitionId = rs.getLong("tablet_id"); long lsId = withLsId ? rs.getLong("ls_id") : INVALID_LS_ID; - if (!rs.wasNull()) { - tabletLsIdMap.put(partitionId, lsId); - } else { - tabletLsIdMap.put(partitionId, INVALID_LS_ID); // non-partitioned table + if (withLsId && rs.wasNull()) { + lsId = INVALID_LS_ID; } + tabletLsIdMap.put(partitionId, lsId); ObPartitionLocationInfo partitionLocationInfo = partitionEntry .getPartitionInfo(partitionId); ObPartitionLocation location = partitionLocationInfo.getPartitionLocation(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java index 0bdb4b85..b597f5a8 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/login/ObTableLoginRequest.java @@ -54,6 +54,7 @@ public int getPcode() { private String databaseName; private long ttlUs; private String configsStr; + private boolean allowDistributeCapability; /* * Ob table login request. @@ -116,6 +117,9 @@ public byte[] encode() { idx += len; strbytes = Serialization.encodeVString(configsStr); System.arraycopy(strbytes, 0, bytes, idx, strbytes.length); + idx += strbytes.length; + System.arraycopy(Serialization.encodeI8(allowDistributeCapability ? (byte) 1 : (byte) 0), 0, + bytes, idx, 1); return bytes; } @@ -168,7 +172,7 @@ public long getPayloadContentSize() { + Serialization.getNeedBytes(userName) + Serialization.getNeedBytes(passSecret) + Serialization.getNeedBytes(passScramble) + Serialization.getNeedBytes(databaseName) + Serialization.getNeedBytes(ttlUs) - + Serialization.getNeedBytes(configsStr); + + Serialization.getNeedBytes(configsStr) + 1 /* allowDistributeCapability */; } /* @@ -318,6 +322,7 @@ public ObBytesString getPassSecret() { return passSecret; } + /* * Set pass secret. */ @@ -382,4 +387,12 @@ public void setConfigsStr(String configsStr) { this.configsStr = configsStr; } + public void setAllowDistributeCapability(boolean allowDistributeCapability) { + this.allowDistributeCapability = allowDistributeCapability; + } + + public boolean getAllowDistributeCapability() { + return this.allowDistributeCapability; + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index f94319bd..efde7f2c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -863,7 +863,7 @@ public void init() throws Exception { // expand other connections (if needed) in the background connectionPool = new AtomicReference(); ObTableConnection[] curConnectionPool = new ObTableConnection[1]; - curConnectionPool[0] = new ObTableConnection(obTable); + curConnectionPool[0] = new ObTableConnection(obTable, obTable.isOdpMode()); curConnectionPool[0].enableLoginWithConfigs(); curConnectionPool[0].init(); @@ -912,7 +912,7 @@ private void checkAndExpandPool() { List tmpConnections = new ArrayList<>(); for (int i = 0; i < expandSize; ++i) { try { - ObTableConnection tmpConnection = new ObTableConnection(obTable); + ObTableConnection tmpConnection = new ObTableConnection(obTable, obTable.isOdpMode()); tmpConnection.init(); tmpConnections.add(tmpConnection); } catch (Exception e) {