diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java index 7ec3ab5b..854e7581 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java @@ -270,4 +270,8 @@ public TableQuery setSearchText(String searchText) { tableClientQuery.setSearchText(searchText); return this; } + + public void setAllowDistributeScan(boolean allowDistributeScan) { + tableClientQuery.setAllowDistributeScan(allowDistributeScan); + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java index 3a574b29..c7100333 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java +++ b/src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java @@ -3777,7 +3777,9 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName, ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this); tableQuery.setEntityType(request.getEntityType()); - return new ObClusterTableQuery(tableQuery).asyncExecuteInternal(); + ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery); + clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan()); + return clusterTableQuery.asyncExecuteInternal(); } else if (request instanceof ObTableBatchOperationRequest) { ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl( request.getTableName(), diff --git a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObTableQueryAsyncRequest.java b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObTableQueryAsyncRequest.java index a8db0e13..f326e88d 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObTableQueryAsyncRequest.java +++ b/src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/syncquery/ObTableQueryAsyncRequest.java @@ -40,6 +40,8 @@ public class ObTableQueryAsyncRequest extends ObTableAbstractOperationRequest { private long querySessionId; private ObQueryOperationType queryType = ObQueryOperationType.QUERY_START; + private boolean allowDistributeScan; + /** * Get pcode. */ @@ -124,4 +126,12 @@ public void setObTableQueryRequest(ObTableQueryRequest obTableQueryRequest) { this.obTableQueryRequest = obTableQueryRequest; } + public void setAllowDistributeScan(boolean allowDistributeScan) { + this.allowDistributeScan = allowDistributeScan; + } + + public boolean isAllowDistributeScan() { + return allowDistributeScan; + } + } diff --git a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java index 21b4e624..7019384a 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java +++ b/src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java @@ -40,12 +40,13 @@ import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME; public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult { - private static final Logger logger = LoggerFactory - .getLogger(ObTableClientQueryStreamResult.class); - private boolean isEnd = true; - private long sessionId = Constants.OB_INVALID_ID; - private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest(); - private ObTableConnection prevConnection = null; + private static final Logger logger = LoggerFactory + .getLogger(ObTableClientQueryStreamResult.class); + private boolean isEnd = true; + private long sessionId = Constants.OB_INVALID_ID; + private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest(); + private ObTableConnection prevConnection = null; + private boolean allowDistributeScan = true; // false when partition scan @Override public void init() throws Exception { @@ -113,8 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair throws Exception { ObTableParam obTableParam = partIdWithObTable.getRight(); ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); - long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID - : obTableParam.getPartitionId(); + long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId(); // refresh request info queryRequest.setPartitionId(partitionId); queryRequest.setTableId(obTableParam.getTableId()); @@ -141,8 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair partIdWithObTabl ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); // refresh request info - long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID - : obTableParam.getPartitionId(); + long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId(); queryRequest.setPartitionId(partitionId); queryRequest.setTableId(obTableParam.getTableId()); @@ -200,8 +198,8 @@ public void renewLease() throws Exception { ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest(); // refresh request info - long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID - : obTableParam.getPartitionId(); + long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam + .getPartitionId(); queryRequest.setPartitionId(partitionId); queryRequest.setTableId(obTableParam.getTableId()); @@ -404,4 +402,12 @@ public boolean isEnd() { public void setEnd(boolean end) { isEnd = end; } + + private boolean isDistributeScan() { + return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute(); + } + + public void setAllowDistributeScan(boolean allowDistributeScan) { + this.allowDistributeScan = allowDistributeScan; + } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java index 447331bb..f071b577 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java @@ -48,7 +48,9 @@ public class ObTableClientQueryImpl extends AbstractTableQueryImpl { private final ObTableClient obTableClient; private Map> partitionObTables; - private Row rowKey; // only used by BatchOperation + private Row rowKey; // only used by BatchOperation + + private boolean allowDistributeScan; /* * Add aggregation. @@ -286,6 +288,7 @@ public ObTableClientQueryAsyncStreamResult asyncExecuteInternal() throws Excepti @Override ObTableClientQueryAsyncStreamResult execute() throws Exception { ObTableClientQueryAsyncStreamResult obTableClientQueryAsyncStreamResult = new ObTableClientQueryAsyncStreamResult(); + obTableClientQueryAsyncStreamResult.setAllowDistributeScan(allowDistributeScan); setCommonParams2Result(obTableClientQueryAsyncStreamResult); obTableClientQueryAsyncStreamResult.setClient(obTableClient); obTableClientQueryAsyncStreamResult.init(); @@ -427,4 +430,8 @@ public void setPartId(Long partId) { public Long getPartId() { return getObTableQuery().getPartId(); } + + public void setAllowDistributeScan(boolean allowDistributeScan) { + this.allowDistributeScan = allowDistributeScan; + } } diff --git a/src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java b/src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java index be09dcc3..84beeffd 100644 --- a/src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java +++ b/src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java @@ -1,3 +1,20 @@ +/*- + * #%L + * com.oceanbase:obkv-table-client + * %% + * Copyright (C) 2021 - 2025 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + package com.alipay.oceanbase.rpc; import com.alipay.oceanbase.rpc.exception.ObTableException; @@ -58,13 +75,9 @@ public void testSingle() throws Exception { Date date = sdf.parse(dateString); try { Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L)); - client.insertOrUpdate(tableName).setRowKey(rk) - .addMutateColVal(colVal("c4", "c4_val")) - .execute(); - Map res = client.get(tableName) - .setRowKey(rk) - .select("c4") - .execute(); + client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val")) + .execute(); + Map res = client.get(tableName).setRowKey(rk).select("c4").execute(); Assert.assertEquals("c4_val", res.get("c4")); client.delete(tableName).setRowKey(rk).execute(); @@ -86,9 +99,9 @@ public void testBatch() throws Exception { Row rk2 = row(colVal("c1", 1L), colVal("c2", date2), colVal("c3", 1L)); BatchOperation batch = client.batchOperation(tableName); InsertOrUpdate insUp1 = client.insertOrUpdate(tableName).setRowKey(rk1) - .addMutateColVal(colVal("c4", "c4_val")); + .addMutateColVal(colVal("c4", "c4_val")); InsertOrUpdate insUp2 = client.insertOrUpdate(tableName).setRowKey(rk2) - .addMutateColVal(colVal("c4", "c4_val")); + .addMutateColVal(colVal("c4", "c4_val")); batch.addOperation(insUp1, insUp2); BatchOperationResult res = batch.execute(); Assert.assertNotNull(res); @@ -110,15 +123,13 @@ public void testPkQuery() throws Exception { Date date = sdf.parse(dateString); try { Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L)); - client.insertOrUpdate(tableName).setRowKey(rk) - .addMutateColVal(colVal("c4", "c4_val")) - .execute(); - - QueryResultSet res = client.query(tableName) - .select("c4") - .setScanRangeColumns("c1", "c2", "c3") - .addScanRange(new Object[]{1L, date, 1L}, new Object[]{1L, date, 1L}) - .execute(); + client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val")) + .execute(); + + QueryResultSet res = client.query(tableName).select("c4") + .setScanRangeColumns("c1", "c2", "c3") + .addScanRange(new Object[] { 1L, date, 1L }, new Object[] { 1L, date, 1L }) + .execute(); Assert.assertTrue(res.next()); Assert.assertEquals("c4_val", res.getRow().get("c4")); @@ -136,16 +147,12 @@ public void testIndexQuery() throws Exception { Date date = sdf.parse(dateString); try { Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L)); - client.insertOrUpdate(tableName).setRowKey(rk) - .addMutateColVal(colVal("c4", "c4_val")) - .execute(); - - QueryResultSet res = client.query(tableName) - .indexName("idx_c2") - .select("c4") - .setScanRangeColumns("c1", "c2") - .addScanRange(new Object[]{1L, date}, new Object[]{1L, date}) - .execute(); + client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val")) + .execute(); + + QueryResultSet res = client.query(tableName).indexName("idx_c2").select("c4") + .setScanRangeColumns("c1", "c2") + .addScanRange(new Object[] { 1L, date }, new Object[] { 1L, date }).execute(); Assert.assertTrue(res.next()); Assert.assertEquals("c4_val", res.getRow().get("c4"));