diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index c7e58f58..57e9cf98 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -457,7 +457,7 @@ private T execute(String tableName, TableExecuteCallback callback, ObServ int tryTimes = 0; boolean needRefreshPartitionLocation = false; long startExecute = System.currentTimeMillis(); - Row rowKey = transformToRow(tableName, callback.getRowKey()); + Row rowKey = odpMode ? null : transformToRow(tableName, callback.getRowKey()); while (true) { checkStatus(); long currentExecute = System.currentTimeMillis(); @@ -946,7 +946,6 @@ public void setRpcExecuteTimeout(int rpcExecuteTimeout) { /** * Get or refresh table entry meta information. - * work for both OcpMode and OdpMode * @param tableName table name * @return TableEntry * @throws Exception if fail @@ -962,15 +961,19 @@ public TableEntry getOrRefreshTableEntry(final String tableName, boolean forceRe /** * refresh table meta information except location - * work for both OcpMode and OdpMode * @param tableName table name * */ private TableEntry refreshMeta(String tableName) throws Exception { - if (odpMode) { - return tableRoute.refreshODPMeta(tableName, true); - } else { - return tableRoute.refreshMeta(tableName); - } + return tableRoute.refreshMeta(tableName); + } + + /** + * refresh table meta information except location + * only support by ODP version after 4.3.2 + * @param tableName table name + * */ + public TableEntry refreshOdpMeta(String tableName) throws Exception { + return tableRoute.refreshOdpMeta(tableName, true); } /** @@ -1076,6 +1079,19 @@ public ObTableParam getTableParamWithPartId(String tableName, long partId, ObSer return tableRoute.getTableWithPartId(tableName, partId, route); } + /** + * get addr by pardId in ODP mode + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param partId logic of table + * @return ObPair of partId and table + * @throws Exception exception + */ + public ObTableParam getOdpTableParamWithPartId(String tableName, long partId) + throws Exception { + return tableRoute.getOdpTableWithPartId(tableName, partId); + } + /** * * @param moveResponse reRoute response @@ -1910,14 +1926,17 @@ private Partition getSinglePartitionInternal(String tableName, Row rowKey, boole addRowKeyElement(tableName, rowKey.getColumns()); } ObTableParam tableParam = null; - if (refresh) { - if (odpMode) { - tableRoute.refreshODPMeta(tableName, true); - } else { + if (odpMode) { + if (refresh) { + tableRoute.refreshOdpMeta(tableName, true); + } + tableParam = tableRoute.getOdpTableParam(tableName, rowKey); + } else { + if (refresh) { tableRoute.refreshMeta(tableName); } + tableParam = tableRoute.getTableParam(tableName, rowKey); } - tableParam = tableRoute.getTableParam(tableName, rowKey); return new Partition(tableParam.getPartitionId(), tableParam.getPartId(), tableParam.getTableId(), tableParam.getObTable().getIp(), tableParam.getObTable() .getPort(), tableParam.getLsId()); @@ -1941,15 +1960,20 @@ public List getPartition(String tableName, boolean refresh) throws Ex */ private List getAllPartitionInternal(String tableName, boolean refresh) throws Exception { List partitions = new ArrayList<>(); - if (refresh) { - if (odpMode) { - tableRoute.refreshODPMeta(tableName, true); - } else { + List allTables; + if (odpMode) { + if (refresh) { + tableRoute.refreshOdpMeta(tableName, true); + } + allTables = tableRoute.getOdpTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, + new Object[]{ ObObj.getMax() }, true); + } else { + if (refresh) { tableRoute.refreshMeta(tableName); } + allTables = tableRoute.getTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, + new Object[]{ ObObj.getMax() }, true); } - List allTables = tableRoute.getTableParams(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true, - new Object[]{ ObObj.getMax() }, true); for (ObTableParam tableParam : allTables) { Partition partition = new Partition(tableParam.getPartitionId(), tableParam.getPartId(), tableParam.getTableId(), tableParam.getObTable().getIp(), tableParam.getObTable().getPort(), tableParam.getLsId()); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java index 1990a4f2..99bc37c3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableLocations.java @@ -533,13 +533,14 @@ private long getTableLevelRefreshInterval(ServerRoster serverRoster) { /** * fetch ODP partition meta information + * only support by ODP version after 4.3.2 * @param tableName table name to query * @param forceRefresh flag to force ODP to fetch the latest partition meta information * @param odpTable odp table to execute refreshing * @return TableEntry ODPTableEntry * @throws Exception Exception */ - public TableEntry refreshODPMeta(String tableName, boolean forceRefresh, ObTable odpTable) + public TableEntry refreshOdpMeta(String tableName, boolean forceRefresh, ObTable odpTable) throws Exception { if (tableName == null || tableName.isEmpty()) { throw new IllegalArgumentException("table name is null"); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java index c40ed5bd..79873d32 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/TableRoute.java @@ -94,16 +94,9 @@ public void setConfigServerInfo(ConfigServerInfo configServerInfo) { * */ public TableEntry getTableEntry(String tableName) throws Exception { TableEntry tableEntry; - if (tableClient.isOdpMode()) { - tableEntry = odpTableLocations.getTableEntry(tableName); - if (tableEntry == null) { - tableEntry = refreshODPMeta(tableName, false); - } - } else { - tableEntry = tableLocations.getTableEntry(tableName); - if (tableEntry == null) { - tableEntry = refreshMeta(tableName); - } + tableEntry = tableLocations.getTableEntry(tableName); + if (tableEntry == null) { + tableEntry = refreshMeta(tableName); } return tableEntry; } @@ -587,13 +580,6 @@ public TableEntry refreshTabletLocationBatch(String tableName) throws Exception } } - /** - * get or refresh table meta information in odp mode - * */ - public TableEntry refreshODPMeta(String tableName, boolean forceRefresh) throws Exception { - return odpTableLocations.refreshODPMeta(tableName, forceRefresh, odpInfo.getObTable()); - } - private Long[] getTabletsFromTableEntry(TableEntry tableEntry) { Long[] tablets = null; if (tableEntry.isPartitionTable()) { @@ -621,7 +607,6 @@ private void validCachedObTableStatus(String tableName, TableEntry tableEntry, l } /** * get TableParam by tableName and rowkey - * work for both OcpMode and OdpMode * @param tableName tableName * @param rowkey row key or partition key names and values * @return ObTableParam tableParam @@ -639,16 +624,11 @@ public ObTableParam getTableParamWithRoute(String tableName, Row rowkey, ObServe throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); } long partId = getPartId(tableEntry, rowkey); - if (tableClient.isOdpMode()) { - return getODPTableInternal(tableEntry, partId); - } else { - return getTableInternal(tableName, tableEntry, partId, route); - } + return getTableInternal(tableName, tableEntry, partId, route); } /** * get TableParam by tableName and rowkeys in batch - * work for both OcpMode and OdpMode * @param tableName tableName * @param rowkeys list of row key or partition key names and values * @return ObTableParam tableParam @@ -665,11 +645,7 @@ public List getTableParams(String tableName, List rowkeys) th for (Row rowkey : rowkeys) { long partId = getPartId(tableEntry, rowkey); ObTableParam param = null; - if (tableClient.isOdpMode()) { - param = getODPTableInternal(tableEntry, partId); - } else { - param = getTableInternal(tableName, tableEntry, partId, route); - } + param = getTableInternal(tableName, tableEntry, partId, route); params.add(param); } return params; @@ -718,7 +694,7 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, } /** - * get addr by pardId + * get addr by partId * @param tableName table want to get * @param partId tabletId of table (real tablet id in 4.x) * @param route ObServer route @@ -728,11 +704,7 @@ private ObPartitionLocationInfo getOrRefreshPartitionInfo(TableEntry tableEntry, public ObTableParam getTableWithPartId(String tableName, long partId, ObServerRoute route) throws Exception { TableEntry tableEntry = getTableEntry(tableName); - if (tableClient.isOdpMode()) { - return getODPTableInternal(tableEntry, partId); - } else { - return getTableInternal(tableName, tableEntry, partId, route); - } + return getTableInternal(tableName, tableEntry, partId, route); } /** @@ -810,24 +782,6 @@ private ObTableParam getTableInternal(String tableName, TableEntry tableEntry, l return param; } - /** - * get odp table entry by partId, just get meta information - * @param odpTableEntry odp tableEntry - * @param partId logicId of tablet - * @return ObTableParam table information for execution - */ - private ObTableParam getODPTableInternal(TableEntry odpTableEntry, long partId) { - ObTable obTable = odpInfo.getObTable(); - ObTableParam param = new ObTableParam(obTable); - param.setPartId(partId); - long tabletId = getTabletIdByPartId(odpTableEntry, partId); - param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); - param.setTableId(odpTableEntry.getTableId()); - // real partition(tablet) id - param.setPartitionId(tabletId); - return param; - } - private ReplicaLocation getPartitionLocation(ObPartitionLocationInfo obPartitionLocationInfo, ObServerRoute route) { return obPartitionLocationInfo.getPartitionLocation().getReplica(route); @@ -964,7 +918,6 @@ public ObTableParam getTableParam(String tableName, List scanRangeColumn /** * get TableParams by start-end range in this table - * work for both OcpMode and OdpMode * @param tableName table want to get * @param query query * @param start start key @@ -977,11 +930,6 @@ public ObTableParam getTableParam(String tableName, List scanRangeColumn public List getTableParams(String tableName, ObTableQuery query, Object[] start, boolean startInclusive, Object[] end, boolean endInclusive) throws Exception { - - if (tableClient.isOdpMode()) { - return getODPTablesInternal(tableName, query.getScanRangeColumns(), start, - startInclusive, end, endInclusive); - } return getTablesInternal(tableName, query.getScanRangeColumns(), start, startInclusive, end, endInclusive, tableClient.getRoute(false)); } @@ -1075,64 +1023,6 @@ private List getTablesInternal(String tableName, List scan return params; } - public List getODPTablesInternal(String tableName, List scanRangeColumns, - Object[] start, boolean startInclusive, - Object[] end, boolean endInclusive) - throws Exception { - if (start.length != end.length) { - throw new IllegalArgumentException("length of start key and end key is not equal"); - } - List obTableParams = new ArrayList(); - TableEntry odpTableEntry = getTableEntry(tableName); - - if (scanRangeColumns == null || scanRangeColumns.isEmpty()) { - Map tableEntryRowKeyElement = tableClient.getRowKeyElement(tableName); - if (tableEntryRowKeyElement != null) { - scanRangeColumns = new ArrayList(tableEntryRowKeyElement.keySet()); - } - } - // 2. get replica location - // partIdWithReplicaList -> List> - Row startRow = new Row(); - Row endRow = new Row(); - // ensure the format of column names and values if the current table is a table with partition - if (odpTableEntry.isPartitionTable() - && odpTableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_ZERO) { - if ((scanRangeColumns == null || scanRangeColumns.isEmpty()) && start.length == 1 - && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() && end.length == 1 - && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) { - // for getPartition to query all partitions - scanRangeColumns = new ArrayList(Collections.nCopies(start.length, - "partition")); - } - // scanRangeColumn may be longer than start/end in prefix scanning situation - if (scanRangeColumns == null || scanRangeColumns.size() < start.length) { - throw new IllegalArgumentException( - "length of key and scan range columns do not match, please use addRowKeyElement or set scan range columns"); - } - for (int i = 0; i < start.length; i++) { - startRow.add(scanRangeColumns.get(i), start[i]); - endRow.add(scanRangeColumns.get(i), end[i]); - } - } - - List partIds = getPartIds(odpTableEntry, startRow, startInclusive, endRow, - endInclusive); - for (Long partId : partIds) { - ObTable obTable = odpInfo.getObTable(); - ObTableParam param = new ObTableParam(obTable); - param.setPartId(partId); - long tabletId = getTabletIdByPartId(odpTableEntry, partId); - param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); - param.setTableId(odpTableEntry.getTableId()); - // real partition(tablet) id - param.setPartitionId(tabletId); - obTableParams.add(param); - } - - return obTableParams; - } - /** * 根据 start-end 获取 partition id 和 addr * @param tableEntry @@ -1232,6 +1122,158 @@ private List getPartIdsForLevelTwo(TableEntry tableEntry, Row startRow, return partIds; } + /*------------------------------------------------------------------------ODP routing------------------------------------------------------------------------*/ + + /** + * get ODP tableEntry by tableName, + * this methods will guarantee the tableEntry is not null + * only support by ODP version after 4.3.2 + * */ + public TableEntry getOdpTableEntry(String tableName) throws Exception { + TableEntry tableEntry; + tableEntry = odpTableLocations.getTableEntry(tableName); + if (tableEntry == null) { + tableEntry = refreshOdpMeta(tableName, false); + } + return tableEntry; + } + + /** + * get or refresh table meta information in odp mode + * only support by ODP version after 4.3.2 + * */ + public TableEntry refreshOdpMeta(String tableName, boolean forceRefresh) throws Exception { + return odpTableLocations.refreshOdpMeta(tableName, forceRefresh, odpInfo.getObTable()); + } + + /** + * get odp TableParam by tableName and rowkey + * only support by ODP version after 4.3.2 + * @param tableName tableName + * @param rowkey row key or partition key names and values + * @return ObTableParam tableParam + * */ + public ObTableParam getOdpTableParam(String tableName, Row rowkey) throws Exception { + TableEntry odpTableEntry = getOdpTableEntry(tableName); + if (odpTableEntry == null) { + logger.error("tableEntry is null, tableName: {}", tableName); + throw new ObTableEntryRefreshException("tableEntry is null, tableName: " + tableName); + } + long partId = getPartId(odpTableEntry, rowkey); + return getOdpTableInternal(odpTableEntry, partId); + } + + /** + * get odp table addr by partId + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param partId tabletId of table (real tablet id in 4.x) + * @return ObTableParam table information for execution + * @throws Exception exception + */ + public ObTableParam getOdpTableWithPartId(String tableName, long partId) + throws Exception { + TableEntry tableEntry = getOdpTableEntry(tableName); + return getOdpTableInternal(tableEntry, partId); + } + + /** + * get odp table entry by partId, just get meta information + * only support by ODP version after 4.3.2 + * @param odpTableEntry odp tableEntry + * @param partId logicId of tablet + * @return ObTableParam table information for execution + */ + private ObTableParam getOdpTableInternal(TableEntry odpTableEntry, long partId) { + ObTable obTable = odpInfo.getObTable(); + ObTableParam param = new ObTableParam(obTable); + param.setPartId(partId); + long tabletId = getTabletIdByPartId(odpTableEntry, partId); + param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); + param.setTableId(odpTableEntry.getTableId()); + // real partition(tablet) id + param.setPartitionId(tabletId); + return param; + } + + /** + * get odp TableParams by start-end range in this table + * only support by ODP version after 4.3.2 + * @param tableName table want to get + * @param query query + * @param start start key + * @param startInclusive whether include start key + * @param end end key + * @param endInclusive whether include end key + * @return list of table obTableParams + * @throws Exception exception + */ + public List getOdpTableParams(String tableName, ObTableQuery query, Object[] start, + boolean startInclusive, Object[] end, + boolean endInclusive) throws Exception { + + return getOdpTablesInternal(tableName, query.getScanRangeColumns(), start, + startInclusive, end, endInclusive); + } + + private List getOdpTablesInternal(String tableName, List scanRangeColumns, + Object[] start, boolean startInclusive, + Object[] end, boolean endInclusive) + throws Exception { + if (start.length != end.length) { + throw new IllegalArgumentException("length of start key and end key is not equal"); + } + List obTableParams = new ArrayList(); + TableEntry odpTableEntry = getOdpTableEntry(tableName); + + if (scanRangeColumns == null || scanRangeColumns.isEmpty()) { + Map tableEntryRowKeyElement = tableClient.getRowKeyElement(tableName); + if (tableEntryRowKeyElement != null) { + scanRangeColumns = new ArrayList(tableEntryRowKeyElement.keySet()); + } + } + // 2. get replica location + // partIdWithReplicaList -> List> + Row startRow = new Row(); + Row endRow = new Row(); + // ensure the format of column names and values if the current table is a table with partition + if (odpTableEntry.isPartitionTable() + && odpTableEntry.getPartitionInfo().getLevel() != ObPartitionLevel.LEVEL_ZERO) { + if ((scanRangeColumns == null || scanRangeColumns.isEmpty()) && start.length == 1 + && start[0] instanceof ObObj && ((ObObj) start[0]).isMinObj() && end.length == 1 + && end[0] instanceof ObObj && ((ObObj) end[0]).isMaxObj()) { + // for getPartition to query all partitions + scanRangeColumns = new ArrayList(Collections.nCopies(start.length, + "partition")); + } + // scanRangeColumn may be longer than start/end in prefix scanning situation + if (scanRangeColumns == null || scanRangeColumns.size() < start.length) { + throw new IllegalArgumentException( + "length of key and scan range columns do not match, please use addRowKeyElement or set scan range columns"); + } + for (int i = 0; i < start.length; i++) { + startRow.add(scanRangeColumns.get(i), start[i]); + endRow.add(scanRangeColumns.get(i), end[i]); + } + } + + List partIds = getPartIds(odpTableEntry, startRow, startInclusive, endRow, + endInclusive); + for (Long partId : partIds) { + ObTable obTable = odpInfo.getObTable(); + ObTableParam param = new ObTableParam(obTable); + param.setPartId(partId); + long tabletId = getTabletIdByPartId(odpTableEntry, partId); + param.setLsId(odpTableEntry.getPartitionEntry().getLsId(tabletId)); + param.setTableId(odpTableEntry.getTableId()); + // real partition(tablet) id + param.setPartitionId(tabletId); + obTableParams.add(param); + } + + return obTableParams; + } + /*------------------------------------------------------------------------Table Group------------------------------------------------------------------------*/ /** diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index 5cf7b178..9a3691d9 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -191,10 +191,8 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } catch (Exception e) { if (e instanceof ObTableException) { @@ -204,9 +202,8 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback(odpTable.getPartId(), odpTable)); } else { throw e;