Skip to content

Commit 713b351

Browse files
IHEIIeemjwushenyunlong
authored
adapt Hbase put ls op (#117) (#144)
* habse put ls * BatchOperationResult add interface to query result info * fix add rowkey obj type change * fix typo * [Adjust] negate hbase timestamp in client side --------- Co-authored-by: eemjwu <34029771+eemjwu@users.noreply.github.com> Co-authored-by: shenyunlong.syl <shenyunlong.syl@oceanbase.com>
1 parent 3ad67bd commit 713b351

11 files changed

Lines changed: 135 additions & 27 deletions

File tree

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3169,6 +3169,10 @@ public void setRunningMode(RunningMode runningMode) {
31693169
this.runningMode = runningMode;
31703170
}
31713171

3172+
public RunningMode getRunningMode() {
3173+
return this.runningMode;
3174+
}
3175+
31723176
public enum RunningMode {
31733177
NORMAL, HBASE;
31743178
}

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.alipay.oceanbase.rpc.checkandmutate.CheckAndInsUp;
2222
import com.alipay.oceanbase.rpc.exception.ObTableException;
2323
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2426
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
2527
import com.alipay.oceanbase.rpc.table.ObTableClientLSBatchOpsImpl;
2628
import com.alipay.oceanbase.rpc.table.api.Table;
@@ -43,6 +45,7 @@ public class BatchOperation {
4345
boolean hasGet = false;
4446
ObTableOperationType lastType = ObTableOperationType.INVALID;
4547
boolean isSameType = true;
48+
protected ObTableEntityType entityType = ObTableEntityType.DYNAMIC;
4649

4750
/*
4851
* default constructor
@@ -138,6 +141,10 @@ public BatchOperation addOperation(CheckAndInsUp... insUps) {
138141
return this;
139142
}
140143

144+
public void setEntityType(ObTableEntityType entityType) {
145+
this.entityType = entityType;
146+
}
147+
141148
public BatchOperation setIsAtomic(boolean isAtomic) {
142149
this.isAtomic = isAtomic;
143150
return this;
@@ -237,6 +244,7 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
237244
throw new ObTableException("unknown operation " + operation);
238245
}
239246
}
247+
batchOps.setEntityType(entityType);
240248
batchOps.setAtomicOperation(isAtomic);
241249
batchOps.setReturnOneResult(returnOneResult);
242250
return new BatchOperationResult(batchOps.executeWithResult());
@@ -265,6 +273,9 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
265273
}
266274
} else if (operation instanceof Mutation) {
267275
Mutation mutation = (Mutation) operation;
276+
if (((ObTableClient) client).getRunningMode() == ObTableClient.RunningMode.HBASE) {
277+
negateHbaseTimestamp(mutation);
278+
}
268279
batchOps.addOperation(mutation);
269280
if (!hasSetRowkeyElement && mutation.getRowKeyNames() != null) {
270281
List<String> rowKeyNames = mutation.getRowKeyNames();
@@ -293,6 +304,17 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
293304
batchOps.setReturningAffectedEntity(withResult);
294305
batchOps.setReturnOneResult(returnOneResult);
295306
batchOps.setAtomicOperation(isAtomic);
307+
batchOps.setEntityType(entityType);
296308
return new BatchOperationResult(batchOps.executeWithResult());
297309
}
310+
311+
private void negateHbaseTimestamp(Mutation mutation) {
312+
Object[] rowKey = mutation.getRowKey();
313+
if (rowKey == null || rowKey.length != 3) {
314+
throw new IllegalArgumentException("hbase rowkey length must be 3");
315+
} else {
316+
long ts = ((long) ((ObObj) mutation.getRowKey()[2]).getValue());
317+
((ObObj) mutation.getRowKey()[2]).setValue(-ts);
318+
}
319+
}
298320
}

src/main/java/com/alipay/oceanbase/rpc/mutation/Increment.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import java.util.Map;
3030

3131
public class Increment extends Mutation<Increment> {
32-
private List<String> columns;
33-
private List<Object> values;
34-
boolean withResult;
32+
boolean withResult;
3533

3634
/*
3735
* default constructor

src/main/java/com/alipay/oceanbase/rpc/mutation/Insert.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
import java.util.Map;
3232

3333
public class Insert extends Mutation<Insert> {
34-
private List<String> columns = null;
35-
private List<Object> values = null;
36-
3734
/*
3835
* default constructor
3936
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/InsertOrUpdate.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import java.util.Map;
2929

3030
public class InsertOrUpdate extends Mutation<InsertOrUpdate> {
31-
private boolean usePut;
32-
private List<String> columns;
33-
private List<Object> values;
31+
private boolean usePut;
3432

3533
/*
3634
* default constructor

src/main/java/com/alipay/oceanbase/rpc/mutation/Mutation.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.exception.ObTableException;
2222
import com.alipay.oceanbase.rpc.filter.*;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjMeta;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
26+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObITableEntity;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntity;
28+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation;
2329
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType;
2430
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
2531
import com.alipay.oceanbase.rpc.table.api.Table;
@@ -29,13 +35,18 @@
2935
import java.util.List;
3036
import java.util.Map;
3137

38+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
39+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
40+
3241
public class Mutation<T> {
3342
private String tableName;
3443
private Table client;
3544
protected Object[] rowKey;
3645
private TableQuery query;
3746
private boolean hasSetRowKey = false;
3847
protected List<String> rowKeyNames = null;
48+
protected List<String> columns;
49+
protected List<Object> values;
3950

4051
/*
4152
* default constructor
@@ -47,6 +58,8 @@ public Mutation() {
4758
rowKey = null;
4859
query = null;
4960
rowKeyNames = null;
61+
columns = null;
62+
values = null;
5063
}
5164

5265
/*
@@ -63,6 +76,8 @@ public Mutation(Table client, String tableName) {
6376
this.rowKey = null;
6477
this.query = null;
6578
this.rowKeyNames = null;
79+
this.columns = null;
80+
this.values = null;
6681
}
6782

6883
/*
@@ -440,4 +455,48 @@ static void removeRowkeyFromMutateColval(List<String> columns, List<Object> valu
440455
}
441456
}
442457
}
458+
459+
public void addColVal(String propName, ObObj propValue) {
460+
columns.add(propName);
461+
values.add(propValue);
462+
}
463+
464+
public static Mutation getInstance(ObTableOperationType type, String[] rowKeyNames,
465+
Object[] rowKeys, String[] columns, Object[] properties) {
466+
Mutation mutation = null;
467+
switch (type) {
468+
case INSERT_OR_UPDATE:
469+
mutation = new InsertOrUpdate();
470+
break;
471+
case DEL:
472+
mutation = new Delete();
473+
break;
474+
default:
475+
throw new ObTableException("not support operation type " + type);
476+
}
477+
478+
Row rowKeyRow = new Row();
479+
if (rowKeys != null) {
480+
for (int i = 0; i < rowKeys.length; i++) {
481+
Object rowkey = rowKeys[i];
482+
ObObj obj = ObObj.getInstance(rowkey);
483+
rowKeyRow.add(rowKeyNames[i], obj);
484+
}
485+
}
486+
mutation.setRowKey(rowKeyRow);
487+
488+
if (columns != null) {
489+
for (int i = 0; i < columns.length; i++) {
490+
String name = columns[i];
491+
Object value = null;
492+
if (properties != null) {
493+
value = properties[i];
494+
}
495+
ObObj c = ObObj.getInstance(value);
496+
mutation.addColVal(name, c);
497+
}
498+
}
499+
500+
return mutation;
501+
}
443502
}

src/main/java/com/alipay/oceanbase/rpc/mutation/Put.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
import java.util.Map;
3232

3333
public class Put extends Mutation<Put> {
34-
private List<String> columns = null;
35-
private List<Object> values = null;
36-
3734
/*
3835
* default constructor
3936
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/Replace.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import java.util.Map;
2929

3030
public class Replace extends Mutation<Replace> {
31-
private List<String> columns;
32-
private List<Object> values;
33-
3431
/*
3532
* default constructor
3633
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/Update.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
import java.util.Map;
3030

3131
public class Update extends Mutation<Update> {
32-
private List<String> columns;
33-
private List<Object> values;
34-
3532
/*
3633
* default constructor
3734
*/

src/main/java/com/alipay/oceanbase/rpc/mutation/result/BatchOperationResult.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.rpc.mutation.result;
1919

2020
import com.alipay.oceanbase.rpc.exception.ObTableException;
21+
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
2122

2223
import java.util.ArrayList;
2324
import java.util.List;
@@ -26,13 +27,56 @@ public class BatchOperationResult {
2627

2728
private List<Object> results;
2829

30+
boolean hasError = false;
31+
2932
/*
3033
* construct with List of Object
3134
*/
3235
public BatchOperationResult(List<Object> results) {
3336
this.results = results;
3437
}
3538

39+
public List<Object> getResults() {
40+
return results;
41+
}
42+
43+
public List<Integer> getErrorCodeList() {
44+
List<Integer> errorCodeList = new ArrayList<Integer>();
45+
for (Object item : results) {
46+
int errorCode = ResultCodes.OB_SUCCESS.errorCode;
47+
if (item instanceof ObTableException) {
48+
errorCode = ((ObTableException) item).getErrorCode();
49+
hasError = true;
50+
}
51+
errorCodeList.add(errorCode);
52+
}
53+
return errorCodeList;
54+
}
55+
56+
public boolean hasError() {
57+
if (!hasError) {
58+
for (Object item : results) {
59+
if (item instanceof ObTableException) {
60+
hasError = true;
61+
break;
62+
}
63+
}
64+
}
65+
return hasError;
66+
}
67+
68+
public ObTableException getFirstException() {
69+
ObTableException exception = null;
70+
for (Object item : results) {
71+
if (item instanceof ObTableException) {
72+
exception = (ObTableException) item;
73+
hasError = true;
74+
break;
75+
}
76+
}
77+
return exception;
78+
}
79+
3680
/*
3781
* get result
3882
*/
@@ -59,6 +103,7 @@ public long getWrongCount() {
59103
for (Object item : results) {
60104
if (item instanceof ObTableException) {
61105
++wrongCount;
106+
hasError = true;
62107
}
63108
}
64109
return wrongCount;
@@ -86,6 +131,7 @@ public int[] getWrongIdx() {
86131
for (Object item : results) {
87132
if (item instanceof ObTableException) {
88133
wrongIdx.add(i);
134+
hasError = true;
89135
}
90136
++i;
91137
}

0 commit comments

Comments
 (0)