Skip to content

Commit b4078c7

Browse files
committed
Fix database-level time partition precision for writes
Convert database time partition config from milliseconds to the current timestamp precision before caching it, and avoid double conversion during partition table cleanup. Also resolve tree-model write splitting with null session database by deriving the database from the fetched data partition, so slot calculation and region lookup stay consistent.
1 parent 05ea17d commit b4078c7

9 files changed

Lines changed: 63 additions & 29 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.iotdb.db.exception.DataTypeInconsistentException;
3333
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
3434
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
35+
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
3536
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
3637
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
3738
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
@@ -228,6 +229,13 @@ public IDeviceID getDeviceID() {
228229
return deviceID;
229230
}
230231

232+
protected String getDatabaseName(final IAnalysis analysis, final IDeviceID deviceID) {
233+
final String databaseName = analysis.getDatabaseName();
234+
return databaseName != null
235+
? databaseName
236+
: analysis.getDataPartitionInfo().getDatabaseNameByDevice(deviceID);
237+
}
238+
231239
public void setDeviceID(IDeviceID deviceID) {
232240
this.deviceID = deviceID;
233241
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import org.apache.tsfile.enums.TSDataType;
4545
import org.apache.tsfile.exception.NotImplementedException;
46+
import org.apache.tsfile.file.metadata.IDeviceID;
4647
import org.apache.tsfile.read.TimeValuePair;
4748
import org.apache.tsfile.utils.Binary;
4849
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -127,13 +128,14 @@ public InsertRowNode(
127128

128129
@Override
129130
public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
131+
final IDeviceID deviceID = getDeviceID();
132+
final String databaseName = getDatabaseName(analysis, deviceID);
130133
TTimePartitionSlot timePartitionSlot =
131-
TimePartitionUtils.getTimePartitionSlot(time, analysis.getDatabaseName());
134+
TimePartitionUtils.getTimePartitionSlot(time, databaseName);
132135
this.dataRegionReplicaSet =
133136
analysis
134137
.getDataPartitionInfo()
135-
.getDataRegionReplicaSetForWriting(
136-
getDeviceID(), timePartitionSlot, analysis.getDatabaseName());
138+
.getDataRegionReplicaSetForWriting(deviceID, timePartitionSlot, databaseName);
137139
// collect redirectInfo
138140
analysis.setRedirectNodeList(
139141
Collections.singletonList(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
4040

4141
import org.apache.tsfile.exception.NotImplementedException;
42+
import org.apache.tsfile.file.metadata.IDeviceID;
4243
import org.apache.tsfile.utils.ReadWriteIOUtils;
4344

4445
import java.io.DataInputStream;
@@ -273,15 +274,16 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
273274
for (int i = 0; i < insertRowNodeList.size(); i++) {
274275
InsertRowNode insertRowNode = insertRowNodeList.get(i);
275276
// Data region for insert row node
276-
// each row may belong to different database, pass null for auto-detection
277+
// Each row may belong to different database.
278+
final IDeviceID deviceID = insertRowNode.targetPath.getIDeviceIDAsFullDevice();
279+
final String databaseName = getDatabaseName(analysis, deviceID);
277280
TRegionReplicaSet dataRegionReplicaSet =
278281
analysis
279282
.getDataPartitionInfo()
280283
.getDataRegionReplicaSetForWriting(
281-
insertRowNode.targetPath.getIDeviceIDAsFullDevice(),
282-
TimePartitionUtils.getTimePartitionSlot(
283-
insertRowNode.getTime(), analysis.getDatabaseName()),
284-
null);
284+
deviceID,
285+
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime(), databaseName),
286+
databaseName);
285287
// Collect redirectInfo
286288
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
287289
InsertRowsNode tmpNode = splitMap.get(dataRegionReplicaSet);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import org.apache.tsfile.enums.TSDataType;
4242
import org.apache.tsfile.exception.NotImplementedException;
43+
import org.apache.tsfile.file.metadata.IDeviceID;
4344
import org.apache.tsfile.utils.ReadWriteIOUtils;
4445

4546
import java.io.DataOutputStream;
@@ -167,19 +168,17 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
167168
Map<TRegionReplicaSet, Map<TTimePartitionSlot, List<InsertRowNode>>> splitMap = new HashMap<>();
168169
Map<TRegionReplicaSet, Map<TTimePartitionSlot, List<Integer>>> splitMapForIndex =
169170
new HashMap<>();
171+
final IDeviceID deviceID = targetPath.getIDeviceIDAsFullDevice();
172+
final String databaseName = getDatabaseName(analysis, deviceID);
170173

171174
for (int i = 0; i < insertRowNodeList.size(); i++) {
172175
InsertRowNode insertRowNode = insertRowNodeList.get(i);
173176
TTimePartitionSlot timePartitionSlot =
174-
TimePartitionUtils.getTimePartitionSlot(
175-
insertRowNode.getTime(), analysis.getDatabaseName());
177+
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime(), databaseName);
176178
TRegionReplicaSet dataRegionReplicaSet =
177179
analysis
178180
.getDataPartitionInfo()
179-
.getDataRegionReplicaSetForWriting(
180-
targetPath.getIDeviceIDAsFullDevice(),
181-
timePartitionSlot,
182-
analysis.getDatabaseName());
181+
.getDataRegionReplicaSetForWriting(deviceID, timePartitionSlot, databaseName);
183182
Map<TTimePartitionSlot, List<InsertRowNode>> tmpMap =
184183
splitMap.computeIfAbsent(dataRegionReplicaSet, k -> new HashMap<>());
185184
Map<TTimePartitionSlot, List<Integer>> tmpIndexMap =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,11 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
237237
return Collections.emptyList();
238238
}
239239

240+
final String databaseName = getDatabaseName(analysis, getDeviceID(0));
240241
final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
241-
collectSplitRanges(analysis.getDatabaseName());
242+
collectSplitRanges(databaseName);
242243
final Map<TRegionReplicaSet, List<Integer>> splitMap =
243-
splitByReplicaSet(deviceIDSplitInfoMap, analysis);
244+
splitByReplicaSet(deviceIDSplitInfoMap, analysis, databaseName);
244245
return doSplit(splitMap);
245246
}
246247

@@ -284,7 +285,9 @@ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges(String database) {
284285
}
285286

286287
protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
287-
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, IAnalysis analysis) {
288+
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
289+
IAnalysis analysis,
290+
String databaseName) {
288291
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
289292

290293
for (Entry<IDeviceID, PartitionSplitInfo> entry : deviceIDSplitInfoMap.entrySet()) {
@@ -294,7 +297,7 @@ protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
294297
analysis
295298
.getDataPartitionInfo()
296299
.getDataRegionReplicaSetForWriting(
297-
deviceID, splitInfo.timePartitionSlots, analysis.getDatabaseName());
300+
deviceID, splitInfo.timePartitionSlots, databaseName);
298301
splitInfo.replicaSets = replicaSets;
299302
// collect redirectInfo
300303
analysis.addEndPointToRedirectNodeList(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ protected InsertTabletNode getEmptySplit(int count) {
201201
}
202202

203203
protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
204-
final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap, final IAnalysis analysis) {
204+
final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
205+
final IAnalysis analysis,
206+
final String databaseName) {
205207
final Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
206208
final Map<IDeviceID, TEndPoint> endPointMap = new HashMap<>();
207209

@@ -212,7 +214,7 @@ protected Map<TRegionReplicaSet, List<Integer>> splitByReplicaSet(
212214
analysis
213215
.getDataPartitionInfo()
214216
.getDataRegionReplicaSetForWriting(
215-
deviceID, splitInfo.timePartitionSlots, analysis.getDatabaseName());
217+
deviceID, splitInfo.timePartitionSlots, databaseName);
216218
splitInfo.replicaSets = replicaSets;
217219
// collect redirectInfo
218220
endPointMap.put(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
2323
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
2424
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
25-
import org.apache.iotdb.commons.conf.CommonDescriptor;
26-
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
2725
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
2826
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2927
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
@@ -255,10 +253,7 @@ public TConsensusGroupId getLastConsensusGroupId() {
255253
*/
256254
public List<TTimePartitionSlot> autoCleanPartitionTable(
257255
long TTL, TTimePartitionSlot currentTimeSlot, String database) {
258-
final long timePartitionInterval =
259-
CommonDateTimeUtils.convertMilliTimeWithPrecision(
260-
TimePartitionUtils.getTimePartitionInterval(database),
261-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
256+
final long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval(database);
262257
List<TTimePartitionSlot> removedTimePartitions = new ArrayList<>();
263258
Iterator<Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>>> iterator =
264259
seriesPartitionMap.entrySet().iterator();

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,17 @@ public BigInteger getBigTimePartitionInterval() {
161161

162162
// Update or add database-specific time partition configuration
163163
public static void updateDatabaseTimePartitionConfig(String database, TDatabaseSchema schema) {
164+
String timestampPrecision = CommonDescriptor.getInstance().getConfig().getTimestampPrecision();
164165
long interval =
165166
schema.isSetTimePartitionInterval()
166-
? schema.getTimePartitionInterval()
167-
: timePartitionInterval;
167+
? CommonDateTimeUtils.convertMilliTimeWithPrecision(
168+
schema.getTimePartitionInterval(), timestampPrecision)
169+
: CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
168170
long origin =
169-
schema.isSetTimePartitionOrigin() ? schema.getTimePartitionOrigin() : timePartitionOrigin;
171+
schema.isSetTimePartitionOrigin()
172+
? CommonDateTimeUtils.convertMilliTimeWithPrecision(
173+
schema.getTimePartitionOrigin(), timestampPrecision)
174+
: CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin();
170175
databaseConfigCache.put(database, new DatabaseTimePartitionConfig(origin, interval));
171176
}
172177

iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/TimePartitionUtilsTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class TimePartitionUtilsTest {
3636

3737
@Before
3838
public void setUp() {
39+
CommonDescriptor.getInstance().getConfig().setTimestampPrecision("ms");
3940
CommonDescriptor.getInstance().getConfig().setTimePartitionOrigin(TEST_TIME_PARTITION_ORIGIN);
4041
CommonDescriptor.getInstance()
4142
.getConfig()
@@ -139,6 +140,23 @@ public void testDatabaseLevelTimePartition() {
139140
assertEquals(expectedSlot.getStartTime(), actualSlot.getStartTime());
140141
}
141142

143+
@Test
144+
public void testDatabaseLevelTimePartitionUsesTimestampPrecision() {
145+
CommonDescriptor.getInstance().getConfig().setTimestampPrecision("ns");
146+
TDatabaseSchema schema = new TDatabaseSchema();
147+
schema.setName("test.db");
148+
schema.setTimePartitionInterval(7200000L);
149+
schema.setTimePartitionOrigin(2000L);
150+
151+
TimePartitionUtils.updateDatabaseTimePartitionConfig("test.db", schema);
152+
153+
assertEquals(7200000_000_000L, TimePartitionUtils.getTimePartitionInterval("test.db"));
154+
assertEquals(2000_000_000L, TimePartitionUtils.getTimePartitionOrigin("test.db"));
155+
assertEquals(
156+
2000_000_000L,
157+
TimePartitionUtils.getTimePartitionSlot(2000_000_000L + 1, "test.db").getStartTime());
158+
}
159+
142160
@Test
143161
public void testDatabaseLevelTimePartitionFallbackToGlobal() {
144162
// Test with database that doesn't have custom settings

0 commit comments

Comments
 (0)