Skip to content

Commit b80af97

Browse files
authored
[feat] obkv support full text search (#286)
* add fulltext index dml case * adapt fts query request * adapt fulltext query * add fts query limit/offset case * fix case fail * fix fts query bugs * opt case * fix compile error * add ob verison check when do fts query * change version check for fts query * adapt char type * fix key and range subpartition not support * allow scan range columns is empty when fts query * modify error message when use wrong index name * modify fts version check * fix compile error * fix for reviews
1 parent c9a3413 commit b80af97

File tree

15 files changed

+983
-26
lines changed

15 files changed

+983
-26
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,10 @@ public void setEntityType(ObTableEntityType entityType) {
264264
super.setEntityType(entityType);
265265
tableClientQuery.setEntityType(entityType);
266266
}
267+
268+
@Override
269+
public TableQuery setSearchText(String searchText) {
270+
tableClientQuery.setSearchText(searchText);
271+
return this;
272+
}
267273
}

src/main/java/com/alipay/oceanbase/rpc/ObGlobal.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public static boolean isLsOpSupport() {
8686
|| OB_VERSION >= OB_VERSION_4_3_4_0;
8787
}
8888

89+
public static boolean isFtsQuerySupport() {
90+
return OB_VERSION >= OB_VERSION_4_3_5_1;
91+
}
92+
8993
public static boolean isReturnOneResultSupport() {
9094
return OB_VERSION >= OB_VERSION_4_2_3_0 && OB_VERSION < OB_VERSION_4_3_0_0
9195
|| OB_VERSION >= OB_VERSION_4_3_4_0;

src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1016,12 +1016,11 @@ public static ObIndexInfo getIndexInfoFromRemote(ObServerAddr obServerAddr, ObUs
10161016
indexInfo.setIndexTableId(rs.getLong("table_id"));
10171017
indexInfo.setIndexType(ObIndexType.valueOf(rs.getInt("index_type")));
10181018
} else {
1019-
throw new ObTableEntryRefreshException(
1020-
"fail to get index info from remote, result set is empty");
1019+
throw new ObTableEntryRefreshException("index is not exist");
10211020
}
10221021
} catch (Exception e) {
10231022
throw new ObTableEntryRefreshException(format(
1024-
"fail to get index info from remote, indexTableName: %s", indexTableName), e);
1023+
"fail to get index info from remote, indexTableName: %s, error message: %s", indexTableName, e.getMessage()), e);
10251024
} finally {
10261025
try {
10271026
if (null != rs) {
@@ -1771,8 +1770,14 @@ private static Map<Long, Long> parseKeyHashPart(ResultSet rs, TableEntry tableEn
17711770
long subPartNum = rs.getLong("sub_part_num");
17721771
subHashPartDesc.setPartNum((int) subPartNum);
17731772
}
1773+
} else if (subPartDesc instanceof ObRangePartDesc) {
1774+
ObRangePartDesc subRangePartDesc = (ObRangePartDesc) subPartDesc;
1775+
if (!isSubPart && subRangePartDesc.getPartNum() == 0) {
1776+
long subPartNum = rs.getLong("sub_part_num");
1777+
subRangePartDesc.setPartNum((int) subPartNum);
1778+
}
17741779
} else {
1775-
throw new IllegalArgumentException("sub part desc is not key or hash part desc");
1780+
throw new IllegalArgumentException("sub part desc is not key,hash and range part desc");
17761781
}
17771782
}
17781783
Long tabletId = rs.getLong("tablet_id");

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObTableObjType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,10 @@ public void decode(ByteBuf buf, ObObj obj) {
177177
ObTableLongBlobType(24) {
178178
},
179179

180-
ObTableInvalidType(25) {
180+
ObTableCharType(25) {
181+
},
182+
183+
ObTableInvalidType(26) {
181184
};
182185

183186
private int value;
@@ -230,6 +233,8 @@ public static ObTableObjType getTableObjType(ObObj obj) {
230233
} else if (obj.isMaxObj()) {
231234
return ObTableObjType.ObTableMaxType;
232235
}
236+
} else if (objType == ObObjType.ObCharType) {
237+
return ObTableObjType.ObTableCharType;
233238
}
234239

235240
throw new IllegalArgumentException("cannot get ObTableObjType, invalid ob obj type: "
@@ -258,6 +263,7 @@ public static ObTableObjType getTableObjType(ObObj obj) {
258263
tableObjTypeMap.put(ObTableLongBlobType, ObObjType.ObLongTextType);
259264
tableObjTypeMap.put(ObTableMinType, ObObjType.ObExtendType);
260265
tableObjTypeMap.put(ObTableMaxType, ObObjType.ObExtendType);
266+
tableObjTypeMap.put(ObTableCharType, ObObjType.ObCharType);
261267
}
262268

263269
public static ObObjType getObjType(ObTableObjType tableObjType) {

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,38 @@
2121
import java.util.Map;
2222

2323
public enum ObIndexType {
24-
IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), IndexTypeUniqueGlobal(
25-
4), IndexTypePrimary(
26-
5), IndexTypeDomainCtxcat(
27-
6), IndexTypeNormalGlobalLocalStorage(
28-
7), IndexTypeUniqueGlobalLocalStorage(
29-
8), IndexTypeSpatialLocal(
30-
10), IndexTypeSpatialGlobal(
31-
11), IndexTypeSpatialGlobalLocalStorage(
32-
12), IndexTypeMax(
33-
13);
24+
IndexTypeIsNot(0),
25+
IndexTypeNormalLocal(1),
26+
IndexTypeUniqueLocal(2),
27+
IndexTypeNormalGlobal(3),
28+
IndexTypeUniqueGlobal(4),
29+
IndexTypePrimary(5),
30+
IndexTypeDomainCtxcat(6),
31+
IndexTypeNormalGlobalLocalStorage(7),
32+
IndexTypeUniqueGlobalLocalStorage(8),
33+
IndexTypeSpatialLocal(10),
34+
IndexTypeSpatialGlobal(11),
35+
IndexTypeSpatialGlobalLocalStorage(12),
36+
IndexTypeRowkeyDocIdLocal(13),
37+
IndexTypeDocIdRowkeyLocal(14),
38+
IndexTypeFtsIndexLocal(15),
39+
IndexTypeFtsDocWordLocal(16),
40+
/*
41+
IndexTypeDocIdRowkeyGlobal(17),
42+
IndexTypeFtsIndexGlobal(18),
43+
IndexTypeFtsDocWordGlobal(19),
44+
IndexTypeDocIdRowkeyGlobalLocalStorage(20),
45+
IndexTypeFtsIndexGlobalLocalStorage(21),
46+
IndexTypeFtsDocWordGlobalLocalStorage(22),
47+
IndexTypeNormalMultivalueLocal(23),
48+
IndexTypeUniqueMultivalueLocal(24),
49+
IndexTypeVecRowkeyVidLocal(25),
50+
IndexTypeVecVidRowkeyLocal(26),
51+
IndexTypeVecDeltaBufferLocal(27),
52+
IndexTypeVecIndexIdLocal(28),
53+
IndexTypeVecIndexSnapshotDataLocal(29),
54+
*/
55+
IndexTypeMax(30);
3456

3557
private int value;
3658
private static Map<Integer, ObIndexType> map = new HashMap<Integer, ObIndexType>();

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

2020
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
21+
import com.alipay.oceanbase.rpc.table.ObFTSParams;
2122
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
2223
import com.alipay.oceanbase.rpc.table.ObKVParams;
2324
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
@@ -26,6 +27,7 @@
2627
import com.alipay.oceanbase.rpc.protocol.payload.Constants;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2829
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
30+
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
2931
import com.alipay.oceanbase.rpc.util.Serialization;
3032
import io.netty.buffer.ByteBuf;
3133

@@ -68,13 +70,14 @@ public class ObTableQuery extends AbstractPayload {
6870

6971
protected static final byte[] HTABLE_DUMMY_BYTES = new byte[] { 0x01, 0x00 };
7072
protected boolean isHbaseQuery = false;
73+
protected boolean isFTSQuery = false;
7174
protected List<String> scanRangeColumns = new LinkedList<String>();
7275

7376
protected List<ObTableAggregationSingle> aggregations = new LinkedList<>();
7477

7578
private Long partId = null;
7679

77-
protected ObKVParams obKVParams;
80+
protected ObKVParams obKVParams = null;
7881

7982
public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
8083
List<ObNewRange> keyRanges = getKeyRanges();
@@ -226,7 +229,7 @@ public byte[] encode() {
226229
idx += len;
227230
}
228231

229-
if (isHbaseQuery && obKVParams != null) {
232+
if (obKVParams != null) { // hbaseQuery or FTSQuery will use obKVParams
230233
len = (int) obKVParams.getPayloadSize();
231234
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
232235
idx += len;
@@ -293,7 +296,11 @@ public Object decode(ByteBuf buf) {
293296
String agg_column = Serialization.decodeVString(buf);
294297
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
295298
}
296-
if (isHbaseQuery) {
299+
300+
buf.markReaderIndex();
301+
if (buf.readByte() > 0) {
302+
// read pType if is exists
303+
buf.resetReaderIndex();
297304
obKVParams = new ObKVParams();
298305
this.obKVParams.decode(buf);
299306
}
@@ -328,7 +335,7 @@ public long getPayloadContentSize() {
328335
} else {
329336
contentSize += HTABLE_DUMMY_BYTES.length;
330337
}
331-
if (isHbaseQuery && obKVParams != null) {
338+
if (obKVParams != null) {
332339
contentSize += obKVParams.getPayloadSize();
333340
} else {
334341
contentSize += HTABLE_DUMMY_BYTES.length;
@@ -554,7 +561,22 @@ public void setObKVParams(ObKVParams obKVParams) {
554561
this.obKVParams = obKVParams;
555562
}
556563

564+
public void setSearchText(String searchText) {
565+
if (this.isHbaseQuery) {
566+
throw new FeatureNotSupportedException("Hbase query not support full text search currently");
567+
}
568+
if (this.obKVParams == null) {
569+
obKVParams = new ObKVParams();
570+
}
571+
ObFTSParams ftsParams = (ObFTSParams)obKVParams.getObParams(ObKVParamsBase.paramType.FTS);
572+
ftsParams.setSearchText(searchText);
573+
this.obKVParams.setObParamsBase(ftsParams);
574+
this.isFTSQuery = true;
575+
}
576+
557577
public ObKVParams getObKVParams() {
558578
return obKVParams;
559579
}
580+
581+
public boolean isFTSQuery() { return isFTSQuery; }
560582
}

src/main/java/com/alipay/oceanbase/rpc/stream/ObTableClientQueryAsyncStreamResult.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,9 @@ protected ObTableQueryAsyncResult referToNewPartition(ObPair<Long, ObTableParam>
117117
queryRequest.setPartitionId(obTableParam.getPartitionId());
118118
queryRequest.setTableId(obTableParam.getTableId());
119119
if (operationTimeout > 0) {
120-
queryRequest.setTimeout(operationTimeout);
120+
asyncRequest.setTimeout(operationTimeout);
121121
} else {
122-
queryRequest.setTimeout(obTableParam.getObTable().getObTableOperationTimeout());
122+
asyncRequest.setTimeout(obTableParam.getObTable().getObTableOperationTimeout());
123123
}
124124

125125
// refresh async query request

src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableQueryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
26+
import com.alipay.oceanbase.rpc.table.api.Table;
2627
import com.alipay.oceanbase.rpc.table.api.TableQuery;
2728

2829
import java.util.Arrays;
@@ -184,6 +185,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
184185
return this;
185186
}
186187

188+
@Override
189+
public TableQuery setSearchText(String searchText) {
190+
this.tableQuery.setSearchText(searchText);
191+
return this;
192+
}
193+
187194
public String getIndexTableName() {
188195
return indexTableName;
189196
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.table;
19+
20+
import com.alipay.oceanbase.rpc.util.Serialization;
21+
import io.netty.buffer.ByteBuf;
22+
23+
public class ObFTSParams extends ObKVParamsBase {
24+
String searchText = null;
25+
public ObFTSParams() {
26+
pType = paramType.FTS;
27+
}
28+
29+
public paramType getType() {
30+
return pType;
31+
}
32+
33+
public void setSearchText(String searchText) {
34+
this.searchText = searchText;
35+
}
36+
37+
public String getSearchText() { return this.searchText; }
38+
39+
public byte[] encode() {
40+
byte[] bytes = new byte[(int) getPayloadContentSize()];
41+
int idx = 0;
42+
byte[] b = new byte[] { (byte)pType.ordinal() };
43+
System.arraycopy(b, 0, bytes, idx, 1);
44+
idx += 1;
45+
int len = Serialization.getNeedBytes(searchText);
46+
System.arraycopy(Serialization.encodeVString(searchText), 0, bytes, idx, len);
47+
return bytes;
48+
}
49+
50+
public Object decode(ByteBuf buf) {
51+
// pType is read by ObKVParams
52+
this.searchText = Serialization.decodeVString(buf);
53+
return this;
54+
}
55+
56+
public long getPayloadContentSize() {
57+
return 1 /* pType*/ + Serialization.getNeedBytes(searchText);
58+
}
59+
60+
public String toString() {
61+
return "ObFtsParams: {\n pType = " + pType + ", \n searchText = " + searchText
62+
+ "\n}\n";
63+
}
64+
}

src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) {
3232
case HBase:
3333
return new ObHBaseParams();
3434
case Redis:
35+
throw new RuntimeException("Currently does not support redis type");
36+
case FTS:
37+
return new ObFTSParams();
3538
default:
3639
throw new RuntimeException("Currently does not support other types except HBase");
3740
}

0 commit comments

Comments
 (0)