Skip to content

Commit e90e232

Browse files
authored
Fix that compression by type is not properly applied (#16211)
1 parent c396fd8 commit e90e232

10 files changed

Lines changed: 227 additions & 15 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2222,6 +2222,7 @@ public void testInsertWrongTypeRecord() throws IoTDBConnectionException {
22222222
@Test
22232223
public void testAlterDefaultCompression()
22242224
throws IoTDBConnectionException, StatementExecutionException {
2225+
// auto-create
22252226
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
22262227
List<TSDataType> types =
22272228
Arrays.asList(
@@ -2288,7 +2289,7 @@ public void testAlterDefaultCompression()
22882289
String.format("SET CONFIGURATION '%s_compressor'='GZIP'", configName));
22892290
}
22902291

2291-
String device2 = "root.test.d1";
2292+
String device2 = "root.test.d2";
22922293
session.insertRecord(device2, 0, measurements, types, values);
22932294

22942295
try (SessionDataSet dataSet =
@@ -2301,5 +2302,182 @@ public void testAlterDefaultCompression()
23012302
}
23022303
}
23032304
}
2305+
2306+
// manual create
2307+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
2308+
List<TSDataType> types =
2309+
Arrays.asList(
2310+
TSDataType.BOOLEAN,
2311+
TSDataType.INT32,
2312+
TSDataType.DATE,
2313+
TSDataType.INT64,
2314+
TSDataType.TIMESTAMP,
2315+
TSDataType.FLOAT,
2316+
TSDataType.DOUBLE,
2317+
TSDataType.TEXT,
2318+
TSDataType.STRING,
2319+
TSDataType.BLOB);
2320+
List<String> measurements =
2321+
types.stream().map(dataType -> "__" + dataType.toString()).collect(Collectors.toList());
2322+
2323+
String device3 = "root.test.d3";
2324+
for (int i = 0; i < types.size(); i++) {
2325+
session.executeNonQueryStatement(
2326+
String.format(
2327+
"CREATE TIMESERIES %s.%s WITH DATATYPE=%s",
2328+
device3, measurements.get(i), types.get(i)));
2329+
}
2330+
2331+
try (SessionDataSet dataSet =
2332+
session.executeQueryStatement("SHOW TIMESERIES root.test.d3.**")) {
2333+
int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
2334+
while (dataSet.hasNext()) {
2335+
RowRecord rec = dataSet.next();
2336+
Field compressionField = rec.getFields().get(compressionIndex);
2337+
assertEquals("GZIP", compressionField.getStringValue());
2338+
}
2339+
}
2340+
2341+
for (TSDataType type : types) {
2342+
String configName = null;
2343+
switch (type) {
2344+
case INT32:
2345+
case INT64:
2346+
case FLOAT:
2347+
case DOUBLE:
2348+
case TEXT:
2349+
case BOOLEAN:
2350+
configName = type.name().toLowerCase();
2351+
break;
2352+
case STRING:
2353+
case BLOB:
2354+
configName = "text";
2355+
break;
2356+
case DATE:
2357+
configName = "int32";
2358+
break;
2359+
case TIMESTAMP:
2360+
configName = "int64";
2361+
break;
2362+
}
2363+
session.executeNonQueryStatement(
2364+
String.format("SET CONFIGURATION '%s_compressor'='LZ4'", configName));
2365+
}
2366+
2367+
String device4 = "root.test.d4";
2368+
for (int i = 0; i < types.size(); i++) {
2369+
session.executeNonQueryStatement(
2370+
String.format(
2371+
"CREATE TIMESERIES %s.%s WITH DATATYPE=%s",
2372+
device4, measurements.get(i), types.get(i)));
2373+
}
2374+
2375+
try (SessionDataSet dataSet =
2376+
session.executeQueryStatement("SHOW TIMESERIES root.test.d4.**")) {
2377+
int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
2378+
while (dataSet.hasNext()) {
2379+
RowRecord rec = dataSet.next();
2380+
Field compressionField = rec.getFields().get(compressionIndex);
2381+
assertEquals("LZ4", compressionField.getStringValue());
2382+
}
2383+
}
2384+
}
2385+
2386+
// template
2387+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
2388+
List<TSDataType> types =
2389+
Arrays.asList(
2390+
TSDataType.BOOLEAN,
2391+
TSDataType.INT32,
2392+
TSDataType.DATE,
2393+
TSDataType.INT64,
2394+
TSDataType.TIMESTAMP,
2395+
TSDataType.FLOAT,
2396+
TSDataType.DOUBLE,
2397+
TSDataType.TEXT,
2398+
TSDataType.STRING,
2399+
TSDataType.BLOB);
2400+
List<String> measurements =
2401+
types.stream().map(dataType -> "__" + dataType.toString()).collect(Collectors.toList());
2402+
List<Object> values =
2403+
Arrays.asList(
2404+
false,
2405+
1,
2406+
LocalDate.of(1000, 1, 1),
2407+
1L,
2408+
1L,
2409+
1.0f,
2410+
1.0,
2411+
new Binary("1".getBytes(StandardCharsets.UTF_8)),
2412+
new Binary("1".getBytes(StandardCharsets.UTF_8)),
2413+
new Binary("1".getBytes(StandardCharsets.UTF_8)));
2414+
2415+
String createTemplateSql = "CREATE DEVICE TEMPLATE t1 (";
2416+
for (int i = 0; i < types.size(); i++) {
2417+
createTemplateSql += measurements.get(i) + " " + types.get(i).name();
2418+
if (i != types.size() - 1) {
2419+
createTemplateSql += ",";
2420+
}
2421+
}
2422+
createTemplateSql += ")";
2423+
session.executeNonQueryStatement(createTemplateSql);
2424+
2425+
session.executeNonQueryStatement("SET DEVICE TEMPLATE t1 TO root.test.d5");
2426+
String device5 = "root.test.d5";
2427+
session.insertRecord(device5, 0, measurements, types, values);
2428+
2429+
try (SessionDataSet dataSet =
2430+
session.executeQueryStatement("SHOW TIMESERIES root.test.d5.**")) {
2431+
int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
2432+
while (dataSet.hasNext()) {
2433+
RowRecord rec = dataSet.next();
2434+
Field compressionField = rec.getFields().get(compressionIndex);
2435+
assertEquals("LZ4", compressionField.getStringValue());
2436+
}
2437+
}
2438+
2439+
for (TSDataType type : types) {
2440+
String configName = null;
2441+
switch (type) {
2442+
case INT32:
2443+
case INT64:
2444+
case FLOAT:
2445+
case DOUBLE:
2446+
case TEXT:
2447+
case BOOLEAN:
2448+
configName = type.name().toLowerCase();
2449+
break;
2450+
case STRING:
2451+
case BLOB:
2452+
configName = "text";
2453+
break;
2454+
case DATE:
2455+
configName = "int32";
2456+
break;
2457+
case TIMESTAMP:
2458+
configName = "int64";
2459+
break;
2460+
}
2461+
session.executeNonQueryStatement(
2462+
String.format("SET CONFIGURATION '%s_compressor'='GZIP'", configName));
2463+
}
2464+
2465+
createTemplateSql = createTemplateSql.replace("t1", "t2");
2466+
session.executeNonQueryStatement(createTemplateSql);
2467+
session.executeNonQueryStatement("SET DEVICE TEMPLATE t2 TO root.test.d6");
2468+
2469+
String device6 = "root.test.d6";
2470+
session.insertRecord(device6, 0, measurements, types, values);
2471+
2472+
try (SessionDataSet dataSet =
2473+
session.executeQueryStatement("SHOW TIMESERIES root.test.d6.**")) {
2474+
int compressionIndex = dataSet.getColumnNames().indexOf("Compression");
2475+
while (dataSet.hasNext()) {
2476+
RowRecord rec = dataSet.next();
2477+
Field compressionField = rec.getFields().get(compressionIndex);
2478+
assertEquals("GZIP", compressionField.getStringValue());
2479+
}
2480+
}
2481+
}
23042482
}
23052483
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/TemplateTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void extendTemplate(TemplateExtendInfo templateExtendInfo) throws Metadat
156156
dataTypeList.get(i),
157157
encodingList == null ? getDefaultEncoding(dataTypeList.get(i)) : encodingList.get(i),
158158
compressionTypeList == null
159-
? TSFileDescriptor.getInstance().getConfig().getCompressor()
159+
? TSFileDescriptor.getInstance().getConfig().getCompressor(dataTypeList.get(i))
160160
: compressionTypeList.get(i));
161161
} else {
162162
if (!measurementSchema.getType().equals(dataTypeList.get(i))

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,31 @@ private void loadTsFileProps(TrimProperties properties) throws IOException {
18211821
TSFileDescriptor.getInstance()
18221822
.getConfig()
18231823
.setEncryptType(properties.getProperty("encrypt_type", "UNENCRYPTED"));
1824+
1825+
String booleanCompressor = properties.getProperty("boolean_compressor");
1826+
if (booleanCompressor != null) {
1827+
TSFileDescriptor.getInstance().getConfig().setBooleanCompression(booleanCompressor);
1828+
}
1829+
String int32Compressor = properties.getProperty("int32_compressor");
1830+
if (int32Compressor != null) {
1831+
TSFileDescriptor.getInstance().getConfig().setInt32Compression(int32Compressor);
1832+
}
1833+
String int64Compressor = properties.getProperty("int64_compressor");
1834+
if (int64Compressor != null) {
1835+
TSFileDescriptor.getInstance().getConfig().setInt64Compression(int64Compressor);
1836+
}
1837+
String floatCompressor = properties.getProperty("float_compressor");
1838+
if (floatCompressor != null) {
1839+
TSFileDescriptor.getInstance().getConfig().setFloatCompression(floatCompressor);
1840+
}
1841+
String doubleCompressor = properties.getProperty("double_compressor");
1842+
if (doubleCompressor != null) {
1843+
TSFileDescriptor.getInstance().getConfig().setDoubleCompression(doubleCompressor);
1844+
}
1845+
String textCompressor = properties.getProperty("text_compressor");
1846+
if (textCompressor != null) {
1847+
TSFileDescriptor.getInstance().getConfig().setTextCompression(textCompressor);
1848+
}
18241849
}
18251850

18261851
// Mqtt related

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ void autoCreateTimeSeries(
123123
dataTypesOfMissingMeasurement.add(tsDataType);
124124
encodingsOfMissingMeasurement.add(getDefaultEncoding(tsDataType));
125125
compressionTypesOfMissingMeasurement.add(
126-
TSFileDescriptor.getInstance().getConfig().getCompressor());
126+
TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType));
127127
}
128128
});
129129

@@ -180,7 +180,9 @@ void autoCreateTimeSeries(
180180
measurements[measurementIndex],
181181
tsDataTypes[measurementIndex],
182182
getDefaultEncoding(tsDataTypes[measurementIndex]),
183-
TSFileDescriptor.getInstance().getConfig().getCompressor());
183+
TSFileDescriptor.getInstance()
184+
.getConfig()
185+
.getCompressor(tsDataTypes[measurementIndex]));
184186
}
185187
return v;
186188
});
@@ -345,7 +347,9 @@ void autoCreateMissingMeasurements(
345347
? getDefaultEncoding(tsDataTypes[measurementIndex])
346348
: encodings[measurementIndex],
347349
compressionTypes == null
348-
? TSFileDescriptor.getInstance().getConfig().getCompressor()
350+
? TSFileDescriptor.getInstance()
351+
.getConfig()
352+
.getCompressor(tsDataTypes[measurementIndex])
349353
: compressionTypes[measurementIndex]);
350354
}
351355
return v;
@@ -389,7 +393,8 @@ void autoCreateMissingMeasurements(
389393
&& compressionTypesList.get(finalDeviceIndex1) != null) {
390394
compressionType = compressionTypesList.get(finalDeviceIndex1)[index];
391395
} else {
392-
compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
396+
compressionType =
397+
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
393398
}
394399
templateExtendInfo.addMeasurement(
395400
measurement, dataType, encoding, compressionType);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/TemplateSchemaFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ void processTemplateTimeSeries(
155155
measurements[j],
156156
dataType,
157157
getDefaultEncoding(dataType),
158-
TSFileDescriptor.getInstance().getConfig().getCompressor());
158+
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType));
159159
}
160160
}
161161
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,9 @@ private void checkPropsInCreateTimeSeries(CreateTimeSeriesStatement createTimeSe
462462
}
463463

464464
createTimeSeriesStatement.setCompressor(
465-
TSFileDescriptor.getInstance().getConfig().getCompressor());
465+
TSFileDescriptor.getInstance()
466+
.getConfig()
467+
.getCompressor(createTimeSeriesStatement.getDataType()));
466468
if (props != null
467469
&& props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION.toLowerCase())) {
468470
String compressionString =
@@ -524,7 +526,7 @@ public void parseAttributeClausesForCreateAlignedTimeSeries(
524526
createAlignedTimeSeriesStatement.addEncoding(encoding);
525527
}
526528

527-
CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
529+
CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
528530
if (props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) {
529531
String compressorString =
530532
props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase();
@@ -3746,7 +3748,7 @@ void parseAttributeClauseForSchemaTemplate(
37463748
encodings.add(encoding);
37473749
}
37483750

3749-
CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor();
3751+
CompressionType compressor = TSFileDescriptor.getInstance().getConfig().getCompressor(dataType);
37503752
if (props.containsKey(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase())) {
37513753
String compressorString =
37523754
props.get(IoTDBConstant.COLUMN_TIMESERIES_COMPRESSOR.toLowerCase()).toUpperCase();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ public static TsTableColumnSchema generateColumnSchema(
338338
columnName,
339339
dataType,
340340
getDefaultEncoding(dataType),
341-
TSFileDescriptor.getInstance().getConfig().getCompressor())
341+
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType))
342342
// Unknown appears only for tree view field when the type needs auto-detection
343343
// Skip encoding & compressors because view query does not need these
344344
: new FieldColumnSchema(columnName, dataType);
@@ -415,7 +415,7 @@ private List<TsTableColumnSchema> parseInputColumnSchema(
415415
inputColumn.getName(),
416416
dataType,
417417
getDefaultEncoding(dataType),
418-
TSFileDescriptor.getInstance().getConfig().getCompressor()));
418+
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType)));
419419
break;
420420
case TIME:
421421
throw new SemanticException(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public static void validateTableSchema(
236236
real.getName(),
237237
tsDataType,
238238
getDefaultEncoding(tsDataType),
239-
TSFileDescriptor.getInstance().getConfig().getCompressor());
239+
TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType));
240240
innerTreeStatement.setMeasurementSchema(measurementSchema, i);
241241
try {
242242
innerTreeStatement.selfCheckDataTypes(i);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ private void createTimeSeries(Map<String, Object> valueMap, String prefix)
220220
TSDataType type = inferType(entry.getValue());
221221
types.add(type.ordinal());
222222
encodings.add((int) getDefaultEncoding(type).serialize());
223-
compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize());
223+
compressors.add(
224+
(int) TSFileDescriptor.getInstance().getConfig().getCompressor(type).serialize());
224225
}
225226
request.setPaths(paths);
226227
request.setDataTypes(types);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/FirstBatchCompactionAlignedChunkWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ public FirstBatchCompactionAlignedChunkWriter(List<IMeasurementSchema> schemaLis
101101
TSEncoding timeEncoding =
102102
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
103103
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
104-
CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor();
104+
CompressionType timeCompression =
105+
TSFileDescriptor.getInstance().getConfig().getCompressor(timeType);
105106
timeChunkWriter =
106107
new FirstBatchCompactionTimeChunkWriter(
107108
"",

0 commit comments

Comments
 (0)