Skip to content

Commit d19993c

Browse files
committed
Merge branch 'master' of https://github.com/apache/iotdb into mem-schema-ctrl
2 parents 8b75a8e + bec0409 commit d19993c

21 files changed

Lines changed: 349 additions & 40 deletions

File tree

example/session/src/main/java/org/apache/iotdb/SessionExample.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.isession.template.Template;
2626
import org.apache.iotdb.isession.util.Version;
2727
import org.apache.iotdb.rpc.IoTDBConnectionException;
28+
import org.apache.iotdb.rpc.RedirectException;
2829
import org.apache.iotdb.rpc.StatementExecutionException;
2930
import org.apache.iotdb.rpc.TSStatusCode;
3031
import org.apache.iotdb.session.Session;
@@ -66,7 +67,7 @@ public class SessionExample {
6667
private static Random random = new Random();
6768

6869
public static void main(String[] args)
69-
throws IoTDBConnectionException, StatementExecutionException {
70+
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
7071
session =
7172
new Session.Builder()
7273
.host(LOCAL_HOST)
@@ -119,6 +120,7 @@ public static void main(String[] args)
119120
sessionEnableRedirect.setFetchSize(10000);
120121

121122
fastLastDataQueryForOneDevice();
123+
fastLastDataQueryForOnePrefix();
122124
insertRecord4Redirect();
123125
query4Redirect();
124126
sessionEnableRedirect.close();
@@ -715,6 +717,20 @@ private static void fastLastDataQueryForOneDevice()
715717
}
716718
}
717719

720+
private static void fastLastDataQueryForOnePrefix()
721+
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
722+
System.out.println("-------fastLastQueryForOnePrefix------");
723+
try (SessionDataSet sessionDataSet =
724+
sessionEnableRedirect.executeFastLastDataQueryForOnePrefixPath(
725+
Arrays.asList("root", "sg1"))) {
726+
System.out.println(sessionDataSet.getColumnNames());
727+
sessionDataSet.setFetchSize(1024);
728+
while (sessionDataSet.hasNext()) {
729+
System.out.println(sessionDataSet.next());
730+
}
731+
}
732+
}
733+
718734
private static void aggregationQuery()
719735
throws IoTDBConnectionException, StatementExecutionException {
720736
List<String> paths = new ArrayList<>();

iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.isession.util.SystemStatus;
2626
import org.apache.iotdb.isession.util.Version;
2727
import org.apache.iotdb.rpc.IoTDBConnectionException;
28+
import org.apache.iotdb.rpc.RedirectException;
2829
import org.apache.iotdb.rpc.StatementExecutionException;
2930
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
3031
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -185,6 +186,9 @@ SessionDataSet executeLastDataQuery(List<String> paths, long lastTime, long time
185186
SessionDataSet executeLastDataQuery(List<String> paths)
186187
throws StatementExecutionException, IoTDBConnectionException;
187188

189+
SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
190+
throws IoTDBConnectionException, StatementExecutionException, RedirectException;
191+
188192
SessionDataSet executeLastDataQueryForOneDevice(
189193
String db, String device, List<String> sensors, boolean isLegalPathNodes)
190194
throws StatementExecutionException, IoTDBConnectionException;

iotdb-client/isession/src/main/java/org/apache/iotdb/isession/pool/ISessionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.isession.util.SystemStatus;
2525
import org.apache.iotdb.isession.util.Version;
2626
import org.apache.iotdb.rpc.IoTDBConnectionException;
27+
import org.apache.iotdb.rpc.RedirectException;
2728
import org.apache.iotdb.rpc.StatementExecutionException;
2829
import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
2930
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
@@ -486,6 +487,9 @@ SessionDataSetWrapper executeLastDataQueryForOneDevice(
486487
String db, String device, List<String> sensors, boolean isLegalPathNodes)
487488
throws StatementExecutionException, IoTDBConnectionException;
488489

490+
SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
491+
throws IoTDBConnectionException, StatementExecutionException, RedirectException;
492+
489493
SessionDataSetWrapper executeAggregationQuery(
490494
List<String> paths, List<TAggregationType> aggregations)
491495
throws StatementExecutionException, IoTDBConnectionException;

iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,12 @@ public SessionDataSet executeLastDataQuery(List<String> paths)
11051105
return executeLastDataQuery(paths, time, queryTimeoutInMs);
11061106
}
11071107

1108+
@Override
1109+
public SessionDataSet executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
1110+
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
1111+
return defaultSessionConnection.executeLastDataQueryForOnePrefixPath(prefixes);
1112+
}
1113+
11081114
@Override
11091115
public SessionDataSet executeLastDataQueryForOneDevice(
11101116
String db, String device, List<String> sensors, boolean isLegalPathNodes)

iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
4747
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
4848
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
49+
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
4950
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
5051
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
5152
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
@@ -495,6 +496,46 @@ protected SessionDataSet executeRawDataQuery(
495496
execResp.getColumnIndex2TsBlockColumnIndexList());
496497
}
497498

499+
protected SessionDataSet executeLastDataQueryForOnePrefixPath(final List<String> prefixes)
500+
throws StatementExecutionException, IoTDBConnectionException, RedirectException {
501+
TSFastLastDataQueryForOnePrefixPathReq req =
502+
new TSFastLastDataQueryForOnePrefixPathReq(sessionId, prefixes, statementId);
503+
req.setFetchSize(session.fetchSize);
504+
req.setEnableRedirectQuery(enableRedirect);
505+
506+
RetryResult<TSExecuteStatementResp> result =
507+
callWithReconnect(
508+
() -> {
509+
req.setSessionId(sessionId);
510+
req.setStatementId(statementId);
511+
return client.executeFastLastDataQueryForOnePrefixPath(req);
512+
});
513+
final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
514+
515+
if (result.getRetryAttempts() == 0) {
516+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
517+
} else {
518+
RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
519+
}
520+
521+
return new SessionDataSet(
522+
"",
523+
tsExecuteStatementResp.getColumns(),
524+
tsExecuteStatementResp.getDataTypeList(),
525+
tsExecuteStatementResp.columnNameIndexMap,
526+
tsExecuteStatementResp.getQueryId(),
527+
statementId,
528+
client,
529+
sessionId,
530+
tsExecuteStatementResp.queryResult,
531+
tsExecuteStatementResp.isIgnoreTimeStamp(),
532+
tsExecuteStatementResp.moreData,
533+
zoneId,
534+
timeFactor,
535+
false,
536+
tsExecuteStatementResp.getColumnIndex2TsBlockColumnIndexList());
537+
}
538+
498539
protected Pair<SessionDataSet, TEndPoint> executeLastDataQueryForOneDevice(
499540
String db, String device, List<String> sensors, boolean isLegalPathNodes, long timeOut)
500541
throws StatementExecutionException, IoTDBConnectionException {

iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3214,6 +3214,33 @@ public SessionDataSetWrapper executeLastDataQuery(List<String> paths)
32143214
return null;
32153215
}
32163216

3217+
@Override
3218+
public SessionDataSetWrapper executeFastLastDataQueryForOnePrefixPath(final List<String> prefixes)
3219+
throws IoTDBConnectionException, StatementExecutionException {
3220+
for (int i = 0; i < RETRY; i++) {
3221+
ISession session = getSession();
3222+
try {
3223+
SessionDataSet resp = session.executeFastLastDataQueryForOnePrefixPath(prefixes);
3224+
SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this);
3225+
occupy(session);
3226+
return wrapper;
3227+
} catch (IoTDBConnectionException e) {
3228+
// TException means the connection is broken, remove it and get a new one.
3229+
LOGGER.warn("executeLastDataQuery failed", e);
3230+
cleanSessionAndMayThrowConnectionException(session, i, e);
3231+
} catch (StatementExecutionException | RuntimeException e) {
3232+
putBack(session);
3233+
throw e;
3234+
} catch (Throwable e) {
3235+
LOGGER.error(EXECUTE_LASTDATAQUERY_ERROR, e);
3236+
putBack(session);
3237+
throw new RuntimeException(e);
3238+
}
3239+
}
3240+
// never go here
3241+
return null;
3242+
}
3243+
32173244
@Override
32183245
public SessionDataSetWrapper executeLastDataQueryForOneDevice(
32193246
String db, String device, List<String> sensors, boolean isLegalPathNodes)

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public class IoTDBConfig {
151151

152152
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 10;
153153

154-
private long allocateMemoryPerWalCache = 2 * 1024 * 1024;
154+
private long allocateMemoryPerWalCache = 512 * 1024;
155155

156156
/** Flush proportion for system */
157157
private double flushProportion = 0.4;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
5050
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
52-
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE;
5352
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
53+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
5454
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
5555
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY;
5656
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
@@ -95,7 +95,7 @@ protected PipeConsensusTransferBatchReqBuilder(
9595
final long requestMaxBatchSizeInBytes =
9696
parameters.getLongOrDefault(
9797
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
98-
CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE);
98+
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
9999

100100
allocatedMemoryBlock =
101101
PipeDataNodeResourceManager.memory()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@
8989
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
9090
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
9191
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
92+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache;
93+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
94+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableId;
9295
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
9396
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SetSqlDialect;
9497
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
@@ -116,6 +119,8 @@
116119
import org.apache.iotdb.db.queryengine.plan.statement.metadata.template.UnsetSchemaTemplateStatement;
117120
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateTableViewStatement;
118121
import org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
122+
import org.apache.iotdb.db.schemaengine.SchemaEngine;
123+
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
119124
import org.apache.iotdb.db.schemaengine.template.TemplateQueryType;
120125
import org.apache.iotdb.db.storageengine.StorageEngine;
121126
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -151,6 +156,7 @@
151156
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
152157
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
153158
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
159+
import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOnePrefixPathReq;
154160
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
155161
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
156162
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
@@ -187,6 +193,7 @@
187193
import org.apache.tsfile.block.column.Column;
188194
import org.apache.tsfile.common.conf.TSFileConfig;
189195
import org.apache.tsfile.common.conf.TSFileDescriptor;
196+
import org.apache.tsfile.common.constant.TsFileConstant;
190197
import org.apache.tsfile.enums.TSDataType;
191198
import org.apache.tsfile.file.metadata.IDeviceID;
192199
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
@@ -926,6 +933,93 @@ public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) {
926933
return executeLastDataQueryInternal(req, SELECT_RESULT);
927934
}
928935

936+
@Override
937+
public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
938+
final TSFastLastDataQueryForOnePrefixPathReq req) {
939+
final IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
940+
if (!SESSION_MANAGER.checkLogin(clientSession)) {
941+
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
942+
}
943+
944+
try {
945+
final long queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
946+
// 1. Map<Device, String[] measurements> ISchemaFetcher.getAllSensors(prefix) ~= 50ms
947+
948+
final PartialPath prefixPath = new PartialPath(req.getPrefixes().toArray(new String[0]));
949+
final Map<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>> resultMap =
950+
new HashMap<>();
951+
int sensorNum = 0;
952+
953+
final String prefixString = prefixPath.toString();
954+
for (final ISchemaRegion region : SchemaEngine.getInstance().getAllSchemaRegions()) {
955+
if (!prefixString.startsWith(region.getDatabaseFullPath())
956+
&& !region.getDatabaseFullPath().startsWith(prefixString)) {
957+
continue;
958+
}
959+
sensorNum += region.fillLastQueryMap(prefixPath, resultMap);
960+
}
961+
962+
// 2.DATA_NODE_SCHEMA_CACHE.getLastCache()
963+
if (!TableDeviceSchemaCache.getInstance().getLastCache(resultMap)) {
964+
// 2.1 any sensor miss cache, construct last query sql, then return
965+
return executeLastDataQueryInternal(convert(req), SELECT_RESULT);
966+
}
967+
968+
// 2.2 all sensors hit cache, return response ~= 20ms
969+
final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum);
970+
971+
for (final Map.Entry<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>>
972+
result : resultMap.entrySet()) {
973+
for (final Map.Entry<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>
974+
device2MeasurementLastEntry : result.getValue().entrySet()) {
975+
final String deviceWithSeparator =
976+
device2MeasurementLastEntry.getKey().toString() + TsFileConstant.PATH_SEPARATOR;
977+
for (final Map.Entry<String, Pair<TSDataType, TimeValuePair>> measurementLastEntry :
978+
device2MeasurementLastEntry.getValue().entrySet()) {
979+
final TimeValuePair tvPair = measurementLastEntry.getValue().getRight();
980+
if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) {
981+
LastQueryUtil.appendLastValue(
982+
builder,
983+
tvPair.getTimestamp(),
984+
deviceWithSeparator + measurementLastEntry.getKey(),
985+
tvPair.getValue().getStringValue(),
986+
measurementLastEntry.getValue().getLeft().name());
987+
}
988+
}
989+
}
990+
}
991+
992+
final TSExecuteStatementResp resp =
993+
createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
994+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
995+
if (builder.isEmpty()) {
996+
resp.setQueryResult(Collections.emptyList());
997+
} else {
998+
resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
999+
}
1000+
1001+
resp.setMoreData(false);
1002+
return resp;
1003+
} catch (final Exception e) {
1004+
return RpcUtils.getTSExecuteStatementResp(
1005+
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
1006+
}
1007+
}
1008+
1009+
private TSLastDataQueryReq convert(final TSFastLastDataQueryForOnePrefixPathReq req) {
1010+
TSLastDataQueryReq tsLastDataQueryReq =
1011+
new TSLastDataQueryReq(
1012+
req.sessionId,
1013+
Collections.singletonList(String.join(".", req.getPrefixes()) + ".**"),
1014+
Long.MIN_VALUE,
1015+
req.statementId);
1016+
tsLastDataQueryReq.setFetchSize(req.fetchSize);
1017+
tsLastDataQueryReq.setEnableRedirectQuery(req.enableRedirectQuery);
1018+
tsLastDataQueryReq.setLegalPathNodes(true);
1019+
tsLastDataQueryReq.setTimeout(req.timeout);
1020+
return tsLastDataQueryReq;
1021+
}
1022+
9291023
@Override
9301024
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
9311025
TSFastLastDataQueryForOneDeviceReq req) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/IDualKeyCache.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import javax.annotation.concurrent.GuardedBy;
2323

24+
import java.util.Map;
25+
import java.util.function.BiFunction;
2426
import java.util.function.Predicate;
2527
import java.util.function.ToIntFunction;
2628

@@ -37,6 +39,9 @@ public interface IDualKeyCache<FK, SK, V> {
3739
/** Get the cache value with given first key and second key. */
3840
V get(final FK firstKey, final SK secondKey);
3941

42+
<R> boolean batchApply(
43+
final Map<FK, Map<SK, R>> inputMap, final BiFunction<V, R, Boolean> mappingFunction);
44+
4045
/**
4146
* Update the existing value. The updater shall return the difference caused by the update,
4247
* because we do not want to call "valueSizeComputer" twice, which may include abundant useless

0 commit comments

Comments
 (0)