Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.it.utils;

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
Expand All @@ -30,6 +31,7 @@
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -225,6 +227,32 @@ private void generateTEXT(final Tablet tablet, final int column, final int row)
tablet.addValue(row, column, String.format("test point %d", random.nextInt()));
}

public void generateDeletion(final String device) throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
new ModificationFile(ModificationFile.getExclusiveMods(tsFile), false)) {
modificationFile.write(
new TreeDeletionEntry(
new MeasurementPath(device, IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
Long.MIN_VALUE,
Long.MAX_VALUE));
device2TimeSet.remove(device);
device2MeasurementSchema.remove(device);
}
}

public void generateDeletion(final String device, final MeasurementSchema measurement)
throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
new ModificationFile(ModificationFile.getExclusiveMods(tsFile), false)) {
modificationFile.write(
new TreeDeletionEntry(
new MeasurementPath(device, measurement.getMeasurementName()),
Long.MIN_VALUE,
Long.MAX_VALUE));
device2MeasurementSchema.get(device).remove(measurement);
}
}

public void generateDeletion(final String device, final int number)
throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

package org.apache.iotdb.it.utils;

import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;

import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -173,11 +178,13 @@ private void generateTEXT(final Tablet tablet, final int column, final int row)
tablet.addValue(row, column, String.format("test point %d", random.nextInt()));
}

public long getTotalNumber() {
return table2TimeSet.entrySet().stream()
.mapToInt(
entry -> entry.getValue().size() * table2MeasurementSchema.get(entry.getKey()).size())
.sum();
public void generateDeletion(final String table) throws IOException {
try (final ModificationFile modificationFile =
new ModificationFile(ModificationFile.getExclusiveMods(tsFile), false)) {
modificationFile.write(
new TableDeletionEntry(
new DeletionPredicate(table), new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE)));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@

import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.it.utils.TsFileGenerator;
import org.apache.iotdb.it.utils.TsFileTableGenerator;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.jdbc.IoTDBSQLException;

import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.Path;
Expand Down Expand Up @@ -107,8 +105,8 @@ private void registerSchema() throws SQLException {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_0);
statement.execute("CREATE DATABASE " + SchemaConfig.STORAGE_GROUP_1);
statement.execute("CREATE DATABASE " + SchemaConfig.DATABASE_0);
statement.execute("CREATE DATABASE " + SchemaConfig.DATABASE_1);

statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_00));
statement.execute(convert2SQL(SchemaConfig.DEVICE_0, SchemaConfig.MEASUREMENT_01));
Expand Down Expand Up @@ -163,8 +161,8 @@ private void deleteSG() throws SQLException {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {

statement.execute(String.format("delete database %s", SchemaConfig.STORAGE_GROUP_0));
statement.execute(String.format("delete database %s", SchemaConfig.STORAGE_GROUP_1));
statement.execute(String.format("delete database %s", SchemaConfig.DATABASE_0));
statement.execute(String.format("delete database %s", SchemaConfig.DATABASE_1));
} catch (final IoTDBSQLException ignored) {
}
}
Expand Down Expand Up @@ -702,16 +700,19 @@ public void testLoadWithMods() throws Exception {
generator.registerTimeseries(
SchemaConfig.DEVICE_3, Collections.singletonList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
SchemaConfig.DEVICE_4, Collections.singletonList(SchemaConfig.MEASUREMENT_40));
SchemaConfig.DEVICE_4,
new ArrayList<>(Arrays.asList(SchemaConfig.MEASUREMENT_30, SchemaConfig.MEASUREMENT_40)));
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateDeletion(SchemaConfig.DEVICE_3);
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL / 10_000, false);
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL / 10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, SchemaConfig.MEASUREMENT_30);
writtenPoint2 = generator.getTotalNumber();
}

Expand All @@ -731,6 +732,10 @@ public void testLoadWithMods() throws Exception {
Assert.fail("This ResultSet is empty.");
}
}

TestUtils.assertSingleResultSetEqual(
TestUtils.executeQueryWithRetry(statement, "count timeSeries root.sg.**"),
Collections.singletonMap("count(timeseries)", "18"));
}
}

Expand Down Expand Up @@ -891,139 +896,9 @@ public void testLoadWithConvertOnTypeMismatchForTreeModel() throws Exception {
return pairs;
}

@Test
public void testLoadWithEmptyDatabaseForTableModel() throws Exception {
final int lineCount = 10000;

final List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemasForDataTypeConvertion();
final List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size());

final File file = new File(tmpDir, "1-0-0-0.tsfile");

final List<IMeasurementSchema> schemaList =
measurementSchemas.stream().map(pair -> pair.right).collect(Collectors.toList());

try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) {
generator.registerTable(SchemaConfig.TABLE_0, schemaList, columnCategories);

generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000);
}

// Prepare normal user
try (final Connection adminCon = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement adminStmt = adminCon.createStatement()) {
adminStmt.execute("create user test 'password123456'");
adminStmt.execute(
String.format(
"grant create, insert on %s.%s to user test",
SchemaConfig.DATABASE_0, SchemaConfig.TABLE_0));

// auto-create table
adminStmt.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0));
}

try (final Connection connection =
EnvFactory.getEnv().getConnection("test", "password123456", BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
statement.execute(String.format("load '%s'", file.getAbsolutePath()));
}

try (final Connection adminCon = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement adminStmt = adminCon.createStatement()) {
adminStmt.execute(String.format("use %s", SchemaConfig.DATABASE_0));
try (final ResultSet resultSet =
adminStmt.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) {
if (resultSet.next()) {
Assert.assertEquals(lineCount, resultSet.getLong(1));
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}

@Test
@Ignore("Load with conversion is currently banned")
public void testLoadWithConvertOnTypeMismatchForTableModel() throws Exception {
final int lineCount = 10000;

List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemasForDataTypeConvertion();
List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size());

final File file = new File(tmpDir, "1-0-0-0.tsfile");

List<MeasurementSchema> schemaList1 =
measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
List<IMeasurementSchema> schemaList2 =
measurementSchemas.stream().map(pair -> pair.right).collect(Collectors.toList());

try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) {
generator.registerTable(SchemaConfig.TABLE_0, schemaList2, columnCategories);

generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000);
}

try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0));
statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
statement.execute(convert2TableSQL(SchemaConfig.TABLE_0, schemaList1, columnCategories));
statement.execute(
String.format(
"load '%s' with ('database'='%s')", file.getAbsolutePath(), SchemaConfig.DATABASE_0));
try (final ResultSet resultSet =
statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) {
if (resultSet.next()) {
Assert.assertEquals(lineCount, resultSet.getLong(1));
} else {
Assert.fail("This ResultSet is empty.");
}
}
}
}

private List<ColumnCategory> generateTabletColumnCategory(int tagNum, int filedNum) {
List<ColumnCategory> columnTypes = new ArrayList<>(tagNum + filedNum);
for (int i = 0; i < tagNum; i++) {
columnTypes.add(ColumnCategory.TAG);
}
for (int i = 0; i < filedNum; i++) {
columnTypes.add(ColumnCategory.FIELD);
}
return columnTypes;
}

private String convert2TableSQL(
final String tableName,
final List<MeasurementSchema> schemaList,
final List<ColumnCategory> columnCategoryList) {
List<String> columns = new ArrayList<>();
for (int i = 0; i < schemaList.size(); i++) {
final MeasurementSchema measurement = schemaList.get(i);
columns.add(
String.format(
"%s %s %s",
measurement.getMeasurementName(),
measurement.getType(),
columnCategoryList.get(i).name()));
}
String tableCreation =
String.format("create table %s(%s)", tableName, String.join(", ", columns));
LOGGER.info("schema execute: {}", tableCreation);
return tableCreation;
}

private static class SchemaConfig {
private static final String DATABASE_0 = "root";
private static final String TABLE_0 = "test";
private static final String STORAGE_GROUP_0 = "root.sg.test_0";
private static final String STORAGE_GROUP_1 = "root.sg.test_1";
private static final String DATABASE_0 = "root.sg.test_0";
private static final String DATABASE_1 = "root.sg.test_1";

// device 0, nonaligned, sg 0
private static final String DEVICE_0 = "root.sg.test_0.d_0";
Expand Down
Loading
Loading