Skip to content

Commit f007ce7

Browse files
committed
Fix database time partition config cache
1 parent b4078c7 commit f007ce7

10 files changed

Lines changed: 173 additions & 124 deletions

File tree

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CNPhysicalPlanGeneratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public void databaseWithoutTemplateGeneratorTest() throws Exception {
335335
tDatabaseSchema.setTTL(i + 1);
336336
tDatabaseSchema.setDataReplicationFactor(i);
337337
tDatabaseSchema.setSchemaReplicationFactor(i);
338-
tDatabaseSchema.setTimePartitionInterval(i);
338+
tDatabaseSchema.setTimePartitionInterval(i + 1);
339339
DatabaseSchemaPlan databaseSchemaPlan =
340340
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, tDatabaseSchema);
341341
clusterSchemaInfo.createDatabase(databaseSchemaPlan);
@@ -453,7 +453,7 @@ public void templateAndDatabaseCompletedTest() throws Exception {
453453
tDatabaseSchema.setTTL(i + 1);
454454
tDatabaseSchema.setDataReplicationFactor(i);
455455
tDatabaseSchema.setSchemaReplicationFactor(i);
456-
tDatabaseSchema.setTimePartitionInterval(i);
456+
tDatabaseSchema.setTimePartitionInterval(i + 1);
457457
final DatabaseSchemaPlan databaseSchemaPlan =
458458
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, tDatabaseSchema);
459459
clusterSchemaInfo.createDatabase(databaseSchemaPlan);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java

Lines changed: 51 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -133,43 +133,43 @@ public static List<DataPartitionQueryParam> computeTableDataPartitionParams(
133133
String database = getDatabaseName(statement, context);
134134
if (statement instanceof InsertTabletStatement) {
135135
final InsertTabletStatement insertTabletStatement = (InsertTabletStatement) statement;
136-
final Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new HashMap<>();
136+
final Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
137137
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
138-
timePartitionSlotMap
139-
.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id -> new HashSet<>())
140-
.add(insertTabletStatement.getTimePartitionSlot(i, database));
138+
dataPartitionQueryParamMap
139+
.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id -> new ArrayList<>())
140+
.add(insertTabletStatement.getTimes()[i]);
141141
}
142-
return computeDataPartitionParams(timePartitionSlotMap, database);
142+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
143143
} else if (statement instanceof InsertMultiTabletsStatement) {
144144
final InsertMultiTabletsStatement insertMultiTabletsStatement =
145145
(InsertMultiTabletsStatement) statement;
146-
final Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new HashMap<>();
146+
final Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
147147
for (final InsertTabletStatement insertTabletStatement :
148148
insertMultiTabletsStatement.getInsertTabletStatementList()) {
149149
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
150-
timePartitionSlotMap
151-
.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id -> new HashSet<>())
152-
.add(insertTabletStatement.getTimePartitionSlot(i, database));
150+
dataPartitionQueryParamMap
151+
.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id -> new ArrayList<>())
152+
.add(insertTabletStatement.getTimes()[i]);
153153
}
154154
}
155-
return computeDataPartitionParams(timePartitionSlotMap, database);
155+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
156156
} else if (statement instanceof InsertRowStatement) {
157157
final InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
158-
return computeDataPartitionParams(
158+
return computeDataPartitionParamsByTime(
159159
Collections.singletonMap(
160160
insertRowStatement.getTableDeviceID(),
161-
Collections.singleton(insertRowStatement.getTimePartitionSlot(database))),
161+
Collections.singletonList(insertRowStatement.getTime())),
162162
database);
163163
} else if (statement instanceof InsertRowsStatement) {
164164
final InsertRowsStatement insertRowsStatement = (InsertRowsStatement) statement;
165-
final Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new HashMap<>();
165+
final Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
166166
for (final InsertRowStatement insertRowStatement :
167167
insertRowsStatement.getInsertRowStatementList()) {
168-
timePartitionSlotMap
169-
.computeIfAbsent(insertRowStatement.getTableDeviceID(), id -> new HashSet<>())
170-
.add(insertRowStatement.getTimePartitionSlot(database));
168+
dataPartitionQueryParamMap
169+
.computeIfAbsent(insertRowStatement.getTableDeviceID(), id -> new ArrayList<>())
170+
.add(insertRowStatement.getTime());
171171
}
172-
return computeDataPartitionParams(timePartitionSlotMap, database);
172+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
173173
}
174174
throw new UnsupportedOperationException(
175175
DataNodeQueryMessages.COMPUTEDATAPARTITIONPARAMS_FOR + statement);
@@ -184,95 +184,51 @@ public static List<DataPartitionQueryParam> computeTreeDataPartitionParams(
184184
return Collections.singletonList(dataPartitionQueryParam);
185185
} else if (statement instanceof InsertRowStatement) {
186186
InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
187-
if (database == null) {
188-
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
189-
dataPartitionQueryParam.setDeviceID(
190-
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice());
191-
dataPartitionQueryParam.setTimeList(
192-
Collections.singletonList(insertRowStatement.getTime()));
193-
return Collections.singletonList(dataPartitionQueryParam);
194-
}
195-
return computeDataPartitionParams(
187+
return computeDataPartitionParamsByTime(
196188
Collections.singletonMap(
197189
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
198-
Collections.singleton(insertRowStatement.getTimePartitionSlot(database))),
190+
Collections.singletonList(insertRowStatement.getTime())),
199191
database);
200192
} else if (statement instanceof InsertMultiTabletsStatement) {
201193
InsertMultiTabletsStatement insertMultiTabletsStatement =
202194
(InsertMultiTabletsStatement) statement;
203-
if (database == null) {
204-
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
205-
for (InsertTabletStatement insertTabletStatement :
206-
insertMultiTabletsStatement.getInsertTabletStatementList()) {
207-
List<Long> timeList =
208-
dataPartitionQueryParamMap.computeIfAbsent(
209-
insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice(),
210-
k -> new ArrayList<>());
211-
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
212-
timeList.add(insertTabletStatement.getTimes()[i]);
213-
}
214-
}
215-
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
216-
}
217-
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
195+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
218196
for (InsertTabletStatement insertTabletStatement :
219197
insertMultiTabletsStatement.getInsertTabletStatementList()) {
220-
Set<TTimePartitionSlot> timePartitionSlotSet =
198+
List<Long> timeList =
221199
dataPartitionQueryParamMap.computeIfAbsent(
222200
insertTabletStatement.getDevicePath().getIDeviceIDAsFullDevice(),
223-
k -> new HashSet<>());
224-
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots(database));
201+
k -> new ArrayList<>());
202+
for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
203+
timeList.add(insertTabletStatement.getTimes()[i]);
204+
}
225205
}
226-
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
206+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
227207
} else if (statement instanceof InsertRowsOfOneDeviceStatement) {
228208
final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement =
229209
(InsertRowsOfOneDeviceStatement) statement;
230-
if (database == null) {
231-
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
232-
List<Long> timeList =
233-
dataPartitionQueryParamMap.computeIfAbsent(
234-
insertRowsOfOneDeviceStatement.getDevicePath().getIDeviceIDAsFullDevice(),
235-
k -> new ArrayList<>());
236-
for (InsertRowStatement insertRowStatement :
237-
insertRowsOfOneDeviceStatement.getInsertRowStatementList()) {
238-
timeList.add(insertRowStatement.getTime());
239-
}
240-
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
241-
}
242-
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
243-
Set<TTimePartitionSlot> timePartitionSlotSet =
210+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
211+
List<Long> timeList =
244212
dataPartitionQueryParamMap.computeIfAbsent(
245213
insertRowsOfOneDeviceStatement.getDevicePath().getIDeviceIDAsFullDevice(),
246-
k -> new HashSet<>());
214+
k -> new ArrayList<>());
247215
for (InsertRowStatement insertRowStatement :
248216
insertRowsOfOneDeviceStatement.getInsertRowStatementList()) {
249-
timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot(database));
217+
timeList.add(insertRowStatement.getTime());
250218
}
251-
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
219+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
252220
} else if (statement instanceof InsertRowsStatement) {
253221
final InsertRowsStatement insertRowsStatement = (InsertRowsStatement) statement;
254-
if (database == null) {
255-
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
256-
for (InsertRowStatement insertRowStatement :
257-
insertRowsStatement.getInsertRowStatementList()) {
258-
dataPartitionQueryParamMap
259-
.computeIfAbsent(
260-
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
261-
k -> new ArrayList<>())
262-
.add(insertRowStatement.getTime());
263-
}
264-
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap);
265-
}
266-
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
222+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap = new HashMap<>();
267223
for (InsertRowStatement insertRowStatement :
268224
insertRowsStatement.getInsertRowStatementList()) {
269-
Set<TTimePartitionSlot> timePartitionSlotSet =
270-
dataPartitionQueryParamMap.computeIfAbsent(
225+
dataPartitionQueryParamMap
226+
.computeIfAbsent(
271227
insertRowStatement.getDevicePath().getIDeviceIDAsFullDevice(),
272-
k -> new HashSet<>());
273-
timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot(database));
228+
k -> new ArrayList<>())
229+
.add(insertRowStatement.getTime());
274230
}
275-
return computeDataPartitionParams(dataPartitionQueryParamMap, database);
231+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, database);
276232
}
277233
throw new UnsupportedOperationException(
278234
DataNodeQueryMessages.COMPUTEDATAPARTITIONPARAMS_FOR + statement);
@@ -282,14 +238,12 @@ private static DataPartitionQueryParam getTreeDataPartitionQueryParam(
282238
InsertTabletStatement statement, String database) {
283239
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
284240
dataPartitionQueryParam.setDeviceID(statement.getDevicePath().getIDeviceIDAsFullDevice());
285-
if (database == null) {
286-
List<Long> timeList = new ArrayList<>();
287-
for (int i = 0; i < statement.getRowCount(); i++) {
288-
timeList.add(statement.getTimes()[i]);
289-
}
290-
dataPartitionQueryParam.setTimeList(timeList);
291-
} else {
292-
dataPartitionQueryParam.setTimePartitionSlotList(statement.getTimePartitionSlots(database));
241+
List<Long> timeList = new ArrayList<>();
242+
for (int i = 0; i < statement.getRowCount(); i++) {
243+
timeList.add(statement.getTimes()[i]);
244+
}
245+
dataPartitionQueryParam.setTimeList(timeList);
246+
if (database != null) {
293247
dataPartitionQueryParam.setDatabaseName(database);
294248
}
295249
return dataPartitionQueryParam;
@@ -316,11 +270,19 @@ public static List<DataPartitionQueryParam> computeDataPartitionParams(
316270

317271
private static List<DataPartitionQueryParam> computeDataPartitionParamsByTime(
318272
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap) {
273+
return computeDataPartitionParamsByTime(dataPartitionQueryParamMap, null);
274+
}
275+
276+
private static List<DataPartitionQueryParam> computeDataPartitionParamsByTime(
277+
Map<IDeviceID, List<Long>> dataPartitionQueryParamMap, String databaseName) {
319278
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
320279
for (Map.Entry<IDeviceID, List<Long>> entry : dataPartitionQueryParamMap.entrySet()) {
321280
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
322281
dataPartitionQueryParam.setDeviceID(entry.getKey());
323282
dataPartitionQueryParam.setTimeList(entry.getValue());
283+
if (databaseName != null) {
284+
dataPartitionQueryParam.setDatabaseName(databaseName);
285+
}
324286
dataPartitionQueryParams.add(dataPartitionQueryParam);
325287
}
326288
return dataPartitionQueryParams;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
3535
import org.apache.iotdb.commons.path.PathPatternTree;
3636
import org.apache.iotdb.commons.schema.SchemaConstant;
37+
import org.apache.iotdb.commons.utils.PathUtils;
3738
import org.apache.iotdb.commons.utils.TimePartitionUtils;
3839
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
3940
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -335,23 +336,36 @@ public void ensureDatabaseTimePartitionConfig(Set<String> databases) {
335336
if (missingDatabases.isEmpty()) {
336337
return;
337338
}
339+
final boolean needFetchTableModelDatabase =
340+
missingDatabases.stream().anyMatch(PathUtils::isTableModelDatabase);
341+
final boolean needFetchTreeModelDatabase =
342+
missingDatabases.stream().anyMatch(database -> !PathUtils.isTableModelDatabase(database));
338343
try (final ConfigNodeClient client =
339344
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
340-
final TDatabaseSchemaResp databaseSchemaResp =
341-
client.getMatchedDatabaseSchemas(
342-
new TGetDatabaseReq(
343-
Arrays.asList("root", "**"), SchemaConstant.ALL_MATCH_SCOPE_BINARY)
344-
.setIsTableModel(false)
345-
.setCanSeeAuditDB(true));
346-
if (databaseSchemaResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
347-
partitionCache.updateDatabaseCache(databaseSchemaResp.getDatabaseSchemaMap());
345+
if (needFetchTreeModelDatabase) {
346+
updateDatabaseCache(client, false);
347+
}
348+
if (needFetchTableModelDatabase) {
349+
updateDatabaseCache(client, true);
348350
}
349351
} catch (final ClientManagerException | TException e) {
350352
throw new StatementAnalyzeException(
351353
"An error occurred when fetching database time partition config:" + e.getMessage());
352354
}
353355
}
354356

357+
private void updateDatabaseCache(final ConfigNodeClient client, final boolean isTableModel)
358+
throws TException {
359+
final TDatabaseSchemaResp databaseSchemaResp =
360+
client.getMatchedDatabaseSchemas(
361+
new TGetDatabaseReq(Arrays.asList("root", "**"), SchemaConstant.ALL_MATCH_SCOPE_BINARY)
362+
.setIsTableModel(isTableModel)
363+
.setCanSeeAuditDB(true));
364+
if (databaseSchemaResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
365+
partitionCache.updateDatabaseCache(databaseSchemaResp.getDatabaseSchemaMap());
366+
}
367+
}
368+
355369
@Override
356370
public SchemaPartition getOrCreateSchemaPartition(
357371
final String database, final List<IDeviceID> deviceIDs, final String userName) {
@@ -436,6 +450,10 @@ private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
436450
database = dataPartitionQueryParam.getDatabaseName();
437451
}
438452
if (database != null) {
453+
if (PathUtils.isTableModelDatabase(database)) {
454+
partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate, userName);
455+
}
456+
ensureDatabaseTimePartitionConfig(Collections.singleton(database));
439457
final String finalDatabase = database;
440458
if (dataPartitionQueryParam.hasTimeList()) {
441459
dataPartitionQueryParam.setTimePartitionSlotList(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,9 @@ private void createDatabaseAndUpdateCache(
366366
throw new IoTDBRuntimeException(tsStatus.message, tsStatus.code);
367367
}
368368
}
369-
// Try to update database cache when all databases have already been created
370-
if (needRefreshCacheFromConfigNode) {
369+
// Fetch the completed schema from ConfigNode after auto-creation. The locally constructed
370+
// schema may miss default time partition settings that ConfigNode fills in.
371+
if (needRefreshCacheFromConfigNode || !successFullyCreatedDatabase.isEmpty()) {
371372
fetchDatabaseAndUpdateCache(client, false);
372373
} else {
373374
updateDatabaseCache(successFullyCreatedDatabaseSchema);
@@ -404,8 +405,9 @@ private void createDatabaseAndUpdateCache(final String database, final String us
404405
databaseSchema.setIsTableModel(true);
405406
final TSStatus tsStatus = client.setDatabase(databaseSchema);
406407
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()) {
407-
// Try to update cache by databases successfully created
408-
updateDatabaseCache(Collections.singletonMap(database, databaseSchema));
408+
// Fetch the completed schema from ConfigNode after auto-creation. The locally constructed
409+
// schema may miss default time partition settings that ConfigNode fills in.
410+
fetchDatabaseAndUpdateCache(client, true);
409411
} else if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode()) {
410412
fetchDatabaseAndUpdateCache(client, true);
411413
} else {

0 commit comments

Comments
 (0)