Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,9 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
throw new IllegalArgumentException("table name is null");
}
int tryTimes = 0;
long routeQueryTabletId = INVALID_TABLET_ID;
boolean needRefreshPartitionLocation = false;
boolean refreshedTableMeta = false;
long startExecute = System.currentTimeMillis();
while (true) {
checkStatus();
Expand Down Expand Up @@ -705,13 +707,19 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
} else if (null != callback.getQuery()) {
if (tryTimes > 1 && needRefreshPartitionLocation) {
needRefreshPartitionLocation = false;
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
if (refreshedTableMeta) {
refreshedTableMeta = false;
boolean isHKV = callback.getQuery().getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(tableName, callback.getQuery().getObTableQuery(), isHKV);
} else {
tableRoute.refreshPartitionLocation(tableName, routeQueryTabletId, null);
}
}
ObTableQuery tableQuery = callback.getQuery().getObTableQuery();
// using scan range
tableParam = tableRoute.getTableParam(tableName, tableQuery.getScanRangeColumns(),
tableQuery.getKeyRanges());
routeQueryTabletId = tableParam.getPartitionId();
} else {
throw new ObTableException("RowKey or scan range is null");
}
Expand Down Expand Up @@ -770,6 +778,7 @@ private <T> T execute(String tableName, OperationExecuteCallback<T> callback,
((ObTableException) ex).getErrorCode(), ex.getMessage(),
tryTimes);
tableRoute.refreshMeta(tableName);
refreshedTableMeta = true;
// reset failure count while fetch all route info
this.resetExecuteContinuousFailureCount(tableName);
} else if (((ObTableException) ex).isNeedRetryError()) {
Expand Down Expand Up @@ -2286,6 +2295,8 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
}
int tryTimes = 0;
boolean needRefreshTabletLocation = false;
boolean refreshedTableMeta = false;
long routeTabletId = INVALID_TABLET_ID;
long startExecute = System.currentTimeMillis();
while (true) {
long currentExecute = System.currentTimeMillis();
Expand All @@ -2309,8 +2320,14 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
// Recalculate partIdMapObTable
if (needRefreshTabletLocation) {
needRefreshTabletLocation = false;
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
if (refreshedTableMeta) {
refreshedTableMeta = false;
// need to recalculate routing tablet_id and refresh location
boolean isHKV = request.getEntityType() == ObTableEntityType.HKV;
tableRoute.refreshTabletLocationForAtomicQuery(request.getTableName(), tableQuery, isHKV);
} else {
tableRoute.refreshPartitionLocation(request.getTableName(), routeTabletId, null);
}
}
Map<Long, ObTableParam> partIdMapObTable = tableRoute.getPartIdParamMapForQuery(
request.getTableName(), tableQuery.getScanRangeColumns(),
Expand All @@ -2327,6 +2344,7 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
ObTableParam tableParam = entry.getValue();
request.setTableId(tableParam.getTableId());
long partitionId = isDistributedExecuteSupported ? INVALID_TABLET_ID : tableParam.getPartitionId();
routeTabletId = tableParam.getPartitionId();
request.setPartitionId(partitionId);
request.setTimeout(tableParam.getObTable().getObTableOperationTimeout());
ObTable obTable = tableParam.getObTable();
Expand Down Expand Up @@ -2360,19 +2378,20 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
(((ObTableException) ex).isNeedRefreshTableEntry() || ((ObTableException) ex).isNeedRetryError())) {
logger.warn(
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(), ex);
request.getTableName(), routeTabletId, ((ObTableException) ex).getErrorCode(), ex);

if (isRetryOnChangeMasterTimes()) {
logger.warn(
"tablename:{} partition id:{} batch ops retry while meet ObTableMasterChangeException, errorCode: {} , retry times {}",
request.getTableName(), request.getPartitionId(), ((ObTableException) ex).getErrorCode(),
request.getTableName(), routeTabletId, ((ObTableException) ex).getErrorCode(),
tryTimes, ex);

if (((ObTableException) ex).isNeedRefreshTableEntry()) {
needRefreshTabletLocation = true;
if (ex instanceof ObTableNeedFetchMetaException) {
// Refresh table info
tableRoute.refreshMeta(request.getTableName());
refreshedTableMeta = true;
}
}
} else {
Expand Down