Skip to content

Commit 4b2946e

Browse files
authored
Fix that partial insert with nulls may result in negative inserted point count (#17640)
* Fix that partial insert with nulls may result in negative inserted point count * fix test
1 parent 7cc9918 commit 4b2946e

4 files changed

Lines changed: 482 additions & 1 deletion

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ public int getFailedMeasurementNumber() {
337337
return failedMeasurementNumber;
338338
}
339339

340+
public boolean isMeasurementFailed(int index) {
341+
return measurements[index] == null;
342+
}
343+
340344
public boolean allMeasurementFailed() {
341345
if (measurements != null) {
342346
return failedMeasurementNumber

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ public int insertAlignedRow(InsertRowNode insertRowNode) {
242242
|| values[i] == null
243243
|| insertRowNode.getColumnCategories() != null
244244
&& insertRowNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD) {
245-
if (values[i] == null) {
245+
if (measurements[i] != null && values[i] == null) {
246+
// do not include failed measurement to avoid a negative pointsInserted
246247
nullPointsNumber++;
247248
}
248249
schemaList.add(null);
@@ -321,6 +322,11 @@ private static int computeTabletNullPointsNumber(
321322
int nullPointsNumber = 0;
322323
if (values != null) {
323324
for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) {
325+
if (insertTabletNode.isMeasurementFailed(i)) {
326+
// do not include failed measurement to avoid a negative pointsInserted
327+
continue;
328+
}
329+
324330
BitMap bitMap = (BitMap) values[i];
325331
if (bitMap != null && !bitMap.isAllUnmarked()) {
326332
for (int j = start; j < end; j++) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
21+
22+
import org.apache.iotdb.commons.exception.IllegalPathException;
23+
import org.apache.iotdb.commons.path.PartialPath;
24+
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
25+
26+
import org.apache.tsfile.enums.TSDataType;
27+
import org.apache.tsfile.write.schema.MeasurementSchema;
28+
import org.junit.Test;
29+
30+
import static org.junit.Assert.assertFalse;
31+
import static org.junit.Assert.assertTrue;
32+
33+
/**
34+
* Tests for {@link InsertNode#isMeasurementFailed(int)}.
35+
*
36+
* <p>The method was added to fix a bug where failed (partial-insert) measurements were incorrectly
37+
* counted when computing {@code pointsInserted}, which could produce a negative value.
38+
*/
39+
public class InsertNodeIsMeasurementFailedTest {
40+
41+
// -----------------------------------------------------------------------
42+
// InsertRowNode
43+
// -----------------------------------------------------------------------
44+
45+
@Test
46+
public void testInsertRowNode_noFailure_allReturnFalse() throws IllegalPathException {
47+
InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1", "s2"});
48+
49+
assertFalse("s0 should not be failed", node.isMeasurementFailed(0));
50+
assertFalse("s1 should not be failed", node.isMeasurementFailed(1));
51+
assertFalse("s2 should not be failed", node.isMeasurementFailed(2));
52+
}
53+
54+
@Test
55+
public void testInsertRowNode_markFirstFailed_firstReturnsTrue() throws IllegalPathException {
56+
InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1", "s2"});
57+
node.markFailedMeasurement(0);
58+
59+
assertTrue("s0 should be failed after markFailedMeasurement", node.isMeasurementFailed(0));
60+
assertFalse("s1 should not be failed", node.isMeasurementFailed(1));
61+
assertFalse("s2 should not be failed", node.isMeasurementFailed(2));
62+
}
63+
64+
@Test
65+
public void testInsertRowNode_markAllFailed_allReturnTrue() throws IllegalPathException {
66+
InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"});
67+
node.markFailedMeasurement(0);
68+
node.markFailedMeasurement(1);
69+
70+
assertTrue(node.isMeasurementFailed(0));
71+
assertTrue(node.isMeasurementFailed(1));
72+
}
73+
74+
@Test
75+
public void testInsertRowNode_markSameTwice_idempotent() throws IllegalPathException {
76+
// markFailedMeasurement is a no-op when already null; isMeasurementFailed must stay true
77+
InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"});
78+
node.markFailedMeasurement(0);
79+
node.markFailedMeasurement(0); // second call should be a no-op
80+
81+
assertTrue(node.isMeasurementFailed(0));
82+
assertFalse(node.isMeasurementFailed(1));
83+
}
84+
85+
// -----------------------------------------------------------------------
86+
// InsertTabletNode
87+
// -----------------------------------------------------------------------
88+
89+
@Test
90+
public void testInsertTabletNode_noFailure_allReturnFalse() throws IllegalPathException {
91+
InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1", "s2"});
92+
93+
assertFalse(node.isMeasurementFailed(0));
94+
assertFalse(node.isMeasurementFailed(1));
95+
assertFalse(node.isMeasurementFailed(2));
96+
}
97+
98+
@Test
99+
public void testInsertTabletNode_markMiddleFailed_onlyMiddleReturnsTrue()
100+
throws IllegalPathException {
101+
InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1", "s2"});
102+
node.markFailedMeasurement(1);
103+
104+
assertFalse(node.isMeasurementFailed(0));
105+
assertTrue("s1 should be failed", node.isMeasurementFailed(1));
106+
assertFalse(node.isMeasurementFailed(2));
107+
}
108+
109+
@Test
110+
public void testInsertTabletNode_markLastFailed_lastReturnsTrue() throws IllegalPathException {
111+
InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1"});
112+
node.markFailedMeasurement(1);
113+
114+
assertFalse(node.isMeasurementFailed(0));
115+
assertTrue(node.isMeasurementFailed(1));
116+
}
117+
118+
// -----------------------------------------------------------------------
119+
// Helpers
120+
// -----------------------------------------------------------------------
121+
122+
private static InsertRowNode buildInsertRowNode(String[] measurementNames)
123+
throws IllegalPathException {
124+
int n = measurementNames.length;
125+
TSDataType[] dataTypes = new TSDataType[n];
126+
Object[] values = new Object[n];
127+
MeasurementSchema[] schemas = new MeasurementSchema[n];
128+
for (int i = 0; i < n; i++) {
129+
dataTypes[i] = TSDataType.INT32;
130+
values[i] = i;
131+
schemas[i] = new MeasurementSchema(measurementNames[i], TSDataType.INT32);
132+
}
133+
InsertRowNode node =
134+
new InsertRowNode(
135+
new PlanNodeId("test"),
136+
new PartialPath("root.sg.d1"),
137+
false,
138+
measurementNames,
139+
dataTypes,
140+
schemas,
141+
1L,
142+
values,
143+
false);
144+
return node;
145+
}
146+
147+
private static InsertTabletNode buildInsertTabletNode(String[] measurementNames)
148+
throws IllegalPathException {
149+
int n = measurementNames.length;
150+
int rowCount = 3;
151+
TSDataType[] dataTypes = new TSDataType[n];
152+
Object[] columns = new Object[n];
153+
MeasurementSchema[] schemas = new MeasurementSchema[n];
154+
for (int i = 0; i < n; i++) {
155+
dataTypes[i] = TSDataType.INT32;
156+
columns[i] = new int[rowCount];
157+
schemas[i] = new MeasurementSchema(measurementNames[i], TSDataType.INT32);
158+
}
159+
long[] times = {1L, 2L, 3L};
160+
InsertTabletNode node =
161+
new InsertTabletNode(
162+
new PlanNodeId("test"),
163+
new PartialPath("root.sg.d1"),
164+
false,
165+
measurementNames,
166+
dataTypes,
167+
schemas,
168+
times,
169+
null,
170+
columns,
171+
rowCount);
172+
return node;
173+
}
174+
}

0 commit comments

Comments
 (0)