Skip to content

Commit dab45fb

Browse files
committed
[Pipe] Optimize memory usage
1 parent 0c25e53 commit dab45fb

24 files changed

Lines changed: 859 additions & 194 deletions

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ public Type getDataType(final int columnIndex) {
152152

153153
@Override
154154
public boolean isNull(final int columnIndex) {
155-
return bitMaps[columnIndex].isMarked(rowIndex);
155+
return bitMaps != null
156+
&& bitMaps[columnIndex] != null
157+
&& bitMaps[columnIndex].isMarked(rowIndex);
156158
}
157159

158160
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ public boolean isAligned() {
455455

456456
public Tablet convertToTablet() {
457457
if (!shouldParseTimeOrPattern()) {
458+
PipeTabletUtils.compactBitMaps(tablet);
458459
return tablet;
459460
}
460461
return initEventParser().convertToTablet();
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
23+
24+
import org.apache.tsfile.common.conf.TSFileConfig;
25+
import org.apache.tsfile.enums.TSDataType;
26+
import org.apache.tsfile.utils.Binary;
27+
import org.apache.tsfile.utils.BitMap;
28+
import org.apache.tsfile.write.UnSupportedDataTypeException;
29+
import org.apache.tsfile.write.record.Tablet;
30+
import org.apache.tsfile.write.schema.IMeasurementSchema;
31+
import org.apache.tsfile.write.schema.MeasurementSchema;
32+
33+
import java.time.LocalDate;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Objects;
38+
39+
public final class PipeTabletUtils {
40+
41+
private PipeTabletUtils() {}
42+
43+
public static final class TabletStringInternPool {
44+
45+
private final Map<String, String> internedStrings = new HashMap<>();
46+
47+
public String intern(final String value) {
48+
if (Objects.isNull(value)) {
49+
return null;
50+
}
51+
52+
final String internedValue = internedStrings.get(value);
53+
if (Objects.nonNull(internedValue)) {
54+
return internedValue;
55+
}
56+
57+
internedStrings.put(value, value);
58+
return value;
59+
}
60+
61+
public void intern(final String[] values) {
62+
if (Objects.isNull(values)) {
63+
return;
64+
}
65+
66+
for (int i = 0; i < values.length; ++i) {
67+
values[i] = intern(values[i]);
68+
}
69+
}
70+
71+
public void intern(final List<String> values) {
72+
if (Objects.isNull(values)) {
73+
return;
74+
}
75+
76+
for (int i = 0; i < values.size(); ++i) {
77+
values.set(i, intern(values.get(i)));
78+
}
79+
}
80+
81+
public Tablet intern(final Tablet tablet) {
82+
if (Objects.isNull(tablet)) {
83+
return null;
84+
}
85+
86+
tablet.setInsertTargetName(intern(tablet.getDeviceId()));
87+
internMeasurementSchemas(tablet.getSchemas());
88+
return tablet;
89+
}
90+
91+
public void internMeasurementSchemas(final List<IMeasurementSchema> schemas) {
92+
if (Objects.isNull(schemas)) {
93+
return;
94+
}
95+
96+
for (final IMeasurementSchema schema : schemas) {
97+
if (schema instanceof MeasurementSchema) {
98+
intern((MeasurementSchema) schema);
99+
}
100+
}
101+
}
102+
103+
public MeasurementSchema intern(final MeasurementSchema schema) {
104+
if (Objects.isNull(schema)) {
105+
return null;
106+
}
107+
108+
schema.setMeasurementName(intern(schema.getMeasurementName()));
109+
schema.setProps(intern(schema.getProps()));
110+
return schema;
111+
}
112+
113+
private Map<String, String> intern(final Map<String, String> props) {
114+
if (Objects.isNull(props) || props.isEmpty()) {
115+
return props;
116+
}
117+
118+
final Map<String, String> internedProps = new HashMap<>(props.size());
119+
for (final Map.Entry<String, String> entry : props.entrySet()) {
120+
internedProps.put(intern(entry.getKey()), intern(entry.getValue()));
121+
}
122+
return internedProps;
123+
}
124+
}
125+
126+
public static Tablet internTablet(
127+
final Tablet tablet, final TabletStringInternPool tabletStringInternPool) {
128+
return Objects.nonNull(tabletStringInternPool) ? tabletStringInternPool.intern(tablet) : tablet;
129+
}
130+
131+
public static void initEmptyBitMaps(final Tablet tablet) {
132+
tablet.setBitMaps(new BitMap[getColumnCount(tablet)]);
133+
}
134+
135+
public static void compactBitMaps(final Tablet tablet) {
136+
if (Objects.isNull(tablet)) {
137+
return;
138+
}
139+
tablet.setBitMaps(compactBitMaps(tablet.getBitMaps(), tablet.getRowSize()));
140+
}
141+
142+
public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int rowCount) {
143+
if (Objects.isNull(bitMaps)) {
144+
return null;
145+
}
146+
147+
boolean hasMarkedBitMap = false;
148+
for (int i = 0; i < bitMaps.length; ++i) {
149+
if (Objects.nonNull(bitMaps[i])
150+
&& bitMaps[i].isAllUnmarked(Math.min(rowCount, bitMaps[i].getSize()))) {
151+
bitMaps[i] = null;
152+
}
153+
if (Objects.nonNull(bitMaps[i])) {
154+
hasMarkedBitMap = true;
155+
}
156+
}
157+
158+
return hasMarkedBitMap ? bitMaps : null;
159+
}
160+
161+
public static void markNullValue(final Tablet tablet, final int rowIndex, final int columnIndex) {
162+
final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
163+
if (Objects.isNull(bitMaps[columnIndex])) {
164+
bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber());
165+
}
166+
bitMaps[columnIndex].mark(rowIndex);
167+
}
168+
169+
public static void putTimestamp(final Tablet tablet, final int rowIndex, final long timestamp) {
170+
tablet.getTimestamps()[rowIndex] = timestamp;
171+
tablet.setRowSize(Math.max(tablet.getRowSize(), rowIndex + 1));
172+
}
173+
174+
public static void putValue(
175+
final Tablet tablet,
176+
final int rowIndex,
177+
final int columnIndex,
178+
final TSDataType dataType,
179+
final Object value) {
180+
switch (dataType) {
181+
case BOOLEAN:
182+
((boolean[]) tablet.getValues()[columnIndex])[rowIndex] = (Boolean) value;
183+
break;
184+
case INT32:
185+
((int[]) tablet.getValues()[columnIndex])[rowIndex] = (Integer) value;
186+
break;
187+
case DATE:
188+
((LocalDate[]) tablet.getValues()[columnIndex])[rowIndex] = (LocalDate) value;
189+
break;
190+
case INT64:
191+
case TIMESTAMP:
192+
((long[]) tablet.getValues()[columnIndex])[rowIndex] = (Long) value;
193+
break;
194+
case FLOAT:
195+
((float[]) tablet.getValues()[columnIndex])[rowIndex] = (Float) value;
196+
break;
197+
case DOUBLE:
198+
((double[]) tablet.getValues()[columnIndex])[rowIndex] = (Double) value;
199+
break;
200+
case TEXT:
201+
case BLOB:
202+
case STRING:
203+
((Binary[]) tablet.getValues()[columnIndex])[rowIndex] = toBinary(value);
204+
break;
205+
default:
206+
throw new UnSupportedDataTypeException(DataNodePipeMessages.UNSUPPORTED + dataType);
207+
}
208+
}
209+
210+
private static BitMap[] ensureBitMaps(final Tablet tablet, final int minColumnCount) {
211+
final int columnCount = Math.max(getColumnCount(tablet), minColumnCount);
212+
BitMap[] bitMaps = tablet.getBitMaps();
213+
if (Objects.isNull(bitMaps)) {
214+
bitMaps = new BitMap[columnCount];
215+
tablet.setBitMaps(bitMaps);
216+
} else if (bitMaps.length < columnCount) {
217+
final BitMap[] expandedBitMaps = new BitMap[columnCount];
218+
System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length);
219+
bitMaps = expandedBitMaps;
220+
tablet.setBitMaps(bitMaps);
221+
}
222+
return bitMaps;
223+
}
224+
225+
private static int getColumnCount(final Tablet tablet) {
226+
if (Objects.nonNull(tablet.getSchemas())) {
227+
return tablet.getSchemas().size();
228+
}
229+
return Objects.nonNull(tablet.getValues()) ? tablet.getValues().length : 0;
230+
}
231+
232+
private static Binary toBinary(final Object value) {
233+
if (Objects.isNull(value)) {
234+
return Binary.EMPTY_VALUE;
235+
}
236+
if (value instanceof Binary) {
237+
return (Binary) value;
238+
}
239+
if (value instanceof byte[]) {
240+
return new Binary((byte[]) value);
241+
}
242+
if (value instanceof String) {
243+
return new Binary(((String) value).getBytes(TSFileConfig.STRING_CHARSET));
244+
}
245+
throw new IllegalArgumentException(
246+
String.format("Expected Binary, byte[] or String, but was %s.", value.getClass()));
247+
}
248+
}

0 commit comments

Comments
 (0)