Skip to content

Commit 56a0fcc

Browse files
authored
[to dev/1.3] Fix tree model WAL serialized sizes (#17886)
* [to dev/1.3] Fix tree model WAL serialized sizes * Fix WAL charset deserialization * Fix insert node serde charset handling * spotless
1 parent d906c6a commit 56a0fcc

16 files changed

Lines changed: 406 additions & 73 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
3737
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
3838
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
39+
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALReadUtils;
3940
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
4041

4142
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
@@ -129,7 +130,7 @@ public List<String> getOutputColumnNames() {
129130
public int serializedSize() {
130131
int size = FIXED_SERIALIZED_SIZE;
131132
for (PartialPath path : pathList) {
132-
size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
133+
size += WALWriteUtils.sizeToWrite(path.getFullPath());
133134
}
134135
return size;
135136
}
@@ -153,10 +154,9 @@ public static DeleteDataNode deserializeFromWAL(DataInputStream stream) throws I
153154
for (int i = 0; i < size; i++) {
154155
try {
155156
pathList.add(
156-
DataNodeDevicePathCache.getInstance()
157-
.getPartialPath(ReadWriteIOUtils.readString(stream)));
157+
DataNodeDevicePathCache.getInstance().getPartialPath(WALReadUtils.readString(stream)));
158158
} catch (IllegalPathException e) {
159-
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
159+
throw new IllegalArgumentException("Cannot deserialize DeleteDataNode", e);
160160
}
161161
}
162162
long deleteStartTime = stream.readLong();
@@ -174,9 +174,9 @@ public static DeleteDataNode deserializeFromWAL(ByteBuffer buffer) {
174174
List<PartialPath> pathList = new ArrayList<>(size);
175175
for (int i = 0; i < size; i++) {
176176
try {
177-
pathList.add(new PartialPath(ReadWriteIOUtils.readString(buffer)));
177+
pathList.add(new PartialPath(WALReadUtils.readString(buffer)));
178178
} catch (IllegalPathException e) {
179-
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
179+
throw new IllegalArgumentException("Cannot deserialize DeleteDataNode", e);
180180
}
181181
}
182182
long deleteStartTime = buffer.getLong();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,24 @@
3131
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
3232
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
3333
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
34+
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALReadUtils;
3435
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
3536

37+
import org.apache.tsfile.common.conf.TSFileConfig;
3638
import org.apache.tsfile.enums.TSDataType;
3739
import org.apache.tsfile.exception.NotImplementedException;
3840
import org.apache.tsfile.file.metadata.IDeviceID;
41+
import org.apache.tsfile.utils.ReadWriteIOUtils;
3942
import org.apache.tsfile.write.schema.MeasurementSchema;
4043

4144
import java.io.DataInputStream;
4245
import java.io.DataOutputStream;
4346
import java.io.IOException;
4447
import java.nio.ByteBuffer;
4548
import java.util.Arrays;
49+
import java.util.HashMap;
4650
import java.util.List;
51+
import java.util.Map;
4752
import java.util.Objects;
4853
import java.util.stream.Collectors;
4954

@@ -213,6 +218,102 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
213218
throw new NotImplementedException("serializeAttributes of InsertNode is not implemented");
214219
}
215220

221+
protected static int serializeString(String value, ByteBuffer buffer) {
222+
if (value == null) {
223+
return ReadWriteIOUtils.write(-1, buffer);
224+
}
225+
byte[] bytes = value.getBytes(TSFileConfig.STRING_CHARSET);
226+
int len = ReadWriteIOUtils.write(bytes.length, buffer);
227+
buffer.put(bytes);
228+
return len + bytes.length;
229+
}
230+
231+
protected static int serializeString(String value, DataOutputStream stream) throws IOException {
232+
if (value == null) {
233+
return ReadWriteIOUtils.write(-1, stream);
234+
}
235+
byte[] bytes = value.getBytes(TSFileConfig.STRING_CHARSET);
236+
int len = ReadWriteIOUtils.write(bytes.length, stream);
237+
stream.write(bytes);
238+
return len + bytes.length;
239+
}
240+
241+
protected static String deserializeString(ByteBuffer buffer) {
242+
int strLength = ReadWriteIOUtils.readInt(buffer);
243+
if (strLength < 0) {
244+
return null;
245+
} else if (strLength == 0) {
246+
return "";
247+
}
248+
byte[] bytes = new byte[strLength];
249+
buffer.get(bytes);
250+
return new String(bytes, TSFileConfig.STRING_CHARSET);
251+
}
252+
253+
protected static void serializeMeasurementSchema(
254+
MeasurementSchema measurementSchema, ByteBuffer buffer) {
255+
serializeString(measurementSchema.getMeasurementId(), buffer);
256+
ReadWriteIOUtils.write(measurementSchema.getTypeInByte(), buffer);
257+
ReadWriteIOUtils.write(measurementSchema.getEncodingType().serialize(), buffer);
258+
ReadWriteIOUtils.write(measurementSchema.getCompressor().serialize(), buffer);
259+
serializeProps(measurementSchema.getProps(), buffer);
260+
}
261+
262+
protected static void serializeMeasurementSchema(
263+
MeasurementSchema measurementSchema, DataOutputStream stream) throws IOException {
264+
serializeString(measurementSchema.getMeasurementId(), stream);
265+
ReadWriteIOUtils.write(measurementSchema.getTypeInByte(), stream);
266+
ReadWriteIOUtils.write(measurementSchema.getEncodingType().serialize(), stream);
267+
ReadWriteIOUtils.write(measurementSchema.getCompressor().serialize(), stream);
268+
serializeProps(measurementSchema.getProps(), stream);
269+
}
270+
271+
protected static MeasurementSchema deserializeMeasurementSchema(ByteBuffer buffer) {
272+
String measurementId = deserializeString(buffer);
273+
byte type = ReadWriteIOUtils.readByte(buffer);
274+
byte encoding = ReadWriteIOUtils.readByte(buffer);
275+
byte compressor = ReadWriteIOUtils.readByte(buffer);
276+
Map<String, String> props = deserializeProps(buffer);
277+
return new MeasurementSchema(measurementId, type, encoding, compressor, props);
278+
}
279+
280+
private static void serializeProps(Map<String, String> props, ByteBuffer buffer) {
281+
if (props == null) {
282+
ReadWriteIOUtils.write(0, buffer);
283+
return;
284+
}
285+
ReadWriteIOUtils.write(props.size(), buffer);
286+
for (Map.Entry<String, String> entry : props.entrySet()) {
287+
serializeString(entry.getKey(), buffer);
288+
serializeString(entry.getValue(), buffer);
289+
}
290+
}
291+
292+
private static void serializeProps(Map<String, String> props, DataOutputStream stream)
293+
throws IOException {
294+
if (props == null) {
295+
ReadWriteIOUtils.write(0, stream);
296+
return;
297+
}
298+
ReadWriteIOUtils.write(props.size(), stream);
299+
for (Map.Entry<String, String> entry : props.entrySet()) {
300+
serializeString(entry.getKey(), stream);
301+
serializeString(entry.getValue(), stream);
302+
}
303+
}
304+
305+
private static Map<String, String> deserializeProps(ByteBuffer buffer) {
306+
int size = ReadWriteIOUtils.readInt(buffer);
307+
if (size <= 0) {
308+
return null;
309+
}
310+
Map<String, String> props = new HashMap<>();
311+
for (int i = 0; i < size; i++) {
312+
props.put(deserializeString(buffer), deserializeString(buffer));
313+
}
314+
return props;
315+
}
316+
216317
// region Serialization methods for WAL
217318
/** Serialized size of measurement schemas, ignoring failed time series */
218319
protected int serializeMeasurementSchemasSize() {
@@ -244,15 +345,15 @@ protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) {
244345
*/
245346
protected void deserializeMeasurementSchemas(DataInputStream stream) throws IOException {
246347
for (int i = 0; i < measurements.length; i++) {
247-
measurementSchemas[i] = MeasurementSchema.deserializeFrom(stream);
348+
measurementSchemas[i] = WALReadUtils.readMeasurementSchema(stream);
248349
measurements[i] = measurementSchemas[i].getMeasurementId();
249350
dataTypes[i] = measurementSchemas[i].getType();
250351
}
251352
}
252353

253354
protected void deserializeMeasurementSchemas(ByteBuffer buffer) {
254355
for (int i = 0; i < measurements.length; i++) {
255-
measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
356+
measurementSchemas[i] = WALReadUtils.readMeasurementSchema(buffer);
256357
measurements[i] = measurementSchemas[i].getMeasurementId();
257358
}
258359
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
3535
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
3636
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
37+
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALReadUtils;
3738
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
3839
import org.apache.iotdb.db.utils.TypeInferenceUtils;
3940

@@ -238,13 +239,13 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException {
238239

239240
void subSerialize(ByteBuffer buffer) {
240241
ReadWriteIOUtils.write(time, buffer);
241-
ReadWriteIOUtils.write(devicePath.getFullPath(), buffer);
242+
serializeString(devicePath.getFullPath(), buffer);
242243
serializeMeasurementsAndValues(buffer);
243244
}
244245

245246
void subSerialize(DataOutputStream stream) throws IOException {
246247
ReadWriteIOUtils.write(time, stream);
247-
ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
248+
serializeString(devicePath.getFullPath(), stream);
248249
serializeMeasurementsAndValues(stream);
249250
}
250251

@@ -281,9 +282,9 @@ private void serializeMeasurementsOrSchemas(ByteBuffer buffer) {
281282
}
282283
// serialize measurement schemas when exist
283284
if (measurementSchemas != null) {
284-
measurementSchemas[i].serializeTo(buffer);
285+
serializeMeasurementSchema(measurementSchemas[i], buffer);
285286
} else {
286-
ReadWriteIOUtils.write(measurements[i], buffer);
287+
serializeString(measurements[i], buffer);
287288
}
288289
}
289290
}
@@ -303,9 +304,9 @@ private void serializeMeasurementsOrSchemas(DataOutputStream stream) throws IOEx
303304
}
304305
// serialize measurement schemas when exist
305306
if (measurementSchemas != null) {
306-
measurementSchemas[i].serializeTo(stream);
307+
serializeMeasurementSchema(measurementSchemas[i], stream);
307308
} else {
308-
ReadWriteIOUtils.write(measurements[i], stream);
309+
serializeString(measurements[i], stream);
309310
}
310311
}
311312
}
@@ -331,7 +332,7 @@ private void putDataTypesAndValues(ByteBuffer buffer) {
331332
// and is forwarded to other nodes
332333
if (isNeedInferType) {
333334
ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
334-
ReadWriteIOUtils.write(values[i].toString(), buffer);
335+
serializeString(values[i].toString(), buffer);
335336
} else {
336337
ReadWriteIOUtils.write(dataTypes[i], buffer);
337338
switch (dataTypes[i]) {
@@ -386,7 +387,7 @@ private void putDataTypesAndValues(DataOutputStream stream) throws IOException {
386387
// and is forwarded to other nodes
387388
if (isNeedInferType) {
388389
ReadWriteIOUtils.write(TYPE_RAW_STRING, stream);
389-
ReadWriteIOUtils.write(values[i].toString(), stream);
390+
serializeString(values[i].toString(), stream);
390391
} else {
391392
ReadWriteIOUtils.write(dataTypes[i], stream);
392393
switch (dataTypes[i]) {
@@ -431,8 +432,7 @@ void subDeserialize(ByteBuffer byteBuffer) {
431432
time = byteBuffer.getLong();
432433
try {
433434
devicePath =
434-
DataNodeDevicePathCache.getInstance()
435-
.getPartialPath(ReadWriteIOUtils.readString(byteBuffer));
435+
DataNodeDevicePathCache.getInstance().getPartialPath(deserializeString(byteBuffer));
436436
} catch (IllegalPathException e) {
437437
throw new IllegalArgumentException(DESERIALIZE_ERROR, e);
438438
}
@@ -447,12 +447,12 @@ void deserializeMeasurementsAndValues(ByteBuffer buffer) {
447447
if (hasSchema) {
448448
measurementSchemas = new MeasurementSchema[measurementSize];
449449
for (int i = 0; i < measurementSize; i++) {
450-
measurementSchemas[i] = MeasurementSchema.deserializeFrom(buffer);
450+
measurementSchemas[i] = deserializeMeasurementSchema(buffer);
451451
measurements[i] = measurementSchemas[i].getMeasurementId();
452452
}
453453
} else {
454454
for (int i = 0; i < measurementSize; i++) {
455-
measurements[i] = ReadWriteIOUtils.readString(buffer);
455+
measurements[i] = deserializeString(buffer);
456456
}
457457
}
458458

@@ -476,7 +476,7 @@ private void fillDataTypesAndValues(ByteBuffer buffer) {
476476
// and is forwarded to other nodes
477477
byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
478478
if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
479-
values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
479+
values[i] = typeNum == TYPE_RAW_STRING ? deserializeString(buffer) : null;
480480
continue;
481481
}
482482
dataTypes[i] = TSDataType.values()[typeNum];
@@ -524,7 +524,7 @@ public int serializedSize() {
524524
protected int subSerializeSize() {
525525
int size = 0;
526526
size += Long.BYTES;
527-
size += ReadWriteIOUtils.sizeToWrite(devicePath.getFullPath());
527+
size += WALWriteUtils.sizeToWrite(devicePath.getFullPath());
528528
return size + serializeMeasurementsAndValuesSize();
529529
}
530530

@@ -671,8 +671,7 @@ protected static InsertRowNode subDeserializeFromWAL(DataInputStream stream) thr
671671
insertNode.setTime(stream.readLong());
672672
try {
673673
insertNode.setDevicePath(
674-
DataNodeDevicePathCache.getInstance()
675-
.getPartialPath(ReadWriteIOUtils.readString(stream)));
674+
DataNodeDevicePathCache.getInstance().getPartialPath(WALReadUtils.readString(stream)));
676675
} catch (IllegalPathException e) {
677676
throw new IllegalArgumentException(DESERIALIZE_ERROR, e);
678677
}
@@ -757,8 +756,7 @@ protected static InsertRowNode subDeserializeFromWAL(ByteBuffer buffer) {
757756
insertNode.setTime(buffer.getLong());
758757
try {
759758
insertNode.setDevicePath(
760-
DataNodeDevicePathCache.getInstance()
761-
.getPartialPath(ReadWriteIOUtils.readString(buffer)));
759+
DataNodeDevicePathCache.getInstance().getPartialPath(WALReadUtils.readString(buffer)));
762760
} catch (IllegalPathException e) {
763761
throw new IllegalArgumentException(DESERIALIZE_ERROR, e);
764762
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,7 @@ public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
236236

237237
try {
238238
devicePath =
239-
DataNodeDevicePathCache.getInstance()
240-
.getPartialPath((ReadWriteIOUtils.readString(byteBuffer)));
239+
DataNodeDevicePathCache.getInstance().getPartialPath((deserializeString(byteBuffer)));
241240
} catch (IllegalPathException e) {
242241
throw new IllegalArgumentException("Cannot deserialize InsertRowsOfOneDeviceNode", e);
243242
}
@@ -269,7 +268,7 @@ public static InsertRowsOfOneDeviceNode deserialize(ByteBuffer byteBuffer) {
269268
@Override
270269
protected void serializeAttributes(ByteBuffer byteBuffer) {
271270
PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.serialize(byteBuffer);
272-
ReadWriteIOUtils.write(devicePath.getFullPath(), byteBuffer);
271+
serializeString(devicePath.getFullPath(), byteBuffer);
273272

274273
ReadWriteIOUtils.write(insertRowNodeList.size(), byteBuffer);
275274

@@ -285,7 +284,7 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {
285284
@Override
286285
protected void serializeAttributes(DataOutputStream stream) throws IOException {
287286
PlanNodeType.INSERT_ROWS_OF_ONE_DEVICE.serialize(stream);
288-
ReadWriteIOUtils.write(devicePath.getFullPath(), stream);
287+
serializeString(devicePath.getFullPath(), stream);
289288

290289
ReadWriteIOUtils.write(insertRowNodeList.size(), stream);
291290

0 commit comments

Comments
 (0)