diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java b/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java index 62052877..5ef8f83d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java @@ -125,6 +125,11 @@ public static boolean isHBasePutPerfSupport() { return OB_VERSION >= OB_VERSION_4_4_1_0; } + public static boolean isDistributeNeedTabletIdSupport() { + return OB_VERSION >= OB_VERSION_4_3_5_5 && OB_VERSION < OB_VERSION_4_4_0_0 + || OB_VERSION >= OB_VERSION_4_4_1_0; + } + /*-------------------------------------------- OB_VERSION --------------------------------------------*/ public static final long OB_VERSION_4_2_3_0 = calcVersion(4, (short) 2, (byte) 3, (byte) 0); @@ -143,6 +148,8 @@ public static boolean isHBasePutPerfSupport() { public static final long OB_VERSION_4_3_5_3 = calcVersion(4, (short) 3, (byte) 5, (byte) 3); + public static final long OB_VERSION_4_3_5_5 = calcVersion(4, (short) 3, (byte) 5, (byte) 5); + public static final long OB_VERSION_4_4_0_0 = calcVersion(4, (short) 4, (byte) 0, (byte) 0); public static final long OB_VERSION_4_4_1_0 = calcVersion(4, (short) 4, (byte) 1, (byte) 0); diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 4a1978bf..8389d54f 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -2107,17 +2107,19 @@ public ObPayload executeWithRetry(ObTable obTable, ObPayload request, String tab throw new ObTableRoutingWrongException(); } } else if (result != null && result.isRoutingWrong() && !isOdpMode()) { - logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}", - obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta()); if (result.isNeedRefreshMeta()) { tableRoute.refreshMeta(tableName); } + long tabletId = INVALID_TABLET_ID; if (request instanceof ObTableAbstractOperationRequest) { - long tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId(); + tabletId = ((ObTableAbstractOperationRequest) request).getPartitionId(); tableRoute.refreshPartitionLocation(tableName, tabletId, null); } else if (request instanceof ObHbaseRequest) { - tableRoute.refreshTabletLocationBatch(tableName); + tabletId = ((ObHbaseRequest) request).getTabletId(); + tableRoute.refreshPartitionLocation(tableName, tabletId, null); } + logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}", + obTable.getIp(), obTable.getPort(), tableName, result.isNeedRefreshMeta(), tabletId); } return result; } @@ -2337,16 +2339,22 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E } // Check if partIdMapObTable size is greater than 1 + boolean needTabletId = false; boolean isDistributedExecuteSupported = getServerCapacity().isSupportDistributedExecute(); if (partIdMapObTable.size() > 1 && !isDistributedExecuteSupported) { throw new ObTablePartitionConsistentException( "query and mutate must be a atomic operation"); } + if (isDistributedExecuteSupported) { + needTabletId = request.getNeedTabletId(); + } else { + needTabletId = true; + } // Proceed with the operation Map.Entry entry = partIdMapObTable.entrySet().iterator().next(); ObTableParam tableParam = entry.getValue(); request.setTableId(tableParam.getTableId()); - long partitionId = isDistributedExecuteSupported ? INVALID_TABLET_ID : tableParam.getPartitionId(); + long partitionId = needTabletId ? tableParam.getPartitionId() : INVALID_TABLET_ID; routeTabletId = tableParam.getPartitionId(); request.setPartitionId(partitionId); request.setTimeout(tableParam.getObTable().getObTableOperationTimeout()); @@ -2424,16 +2432,10 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception { } else { Row row = new Row(); // get the first cell from the first cfRows to route - // use the first table in tablegroup to route - String realTableName = null; if (request.getCfRows().isEmpty()) { throw new ObTableUnexpectedException("no cf rows"); } - if (request.getCfRows().size() > 1) { - realTableName = tryGetTableNameFromTableGroupCache(request.getTableName(), false); - } else { - realTableName = request.getCfRows().get(0).getRealTableName(); - } + String realTableName = request.getCfRows().get(0).getRealTableName(); int keyIdx = request.getCfRows().get(0).getKeyIndex(0); row.add("K", request.getKeys().get(keyIdx).getValue()); row.add("Q", request.getCfRows().get(0).getCells().get(0).getQ().getValue()); @@ -2441,6 +2443,7 @@ public ObPayload execute(final ObHbaseRequest request) throws Exception { ObTableParam tableParam = tableRoute.getTableParam(realTableName, row); ObTable obTable = tableParam.getObTable(); request.setTimeout(obTable.getObTableOperationTimeout()); + request.setTabletId(tableParam.getTabletId()); return executeWithRetry(obTable, request, realTableName); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java index 59aaa435..e1ddc69e 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java +++ b/src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java @@ -49,6 +49,7 @@ public class BatchOperation { boolean hasCheckAndInsUp = false; boolean hasGet = false; boolean serverCanRetry = false; + boolean needTabletId = false; ObTableOperationType lastType = ObTableOperationType.INVALID; boolean isSameType = true; protected ObTableEntityType entityType = ObTableEntityType.KV; @@ -186,6 +187,10 @@ public void setServerCanRetry(boolean canRetry) { this.serverCanRetry = canRetry; } + public void setNeedTabletId(boolean needTabletId) { + this.needTabletId = needTabletId; + } + public BatchOperation setIsAtomic(boolean isAtomic) { this.isAtomic = isAtomic; return this; @@ -319,6 +324,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception { batchOps = new ObTableClientLSBatchOpsImpl(tableName, (ObTableClient) client); batchOps.setEntityType(entityType); batchOps.setServerCanRetry(serverCanRetry); + batchOps.setNeedTabletId(needTabletId); for (Object operation : operations) { if (operation instanceof CheckAndInsUp) { checkAndInsUpCnt++; diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java index 18d33fbf..6f423694 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObHbaseRequest.java @@ -33,9 +33,10 @@ /* OB_SERIALIZE_MEMBER(ObHbaseRequest, credential_, + table_name_, + tablet_id_, option_flag_, op_type_, - table_name_, keys_, cf_rows_); */ @@ -47,6 +48,7 @@ public class ObHbaseRequest extends AbstractPayload implements Credentialable { protected ObBytesString credential; protected String tableName; // HBase tableName, OceanBase tablegroup_name + protected long tabletId; // do not serialize protected ObTableHbaseReqFlag optionFlag = new ObTableHbaseReqFlag(); protected ObTableOperationType opType; protected List keys = new ArrayList<>(); @@ -186,6 +188,14 @@ public boolean getServerCanRetry() { return optionFlag.getFlagServerCanRetry(); } + public void setTabletId(long tabletId) { + this.tabletId = tabletId; + } + + public long getTabletId() { + return tabletId; + } + public ObBytesString getCredential() { return credential; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java index 27b28b87..d037a1b4 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableAbstractOperationRequest.java @@ -216,6 +216,14 @@ public boolean getServerCanRetry() { return option_flag.isServerCanRetry(); } + public void setNeedTabletId(boolean needTabletId) { + option_flag.setNeedTabletId(needTabletId); + } + + public boolean getNeedTabletId() { + return option_flag.isNeedTabletId(); + } + /* * Is returning affected entity. */ diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java index e1ae4f88..e256026d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObTableOptionFlag.java @@ -25,7 +25,8 @@ public enum ObTableOptionFlag { RETURNING_ROWKEY(1 << 0), USE_PUT(1 << 1), RETURN_ONE_RES(1 << 2), - SERVER_CAN_RETRY(1 << 3); + SERVER_CAN_RETRY(1 << 3), + DIS_NEED_TABLET_ID(1 << 4); private int value; private static Map map = new HashMap(); @@ -116,4 +117,16 @@ public void setServerCanRetry(boolean serverCanRetry) { this.value &= ~(SERVER_CAN_RETRY.value); } } + + public boolean isNeedTabletId() { + return (this.value & DIS_NEED_TABLET_ID.value) != 0; + } + + public void setNeedTabletId(boolean needTabletId) { + if (needTabletId) { + this.value |= DIS_NEED_TABLET_ID.value; + } else { + this.value &= ~(DIS_NEED_TABLET_ID.value); + } + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java index 0ed29864..213b1f67 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/AbstractQueryStreamResult.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import static com.alipay.oceanbase.rpc.protocol.payload.Constants.INVALID_TABLET_ID; import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; public abstract class AbstractQueryStreamResult extends AbstractPayload implements @@ -175,13 +176,13 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger, throw new ObTableRoutingWrongException(); } } else if (result != null && result.isRoutingWrong()) { - logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}", - subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta()); TableEntry tableEntry = result.isNeedRefreshMeta() ? client.getOrRefreshTableEntry(indexTableName, true) : client.getOrRefreshTableEntry(indexTableName, false); long tabletId = client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()); client.refreshTableLocationByTabletId(indexTableName, tabletId); + logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}", + subObTable.getIp(), subObTable.getPort(), indexTableName, result.isNeedRefreshMeta(), tabletId); } } client.resetExecuteContinuousFailureCount(indexTableName); @@ -412,7 +413,7 @@ public boolean next() throws Exception { } } - protected Map> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception { + protected Map> buildAllPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception { Map> partitionObTables = new LinkedHashMap<>(); String indexName = tableQuery.getIndexName(); if (!client.isOdpMode()) { @@ -456,6 +457,40 @@ protected Map> buildPartitions(ObTableClient cl return partitionObTables; } + protected Map> buildFirstPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception { + Map> partitionObTables = new LinkedHashMap<>(); + String indexName = tableQuery.getIndexName(); + + if (!this.client.isOdpMode()) { + indexTableName = client.getIndexTableName(tableName, indexName, tableQuery.getScanRangeColumns(), false); + } + + if (tableQuery.getKeyRanges().isEmpty()) { + throw new IllegalArgumentException("query ranges is empty"); + } else { + ObNewRange range = tableQuery.getKeyRanges().get(0); + ObRowKey startKey = range.getStartKey(); + int startKeySize = startKey.getObjs().size(); + Object[] start = new Object[startKeySize]; + + for (int i = 0; i < startKeySize; i++) { + start[i] = startKey.getObj(i).isMinObj() || startKey.getObj(i).isMaxObj() ? + startKey.getObj(i) : startKey.getObj(i).getValue(); + } + + if (this.entityType == ObTableEntityType.HKV && client.isTableGroupName(tableName)) { + indexTableName = client.tryGetTableNameFromTableGroupCache(tableName, false); + } + ObBorderFlag borderFlag = range.getBorderFlag(); + List params = this.client.getTableParams(indexTableName, tableQuery, start, + borderFlag.isInclusiveStart(), start, borderFlag.isInclusiveEnd()); + + partitionObTables.put(INVALID_TABLET_ID, new ObPair<>(params.get(0).getPartId(), params.get(0))); + } + + return partitionObTables; + } + protected void nextRow() { rowIndex = rowIndex + 1; row = cacheRows.poll(); diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index 27efb925..9c2bc4ec 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -114,7 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair throws Exception { ObTableParam obTableParam = partIdWithObTable.getRight(); ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); - long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId(); + long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID; // refresh request info queryRequest.setPartitionId(partitionId); queryRequest.setTableId(obTableParam.getTableId()); @@ -141,7 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair partIdWithObTabl ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); // refresh request info - long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId(); + long partitionId = needTabletId(queryRequest) ? obTableParam.getPartitionId() : INVALID_TABLET_ID; queryRequest.setPartitionId(partitionId); queryRequest.setTableId(obTableParam.getTableId()); @@ -278,7 +278,11 @@ public boolean queryNewStreamResultInNext() throws Exception { protected Map> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception { - return buildPartitions(client, tableQuery, tableName); + if (isDistributeScan()) { + return buildFirstPartitions(client, tableQuery, tableName); + } else { + return buildAllPartitions(client, tableQuery, tableName); + } } // This function is designed for HBase-type requests. @@ -440,6 +444,14 @@ private boolean isDistributeScan() { return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute(); } + private boolean needTabletId(ObTableQueryRequest queryRequest) { + if (isDistributeScan()) { + return queryRequest.getNeedTabletId(); + } else { + return true; + } + } + public void setAllowDistributeScan(boolean allowDistributeScan) { this.allowDistributeScan = allowDistributeScan; } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java index 15971811..3ae915f3 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryStreamResult.java @@ -17,7 +17,6 @@ package com.alipay.oceanbase.rpc.stream; -import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection; import com.alipay.oceanbase.rpc.exception.ObTableEntryRefreshException; import com.alipay.oceanbase.rpc.exception.ObTableRetryExhaustedException; @@ -44,11 +43,10 @@ public class ObTableClientQueryStreamResult extends AbstractQueryStreamResult { protected ObTableQueryResult referToNewPartition(ObPair partIdWithObTable) throws Exception { - long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID - : partIdWithObTable.getRight().getPartitionId(); ObTableQueryRequest request = new ObTableQueryRequest(); request.setTableName(tableName); request.setTableQuery(tableQuery); + long partitionId = needTabletId(request) ? partIdWithObTable.getRight().getPartitionId() : INVALID_TABLET_ID; request.setPartitionId(partitionId); request.setTableId(partIdWithObTable.getRight().getTableId()); request.setEntityType(entityType); @@ -115,6 +113,18 @@ protected ObTableQueryAsyncResult executeAsync(ObPair partId protected Map> refreshPartition(ObTableQuery tableQuery, String tableName) throws Exception { - return buildPartitions(client, tableQuery, tableName); + if (client.getServerCapacity().isSupportDistributedExecute()) { + return buildFirstPartitions(client, tableQuery, tableName); + } else { + return buildAllPartitions(client, tableQuery, tableName); + } + } + + private boolean needTabletId(ObTableQueryRequest request) { + if (client.getServerCapacity().isSupportDistributedExecute()) { + return request.getNeedTabletId(); + } else { + return true; + } } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java index fa45fcd6..b5840cce 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientLSBatchOpsImpl.java @@ -69,6 +69,7 @@ public class ObTableClientLSBatchOpsImpl extends AbstractTableBatchOps { private boolean returningAffectedEntity = false; private boolean needAllProp = false; private boolean serverCanRetry = false; + private boolean needTabletId = false; private List batchOperation; /* @@ -553,7 +554,6 @@ public void partitionExecute(ObTableSingleOpResult[] results, long tableId = 0; long originPartId = 0; long operationTimeout = 0; - List tabletIds = new ArrayList<>(); ObTable subObTable = null; boolean isFirstEntry = true; @@ -561,8 +561,7 @@ public void partitionExecute(ObTableSingleOpResult[] results, List>> lsOperationWithIndexList = new ArrayList<>(); for (final Map.Entry> tabletOperation : tabletOperationsMap.entrySet()) { ObTableParam tableParam = tabletOperation.getValue().getLeft(); - long tabletId = obTableClient.getServerCapacity().isSupportDistributedExecute() ? - INVALID_TABLET_ID : tableParam.getPartitionId(); + long tabletId = needTabletId() ? tableParam.getPartitionId() : INVALID_TABLET_ID; List> tabletOperationWithIndexList = tabletOperation.getValue().getRight(); lsOperationWithIndexList.add(tabletOperationWithIndexList); List singleOps = new ArrayList<>(); @@ -572,7 +571,6 @@ public void partitionExecute(ObTableSingleOpResult[] results, ObTableTabletOp tableTabletOp = new ObTableTabletOp(); tableTabletOp.setSingleOperations(singleOps); tableTabletOp.setTabletId(tabletId); - tabletIds.add(tabletId); tableLsOp.addTabletOperation(tableTabletOp); @@ -649,13 +647,13 @@ public void partitionExecute(ObTableSingleOpResult[] results, } } else if (result != null && result.isRoutingWrong() && !obTableClient.isOdpMode()) { // retry successfully in server and need to refresh client cache - logger.debug("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}", - subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta()); + long tabletId = tableLsOp.getTabletOperations().get(0).getTabletId(); + logger.info("errors happened in server and retried successfully, server ip:port is {}:{}, tableName: {}, need_refresh_meta: {}, tabletId: {}", + subObTable.getIp(), subObTable.getPort(), realTableName, result.isNeedRefreshMeta(), tabletId); if (result.isNeedRefreshMeta()) { obTableClient.getOrRefreshTableEntry(realTableName, true); } - // TODO: 如果是不需要全部刷新地址的错误,全部刷新地址会降低效率。如何确定出错的 tablet_id 并刷新? - obTableClient.refreshTabletLocationBatch(realTableName); + obTableClient.refreshTableLocationByTabletId(realTableName, tabletId); } subLSOpResult = (ObTableLSOpResult) result; obTableClient.resetExecuteContinuousFailureCount(realTableName); @@ -1011,4 +1009,17 @@ public void setServerCanRetry(boolean canRetry) { public boolean getServerCanRetry() { return this.serverCanRetry; } + + public void setNeedTabletId(boolean needTabletId) { + this.needTabletId = needTabletId; + } + + private boolean needTabletId() { + if (obTableClient.getServerCapacity().isSupportDistributedExecute()) { + return needTabletId; + } else { + // 不开分布式需要tablet_id + return true; + } + } }