Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
Expand All @@ -62,9 +63,11 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -1176,6 +1179,10 @@ public class IoTDBConfig {

private boolean includeNullValueInWriteThroughputMetric = false;

private ConcurrentHashMap<String, EncryptParameter> tsFileDBToEncryptMap =
new ConcurrentHashMap<>(
Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null)));

IoTDBConfig() {}

public int getMaxLogEntriesNumPerBatch() {
Expand Down Expand Up @@ -4225,4 +4232,8 @@ public int getPasswordLockTimeMinutes() {
public void setPasswordLockTimeMinutes(int passwordLockTimeMinutes) {
this.passwordLockTimeMinutes = passwordLockTimeMinutes;
}

public ConcurrentHashMap<String, EncryptParameter> getTSFileDBToEncryptMap() {
return tsFileDBToEncryptMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.db.utils.EncryptDBUtils;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -3087,7 +3088,9 @@ public int executeCompaction() throws InterruptedException {
if (!isCompactionSelecting.compareAndSet(false, true)) {
return 0;
}
CompactionScheduleContext context = new CompactionScheduleContext();
CompactionScheduleContext context =
new CompactionScheduleContext(
EncryptDBUtils.getFirstEncryptParamFromDatabase(databaseName));
try {
List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from largest to smallest
Expand Down Expand Up @@ -3140,7 +3143,9 @@ public int executeTTLCheck() throws InterruptedException {
return 0;
}
logger.info("[TTL] {}-{} Start ttl and modification checking.", databaseName, dataRegionId);
CompactionScheduleContext context = new CompactionScheduleContext();
CompactionScheduleContext context =
new CompactionScheduleContext(
EncryptDBUtils.getFirstEncryptParamFromDatabase(databaseName));
List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from smallest to largest
Collections.sort(timePartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionPerformerException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;

import org.apache.tsfile.encrypt.EncryptParameter;

public enum CrossCompactionPerformer {
READ_POINT,
FAST;
Expand All @@ -38,6 +41,7 @@ public static CrossCompactionPerformer getCrossCompactionPerformer(String name)
"Illegal compaction performer for cross compaction " + name);
}

@TestOnly
public ICrossCompactionPerformer createInstance() {
switch (this) {
case READ_POINT:
Expand All @@ -49,4 +53,16 @@ public ICrossCompactionPerformer createInstance() {
"Illegal compaction performer for cross compaction " + this);
}
}

public ICrossCompactionPerformer createInstance(EncryptParameter encryptParameter) {
switch (this) {
case READ_POINT:
return new ReadPointCompactionPerformer(encryptParameter);
case FAST:
return new FastCompactionPerformer(true, encryptParameter);
default:
throw new IllegalCompactionPerformerException(
"Illegal compaction performer for cross compaction " + this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionPerformerException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;

import org.apache.tsfile.encrypt.EncryptParameter;

public enum InnerSeqCompactionPerformer {
READ_CHUNK,
FAST;
Expand All @@ -38,6 +41,7 @@ public static InnerSeqCompactionPerformer getInnerSeqCompactionPerformer(String
"Illegal compaction performer for seq inner compaction " + name);
}

@TestOnly
public ISeqCompactionPerformer createInstance() {
switch (this) {
case READ_CHUNK:
Expand All @@ -49,4 +53,16 @@ public ISeqCompactionPerformer createInstance() {
"Illegal compaction performer for seq inner compaction " + this);
}
}

public ISeqCompactionPerformer createInstance(EncryptParameter encryptParameter) {
switch (this) {
case READ_CHUNK:
return new ReadChunkCompactionPerformer(encryptParameter);
case FAST:
return new FastCompactionPerformer(false, encryptParameter);
default:
throw new IllegalCompactionPerformerException(
"Illegal compaction performer for seq inner compaction " + this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.IllegalCompactionPerformerException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;

import org.apache.tsfile.encrypt.EncryptParameter;

public enum InnerUnseqCompactionPerformer {
READ_POINT,
FAST;
Expand All @@ -38,6 +41,7 @@ public static InnerUnseqCompactionPerformer getInnerUnseqCompactionPerformer(Str
"Illegal compaction performer for unseq inner compaction " + name);
}

@TestOnly
public IUnseqCompactionPerformer createInstance() {
switch (this) {
case READ_POINT:
Expand All @@ -49,4 +53,16 @@ public IUnseqCompactionPerformer createInstance() {
"Illegal compaction performer for unseq inner compaction " + this);
}
}

public IUnseqCompactionPerformer createInstance(EncryptParameter encryptParameter) {
switch (this) {
case READ_POINT:
return new ReadPointCompactionPerformer(encryptParameter);
case FAST:
return new FastCompactionPerformer(false, encryptParameter);
default:
throw new IllegalCompactionPerformerException(
"Illegal compaction performer for unseq inner compaction " + this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
Expand All @@ -47,6 +48,8 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.exception.StopReadTsFileByInterruptException;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.IDeviceID;
Expand Down Expand Up @@ -96,6 +99,9 @@ public class FastCompactionPerformer

private final boolean isCrossCompaction;

private EncryptParameter encryptParameter;

@TestOnly
public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
Expand All @@ -109,10 +115,41 @@ public FastCompactionPerformer(
} else {
isCrossCompaction = true;
}
this.encryptParameter =
new EncryptParameter(
TSFileDescriptor.getInstance().getConfig().getEncryptType(),
TSFileDescriptor.getInstance().getConfig().getEncryptKey());
}

public FastCompactionPerformer(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
List<TsFileResource> targetFiles,
EncryptParameter encryptParameter) {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.targetFiles = targetFiles;
if (seqFiles.isEmpty() || unseqFiles.isEmpty()) {
// inner space compaction
isCrossCompaction = false;
} else {
isCrossCompaction = true;
}
this.encryptParameter = encryptParameter;
}

@TestOnly
public FastCompactionPerformer(boolean isCrossCompaction) {
this.isCrossCompaction = isCrossCompaction;
this.encryptParameter =
new EncryptParameter(
TSFileDescriptor.getInstance().getConfig().getEncryptType(),
TSFileDescriptor.getInstance().getConfig().getEncryptKey());
}

public FastCompactionPerformer(boolean isCrossCompaction, EncryptParameter encryptParameter) {
this.isCrossCompaction = isCrossCompaction;
this.encryptParameter = encryptParameter;
}

@Override
Expand All @@ -122,8 +159,9 @@ public void perform() throws Exception {
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
: new FastInnerCompactionWriter(targetFiles)) {
? new FastCrossCompactionWriter(
targetFiles, seqFiles, readerCacheMap, encryptParameter)
: new FastInnerCompactionWriter(targetFiles, encryptParameter)) {
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer;
Expand All @@ -35,7 +36,10 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EncryptDBUtils;

import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
Expand Down Expand Up @@ -66,21 +70,60 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
* IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion());
private Schema schema = null;

private EncryptParameter firstEncryptParameter;

@TestOnly
public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles, TsFileResource targetFile) {
this(sourceFiles, Collections.singletonList(targetFile));
}

public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles,
TsFileResource targetFile,
EncryptParameter encryptParameter) {
this(sourceFiles, Collections.singletonList(targetFile), encryptParameter);
}

@TestOnly
public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles, List<TsFileResource> targetFiles) {
setSourceFiles(sourceFiles);
setTargetFiles(targetFiles);
this.firstEncryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam();
}

public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles,
List<TsFileResource> targetFiles,
EncryptParameter encryptParameter) {
setSourceFiles(sourceFiles);
setTargetFiles(targetFiles);
this.firstEncryptParameter = encryptParameter;
}

@TestOnly
public ReadChunkCompactionPerformer(List<TsFileResource> sourceFiles) {
setSourceFiles(sourceFiles);
this.firstEncryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam();
}

public ReadChunkCompactionPerformer(
List<TsFileResource> sourceFiles, EncryptParameter encryptParameter) {
setSourceFiles(sourceFiles);
this.firstEncryptParameter = encryptParameter;
}

@TestOnly
public ReadChunkCompactionPerformer() {
this.firstEncryptParameter =
new EncryptParameter(
TSFileDescriptor.getInstance().getConfig().getEncryptType(),
TSFileDescriptor.getInstance().getConfig().getEncryptKey());
}

public ReadChunkCompactionPerformer() {}
public ReadChunkCompactionPerformer(EncryptParameter encryptParameter) {
this.firstEncryptParameter = encryptParameter;
}

@Override
public void perform()
Expand Down Expand Up @@ -164,7 +207,8 @@ private void useNewWriter() throws IOException {
new CompactionTsFileWriter(
targetResources.get(currentTargetFileIndex).getTsFile(),
memoryBudgetForFileWriter,
CompactionType.INNER_SEQ_COMPACTION);
CompactionType.INNER_SEQ_COMPACTION,
firstEncryptParameter);
currentWriter.setSchema(CompactionTableSchemaCollector.copySchema(schema));
}

Expand Down
Loading
Loading