Skip to content

Commit c233196

Browse files
[To dev/1.3] Implemented fast last query for on prefix path (#15810)
* Fast last query on local * Remove debug settings * Remove debug settings 2 * [TIMECHODB]rest service add FastLastQuery method * [TIMECHODB]Fixed multiple device return value format issues with REST service * Fix * [TIMECHODB]Fixed the format issue of return values for the fast and query interfaces of the rest service * Remove binary * wildcard detection * Update RestApiServiceImpl.java * filter * fix * fix * fix-2 * licene --------- Co-authored-by: luke.miao <282583553@qq.com>
1 parent dc870f0 commit c233196

20 files changed

Lines changed: 514 additions & 26 deletions

File tree

iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.isession.util.SystemStatus;
2626
import org.apache.iotdb.isession.util.Version;
2727
import org.apache.iotdb.rpc.IoTDBConnectionException;
28+
import org.apache.iotdb.rpc.RedirectException;
2829
import org.apache.iotdb.rpc.StatementExecutionException;
2930
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
3031
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -176,6 +177,9 @@ SessionDataSet executeLastDataQuery(List<String> paths, long lastTime, long time
176177
SessionDataSet executeLastDataQuery(List<String> paths)
177178
throws StatementExecutionException, IoTDBConnectionException;
178179

180+
SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
181+
throws IoTDBConnectionException, StatementExecutionException, RedirectException;
182+
179183
SessionDataSet executeLastDataQueryForOneDevice(
180184
String db, String device, List<String> sensors, boolean isLegalPathNodes)
181185
throws StatementExecutionException, IoTDBConnectionException;

iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.isession.util.SystemStatus;
2525
import org.apache.iotdb.isession.util.Version;
2626
import org.apache.iotdb.rpc.IoTDBConnectionException;
27+
import org.apache.iotdb.rpc.RedirectException;
2728
import org.apache.iotdb.rpc.StatementExecutionException;
2829
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
2930
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -486,6 +487,9 @@ SessionDataSetWrapper executeLastDataQueryForOneDevice(
486487
String db, String device, List<String> sensors, boolean isLegalPathNodes)
487488
throws StatementExecutionException, IoTDBConnectionException;
488489

490+
SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
491+
throws IoTDBConnectionException, StatementExecutionException, RedirectException;
492+
489493
SessionDataSetWrapper executeAggregationQuery(
490494
List<String> paths, List<TAggregationType> aggregations)
491495
throws StatementExecutionException, IoTDBConnectionException;

iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,12 @@ public SessionDataSet executeLastDataQuery(List<String> paths)
10461046
return executeLastDataQuery(paths, time, queryTimeoutInMs);
10471047
}
10481048

1049+
@Override
1050+
public SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
1051+
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
1052+
return defaultSessionConnection.executeLastDataQueryForOnePrefixPath(prefixes);
1053+
}
1054+
10491055
@Override
10501056
public SessionDataSet executeLastDataQueryForOneDevice(
10511057
String db, String device, List<String> sensors, boolean isLegalPathNodes)

iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
4646
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
4747
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
48+
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
4849
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
4950
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
5051
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -442,6 +443,43 @@ protected SessionDataSet executeRawDataQuery(
442443
zoneId);
443444
}
444445

446+
protected SessionDataSet executeLastDataQueryForOnePrefixPath(final List<String> prefixes)
447+
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
448+
TSFastLastDataQueryForOnePrefixPathReq req =
449+
new TSFastLastDataQueryForOnePrefixPathReq(sessionId, prefixes, statementId);
450+
req.setFetchSize(session.fetchSize);
451+
req.setEnableRedirectQuery(enableRedirect);
452+
453+
RetryResult<TSExecuteStatementResp> result =
454+
callWithReconnect(
455+
() -> {
456+
req.setSessionId(sessionId);
457+
req.setStatementId(statementId);
458+
return client.executeFastLastDataQueryForOnePrefixPath(req);
459+
});
460+
final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
461+
462+
if (result.getRetryAttempts() == 0) {
463+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
464+
} else {
465+
RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
466+
}
467+
468+
return new SessionDataSet(
469+
"",
470+
tsExecuteStatementResp.getColumns(),
471+
tsExecuteStatementResp.getDataTypeList(),
472+
tsExecuteStatementResp.columnNameIndexMap,
473+
tsExecuteStatementResp.getQueryId(),
474+
statementId,
475+
client,
476+
sessionId,
477+
tsExecuteStatementResp.queryResult,
478+
tsExecuteStatementResp.isIgnoreTimeStamp(),
479+
tsExecuteStatementResp.moreData,
480+
zoneId);
481+
}
482+
445483
protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
446484
String db, String device, List<String> sensors, boolean isLegalPathNodes, long timeOut)
447485
throws StatementExecutionException, IoTDBConnectionException {

iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3167,6 +3167,33 @@ public SessionDataSetWrapper executeLastDataQuery(List<String> paths)
31673167
return null;
31683168
}
31693169

3170+
@Override
3171+
public SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
3172+
throws IoTDBConnectionException, StatementExecutionException {
3173+
for (int i = 0; i < RETRY; i++) {
3174+
ISession session = getSession();
3175+
try {
3176+
SessionDataSet resp = session.executeFastLastDataQueryForOnePrefixPath(prefixes);
3177+
SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
3178+
occupy(session);
3179+
return wrapper;
3180+
} catch (IoTDBConnectionException e) {
3181+
// TException means the connection is broken, remove it and get a new one.
3182+
LOGGER.warn("executeLastDataQuery failed", e);
3183+
cleanSessionAndMayThrowConnectionException(session, i, e);
3184+
} catch (StatementExecutionException | RuntimeException e) {
3185+
putBack(session);
3186+
throw e;
3187+
} catch (Throwable e) {
3188+
LOGGER.error(EXECUTE_LASTDATAQUERY_ERROR, e);
3189+
putBack(session);
3190+
throw new RuntimeException(e);
3191+
}
3192+
}
3193+
// never go here
3194+
return null;
3195+
}
3196+
31703197
@Override
31713198
public SessionDataSetWrapper executeLastDataQueryForOneDevice(
31723199
String db, String device, List<String> sensors, boolean isLegalPathNodes)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.protocol.rest.v2.handler;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.db.protocol.rest.v2.model.ExecutionStatus;
24+
import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList;
25+
import org.apache.iotdb.db.protocol.session.IClientSession;
26+
import org.apache.iotdb.rpc.TSStatusCode;
27+
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
28+
29+
import javax.ws.rs.core.Response;
30+
31+
import java.util.ArrayList;
32+
import java.util.Collections;
33+
34+
public class FastLastHandler {
35+
36+
public static TSLastDataQueryReq createTSLastDataQueryReq(
37+
IClientSession clientSession, PrefixPathList prefixPathList) {
38+
TSLastDataQueryReq req = new TSLastDataQueryReq();
39+
req.sessionId = clientSession.getId();
40+
req.paths =
41+
Collections.singletonList(String.join(".", prefixPathList.getPrefixPaths()) + ".**");
42+
req.time = Long.MIN_VALUE;
43+
req.setLegalPathNodes(true);
44+
return req;
45+
}
46+
47+
public static Response buildErrorResponse(TSStatusCode statusCode) {
48+
return Response.ok()
49+
.entity(
50+
new org.apache.iotdb.db.protocol.rest.model.ExecutionStatus()
51+
.code(statusCode.getStatusCode())
52+
.message(statusCode.name()))
53+
.build();
54+
}
55+
56+
public static Response buildExecutionStatusResponse(TSStatus status) {
57+
return Response.ok()
58+
.entity(new ExecutionStatus().code(status.getCode()).message(status.getMessage()))
59+
.build();
60+
}
61+
62+
public static void setupTargetDataSet(
63+
org.apache.iotdb.db.protocol.rest.v2.model.QueryDataSet dataSet) {
64+
dataSet.addExpressionsItem("Timeseries");
65+
dataSet.addExpressionsItem("Value");
66+
dataSet.addExpressionsItem("DataType");
67+
dataSet.addDataTypesItem("TEXT");
68+
dataSet.addDataTypesItem("TEXT");
69+
dataSet.addDataTypesItem("TEXT");
70+
dataSet.setValues(new ArrayList<>());
71+
dataSet.setTimestamps(new ArrayList<>());
72+
}
73+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/RequestValidationHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.iotdb.db.protocol.rest.v2.model.ExpressionRequest;
2121
import org.apache.iotdb.db.protocol.rest.v2.model.InsertRecordsRequest;
2222
import org.apache.iotdb.db.protocol.rest.v2.model.InsertTabletRequest;
23+
import org.apache.iotdb.db.protocol.rest.v2.model.PrefixPathList;
2324
import org.apache.iotdb.db.protocol.rest.v2.model.SQL;
2425

2526
import org.apache.commons.lang3.Validate;
@@ -40,6 +41,13 @@ public static void validateSQL(SQL sql) {
4041
}
4142
}
4243

44+
public static void validatePrefixPaths(PrefixPathList prefixPathList) {
45+
Objects.requireNonNull(prefixPathList.getPrefixPaths(), "prefix_paths should not be null");
46+
if (prefixPathList.getPrefixPaths().isEmpty()) {
47+
throw new IllegalArgumentException("prefix_paths should not be empty");
48+
}
49+
}
50+
4351
public static void validateInsertTabletRequest(InsertTabletRequest insertTabletRequest) {
4452
Objects.requireNonNull(insertTabletRequest.getTimestamps(), "timestamps should not be null");
4553
Objects.requireNonNull(insertTabletRequest.getIsAligned(), "is_aligned should not be null");

0 commit comments

Comments
 (0)