From 228634df3097b7b403e739d8945816743510ad09 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 9 Sep 2025 16:25:17 +0800 Subject: [PATCH] refresh tablet_id if the size of partIdMap is more than 1, do not directly return --- .../alipay/oceanbase/rpc/ObTableClient.java | 27 ++++++++++--------- .../rpc/location/model/TableRoute.java | 5 ++-- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 53c5abf9..4a1978bf 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -29,13 +29,11 @@ import com.alipay.oceanbase.rpc.protocol.payload.ObPayload; import com.alipay.oceanbase.rpc.protocol.payload.Pcodes; import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj; -import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregation; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateRequest; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult; -import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObBorderFlag; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery; import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest; @@ -48,7 +46,6 @@ import com.alipay.remoting.util.StringUtils; import org.slf4j.Logger; -import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -710,15 +707,18 @@ private T execute(String tableName, OperationExecuteCallback callback, if (refreshedTableMeta) { refreshedTableMeta = false; boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV; - tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV); + Map partIdMapObTable = tableRoute.refreshTabletLocationAndGetPartIdMap(tableName, callback.getQuery().getObTableQuery(), isHKV); + tableParam = partIdMapObTable.entrySet().iterator().next().getValue(); } else { tableRoute.refreshPartitionLocation(tableName, routeQueryTabletId, null); } } - ObTableQuery tableQuery = callback.getQuery().getObTableQuery(); - // using scan range - tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(), - tableQuery.getKeyRanges()); + if (tableParam == null) { + ObTableQuery tableQuery = callback.getQuery().getObTableQuery(); + // using scan range + tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(), + tableQuery.getKeyRanges()); + } routeQueryTabletId = tableParam.getPartitionId(); } else { throw new ObTableException("RowKey or scan range is null"); @@ -2317,6 +2317,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E request.setTimeout(getOdpTable().getObTableOperationTimeout()); return getOdpTable().execute(request); } else { + Map partIdMapObTable = null; // Recalculate partIdMapObTable if (needRefreshTabletLocation) { needRefreshTabletLocation = false; @@ -2324,14 +2325,16 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E refreshedTableMeta = false; // need to recalculate routing tablet_id and refresh location boolean isHKV = request.getEntityType() == ObTableEntityType.HKV; - tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV); + partIdMapObTable = tableRoute.refreshTabletLocationAndGetPartIdMap(request.getTableName(), tableQuery, isHKV); } else { tableRoute.refreshPartitionLocation(request.getTableName(), routeTabletId, null); } } - Map partIdMapObTable = tableRoute.getPartIdParamMapForQuery( - request.getTableName(), tableQuery.getScanRangeColumns(), - tableQuery.getKeyRanges()); + if (partIdMapObTable == null) { + partIdMapObTable = tableRoute.getPartIdParamMapForQuery( + request.getTableName(), tableQuery.getScanRangeColumns(), + tableQuery.getKeyRanges()); + } // Check if partIdMapObTable size is greater than 1 boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index 62e63901..d39eddfa 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -579,7 +579,7 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception } } - public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery query, boolean isHKV) throws Exception { + public Map refreshTabletLocationAndGetPartIdMap(String tableName, ObTableQuery query, boolean isHKV) throws Exception { Map partIdParamMap = getPartIdParamMapForQuery(tableName, query.getScanRangeColumns(), query.getKeyRanges()); if (isHKV) { // for HBase process, if distributed function is enabled, no need to do routing refresh @@ -588,7 +588,7 @@ public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery q throw new ObTablePartitionConsistentException( "query and mutate must be a atomic operation"); } else if (isDistributedSupported) { - return; + // do nothing } } else { // for table process, distributed function is not supported yet, need to refresh routing @@ -604,6 +604,7 @@ public void refreshTabletLocationForAtomicQuery(String tableName, ObTableQuery q TableEntry tableEntry = getTableEntry(tableName); long tabletId = entry.getValue().getTabletId(); refreshPartitionLocation(tableName, tabletId, tableEntry); + return partIdParamMap; } private Long[] getTabletsFromTableEntry(TableEntry tableEntry) {