Skip to content

Commit b33725a

Browse files
committed
Support database-specific time partitions
Defer tree insert time slot calculation until database resolution. Refresh database time partition config before data partition fetch and compute query slots per database. Expose database time partition metadata consistently in information schema.
1 parent 3c6fb00 commit b33725a

15 files changed

Lines changed: 260 additions & 81 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ public TSStatus createDatabase(final DatabaseSchemaPlan plan) {
206206
.getDatabaseNodeByDatabasePath(partialPathName)
207207
.getAsMNode()
208208
.setDatabaseSchema(databaseSchema);
209-
TimePartitionUtils.updateDatabaseTimePartitionConfig(databaseSchema.getName(), databaseSchema);
209+
TimePartitionUtils.updateDatabaseTimePartitionConfig(
210+
databaseSchema.getName(), databaseSchema);
210211

211212
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
212213
} catch (final MetadataException e) {

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfoTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.iotdb.commons.exception.IllegalPathException;
2323
import org.apache.iotdb.commons.path.PartialPath;
2424
import org.apache.iotdb.commons.schema.template.Template;
25-
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2625
import org.apache.iotdb.commons.utils.PathUtils;
26+
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2727
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2828
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
2929
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -338,16 +338,16 @@ protected void constructLine() {
338338
} else {
339339
columnBuilders[1].writeBinary(
340340
new Binary(String.valueOf(currentDatabase.getTTL()), TSFileConfig.STRING_CHARSET));
341-
}
342-
columnBuilders[2].writeInt(currentDatabase.getSchemaReplicationFactor());
343-
columnBuilders[3].writeInt(currentDatabase.getDataReplicationFactor());
344-
columnBuilders[4].writeLong(currentDatabase.getTimePartitionOrigin());
345-
columnBuilders[5].writeLong(currentDatabase.getTimePartitionInterval());
346-
columnBuilders[6].writeInt(currentDatabase.getSchemaRegionNum());
347-
columnBuilders[7].writeInt(currentDatabase.getDataRegionNum());
348-
resultBuilder.declarePosition();
349-
currentDatabase = null;
350341
}
342+
columnBuilders[2].writeInt(currentDatabase.getSchemaReplicationFactor());
343+
columnBuilders[3].writeInt(currentDatabase.getDataReplicationFactor());
344+
columnBuilders[4].writeLong(currentDatabase.getTimePartitionOrigin());
345+
columnBuilders[5].writeLong(currentDatabase.getTimePartitionInterval());
346+
columnBuilders[6].writeInt(currentDatabase.getSchemaRegionNum());
347+
columnBuilders[7].writeInt(currentDatabase.getDataRegionNum());
348+
resultBuilder.declarePosition();
349+
currentDatabase = null;
350+
}
351351

352352
@Override
353353
public boolean hasNext() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
4949
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
5050
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
51+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
5152
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
5253
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
5354
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
@@ -174,50 +175,120 @@ public static List<DataPartitionQueryParam> computeTableDataPartitionParams(
174175

175176
public static List<DataPartitionQueryParam> computeTreeDataPartitionParams(
176177
InsertBaseStatement statement, MPPQueryContext context) {
178+
final String database = getDatabaseName(statement, context);
177179
if (statement instanceof InsertTabletStatement) {
178180
DataPartitionQueryParam dataPartitionQueryParam =
179-
getTreeDataPartitionQueryParam((InsertTabletStatement) statement, context);
181+
getTreeDataPartitionQueryParam((InsertTabletStatement) statement, database);
180182
return Collections.singletonList(dataPartitionQueryParam);
183+
} else if (statement instanceof InsertRowStatement) {
184+
InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
185+
if (database == null) {
186+
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
187+
dataPartitionQueryParam.setDeviceID(
188+
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice());
189+
dataPartitionQueryParam.setTimeList(
190+
Collections.singletonList(insertRowStatement.getTime()));
191+
return Collections.singletonList(dataPartitionQueryParam);
192+
}
193+
return computeDataPartitionParams(
194+
Collections.singletonMap(
195+
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
196+
Collections.singleton(insertRowStatement.getTimePartitionSlot(database))),
197+
database);
181198
} else if (statement instanceof InsertMultiTabletsStatement) {
182199
InsertMultiTabletsStatement insertMultiTabletsStatement =
183200
(InsertMultiTabletsStatement) statement;
201+
if (database == null) {
202+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
203+
for (InsertTabletStatement insertTabletStatement :
204+
insertMultiTabletsStatement.getInsertTabletStatementList()) {
205+
List<Long> timeList =
206+
dataPartitionQueryParamMap.computeIfAbsent(
207+
insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice(),
208+
k -> new ArrayList<>());
209+
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
210+
timeList.add(insertTabletStatement.getTimes()[i]);
211+
}
212+
}
213+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
214+
}
184215
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
185216
for (InsertTabletStatement insertTabletStatement :
186217
insertMultiTabletsStatement.getInsertTabletStatementList()) {
187218
Set<TTimePartitionSlot> timePartitionSlotSet =
188219
dataPartitionQueryParamMap.computeIfAbsent(
189220
insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice(),
190221
k -> new HashSet<>());
191-
timePartitionSlotSet.addAll(
192-
insertTabletStatement.getTimePartitionSlots(getDatabaseName(statement, context)));
222+
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots(database));
193223
}
194-
return computeDataPartitionParams(
195-
dataPartitionQueryParamMap, getDatabaseName(statement, context));
224+
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
225+
} else if (statement instanceof InsertRowsOfOneDeviceStatement) {
226+
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement =
227+
(InsertRowsOfOneDeviceStatement) statement;
228+
if (database == null) {
229+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
230+
List<Long> timeList =
231+
dataPartitionQueryParamMap.computeIfAbsent(
232+
insertRowsOfOneDeviceStatement.getDevicePath().getIDeviceIDAsFullDevice(),
233+
k -> new ArrayList<>());
234+
for (InsertRowStatement insertRowStatement :
235+
insertRowsOfOneDeviceStatement.getInsertRowStatementList()) {
236+
timeList.add(insertRowStatement.getTime());
237+
}
238+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
239+
}
240+
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
241+
Set<TTimePartitionSlot> timePartitionSlotSet =
242+
dataPartitionQueryParamMap.computeIfAbsent(
243+
insertRowsOfOneDeviceStatement.getDevicePath().getIDeviceIDAsFullDevice(),
244+
k -> new HashSet<>());
245+
for (InsertRowStatement insertRowStatement :
246+
insertRowsOfOneDeviceStatement.getInsertRowStatementList()) {
247+
timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot(database));
248+
}
249+
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
196250
} else if (statement instanceof InsertRowsStatement) {
197251
final InsertRowsStatement insertRowsStatement = (InsertRowsStatement) statement;
252+
if (database == null) {
253+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
254+
for (InsertRowStatement insertRowStatement :
255+
insertRowsStatement.getInsertRowStatementList()) {
256+
dataPartitionQueryParamMap
257+
.computeIfAbsent(
258+
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
259+
k -> new ArrayList<>())
260+
.add(insertRowStatement.getTime());
261+
}
262+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
263+
}
198264
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
199265
for (InsertRowStatement insertRowStatement :
200266
insertRowsStatement.getInsertRowStatementList()) {
201267
Set<TTimePartitionSlot> timePartitionSlotSet =
202268
dataPartitionQueryParamMap.computeIfAbsent(
203269
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
204270
k -> new HashSet<>());
205-
timePartitionSlotSet.add(
206-
insertRowStatement.getTimePartitionSlot(getDatabaseName(statement, context)));
271+
timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot(database));
207272
}
208-
return computeDataPartitionParams(
209-
dataPartitionQueryParamMap, getDatabaseName(statement, context));
273+
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
210274
}
211275
throw new UnsupportedOperationException("computeDataPartitionParams for " + statement);
212276
}
213277

214278
private static DataPartitionQueryParam getTreeDataPartitionQueryParam(
215-
InsertTabletStatement statement, MPPQueryContext context) {
279+
InsertTabletStatement statement, String database) {
216280
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
217281
dataPartitionQueryParam.setDeviceID(statement.getDevicePath().getIDeviceIDAsFullDevice());
218-
dataPartitionQueryParam.setTimePartitionSlotList(
219-
statement.getTimePartitionSlots(getDatabaseName(statement, context)));
220-
dataPartitionQueryParam.setDatabaseName(getDatabaseName(statement, context));
282+
if (database == null) {
283+
List<Long> timeList = new ArrayList<>();
284+
for (int i = 0; i < statement.getRowCount(); i++) {
285+
timeList.add(statement.getTimes()[i]);
286+
}
287+
dataPartitionQueryParam.setTimeList(timeList);
288+
} else {
289+
dataPartitionQueryParam.setTimePartitionSlotList(statement.getTimePartitionSlots(database));
290+
dataPartitionQueryParam.setDatabaseName(database);
291+
}
221292
return dataPartitionQueryParam;
222293
}
223294

@@ -240,6 +311,18 @@ public static List<DataPartitionQueryParam> computeDataPartitionParams(
240311
return dataPartitionQueryParams;
241312
}
242313

314+
private static List<DataPartitionQueryParam> computeDataPartitionParamsByTime(
315+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap) {
316+
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
317+
for (Map.Entry<IDeviceID, List<Long>> entry : dataPartitionQueryParamMap.entrySet()) {
318+
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
319+
dataPartitionQueryParam.setDeviceID(entry.getKey());
320+
dataPartitionQueryParam.setTimeList(entry.getValue());
321+
dataPartitionQueryParams.add(dataPartitionQueryParam);
322+
}
323+
return dataPartitionQueryParams;
324+
}
325+
243326
public static void validateSchema(
244327
IAnalysis analysis, InsertBaseStatement insertStatement, Runnable schemaValidation) {
245328
final long startTime = System.nanoTime();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,26 +2129,37 @@ private DataPartition fetchDataPartitionByDevices(
21292129
Set<IDeviceID> deviceSet, ISchemaTree schemaTree, MPPQueryContext context) {
21302130
long startTime = System.nanoTime();
21312131
try {
2132-
Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
2133-
getTimePartitionSlotList(
2134-
context.getGlobalTimeFilter(), context, context.getDatabaseName().orElse(null));
2135-
// there is no satisfied time range
2136-
if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
2137-
return new DataPartition(
2138-
Collections.emptyMap(),
2139-
CONFIG.getSeriesPartitionExecutorClass(),
2140-
CONFIG.getSeriesPartitionSlotNum());
2141-
}
2142-
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
2132+
Map<String, List<IDeviceID>> databaseToDevices = new HashMap<>();
21432133
for (IDeviceID deviceID : deviceSet) {
2144-
DataPartitionQueryParam queryParam =
2145-
new DataPartitionQueryParam(deviceID, res.left, res.right.left, res.right.right);
2146-
sgNameToQueryParamsMap
2134+
databaseToDevices
21472135
.computeIfAbsent(schemaTree.getBelongedDatabase(deviceID), key -> new ArrayList<>())
2148-
.add(queryParam);
2136+
.add(deviceID);
21492137
}
2138+
partitionFetcher.ensureDatabaseTimePartitionConfig(databaseToDevices.keySet());
21502139

2151-
if (res.right.left || res.right.right) {
2140+
boolean hasUnclosedTimeRange = false;
2141+
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
2142+
for (Map.Entry<String, List<IDeviceID>> entry : databaseToDevices.entrySet()) {
2143+
String database = entry.getKey();
2144+
Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
2145+
getTimePartitionSlotList(context.getGlobalTimeFilter(), context, database);
2146+
// there is no satisfied time range
2147+
if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
2148+
return new DataPartition(
2149+
Collections.emptyMap(),
2150+
CONFIG.getSeriesPartitionExecutorClass(),
2151+
CONFIG.getSeriesPartitionSlotNum());
2152+
}
2153+
hasUnclosedTimeRange |= res.right.left || res.right.right;
2154+
for (IDeviceID deviceID : entry.getValue()) {
2155+
sgNameToQueryParamsMap
2156+
.computeIfAbsent(database, key -> new ArrayList<>())
2157+
.add(
2158+
new DataPartitionQueryParam(deviceID, res.left, res.right.left, res.right.right));
2159+
}
2160+
}
2161+
2162+
if (hasUnclosedTimeRange) {
21522163
return partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
21532164
} else {
21542165
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
@@ -2774,17 +2785,9 @@ public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryCo
27742785
analysis.setRealStatement(realInsertStatement);
27752786

27762787
if (realInsertStatement instanceof InsertRowStatement) {
2777-
InsertRowStatement realInsertRowStatement = (InsertRowStatement) realInsertStatement;
2778-
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
2779-
dataPartitionQueryParam.setDeviceID(
2780-
realInsertRowStatement.getDevicePath().getIDeviceIDAsFullDevice());
2781-
dataPartitionQueryParam.setTimePartitionSlotList(
2782-
Collections.singletonList(
2783-
realInsertRowStatement.getTimePartitionSlot(context.getDatabaseName().orElse(null))));
2784-
27852788
AnalyzeUtils.analyzeDataPartition(
27862789
analysis,
2787-
Collections.singletonList(dataPartitionQueryParam),
2790+
AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement, context),
27882791
context.getSession().getUserName(),
27892792
partitionFetcher::getOrCreateDataPartition);
27902793
} else {
@@ -2865,15 +2868,9 @@ public Analysis visitInsertRowsOfOneDevice(
28652868
analysis.setRealStatement(realInsertStatement);
28662869

28672870
if (realInsertStatement instanceof InsertRowsOfOneDeviceStatement) {
2868-
InsertRowsOfOneDeviceStatement realStatement =
2869-
(InsertRowsOfOneDeviceStatement) realInsertStatement;
2870-
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
2871-
dataPartitionQueryParam.setDeviceID(realStatement.getDevicePath().getIDeviceIDAsFullDevice());
2872-
dataPartitionQueryParam.setTimePartitionSlotList(realStatement.getTimePartitionSlots());
2873-
28742871
AnalyzeUtils.analyzeDataPartition(
28752872
analysis,
2876-
Collections.singletonList(dataPartitionQueryParam),
2873+
AnalyzeUtils.computeTreeDataPartitionParams(realInsertStatement, context),
28772874
context.getSession().getUserName(),
28782875
partitionFetcher::getOrCreateDataPartition);
28792876
} else {

0 commit comments

Comments
 (0)