Skip to content

Commit 45b5485

Browse files
authored
Merge pull request #286 from oceanbase/hbase_441_2x
Hbase 441 2x feature
2 parents fa62490 + f7d6927 commit 45b5485

File tree

9 files changed

+490
-78
lines changed

9 files changed

+490
-78
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 155 additions & 33 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ public Result next() throws IOException {
127127
break;
128128
}
129129
}
130+
// sort keyValues
131+
OHBaseFuncUtils.sortHBaseResult(keyValues);
130132
return Result.create(keyValues);
131133
} catch (Exception e) {
132134
logger.error(LCD.convert("01-00000"), streamResult.getTableName(), e);

src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private void handleObTableException(Exception e, TableName tableName, ExceptionH
8989

9090
@Override
9191
public int getOperationTimeout() {
92-
return connection.getOHConnectionConfiguration().getOperationTimeout();
92+
return connection.getOHConnectionConfiguration().getClientOperationTimeout();
9393
}
9494

9595
@Override

src/main/java/com/alipay/oceanbase/hbase/util/OHBaseFuncUtils.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,17 @@
1717

1818
package com.alipay.oceanbase.hbase.util;
1919

20+
import com.alipay.oceanbase.rpc.ObGlobal;
21+
import com.alipay.oceanbase.rpc.ObTableClient;
2022
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.hbase.Cell;
24+
import org.apache.hadoop.hbase.client.Put;
25+
import org.apache.hadoop.hbase.client.Row;
26+
import org.apache.hadoop.hbase.util.Bytes;
2127

2228
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.List;
2331

2432
@InterfaceAudience.Private
2533
public class OHBaseFuncUtils {
@@ -42,4 +50,65 @@ public static byte[][] extractFamilyFromQualifier(byte[] qualifier) throws Excep
4250
byte[] newQualifier = Arrays.copyOfRange(qualifier, familyLen + 1, qualifier.length);
4351
return new byte[][] { family, newQualifier };
4452
}
53+
54+
public static boolean isHBasePutPefSupport(ObTableClient tableClient) {
55+
if (tableClient.isOdpMode()) {
56+
// server version support and distributed capacity is enabled and odp version support
57+
return ObGlobal.isHBasePutPerfSupport()
58+
&& tableClient.getServerCapacity().isSupportDistributedExecute()
59+
&& ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0;
60+
} else {
61+
// server version support and distributed capacity is enabled
62+
return ObGlobal.isHBasePutPerfSupport()
63+
&& tableClient.getServerCapacity().isSupportDistributedExecute();
64+
}
65+
}
66+
67+
public static boolean isAllPut(List<? extends Row> actions) {
68+
boolean isAllPut = true;
69+
for (Row action : actions) {
70+
if (!(action instanceof Put)) {
71+
isAllPut = false;
72+
break;
73+
}
74+
}
75+
return isAllPut;
76+
}
77+
78+
public static void sortHBaseResult(List<Cell> cells) {
79+
cells.sort(new Comparator<Cell>() {
80+
@Override
81+
public int compare(Cell cell1, Cell cell2) {
82+
// 1. sort family in lexicographical order
83+
int familyComparison = Bytes.compareTo(cell1.getFamilyArray(),
84+
cell1.getFamilyOffset(), cell1.getFamilyLength(), cell2.getFamilyArray(),
85+
cell2.getFamilyOffset(), cell2.getFamilyLength());
86+
if (familyComparison != 0) {
87+
return familyComparison;
88+
}
89+
90+
// 2: sort qualifier in lexicographical order
91+
int qualifierComparison = Bytes.compareTo(cell1.getQualifierArray(),
92+
cell1.getQualifierOffset(), cell1.getQualifierLength(),
93+
cell2.getQualifierArray(), cell2.getQualifierOffset(),
94+
cell2.getQualifierLength());
95+
if (qualifierComparison != 0) {
96+
return qualifierComparison;
97+
}
98+
99+
// 3: sort timestamp in descend order
100+
return Long.compare(cell2.getTimestamp(), cell1.getTimestamp());
101+
}
102+
});
103+
}
104+
105+
public static boolean serverCanRetry(ObTableClient tableClient) {
106+
if (tableClient.isOdpMode()) {
107+
// ODP mode needs to check proxy version
108+
return ObGlobal.OB_PROXY_VERSION >= ObGlobal.OB_PROXY_VERSION_4_3_6_0;
109+
} else {
110+
// OCP mode directly return true, server will do the check
111+
return true;
112+
}
113+
}
45114
}

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.atomic.AtomicLong;
36-
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
3736

3837
@InterfaceAudience.Private
3938
public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -94,7 +93,7 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
9493
.getRpcTimeout() : connectionConfig.getRpcTimeout());
9594
this.operationTimeout = new AtomicInteger(
9695
params.getOperationTimeout() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
97-
.getOperationTimeout() : connectionConfig.getOperationTimeout());
96+
.getOperationTimeout() : connectionConfig.getClientOperationTimeout());
9897

9998
long newPeriodicFlushTimeoutMs = params.getWriteBufferPeriodicFlushTimeoutMs() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
10099
.getWriteBufferPeriodicFlushTimeoutMs() : connectionConfig
@@ -301,7 +300,7 @@ public void disableWriteBufferPeriodicFlush() {
301300
}
302301

303302
@Override
304-
public void close() throws IOException {
303+
public synchronized void close() throws IOException {
305304
if (closed) {
306305
return;
307306
}

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public class OHConnectionConfiguration {
4646
private final int odpPort;
4747
private final boolean odpMode;
4848
private final long writeBufferSize;
49-
private final int operationTimeout;
49+
private final int clientOperationTimeout;
50+
private final int serverOperationTimeout;
5051
private final int metaOperationTimeout;
5152
private final int scannerCaching;
5253
private final long scannerMaxResultSize;
@@ -76,8 +77,10 @@ public OHConnectionConfiguration(Configuration conf) {
7677
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
7778
this.metaOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
7879
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
79-
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
80+
this.clientOperationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
8081
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
82+
this.serverOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
83+
HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
8184
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
8285
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
8386
this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
@@ -133,8 +136,12 @@ public int getMetaOperationTimeout() {
133136
return this.metaOperationTimeout;
134137
}
135138

136-
public int getOperationTimeout() {
137-
return this.operationTimeout;
139+
public int getClientOperationTimeout() {
140+
return this.clientOperationTimeout;
141+
}
142+
143+
public int getServerOperationTimeout() {
144+
return this.serverOperationTimeout;
138145
}
139146

140147
public int getScannerCaching() {

src/main/java/com/alipay/oceanbase/hbase/util/ObTableBuilderBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ abstract public class ObTableBuilderBase implements TableBuilder {
3939
}
4040
this.tableName = tableName;
4141
this.operationTimeout = tableName.isSystemTable() ? ohConnConf.getMetaOperationTimeout()
42-
: ohConnConf.getOperationTimeout();
42+
: ohConnConf.getClientOperationTimeout();
4343
this.rpcTimeout = ohConnConf.getRpcTimeout();
4444
this.readRpcTimeout = ohConnConf.getReadRpcTimeout();
4545
this.writeRpcTimeout = ohConnConf.getWriteRpcTimeout();

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
3434
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
35+
import static com.alipay.oceanbase.rpc.property.Property.RPC_OPERATION_TIMEOUT;
3536
import static org.apache.commons.lang.StringUtils.isNotBlank;
3637

3738
@InterfaceAudience.Private
@@ -86,11 +87,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
8687
obTableClientKey.getProperties().put(property.getKey(), property.getValue());
8788
}
8889

89-
return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
90+
return getOrCreateObTableClient(obTableClientKey, connectionConfig);
9091
}
9192

9293
public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey,
93-
int rpcConnectTimeout) throws IOException {
94+
OHConnectionConfiguration connectionConfig) throws IOException {
9495
if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
9596
ReentrantLock tmp = new ReentrantLock();
9697
ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp);
@@ -115,7 +116,8 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
115116
}
116117
obTableClient.setFullUserName(obTableClientKey.getFullUserName());
117118
obTableClient.setPassword(obTableClientKey.getPassword());
118-
obTableClient.setRpcConnectTimeout(rpcConnectTimeout);
119+
obTableClient.setRpcConnectTimeout(connectionConfig.getRpcConnectTimeout());
120+
obTableClient.addProperty(RPC_OPERATION_TIMEOUT.getKey(), Integer.toString(connectionConfig.getServerOperationTimeout()));
119121
obTableClient.init();
120122
OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
121123
}
@@ -140,8 +142,8 @@ public static ObTableClient getOrCreateObTableClientByTableName(TableName tableN
140142
private static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf) {
141143
obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
142144
obTableClient.setRuntimeRetryTimes(ohConnectionConf.getNumRetries());
143-
obTableClient.setRuntimeMaxWait(ohConnectionConf.getOperationTimeout());
144-
obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getOperationTimeout());
145+
obTableClient.setRuntimeMaxWait(ohConnectionConf.getClientOperationTimeout());
146+
obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getClientOperationTimeout());
145147
}
146148

147149
public static class ObTableClientKey {

0 commit comments

Comments
 (0)