Skip to content

Commit ab887eb

Browse files
authored
Fix relational delete node serialization (#17867)
* Fix relational delete node serialization * Fix deletion entry serialized sizes * Fix remaining serialized size calculations * Fix ranged WAL serialized size calculations * Fix relational delete resource identity
1 parent 6532d29 commit ab887eb

22 files changed

Lines changed: 254 additions & 69 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public PlanNode clone() {
226226
public int serializedSize() {
227227
int size = FIXED_SERIALIZED_SIZE;
228228
for (PartialPath path : pathList) {
229-
size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
229+
size += WALWriteUtils.sizeToWrite(path.getFullPath());
230230
}
231231
return size;
232232
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,16 @@ public int getFailedMeasurementNumber() {
340340
return failedMeasurementNumber;
341341
}
342342

343+
protected int getValidMeasurementNumber() {
344+
int validMeasurementNumber = 0;
345+
for (String measurement : measurements) {
346+
if (measurement != null) {
347+
validMeasurementNumber++;
348+
}
349+
}
350+
return validMeasurementNumber;
351+
}
352+
343353
public boolean isMeasurementFailed(int index) {
344354
return measurements[index] == null;
345355
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ public int serializedSize() {
556556
protected int subSerializeSize() {
557557
int size = 0;
558558
size += Long.BYTES;
559-
size += ReadWriteIOUtils.sizeToWrite(targetPath.getFullPath());
559+
size += WALWriteUtils.sizeToWrite(targetPath.getFullPath());
560560
return size + serializeMeasurementsAndValuesSize();
561561
}
562562

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -816,16 +816,30 @@ public int serializedSize(int start, int end) {
816816
return Short.BYTES + subSerializeSize(start, end);
817817
}
818818

819+
/** Serialized size for wal */
820+
public int serializedSize(List<int[]> rangeList) {
821+
return Short.BYTES + subSerializeSize(rangeList);
822+
}
823+
819824
int subSerializeSize(int start, int end) {
825+
return subSerializeSizeByRange(Collections.singletonList(new int[] {start, end}));
826+
}
827+
828+
int subSerializeSize(List<int[]> rangeList) {
829+
return subSerializeSizeByRange(rangeList);
830+
}
831+
832+
private int subSerializeSizeByRange(List<int[]> rangeList) {
820833
int size = 0;
821834
size += Long.BYTES;
822-
size += ReadWriteIOUtils.sizeToWrite(targetPath.getFullPath());
835+
size += WALWriteUtils.sizeToWrite(targetPath.getFullPath());
836+
int rowNumInRange = getRowNumInRange(rangeList);
823837
// measurements size
824838
size += Integer.BYTES;
825839
size += serializeMeasurementSchemasSize();
826840
// times size
827841
size += Integer.BYTES;
828-
size += Long.BYTES * (end - start);
842+
size += Long.BYTES * rowNumInRange;
829843
// bitmaps size
830844
size += Byte.BYTES;
831845
if (bitMaps != null) {
@@ -837,17 +851,23 @@ int subSerializeSize(int start, int end) {
837851

838852
size += Byte.BYTES;
839853
if (bitMaps[i] != null) {
840-
int len = end - start;
841-
BitMap partBitMap = new BitMap(len);
842-
BitMap.copyOfRange(bitMaps[i], start, partBitMap, 0, len);
854+
BitMap partBitMap = new BitMap(rowNumInRange);
855+
int copiedLength = 0;
856+
for (int[] range : rangeList) {
857+
int len = range[1] - range[0];
858+
BitMap.copyOfRange(bitMaps[i], range[0], partBitMap, copiedLength, len);
859+
copiedLength += len;
860+
}
843861
size += partBitMap.getByteArray().length;
844862
}
845863
}
846864
}
847865
// values size
848866
for (int i = 0; i < dataTypes.length; i++) {
849867
if (columns[i] != null) {
850-
size += getColumnSize(dataTypes[i], columns[i], start, end);
868+
for (int[] range : rangeList) {
869+
size += getColumnSize(dataTypes[i], columns[i], range[0], range[1]);
870+
}
851871
}
852872
}
853873
// isAlign
@@ -857,6 +877,14 @@ int subSerializeSize(int start, int end) {
857877
return size;
858878
}
859879

880+
private int getRowNumInRange(List<int[]> rangeList) {
881+
int rowNumInRange = 0;
882+
for (int[] range : rangeList) {
883+
rowNumInRange += range[1] - range[0];
884+
}
885+
return rowNumInRange;
886+
}
887+
860888
private int getColumnSize(TSDataType dataType, Object column, int start, int end) {
861889
int size = 0;
862890
switch (dataType) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
3737
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
3838

39+
import org.apache.tsfile.common.conf.TSFileConfig;
3940
import org.apache.tsfile.utils.PublicBAOS;
4041
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
4142
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -57,8 +58,8 @@
5758
public class RelationalDeleteDataNode extends AbstractDeleteDataNode {
5859
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalDeleteDataNode.class);
5960

60-
/** byte: type */
61-
private static final int FIXED_SERIALIZED_SIZE = Short.BYTES;
61+
/** short: type, long: searchIndex */
62+
private static final int FIXED_SERIALIZED_SIZE = Short.BYTES + Long.BYTES;
6263

6364
private final List<TableDeletionEntry> modEntries;
6465

@@ -220,9 +221,18 @@ public int serializedSize() {
220221
for (TableDeletionEntry modEntry : modEntries) {
221222
size += modEntry.serializedSize();
222223
}
224+
size += sizeToWriteVarString(databaseName);
223225
return size;
224226
}
225227

228+
private static int sizeToWriteVarString(final String value) {
229+
if (value == null) {
230+
return ReadWriteForEncodingUtils.varIntSize(-1);
231+
}
232+
final int byteLength = value.getBytes(TSFileConfig.STRING_CHARSET).length;
233+
return ReadWriteForEncodingUtils.varIntSize(byteLength) + byteLength;
234+
}
235+
226236
@Override
227237
public void serializeToWAL(IWALByteBufferView buffer) {
228238
serializeToWAL(buffer, getEncodedSearchIndex());
@@ -236,6 +246,7 @@ public void serializeToWAL(IWALByteBufferView buffer, long encodedSearchIndex) {
236246
for (TableDeletionEntry modEntry : modEntries) {
237247
modEntry.serialize(buffer);
238248
}
249+
ReadWriteIOUtils.writeVar(databaseName, buffer);
239250
} catch (IOException e) {
240251
LOGGER.error(DataNodeQueryMessages.FAILED_TO_SERIALIZE_MODENTRY_TO_WAL, e);
241252
}
@@ -283,12 +294,14 @@ public boolean equals(final Object obj) {
283294
}
284295
final RelationalDeleteDataNode that = (RelationalDeleteDataNode) obj;
285296
return this.getPlanNodeId().equals(that.getPlanNodeId())
286-
&& Objects.equals(this.modEntries, that.modEntries);
297+
&& Objects.equals(this.modEntries, that.modEntries)
298+
&& Objects.equals(this.databaseName, that.databaseName)
299+
&& Objects.equals(this.progressIndex, that.progressIndex);
287300
}
288301

289302
@Override
290303
public int hashCode() {
291-
return Objects.hash(getPlanNodeId(), modEntries, progressIndex);
304+
return Objects.hash(getPlanNodeId(), modEntries, databaseName, progressIndex);
292305
}
293306

294307
public String toString() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void subDeserialize(ByteBuffer buffer) {
219219

220220
@Override
221221
protected int subSerializeSize() {
222-
return super.subSerializeSize() + columnCategories.length * Byte.BYTES;
222+
return super.subSerializeSize() + getValidMeasurementNumber() * Byte.BYTES;
223223
}
224224

225225
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,12 @@ public static RelationalInsertTabletNode deserializeFromWAL(ByteBuffer buffer) {
329329

330330
@Override
331331
int subSerializeSize(int start, int end) {
332-
return super.subSerializeSize(start, end) + columnCategories.length * Byte.BYTES;
332+
return super.subSerializeSize(start, end) + getValidMeasurementNumber() * Byte.BYTES;
333+
}
334+
335+
@Override
336+
int subSerializeSize(List<int[]> rangeList) {
337+
return super.subSerializeSize(rangeList) + getValidMeasurementNumber() * Byte.BYTES;
333338
}
334339

335340
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,13 @@ public void deserialize(ByteBuffer buffer) {
147147
}
148148

149149
public int serializedSize() {
150-
// table name + id predicate +
150+
// table name + id predicate + measurement names
151151
int size =
152-
ReadWriteForEncodingUtils.varIntSize(tableName.length())
153-
+ tableName.length() * Character.BYTES
152+
ModEntry.sizeToWriteVarString(tableName)
154153
+ idPredicate.serializedSize()
155154
+ ReadWriteForEncodingUtils.varIntSize(measurementNames.size());
156155
for (String measurementName : measurementNames) {
157-
size +=
158-
ReadWriteForEncodingUtils.varIntSize(
159-
measurementName.length() * measurementName.length() * Character.BYTES);
156+
size += ModEntry.sizeToWriteVarString(measurementName);
160157
}
161158
return size;
162159
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/IDPredicate.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.db.utils.io.BufferSerializable;
2323
import org.apache.iotdb.db.utils.io.StreamSerializable;
2424

25-
import org.apache.tsfile.common.conf.TSFileConfig;
2625
import org.apache.tsfile.file.metadata.IDeviceID;
2726
import org.apache.tsfile.file.metadata.IDeviceID.Deserializer;
2827
import org.apache.tsfile.utils.Accountable;
@@ -265,15 +264,9 @@ public SegmentExactMatch() {
265264

266265
@Override
267266
public int serializedSize() {
268-
if (pattern != null) {
269-
byte[] bytes = pattern.getBytes(TSFileConfig.STRING_CHARSET);
270-
return super.serializedSize()
271-
+ ReadWriteForEncodingUtils.varIntSize(bytes.length)
272-
+ bytes.length * Character.BYTES
273-
+ ReadWriteForEncodingUtils.varIntSize(segmentIndex);
274-
} else {
275-
return ReadWriteForEncodingUtils.varIntSize(-1);
276-
}
267+
return super.serializedSize()
268+
+ ModEntry.sizeToWriteVarString(pattern)
269+
+ ReadWriteForEncodingUtils.varIntSize(segmentIndex);
277270
}
278271

279272
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModEntry.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import org.apache.iotdb.db.utils.io.StreamSerializable;
2525

2626
import org.apache.tsfile.annotations.TreeModel;
27+
import org.apache.tsfile.common.conf.TSFileConfig;
2728
import org.apache.tsfile.file.metadata.IDeviceID;
2829
import org.apache.tsfile.read.common.TimeRange;
2930
import org.apache.tsfile.utils.Accountable;
31+
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
3032
import org.apache.tsfile.utils.ReadWriteIOUtils;
3133

3234
import java.io.EOFException;
@@ -47,7 +49,15 @@ protected ModEntry(ModType modType) {
4749

4850
public int serializedSize() {
4951
// modType + time range
50-
return Byte.BYTES + Long.BYTES * 2 + Byte.BYTES * 2;
52+
return Byte.BYTES + Long.BYTES * 2;
53+
}
54+
55+
static int sizeToWriteVarString(final String value) {
56+
if (value == null) {
57+
return ReadWriteForEncodingUtils.varIntSize(-1);
58+
}
59+
final int byteLength = value.getBytes(TSFileConfig.STRING_CHARSET).length;
60+
return ReadWriteForEncodingUtils.varIntSize(byteLength) + byteLength;
5161
}
5262

5363
@Override

0 commit comments

Comments
 (0)