Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2271,13 +2271,15 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryRequest) request).getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
tableQuery.setHbaseOpType(request.getHbaseOpType());
return new ObClusterTableQuery(tableQuery).executeInternal();
} else if (request instanceof ObTableQueryAsyncRequest) {
// TableGroup -> TableName
String tableName = request.getTableName();
ObTableClientQueryImpl tableQuery = new ObTableClientQueryImpl(tableName,
((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery(), this);
tableQuery.setEntityType(request.getEntityType());
tableQuery.setHbaseOpType(request.getHbaseOpType());
ObClusterTableQuery clusterTableQuery = new ObClusterTableQuery(tableQuery);
clusterTableQuery.setAllowDistributeScan(((ObTableQueryAsyncRequest) request).isAllowDistributeScan());
return clusterTableQuery.asyncExecuteInternal();
Expand Down Expand Up @@ -2387,6 +2389,15 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
} else {
if (ex instanceof ObTableException &&
(((ObTableException) ex).isNeedRefreshTableEntry() || ((ObTableException) ex).isNeedRetryError())) {
if (ex instanceof ObTableNotExistException) {
String logMessage = String.format(
"exhaust retry while meet TableNotExist Exception, table name: %s, errorCode: %d",
request.getTableName(),
((ObTableException) ex).getErrorCode()
);
logger.warn(logMessage, ex);
throw ex;
}
logger.warn(
"tablename:{} partition id:{} batch ops refresh table while meet ObTableMasterChangeException, errorCode: {}",
request.getTableName(), routeTabletId, ((ObTableException) ex).getErrorCode(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alipay.oceanbase.rpc.get.Get;
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.OHOperationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutate;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class BatchOperation {
ObTableOperationType lastType = ObTableOperationType.INVALID;
boolean isSameType = true;
protected ObTableEntityType entityType = ObTableEntityType.KV;
protected OHOperationType hbaseOpType = OHOperationType.INVALID;

/*
* default constructor
Expand Down Expand Up @@ -90,6 +92,10 @@ public BatchOperation setTable(String tableName) {
return this;
}

public void setHbaseOpType(OHOperationType hbaseOpType) {
this.hbaseOpType = hbaseOpType;
}

/*
* add queries
*/
Expand Down Expand Up @@ -325,6 +331,7 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
batchOps.setEntityType(entityType);
batchOps.setServerCanRetry(serverCanRetry);
batchOps.setNeedTabletId(needTabletId);
batchOps.setHbaseOpType(hbaseOpType);
for (Object operation : operations) {
if (operation instanceof CheckAndInsUp) {
checkAndInsUpCnt++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2025 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.protocol.payload.impl.execute;

import java.util.*;

public enum OHOperationType {
INVALID(0),
PUT(1),
PUT_LIST(2),
DELETE(3),
DELETE_LIST(4),
GET(5),
GET_LIST(6),
EXISTS(7),
EXISTS_LIST(8),
BATCH(9),
BATCH_CALLBACK(10),
SCAN(11),
CHECK_AND_PUT(12),
CHECK_AND_DELETE(13),
CHECK_AND_MUTATE(14),
APPEND(15),
INCREMENT(16),
INCREMENT_COLUMN_VALUE(17),
MUTATE_ROW(18);

private final int value;
private static final Map<Integer, OHOperationType> map = new HashMap<Integer, OHOperationType>();

static {
for (OHOperationType type : OHOperationType.values()) {
map.put(type.value, type);
}
}

OHOperationType(int value) {
this.value = value;
}

public static OHOperationType valueOf(int value) {
return map.get(value);
}

public int getValue() {
return value;
}

public byte getByteValue() {
return (byte) value;
}

/*
* CHECK_AND_PUT -> checkAndPut
* PUT -> put
*/
public String toCamelCase() {
String name = this.name();
if (name == null || name.isEmpty()) {
return name;
}

String[] parts = name.split("_");
StringBuilder sb = new StringBuilder();

for (int i = 0; i < parts.length; i++) {
String part = parts[i];
if (part == null || part.isEmpty()) {
continue;
}

if (i == 0) {
sb.append(part.toLowerCase());
} else {
if (!part.isEmpty()) {
sb.append(Character.toUpperCase(part.charAt(0)));
if (part.length() > 1) {
sb.append(part.substring(1).toLowerCase());
}
}
}
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
option_flag_,
op_type_,
keys_,
cf_rows_);
cf_rows_,
hbase_op_type_);
*/
/*
[k1][k2][k3]...
Expand All @@ -53,6 +54,7 @@ public class ObHbaseRequest extends AbstractPayload implements Credentialable {
protected ObTableOperationType opType;
protected List<ObObj> keys = new ArrayList<>();
protected List<ObHbaseCfRows> cfRows;
protected OHOperationType hbaseOpType = OHOperationType.INVALID;

public ObHbaseRequest() {
this.credential = new ObBytesString();
Expand Down Expand Up @@ -112,6 +114,9 @@ public byte[] encode() {
ObHbaseCfRows sameCfRows = cfRows.get(i);
sameCfRows.encode(buf);
}

// 7. encode hbase op type, to differentiate put and put list
Serialization.encodeI8(buf, hbaseOpType.getByteValue());

if (buf.pos != buf.bytes.length) {
throw new IllegalArgumentException("error in encode ObHbaseRequest (" +
Expand Down Expand Up @@ -151,6 +156,7 @@ public long getPayloadContentSize() {
for (ObHbaseCfRows cfRows : cfRows) {
payLoadContentSize += cfRows.getPayloadSize();
}
payLoadContentSize += 1; // hbase_op_type_
}
return payLoadContentSize;
}
Expand Down Expand Up @@ -184,6 +190,10 @@ public void setServerCanRetry(boolean canRetry) {
optionFlag.setFlagServerCanRetry(canRetry);
}

public void setHbaseOpType(OHOperationType hbaseOpType) {
this.hbaseOpType = hbaseOpType;
}

public boolean getServerCanRetry() {
return optionFlag.getFlagServerCanRetry();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public abstract class ObTableAbstractOperationRequest extends AbstractPayload im
protected ObTableOptionFlag option_flag = ObTableOptionFlag.DEFAULT;
protected boolean returningAffectedEntity = false;
protected boolean returningAffectedRows = false;
protected OHOperationType hbaseOpType = OHOperationType.INVALID; // for table operations, this will be INVALID(0)

/*
* Get payload content size.
Expand Down Expand Up @@ -220,6 +221,14 @@ public void setNeedTabletId(boolean needTabletId) {
option_flag.setNeedTabletId(needTabletId);
}

public void setHbaseOpType(OHOperationType hbaseOpType) {
this.hbaseOpType = hbaseOpType;
}

public OHOperationType getHbaseOpType() {
return hbaseOpType;
}

public boolean getNeedTabletId() {
return option_flag.isNeedTabletId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
credential_,
entity_type_,
consistency_level_,
ls_op_);
ls_op_,
hbase_op_type_);
*/
public class ObTableLSOpRequest extends AbstractPayload implements Credentialable {
protected ObBytesString credential;
protected ObTableEntityType entityType = ObTableEntityType.KV;
protected ObTableConsistencyLevel consistencyLevel = ObTableConsistencyLevel.STRONG;
private ObTableLSOperation lsOperation = null;
protected OHOperationType hbaseOpType = OHOperationType.INVALID;

/*
* Get pcode.
Expand Down Expand Up @@ -70,6 +72,9 @@ public byte[] encode() {

// 4. encode lsOperation
lsOperation.encode(buf);

// 5. encode hbase op type, for table operations, this will be INVALID(0)
Serialization.encodeI8(buf, hbaseOpType.getByteValue());
if (buf.pos != buf.bytes.length) {
throw new IllegalArgumentException("error in encode lsOperationRequest (" +
"pos:" + buf.pos + ", buf.capacity:" + buf.bytes.length + ")");
Expand Down Expand Up @@ -99,7 +104,7 @@ public Object decode(ByteBuf buf) {
public long getPayloadContentSize() {
if (payLoadContentSize == INVALID_PAYLOAD_CONTENT_SIZE) {
payLoadContentSize = lsOperation.getPayloadSize() + Serialization.getNeedBytes(credential) + 1 // entityType
+ 1; // consistencyLevel
+ 1 /* consistencyLevel */ + 1 /* hbaseOpType */;
}
return payLoadContentSize;
}
Expand Down Expand Up @@ -161,6 +166,10 @@ public void setTableId(long tableId) {
this.lsOperation.setTableId(tableId);
}

public void setHbaseOpType(OHOperationType hbaseOpType) {
this.hbaseOpType = hbaseOpType;
}

/**
* Reset the cached payload content size and propagate to child objects
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
entity_type_,
query_and_mutate_,
binlog_row_image_type_,
option_flag_);
option_flag_,
hbase_op_type_);
*
*/
public class ObTableQueryAndMutateRequest extends ObTableAbstractOperationRequest {
Expand Down Expand Up @@ -78,6 +79,9 @@ public byte[] encode() {
idx += len;
System.arraycopy(Serialization.encodeI8(option_flag.getByteValue()), 0, bytes, idx, 1);

idx += 1;
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);

return bytes;
}

Expand Down Expand Up @@ -111,7 +115,7 @@ public long getPayloadContentSize() {
if (ObGlobal.obVsnMajor() >= 4)
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
+ Serialization.getNeedBytes(tableId) + 8 + 1
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1;
+ tableQueryAndMutate.getPayloadSize() + Serialization.getNeedBytes(type.getValue()) + 1 + 1;
else
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
import com.alipay.oceanbase.rpc.protocol.payload.Pcodes;
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableApiMove;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
import com.alipay.oceanbase.rpc.table.ObTable;
Expand Down Expand Up @@ -65,6 +62,7 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
protected String indexTableName;
protected ObTableEntityType entityType;
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
protected Map<Long, ObPair<Long, ObTableParam>> expectant;
protected List<String> cacheProperties = new LinkedList<String>();
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
Expand Down Expand Up @@ -832,4 +830,12 @@ public ObTableClient getClient() {
public void setClient(ObTableClient client) {
this.client = client;
}

public OHOperationType getHbaseOpType() {
return hbaseOpType;
}

public void setHbaseOpType(OHOperationType hbaseOpType) {
this.hbaseOpType = hbaseOpType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
partition_id_,
entity_type_,
consistency_level_,
query_
query_,
option_flag_,
hbase_op_type_
);
*
*/
Expand Down Expand Up @@ -76,6 +78,9 @@ public byte[] encode() {
idx += len;
System.arraycopy(Serialization.encodeVi64(option_flag.getValue()), 0, bytes, idx, 1);

idx += 1;
System.arraycopy(Serialization.encodeI8(hbaseOpType.getByteValue()), 0, bytes, idx, 1);

return bytes;
}

Expand Down Expand Up @@ -109,7 +114,7 @@ public Object decode(ByteBuf buf) {
public long getPayloadContentSize() {
if (ObGlobal.obVsnMajor() >= 4)
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1;
+ Serialization.getNeedBytes(tableId) + 8 + 2 + tableQuery.getPayloadSize() + 1 + 1;
else
return Serialization.getNeedBytes(credential) + Serialization.getNeedBytes(tableName)
+ Serialization.getNeedBytes(tableId) + Serialization.getNeedBytes(partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void init() throws Exception {
request.setTableQuery(tableQuery);
request.setEntityType(entityType);
request.setConsistencyLevel(getReadConsistency().toObTableConsistencyLevel());
request.setHbaseOpType(hbaseOpType);

// construct async query request
asyncRequest.setObTableQueryRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protected ObTableQueryResult referToNewPartition(ObPair<Long, ObTableParam> part
request.setPartitionId(partitionId);
request.setTableId(partIdWithObTable.getRight().getTableId());
request.setEntityType(entityType);
request.setHbaseOpType(hbaseOpType);
if (operationTimeout > 0) {
request.setTimeout(operationTimeout);
} else {
Expand Down
Loading