Skip to content

Commit afa2f9a

Browse files
committed
[Pipe] Optimize memory usage (#17770)
1 parent afcc36f commit afa2f9a

22 files changed

Lines changed: 699 additions & 133 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ public Type getDataType(final int columnIndex) {
152152

153153
@Override
154154
public boolean isNull(final int columnIndex) {
155-
return bitMaps[columnIndex].isMarked(rowIndex);
155+
return bitMaps != null
156+
&& columnIndex < bitMaps.length
157+
&& bitMaps[columnIndex] != null
158+
&& bitMaps[columnIndex].isMarked(rowIndex);
156159
}
157160

158161
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2424
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
26+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
2627
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2728
import org.apache.iotdb.pipe.api.access.Row;
2829
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -66,27 +67,26 @@ public void collectRow(Row row) {
6667
Pair<Integer, Integer> rowCountAndMemorySize =
6768
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
6869
tablet = new Tablet(deviceId, measurementSchemaList, rowCountAndMemorySize.getLeft());
69-
tablet.initBitMaps();
7070
isAligned = pipeRow.isAligned();
7171
}
7272

7373
final int rowIndex = tablet.rowSize;
74-
tablet.addTimestamp(rowIndex, row.getTime());
74+
PipeTabletUtils.putTimestamp(tablet, rowIndex, row.getTime());
7575
for (int i = 0; i < row.size(); i++) {
7676
final Object value = row.getObject(i);
77-
if (value instanceof org.apache.iotdb.pipe.api.type.Binary) {
78-
tablet.addValue(
79-
measurementSchemaArray[i].getMeasurementId(),
80-
rowIndex,
81-
PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary) value));
82-
} else {
83-
tablet.addValue(measurementSchemaArray[i].getMeasurementId(), rowIndex, value);
84-
}
77+
PipeTabletUtils.putValue(
78+
tablet,
79+
rowIndex,
80+
i,
81+
measurementSchemaArray[i].getType(),
82+
value instanceof org.apache.iotdb.pipe.api.type.Binary
83+
? PipeBinaryTransformer.transformToBinary(
84+
(org.apache.iotdb.pipe.api.type.Binary) value)
85+
: value);
8586
if (row.isNull(i)) {
86-
tablet.bitMaps[i].mark(rowIndex);
87+
PipeTabletUtils.markNullValue(tablet, rowIndex, i);
8788
}
8889
}
89-
tablet.rowSize++;
9090

9191
if (tablet.rowSize == tablet.getMaxRowNumber()) {
9292
collectTabletInsertionEvent();
@@ -95,6 +95,7 @@ public void collectRow(Row row) {
9595

9696
private void collectTabletInsertionEvent() {
9797
if (tablet != null) {
98+
PipeTabletUtils.compactBitMaps(tablet);
9899
tabletInsertionEventList.add(
99100
new PipeRawTabletInsertionEvent(
100101
tablet,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ public boolean isAligned() {
323323

324324
public Tablet convertToTablet() {
325325
if (!shouldParseTimeOrPattern()) {
326+
PipeTabletUtils.compactBitMaps(tablet);
326327
return tablet;
327328
}
328329

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.db.utils.BitMapUtils;
23+
24+
import org.apache.tsfile.common.conf.TSFileConfig;
25+
import org.apache.tsfile.enums.TSDataType;
26+
import org.apache.tsfile.utils.Binary;
27+
import org.apache.tsfile.utils.BitMap;
28+
import org.apache.tsfile.write.UnSupportedDataTypeException;
29+
import org.apache.tsfile.write.record.Tablet;
30+
import org.apache.tsfile.write.schema.MeasurementSchema;
31+
32+
import java.time.LocalDate;
33+
import java.util.Arrays;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Objects;
38+
39+
public final class PipeTabletUtils {
40+
41+
private PipeTabletUtils() {}
42+
43+
public static final class TabletStringInternPool {
44+
45+
private final Map<String, String> internedStrings = new HashMap<>();
46+
47+
public String intern(final String value) {
48+
if (Objects.isNull(value)) {
49+
return null;
50+
}
51+
52+
final String internedValue = internedStrings.get(value);
53+
if (Objects.nonNull(internedValue)) {
54+
return internedValue;
55+
}
56+
57+
internedStrings.put(value, value);
58+
return value;
59+
}
60+
61+
public void intern(final String[] values) {
62+
if (Objects.isNull(values)) {
63+
return;
64+
}
65+
66+
for (int i = 0; i < values.length; ++i) {
67+
values[i] = intern(values[i]);
68+
}
69+
}
70+
71+
public void intern(final List<String> values) {
72+
if (Objects.isNull(values)) {
73+
return;
74+
}
75+
76+
for (int i = 0; i < values.size(); ++i) {
77+
values.set(i, intern(values.get(i)));
78+
}
79+
}
80+
81+
public Tablet intern(final Tablet tablet) {
82+
if (Objects.isNull(tablet)) {
83+
return null;
84+
}
85+
86+
tablet.setDeviceId(intern(tablet.deviceId));
87+
internMeasurementSchemas(tablet.getSchemas());
88+
return tablet;
89+
}
90+
91+
public void internMeasurementSchemas(final List<MeasurementSchema> schemas) {
92+
if (Objects.isNull(schemas)) {
93+
return;
94+
}
95+
96+
for (final MeasurementSchema schema : schemas) {
97+
intern(schema);
98+
}
99+
}
100+
101+
public MeasurementSchema intern(final MeasurementSchema schema) {
102+
if (Objects.isNull(schema)) {
103+
return null;
104+
}
105+
106+
schema.setMeasurementId(intern(schema.getMeasurementId()));
107+
schema.setProps(intern(schema.getProps()));
108+
return schema;
109+
}
110+
111+
private Map<String, String> intern(final Map<String, String> props) {
112+
if (Objects.isNull(props) || props.isEmpty()) {
113+
return props;
114+
}
115+
116+
final Map<String, String> internedProps = new HashMap<>(props.size());
117+
for (final Map.Entry<String, String> entry : props.entrySet()) {
118+
internedProps.put(intern(entry.getKey()), intern(entry.getValue()));
119+
}
120+
return internedProps;
121+
}
122+
}
123+
124+
public static Tablet internTablet(
125+
final Tablet tablet, final TabletStringInternPool tabletStringInternPool) {
126+
return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet;
127+
}
128+
129+
public static void compactBitMaps(final Tablet tablet) {
130+
if (Objects.isNull(tablet)) {
131+
return;
132+
}
133+
tablet.bitMaps = compactBitMaps(tablet.bitMaps, tablet.rowSize);
134+
}
135+
136+
public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) {
137+
return BitMapUtils.compactBitMaps(bitMaps, rowCount);
138+
}
139+
140+
public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) {
141+
final BitMap[] bitMaps = tablet.bitMaps;
142+
return Objects.nonNull(bitMaps)
143+
? Arrays.copyOf(bitMaps, bitMaps.length)
144+
: new BitMap[getColumnCount(tablet)];
145+
}
146+
147+
public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) {
148+
final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
149+
if (Objects.isNull(bitMaps[columnIndex])) {
150+
bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber());
151+
}
152+
bitMaps[columnIndex].mark(rowIndex);
153+
}
154+
155+
public static void putTimestamp(final Tablet tablet, final int rowIndex, final long timestamp) {
156+
tablet.timestamps[rowIndex] = timestamp;
157+
tablet.rowSize = Math.max(tablet.rowSize, rowIndex + 1);
158+
}
159+
160+
public static void putValue(
161+
final Tablet tablet,
162+
final int rowIndex,
163+
final int columnIndex,
164+
final TSDataType dataType,
165+
final Object value) {
166+
switch (dataType) {
167+
case BOOLEAN:
168+
((boolean[]) tablet.values[columnIndex])[rowIndex] = (Boolean) value;
169+
break;
170+
case INT32:
171+
((int[]) tablet.values[columnIndex])[rowIndex] = (Integer) value;
172+
break;
173+
case DATE:
174+
((LocalDate[]) tablet.values[columnIndex])[rowIndex] = (LocalDate) value;
175+
break;
176+
case INT64:
177+
case TIMESTAMP:
178+
((long[]) tablet.values[columnIndex])[rowIndex] = (Long) value;
179+
break;
180+
case FLOAT:
181+
((float[]) tablet.values[columnIndex])[rowIndex] = (Float) value;
182+
break;
183+
case DOUBLE:
184+
((double[]) tablet.values[columnIndex])[rowIndex] = (Double) value;
185+
break;
186+
case TEXT:
187+
case BLOB:
188+
case STRING:
189+
((Binary[]) tablet.values[columnIndex])[rowIndex] = toBinary(value);
190+
break;
191+
default:
192+
throw new UnSupportedDataTypeException("Unsupported data type: " + dataType);
193+
}
194+
unmarkNullValue(tablet, rowIndex, columnIndex);
195+
}
196+
197+
private static void unmarkNullValue(
198+
final Tablet tablet, final int rowIndex, final int columnIndex) {
199+
final BitMap[] bitMaps = tablet.bitMaps;
200+
if (Objects.nonNull(bitMaps)
201+
&& columnIndex < bitMaps.length
202+
&& Objects.nonNull(bitMaps[columnIndex])) {
203+
bitMaps[columnIndex].unmark(rowIndex);
204+
}
205+
}
206+
207+
private static BitMap[] ensureBitMaps(final Tablet tablet, final int minColumnCount) {
208+
final int columnCount = Math.max(getColumnCount(tablet), minColumnCount);
209+
BitMap[] bitMaps = tablet.bitMaps;
210+
if (Objects.isNull(bitMaps)) {
211+
bitMaps = new BitMap[columnCount];
212+
tablet.bitMaps = bitMaps;
213+
} else if (bitMaps.length < columnCount) {
214+
final BitMap[] expandedBitMaps = new BitMap[columnCount];
215+
System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length);
216+
bitMaps = expandedBitMaps;
217+
tablet.bitMaps = bitMaps;
218+
}
219+
return bitMaps;
220+
}
221+
222+
private static int getColumnCount(final Tablet tablet) {
223+
if (Objects.nonNull(tablet.getSchemas())) {
224+
return tablet.getSchemas().size();
225+
}
226+
return Objects.nonNull(tablet.values) ? tablet.values.length : 0;
227+
}
228+
229+
private static Binary toBinary(final Object value) {
230+
if (Objects.isNull(value)) {
231+
return Binary.EMPTY_VALUE;
232+
}
233+
if (value instanceof Binary) {
234+
return (Binary) value;
235+
}
236+
if (value instanceof byte[]) {
237+
return new Binary((byte[]) value);
238+
}
239+
if (value instanceof String) {
240+
return new Binary(((String) value).getBytes(TSFileConfig.STRING_CHARSET));
241+
}
242+
throw new IllegalArgumentException(
243+
String.format("Expected Binary, byte[] or String, but was %s.", value.getClass()));
244+
}
245+
}

0 commit comments

Comments
 (0)