From 646a37d6ce7c31a93b04c438f31dff94183e1d5d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 13 May 2025 15:43:48 +0800 Subject: [PATCH 1/2] use odp interfaces and seperate from ocp mode to refresh meta --- .../alipay/oceanbase/rpc/ObTableClient.java | 58 ++++++++++---- .../rpc/location/model/TableRoute.java | 76 +++++++++++-------- .../rpc/table/ObTableClientQueryImpl.java | 6 +- 3 files changed, 91 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index c7e58f58..7cfe3d6b 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -457,7 +457,7 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ int tryTimes = 0; boolean needRefreshPartitionLocation = false; long startExecute = System.currentTimeMillis(); - Row rowKey = transformToRow(tableName, callback.getRowKey()); + Row rowKey = odpMode ? null : transformToRow(tableName, callback.getRowKey()); while (true) { checkStatus(); long currentExecute = System.currentTimeMillis(); @@ -966,11 +966,17 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe * @param tableName table name * */ private TableEntry refreshMeta(String tableName) throws Exception { - if (odpMode) { - return tableRoute.refreshODPMeta(tableName, true); - } else { - return tableRoute.refreshMeta(tableName); - } + return tableRoute.refreshMeta(tableName); + } + + /** + * refresh table meta information except location + * work for both OcpMode and OdpMode + * only support by ODP version after 4.3.2 + * @param tableName table name + * */ + public TableEntry refreshOdpMeta(String tableName) throws Exception { + return tableRoute.refreshODPMeta(tableName, true); } /** @@ -1076,6 +1082,20 @@ public ObTableParam getTableParamWithPartId(String tableName, long partId, ObSer return tableRoute.getTableWithPartId(tableName, partId, route); } + /** + * get addr by pardId in ODP mode + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param partId logic of table + * @param route ObServer route + * @return ObPair of partId and table + * @throws Exception exception + */ + public ObTableParam getOdpTableParamWithPartId(String tableName, long partId, ObServerRoute route) + throws Exception { + return tableRoute.getOdpTableWithPartId(tableName, partId, route); + } + /** * * @param moveResponse reRoute response @@ -1910,14 +1930,17 @@ private Partition getSinglePartitionInternal(String tableName, Row rowKey, boole addRowKeyElement(tableName, rowKey.getColumns()); } ObTableParam tableParam = null; - if (refresh) { - if (odpMode) { + if (odpMode) { + if (refresh) { tableRoute.refreshODPMeta(tableName, true); - } else { + } + tableParam = tableRoute.getOdpTableParam(tableName, rowKey); + } else { + if (refresh) { tableRoute.refreshMeta(tableName); } + tableParam = tableRoute.getTableParam(tableName, rowKey); } - tableParam = tableRoute.getTableParam(tableName, rowKey); return new Partition(tableParam.getPartitionId(), tableParam.getPartId(), tableParam.getTableId(), tableParam.getObTable().getIp(), tableParam.getObTable() .getPort(), tableParam.getLsId()); @@ -1941,15 +1964,20 @@ public List getPartition(String tableName, boolean refresh) throws Ex */ private List getAllPartitionInternal(String tableName, boolean refresh) throws Exception { List partitions = new ArrayList<>(); - if (refresh) { - if (odpMode) { + List allTables; + if (odpMode) { + if (refresh) { tableRoute.refreshODPMeta(tableName, true); - } else { + } + allTables = tableRoute.getOdpTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, + new Object[]{ ObObj.getMax() }, true); + } else { + if (refresh) { tableRoute.refreshMeta(tableName); } + allTables = tableRoute.getTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, + new Object[]{ ObObj.getMax() }, true); } - List allTables = tableRoute.getTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, - new Object[]{ ObObj.getMax() }, true); for (ObTableParam tableParam : allTables) { Partition partition = new Partition(tableParam.getPartitionId(), tableParam.getPartId(), tableParam.getTableId(), tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId()); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index c40ed5bd..f32330c1 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -94,16 +94,23 @@ public void setConfigServerInfo(ConfigServerInfo configServerInfo) { * */ public TableEntry getTableEntry(String tableName) throws Exception { TableEntry tableEntry; - if (tableClient.isOdpMode()) { - tableEntry = odpTableLocations.getTableEntry(tableName); - if (tableEntry == null) { - tableEntry = refreshODPMeta(tableName, false); - } - } else { - tableEntry = tableLocations.getTableEntry(tableName); - if (tableEntry == null) { - tableEntry = refreshMeta(tableName); - } + tableEntry = tableLocations.getTableEntry(tableName); + if (tableEntry == null) { + tableEntry = refreshMeta(tableName); + } + return tableEntry; + } + + /** + * get ODP tableEntry by tableName, + * this methods will guarantee the tableEntry is not null + * only support by ODP version after 4.3.2 + * */ + public TableEntry getOdpTableEntry(String tableName) throws Exception { + TableEntry tableEntry; + tableEntry = odpTableLocations.getTableEntry(tableName); + if (tableEntry == null) { + tableEntry = refreshODPMeta(tableName, false); } return tableEntry; } @@ -631,6 +638,16 @@ public ObTableParam getTableParam(String tableName, Row rowkey) throws Exception return getTableParamWithRoute(tableName, rowkey, route); } + public ObTableParam getOdpTableParam(String tableName, Row rowkey) throws Exception { + TableEntry odpTableEntry = getOdpTableEntry(tableName); + if (odpTableEntry == null) { + logger.error("tableEntry is null, tableName: {}", tableName); + throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); + } + long partId = getPartId(odpTableEntry, rowkey); + return getODPTableInternal(odpTableEntry, partId); + } + public ObTableParam getTableParamWithRoute(String tableName, Row rowkey, ObServerRoute route) throws Exception { TableEntry tableEntry = getTableEntry(tableName); @@ -639,11 +656,7 @@ public ObTableParam getTableParamWithRoute(String tableName, Row rowkey, ObServe throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); } long partId = getPartId(tableEntry, rowkey); - if (tableClient.isOdpMode()) { - return getODPTableInternal(tableEntry, partId); - } else { - return getTableInternal(tableName, tableEntry, partId, route); - } + return getTableInternal(tableName, tableEntry, partId, route); } /** @@ -665,11 +678,7 @@ public List getTableParams(String tableName, List rowkeys) th for (Row rowkey : rowkeys) { long partId = getPartId(tableEntry, rowkey); ObTableParam param = null; - if (tableClient.isOdpMode()) { - param = getODPTableInternal(tableEntry, partId); - } else { - param = getTableInternal(tableName, tableEntry, partId, route); - } + param = getTableInternal(tableName, tableEntry, partId, route); params.add(param); } return params; @@ -728,11 +737,13 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, public ObTableParam getTableWithPartId(String tableName, long partId, ObServerRoute route) throws Exception { TableEntry tableEntry = getTableEntry(tableName); - if (tableClient.isOdpMode()) { - return getODPTableInternal(tableEntry, partId); - } else { - return getTableInternal(tableName, tableEntry, partId, route); - } + return getTableInternal(tableName, tableEntry, partId, route); + } + + public ObTableParam getOdpTableWithPartId(String tableName, long partId, ObServerRoute route) + throws Exception { + TableEntry tableEntry = getOdpTableEntry(tableName); + return getODPTableInternal(tableEntry, partId); } /** @@ -977,15 +988,18 @@ public ObTableParam getTableParam(String tableName, List scanRangeColumn public List getTableParams(String tableName, ObTableQuery query, Object[] start, boolean startInclusive, Object[] end, boolean endInclusive) throws Exception { - - if (tableClient.isOdpMode()) { - return getODPTablesInternal(tableName, query.getScanRangeColumns(), start, - startInclusive, end, endInclusive); - } return getTablesInternal(tableName, query.getScanRangeColumns(), start, startInclusive, end, endInclusive, tableClient.getRoute(false)); } + public List getOdpTableParams(String tableName, ObTableQuery query, Object[] start, + boolean startInclusive, Object[] end, + boolean endInclusive) throws Exception { + + return getODPTablesInternal(tableName, query.getScanRangeColumns(), start, + startInclusive, end, endInclusive); + } + private List getTablesInternal(String tableName, List scanRangeColumns, Object[] start, boolean startInclusive, Object[] end, boolean endInclusive, @@ -1083,7 +1097,7 @@ public List getODPTablesInternal(String tableName, List sc throw new IllegalArgumentException("length of start key and end key is not equal"); } List obTableParams = new ArrayList(); - TableEntry odpTableEntry = getTableEntry(tableName); + TableEntry odpTableEntry = getOdpTableEntry(tableName); if (scanRangeColumns == null || scanRangeColumns.isEmpty()) { Map tableEntryRowKeyElement = tableClient.getRowKeyElement(tableName); diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index 5cf7b178..1d220e52 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -193,7 +193,7 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } catch (Exception e) { @@ -204,8 +204,8 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } else { From 85c9f6b11cc530b1bea6c79c949f0e8c629ad8b6 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Tue, 13 May 2025 17:21:16 +0800 Subject: [PATCH 2/2] format code and optimize by review --- .../alipay/oceanbase/rpc/ObTableClient.java | 14 +- .../rpc/location/model/TableLocations.java | 3 +- .../rpc/location/model/TableRoute.java | 278 ++++++++++-------- .../rpc/table/ObTableClientQueryImpl.java | 7 +- 4 files changed, 162 insertions(+), 140 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 7cfe3d6b..57e9cf98 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -946,7 +946,6 @@ public void setRpcExecuteTimeout(int rpcExecuteTimeout) { /** * Get or refresh table entry meta information. - * work for both OcpMode and OdpMode * @param tableName table name * @return TableEntry * @throws Exception if fail @@ -962,7 +961,6 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe /** * refresh table meta information except location - * work for both OcpMode and OdpMode * @param tableName table name * */ private TableEntry refreshMeta(String tableName) throws Exception { @@ -971,12 +969,11 @@ private TableEntry refreshMeta(String tableName) throws Exception { /** * refresh table meta information except location - * work for both OcpMode and OdpMode * only support by ODP version after 4.3.2 * @param tableName table name * */ public TableEntry refreshOdpMeta(String tableName) throws Exception { - return tableRoute.refreshODPMeta(tableName, true); + return tableRoute.refreshOdpMeta(tableName, true); } /** @@ -1087,13 +1084,12 @@ public ObTableParam getTableParamWithPartId(String tableName, long partId, ObSer * only support by ODP version after 4.3.2 * @param tableName table want to get * @param partId logic of table - * @param route ObServer route * @return ObPair of partId and table * @throws Exception exception */ - public ObTableParam getOdpTableParamWithPartId(String tableName, long partId, ObServerRoute route) + public ObTableParam getOdpTableParamWithPartId(String tableName, long partId) throws Exception { - return tableRoute.getOdpTableWithPartId(tableName, partId, route); + return tableRoute.getOdpTableWithPartId(tableName, partId); } /** @@ -1932,7 +1928,7 @@ private Partition getSinglePartitionInternal(String tableName, Row rowKey, boole ObTableParam tableParam = null; if (odpMode) { if (refresh) { - tableRoute.refreshODPMeta(tableName, true); + tableRoute.refreshOdpMeta(tableName, true); } tableParam = tableRoute.getOdpTableParam(tableName, rowKey); } else { @@ -1967,7 +1963,7 @@ private List getAllPartitionInternal(String tableName, boolean refres List allTables; if (odpMode) { if (refresh) { - tableRoute.refreshODPMeta(tableName, true); + tableRoute.refreshOdpMeta(tableName, true); } allTables = tableRoute.getOdpTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, new Object[]{ ObObj.getMax() }, true); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java index 1990a4f2..99bc37c3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java @@ -533,13 +533,14 @@ private long getTableLevelRefreshInterval(ServerRoster serverRoster) { /** * fetch ODP partition meta information + * only support by ODP version after 4.3.2 * @param tableName table name to query * @param forceRefresh flag to force ODP to fetch the latest partition meta information * @param odpTable odp table to execute refreshing * @return TableEntry ODPTableEntry * @throws Exception Exception */ - public TableEntry refreshODPMeta(String tableName, boolean forceRefresh, ObTable odpTable) + public TableEntry refreshOdpMeta(String tableName, boolean forceRefresh, ObTable odpTable) throws Exception { if (tableName == null || tableName.isEmpty()) { throw new IllegalArgumentException("table name is null"); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index f32330c1..79873d32 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -101,20 +101,6 @@ public TableEntry getTableEntry(String tableName) throws Exception { return tableEntry; } - /** - * get ODP tableEntry by tableName, - * this methods will guarantee the tableEntry is not null - * only support by ODP version after 4.3.2 - * */ - public TableEntry getOdpTableEntry(String tableName) throws Exception { - TableEntry tableEntry; - tableEntry = odpTableLocations.getTableEntry(tableName); - if (tableEntry == null) { - tableEntry = refreshODPMeta(tableName, false); - } - return tableEntry; - } - /** * erase the tableEntry cached in tableLocations * */ @@ -594,13 +580,6 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception } } - /** - * get or refresh table meta information in odp mode - * */ - public TableEntry refreshODPMeta(String tableName, boolean forceRefresh) throws Exception { - return odpTableLocations.refreshODPMeta(tableName, forceRefresh, odpInfo.getObTable()); - } - private Long[] getTabletsFromTableEntry(TableEntry tableEntry) { Long[] tablets = null; if (tableEntry.isPartitionTable()) { @@ -628,7 +607,6 @@ private void validCachedObTableStatus(String tableName, TableEntry tableEntry, l } /** * get TableParam by tableName and rowkey - * work for both OcpMode and OdpMode * @param tableName tableName * @param rowkey row key or partition key names and values * @return ObTableParam tableParam @@ -638,16 +616,6 @@ public ObTableParam getTableParam(String tableName, Row rowkey) throws Exception return getTableParamWithRoute(tableName, rowkey, route); } - public ObTableParam getOdpTableParam(String tableName, Row rowkey) throws Exception { - TableEntry odpTableEntry = getOdpTableEntry(tableName); - if (odpTableEntry == null) { - logger.error("tableEntry is null, tableName: {}", tableName); - throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); - } - long partId = getPartId(odpTableEntry, rowkey); - return getODPTableInternal(odpTableEntry, partId); - } - public ObTableParam getTableParamWithRoute(String tableName, Row rowkey, ObServerRoute route) throws Exception { TableEntry tableEntry = getTableEntry(tableName); @@ -661,7 +629,6 @@ public ObTableParam getTableParamWithRoute(String tableName, Row rowkey, ObServe /** * get TableParam by tableName and rowkeys in batch - * work for both OcpMode and OdpMode * @param tableName tableName * @param rowkeys list of row key or partition key names and values * @return ObTableParam tableParam @@ -727,7 +694,7 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, } /** - * get addr by pardId + * get addr by partId * @param tableName table want to get * @param partId tabletId of table (real tablet id in 4.x) * @param route ObServer route @@ -740,12 +707,6 @@ public ObTableParam getTableWithPartId(String tableName, long partId, ObServerRo return getTableInternal(tableName, tableEntry, partId, route); } - public ObTableParam getOdpTableWithPartId(String tableName, long partId, ObServerRoute route) - throws Exception { - TableEntry tableEntry = getOdpTableEntry(tableName); - return getODPTableInternal(tableEntry, partId); - } - /** * get addr from table entry by partId * @param tableName table want to get @@ -821,24 +782,6 @@ private ObTableParam getTableInternal(String tableName, TableEntry tableEntry, l return param; } - /** - * get odp table entry by partId, just get meta information - * @param odpTableEntry odp tableEntry - * @param partId logicId of tablet - * @return ObTableParam table information for execution - */ - private ObTableParam getODPTableInternal(TableEntry odpTableEntry, long partId) { - ObTable obTable = odpInfo.getObTable(); - ObTableParam param = new ObTableParam(obTable); - param.setPartId(partId); - long tabletId = getTabletIdByPartId(odpTableEntry, partId); - param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); - param.setTableId(odpTableEntry.getTableId()); - // real partition(tablet) id - param.setPartitionId(tabletId); - return param; - } - private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo, ObServerRoute route) { return obPartitionLocationInfo.getPartitionLocation().getReplica(route); @@ -975,7 +918,6 @@ public ObTableParam getTableParam(String tableName, List scanRangeColumn /** * get TableParams by start-end range in this table - * work for both OcpMode and OdpMode * @param tableName table want to get * @param query query * @param start start key @@ -992,14 +934,6 @@ public List getTableParams(String tableName, ObTableQuery query, O end, endInclusive, tableClient.getRoute(false)); } - public List getOdpTableParams(String tableName, ObTableQuery query, Object[] start, - boolean startInclusive, Object[] end, - boolean endInclusive) throws Exception { - - return getODPTablesInternal(tableName, query.getScanRangeColumns(), start, - startInclusive, end, endInclusive); - } - private List getTablesInternal(String tableName, List scanRangeColumns, Object[] start, boolean startInclusive, Object[] end, boolean endInclusive, @@ -1089,64 +1023,6 @@ private List getTablesInternal(String tableName, List scan return params; } - public List getODPTablesInternal(String tableName, List scanRangeColumns, - Object[] start, boolean startInclusive, - Object[] end, boolean endInclusive) - throws Exception { - if (start.length != end.length) { - throw new IllegalArgumentException("length of start key and end key is not equal"); - } - List obTableParams = new ArrayList(); - TableEntry odpTableEntry = getOdpTableEntry(tableName); - - if (scanRangeColumns == null || scanRangeColumns.isEmpty()) { - Map tableEntryRowKeyElement = tableClient.getRowKeyElement(tableName); - if (tableEntryRowKeyElement != null) { - scanRangeColumns = new ArrayList(tableEntryRowKeyElement.keySet()); - } - } - // 2. get replica location - // partIdWithReplicaList -> List> - Row startRow = new Row(); - Row endRow = new Row(); - // ensure the format of column names and values if the current table is a table with partition - if (odpTableEntry.isPartitionTable() - && odpTableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_ZERO) { - if ((scanRangeColumns == null || scanRangeColumns.isEmpty()) && start.length == 1 - && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() && end.length == 1 - && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) { - // for getPartition to query all partitions - scanRangeColumns = new ArrayList(Collections.nCopies(start.length, - "partition")); - } - // scanRangeColumn may be longer than start/end in prefix scanning situation - if (scanRangeColumns == null || scanRangeColumns.size() < start.length) { - throw new IllegalArgumentException( - "length of key and scan range columns do not match, please use addRowKeyElement or set scan range columns"); - } - for (int i = 0; i < start.length; i++) { - startRow.add(scanRangeColumns.get(i), start[i]); - endRow.add(scanRangeColumns.get(i), end[i]); - } - } - - List partIds = getPartIds(odpTableEntry, startRow, startInclusive, endRow, - endInclusive); - for (Long partId : partIds) { - ObTable obTable = odpInfo.getObTable(); - ObTableParam param = new ObTableParam(obTable); - param.setPartId(partId); - long tabletId = getTabletIdByPartId(odpTableEntry, partId); - param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); - param.setTableId(odpTableEntry.getTableId()); - // real partition(tablet) id - param.setPartitionId(tabletId); - obTableParams.add(param); - } - - return obTableParams; - } - /** * 根据 start-end 获取 partition id 和 addr * @param tableEntry @@ -1246,6 +1122,158 @@ private List getPartIdsForLevelTwo(TableEntry tableEntry, Row startRow, return partIds; } + /*------------------------------------------------------------------------ODP routing------------------------------------------------------------------------*/ + + /** + * get ODP tableEntry by tableName, + * this methods will guarantee the tableEntry is not null + * only support by ODP version after 4.3.2 + * */ + public TableEntry getOdpTableEntry(String tableName) throws Exception { + TableEntry tableEntry; + tableEntry = odpTableLocations.getTableEntry(tableName); + if (tableEntry == null) { + tableEntry = refreshOdpMeta(tableName, false); + } + return tableEntry; + } + + /** + * get or refresh table meta information in odp mode + * only support by ODP version after 4.3.2 + * */ + public TableEntry refreshOdpMeta(String tableName, boolean forceRefresh) throws Exception { + return odpTableLocations.refreshOdpMeta(tableName, forceRefresh, odpInfo.getObTable()); + } + + /** + * get odp TableParam by tableName and rowkey + * only support by ODP version after 4.3.2 + * @param tableName tableName + * @param rowkey row key or partition key names and values + * @return ObTableParam tableParam + * */ + public ObTableParam getOdpTableParam(String tableName, Row rowkey) throws Exception { + TableEntry odpTableEntry = getOdpTableEntry(tableName); + if (odpTableEntry == null) { + logger.error("tableEntry is null, tableName: {}", tableName); + throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); + } + long partId = getPartId(odpTableEntry, rowkey); + return getOdpTableInternal(odpTableEntry, partId); + } + + /** + * get odp table addr by partId + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param partId tabletId of table (real tablet id in 4.x) + * @return ObTableParam table information for execution + * @throws Exception exception + */ + public ObTableParam getOdpTableWithPartId(String tableName, long partId) + throws Exception { + TableEntry tableEntry = getOdpTableEntry(tableName); + return getOdpTableInternal(tableEntry, partId); + } + + /** + * get odp table entry by partId, just get meta information + * only support by ODP version after 4.3.2 + * @param odpTableEntry odp tableEntry + * @param partId logicId of tablet + * @return ObTableParam table information for execution + */ + private ObTableParam getOdpTableInternal(TableEntry odpTableEntry, long partId) { + ObTable obTable = odpInfo.getObTable(); + ObTableParam param = new ObTableParam(obTable); + param.setPartId(partId); + long tabletId = getTabletIdByPartId(odpTableEntry, partId); + param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); + param.setTableId(odpTableEntry.getTableId()); + // real partition(tablet) id + param.setPartitionId(tabletId); + return param; + } + + /** + * get odp TableParams by start-end range in this table + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param query query + * @param start start key + * @param startInclusive whether include start key + * @param end end key + * @param endInclusive whether include end key + * @return list of table obTableParams + * @throws Exception exception + */ + public List getOdpTableParams(String tableName, ObTableQuery query, Object[] start, + boolean startInclusive, Object[] end, + boolean endInclusive) throws Exception { + + return getOdpTablesInternal(tableName, query.getScanRangeColumns(), start, + startInclusive, end, endInclusive); + } + + private List getOdpTablesInternal(String tableName, List scanRangeColumns, + Object[] start, boolean startInclusive, + Object[] end, boolean endInclusive) + throws Exception { + if (start.length != end.length) { + throw new IllegalArgumentException("length of start key and end key is not equal"); + } + List obTableParams = new ArrayList(); + TableEntry odpTableEntry = getOdpTableEntry(tableName); + + if (scanRangeColumns == null || scanRangeColumns.isEmpty()) { + Map tableEntryRowKeyElement = tableClient.getRowKeyElement(tableName); + if (tableEntryRowKeyElement != null) { + scanRangeColumns = new ArrayList(tableEntryRowKeyElement.keySet()); + } + } + // 2. get replica location + // partIdWithReplicaList -> List> + Row startRow = new Row(); + Row endRow = new Row(); + // ensure the format of column names and values if the current table is a table with partition + if (odpTableEntry.isPartitionTable() + && odpTableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_ZERO) { + if ((scanRangeColumns == null || scanRangeColumns.isEmpty()) && start.length == 1 + && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() && end.length == 1 + && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) { + // for getPartition to query all partitions + scanRangeColumns = new ArrayList(Collections.nCopies(start.length, + "partition")); + } + // scanRangeColumn may be longer than start/end in prefix scanning situation + if (scanRangeColumns == null || scanRangeColumns.size() < start.length) { + throw new IllegalArgumentException( + "length of key and scan range columns do not match, please use addRowKeyElement or set scan range columns"); + } + for (int i = 0; i < start.length; i++) { + startRow.add(scanRangeColumns.get(i), start[i]); + endRow.add(scanRangeColumns.get(i), end[i]); + } + } + + List partIds = getPartIds(odpTableEntry, startRow, startInclusive, endRow, + endInclusive); + for (Long partId : partIds) { + ObTable obTable = odpInfo.getObTable(); + ObTableParam param = new ObTableParam(obTable); + param.setPartId(partId); + long tabletId = getTabletIdByPartId(odpTableEntry, partId); + param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); + param.setTableId(odpTableEntry.getTableId()); + // real partition(tablet) id + param.setPartitionId(tabletId); + obTableParams.add(param); + } + + return obTableParams; + } + /*------------------------------------------------------------------------Table Group------------------------------------------------------------------------*/ /** diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index 1d220e52..9a3691d9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -191,10 +191,8 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } catch (Exception e) { if (e instanceof ObTableException) { @@ -205,8 +203,7 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } else { throw e;