Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,8 @@ public TableQuery setSearchText(String searchText) {
tableClientQuery.setSearchText(searchText);
return this;
}

public void setAllowDistributeScan(boolean allowDistributeScan) {
tableClientQuery.setAllowDistributeScan(allowDistributeScan);
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3777,7 +3777,9 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
return new ObClusterTableQuery(tableQuery).asyncExecuteInternal();
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
return clusterTableQuery.asyncExecuteInternal();
} else if (request instanceof ObTableBatchOperationRequest) {
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
request.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class ObTableQueryAsyncRequest extends ObTableAbstractOperationRequest {
private long querySessionId;
private ObQueryOperationType queryType = ObQueryOperationType.QUERY_START;

private boolean allowDistributeScan;

/**
* Get pcode.
*/
Expand Down Expand Up @@ -124,4 +126,12 @@ public void setObTableQueryRequest(ObTableQueryRequest obTableQueryRequest) {
this.obTableQueryRequest = obTableQueryRequest;
}

public void setAllowDistributeScan(boolean allowDistributeScan) {
this.allowDistributeScan = allowDistributeScan;
}

public boolean isAllowDistributeScan() {
return allowDistributeScan;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;

public class ObTableClientQueryAsyncStreamResult extends AbstractQueryStreamResult {
private static final Logger logger = LoggerFactory
.getLogger(ObTableClientQueryStreamResult.class);
private boolean isEnd = true;
private long sessionId = Constants.OB_INVALID_ID;
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
private ObTableConnection prevConnection = null;
private static final Logger logger = LoggerFactory
.getLogger(ObTableClientQueryStreamResult.class);
private boolean isEnd = true;
private long sessionId = Constants.OB_INVALID_ID;
private ObTableQueryAsyncRequest asyncRequest = new ObTableQueryAsyncRequest();
private ObTableConnection prevConnection = null;
private boolean allowDistributeScan = true; // false when partition scan

@Override
public void init() throws Exception {
Expand Down Expand Up @@ -113,8 +114,7 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
throws Exception {
ObTableParam obTableParam = partIdWithObTable.getRight();
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
: obTableParam.getPartitionId();
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
// refresh request info
queryRequest.setPartitionId(partitionId);
queryRequest.setTableId(obTableParam.getTableId());
Expand All @@ -141,8 +141,7 @@ protected ObTableQueryAsyncResult referToLastStreamResult(ObPair<Long, ObTablePa
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();

// refresh request info
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
: obTableParam.getPartitionId();
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
queryRequest.setPartitionId(partitionId);
queryRequest.setTableId(obTableParam.getTableId());

Expand All @@ -162,8 +161,7 @@ protected void closeLastStreamResult(ObPair<Long, ObTableParam> partIdWithObTabl
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();

// refresh request info
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
: obTableParam.getPartitionId();
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam.getPartitionId();
queryRequest.setPartitionId(partitionId);
queryRequest.setTableId(obTableParam.getTableId());

Expand Down Expand Up @@ -200,8 +198,8 @@ public void renewLease() throws Exception {
ObTableQueryRequest queryRequest = asyncRequest.getObTableQueryRequest();

// refresh request info
long partitionId = client.getServerCapacity().isSupportDistributedExecute() ? INVALID_TABLET_ID
: obTableParam.getPartitionId();
long partitionId = isDistributeScan() ? INVALID_TABLET_ID : obTableParam
.getPartitionId();
queryRequest.setPartitionId(partitionId);
queryRequest.setTableId(obTableParam.getTableId());

Expand Down Expand Up @@ -404,4 +402,12 @@ public boolean isEnd() {
public void setEnd(boolean end) {
isEnd = end;
}

private boolean isDistributeScan() {
return allowDistributeScan && client.getServerCapacity().isSupportDistributedExecute();
}

public void setAllowDistributeScan(boolean allowDistributeScan) {
this.allowDistributeScan = allowDistributeScan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public class ObTableClientQueryImpl extends AbstractTableQueryImpl {
private final ObTableClient obTableClient;
private Map<Long, ObPair<Long, ObTableParam>> partitionObTables;

private Row rowKey; // only used by BatchOperation
private Row rowKey; // only used by BatchOperation

private boolean allowDistributeScan;

/*
* Add aggregation.
Expand Down Expand Up @@ -286,6 +288,7 @@ public ObTableClientQueryAsyncStreamResult asyncExecuteInternal() throws Excepti
@Override
ObTableClientQueryAsyncStreamResult execute() throws Exception {
ObTableClientQueryAsyncStreamResult obTableClientQueryAsyncStreamResult = new ObTableClientQueryAsyncStreamResult();
obTableClientQueryAsyncStreamResult.setAllowDistributeScan(allowDistributeScan);
setCommonParams2Result(obTableClientQueryAsyncStreamResult);
obTableClientQueryAsyncStreamResult.setClient(obTableClient);
obTableClientQueryAsyncStreamResult.init();
Expand Down Expand Up @@ -427,4 +430,8 @@ public void setPartId(Long partId) {
public Long getPartId() {
return getObTableQuery().getPartId();
}

public void setAllowDistributeScan(boolean allowDistributeScan) {
this.allowDistributeScan = allowDistributeScan;
}
}
63 changes: 35 additions & 28 deletions src/test/java/com/alipay/oceanbase/rpc/ObTableDatetimeTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2025 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc;

import com.alipay.oceanbase.rpc.exception.ObTableException;
Expand Down Expand Up @@ -58,13 +75,9 @@ public void testSingle() throws Exception {
Date date = sdf.parse(dateString);
try {
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
client.insertOrUpdate(tableName).setRowKey(rk)
.addMutateColVal(colVal("c4", "c4_val"))
.execute();
Map<String, Object> res = client.get(tableName)
.setRowKey(rk)
.select("c4")
.execute();
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
.execute();
Map<String, Object> res = client.get(tableName).setRowKey(rk).select("c4").execute();
Assert.assertEquals("c4_val", res.get("c4"));

client.delete(tableName).setRowKey(rk).execute();
Expand All @@ -86,9 +99,9 @@ public void testBatch() throws Exception {
Row rk2 = row(colVal("c1", 1L), colVal("c2", date2), colVal("c3", 1L));
BatchOperation batch = client.batchOperation(tableName);
InsertOrUpdate insUp1 = client.insertOrUpdate(tableName).setRowKey(rk1)
.addMutateColVal(colVal("c4", "c4_val"));
.addMutateColVal(colVal("c4", "c4_val"));
InsertOrUpdate insUp2 = client.insertOrUpdate(tableName).setRowKey(rk2)
.addMutateColVal(colVal("c4", "c4_val"));
.addMutateColVal(colVal("c4", "c4_val"));
batch.addOperation(insUp1, insUp2);
BatchOperationResult res = batch.execute();
Assert.assertNotNull(res);
Expand All @@ -110,15 +123,13 @@ public void testPkQuery() throws Exception {
Date date = sdf.parse(dateString);
try {
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
client.insertOrUpdate(tableName).setRowKey(rk)
.addMutateColVal(colVal("c4", "c4_val"))
.execute();

QueryResultSet res = client.query(tableName)
.select("c4")
.setScanRangeColumns("c1", "c2", "c3")
.addScanRange(new Object[]{1L, date, 1L}, new Object[]{1L, date, 1L})
.execute();
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
.execute();

QueryResultSet res = client.query(tableName).select("c4")
.setScanRangeColumns("c1", "c2", "c3")
.addScanRange(new Object[] { 1L, date, 1L }, new Object[] { 1L, date, 1L })
.execute();
Assert.assertTrue(res.next());
Assert.assertEquals("c4_val", res.getRow().get("c4"));

Expand All @@ -136,16 +147,12 @@ public void testIndexQuery() throws Exception {
Date date = sdf.parse(dateString);
try {
Row rk = row(colVal("c1", 1L), colVal("c2", date), colVal("c3", 1L));
client.insertOrUpdate(tableName).setRowKey(rk)
.addMutateColVal(colVal("c4", "c4_val"))
.execute();

QueryResultSet res = client.query(tableName)
.indexName("idx_c2")
.select("c4")
.setScanRangeColumns("c1", "c2")
.addScanRange(new Object[]{1L, date}, new Object[]{1L, date})
.execute();
client.insertOrUpdate(tableName).setRowKey(rk).addMutateColVal(colVal("c4", "c4_val"))
.execute();

QueryResultSet res = client.query(tableName).indexName("idx_c2").select("c4")
.setScanRangeColumns("c1", "c2")
.addScanRange(new Object[] { 1L, date }, new Object[] { 1L, date }).execute();
Assert.assertTrue(res.next());
Assert.assertEquals("c4_val", res.getRow().get("c4"));

Expand Down