Skip to content

Commit 1b67ec5

Browse files
authored
[to dev/1.3] Cherry pick 'estimating inner compaction task memory during selection (#15257)' & Fix estimating memory cost for string type in compaction (#16048)
* estimating inner compaction task memory during selection (#15257) * estimating inner compaction task during selection * use tsfile id * rename * spotless * modify CompactionEstimateUtils * fix bug * fix compile * fix ut * fix bug * fix ut * fix review * modify CompactionEstimateUtils * modify ut * fix ut * add log * add log * fix estimating memory cost for string type * modify SystemInfo * remove Collections.synchronizedMap
1 parent d9ab51d commit 1b67ec5

27 files changed

Lines changed: 583 additions & 136 deletions

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -587,9 +587,6 @@ public class IoTDBConfig {
587587
*/
588588
private volatile double innerCompactionTaskSelectionDiskRedundancy = 0.05;
589589

590-
/** The size of global compaction estimation file info cahce. */
591-
private int globalCompactionFileInfoCacheSize = 1000;
592-
593590
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
594591
private boolean metaDataCacheEnable = true;
595592

@@ -3949,14 +3946,6 @@ public void setCandidateCompactionTaskQueueSize(int candidateCompactionTaskQueue
39493946
this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize;
39503947
}
39513948

3952-
public int getGlobalCompactionFileInfoCacheSize() {
3953-
return globalCompactionFileInfoCacheSize;
3954-
}
3955-
3956-
public void setGlobalCompactionFileInfoCacheSize(int globalCompactionFileInfoCacheSize) {
3957-
this.globalCompactionFileInfoCacheSize = globalCompactionFileInfoCacheSize;
3958-
}
3959-
39603949
public boolean isEnableAuditLog() {
39613950
return enableAuditLog;
39623951
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,17 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer;
2121

22+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator;
2223
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2324

2425
import java.util.List;
26+
import java.util.Optional;
2527

2628
public interface ICrossCompactionPerformer extends ICompactionPerformer {
2729
@Override
2830
void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles);
31+
32+
default Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() {
33+
return Optional.empty();
34+
}
2935
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer;
21+
22+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
23+
24+
import java.util.Optional;
25+
26+
public interface IInnerCompactionPerformer extends ICompactionPerformer {
27+
default Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
28+
return Optional.empty();
29+
}
30+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.util.List;
2525

26-
public interface ISeqCompactionPerformer extends ICompactionPerformer {
26+
public interface ISeqCompactionPerformer extends IInnerCompactionPerformer {
2727
@Override
2828
void setSourceFiles(List<TsFileResource> seqFiles);
2929
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.util.List;
2525

26-
public interface IUnseqCompactionPerformer extends ICompactionPerformer {
26+
public interface IUnseqCompactionPerformer extends IInnerCompactionPerformer {
2727
@Override
2828
void setSourceFiles(List<TsFileResource> unseqFiles);
2929
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter;
4040
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter;
4141
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
42+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator;
43+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
44+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
45+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
4246
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
4347
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
4448
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -61,6 +65,7 @@
6165
import java.util.LinkedHashMap;
6266
import java.util.List;
6367
import java.util.Map;
68+
import java.util.Optional;
6469
import java.util.concurrent.ConcurrentHashMap;
6570
import java.util.concurrent.ExecutionException;
6671
import java.util.concurrent.Future;
@@ -364,4 +369,20 @@ private void readModification(List<TsFileResource> resources) {
364369
modificationCache.put(resource.getTsFile().getName(), modifications);
365370
}
366371
}
372+
373+
public String getDatabaseName() {
374+
return !seqFiles.isEmpty()
375+
? seqFiles.get(0).getDatabaseName()
376+
: unseqFiles.get(0).getDatabaseName();
377+
}
378+
379+
@Override
380+
public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
381+
return Optional.of(new FastCompactionInnerCompactionEstimator());
382+
}
383+
384+
@Override
385+
public Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() {
386+
return Optional.of(new FastCrossSpaceCompactionEstimator());
387+
}
367388
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor;
3131
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
3232
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
33+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
34+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
3335
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3436
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
3537

@@ -45,6 +47,7 @@
4547
import java.util.Collections;
4648
import java.util.LinkedList;
4749
import java.util.List;
50+
import java.util.Optional;
4851

4952
public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
5053
private List<TsFileResource> seqFiles;
@@ -286,4 +289,9 @@ private void compactNotAlignedSeries(
286289
public void setSourceFiles(List<TsFileResource> seqFiles) {
287290
this.seqFiles = seqFiles;
288291
}
292+
293+
@Override
294+
public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
295+
return Optional.of(new ReadChunkInnerCompactionEstimator());
296+
}
289297
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointCrossCompactionWriter;
3939
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointInnerCompactionWriter;
4040
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
41+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
42+
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator;
4143
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
4244
import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
4345
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -61,6 +63,7 @@
6163
import java.util.LinkedList;
6264
import java.util.List;
6365
import java.util.Map;
66+
import java.util.Optional;
6467
import java.util.concurrent.ExecutionException;
6568
import java.util.concurrent.Future;
6669
import java.util.stream.Collectors;
@@ -323,4 +326,9 @@ public void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> u
323326
public void setSourceFiles(List<TsFileResource> unseqFiles) {
324327
this.unseqFiles = unseqFiles;
325328
}
329+
330+
@Override
331+
public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() {
332+
return Optional.of(new RepairUnsortedFileCompactionEstimator());
333+
}
326334
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public abstract class AbstractCompactionTask {
7474
protected CompactionTaskSummary summary;
7575
protected long serialId;
7676
protected CompactionTaskStage taskStage;
77+
protected long roughMemoryCost = -1L;
7778
protected long memoryCost = 0L;
7879

7980
protected boolean recoverMemoryStatus;
@@ -256,6 +257,15 @@ public void transitSourceFilesToMerging() throws FileCannotTransitToCompactingEx
256257
}
257258
}
258259

260+
@TestOnly
261+
public long getRoughMemoryCost() {
262+
return roughMemoryCost;
263+
}
264+
265+
public void setRoughMemoryCost(long memoryCost) {
266+
this.roughMemoryCost = memoryCost;
267+
}
268+
259269
public abstract long getEstimatedMemoryCost();
260270

261271
public abstract int getProcessedFileNum();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
2929
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
3030
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
31+
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer;
3132
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
32-
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
3333
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
3434
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
3535
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
@@ -39,8 +39,6 @@
3939
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
4040
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator;
4141
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.CompactionEstimateUtils;
42-
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
43-
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
4442
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
4543
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
4644
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -674,20 +672,14 @@ protected void releaseAllLocks() {
674672
@Override
675673
public long getEstimatedMemoryCost() {
676674
if (innerSpaceEstimator == null) {
677-
if (this.performer instanceof ReadChunkCompactionPerformer) {
678-
innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
679-
} else if (this.performer instanceof FastCompactionPerformer) {
680-
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
681-
}
675+
innerSpaceEstimator =
676+
((IInnerCompactionPerformer) this.performer).getInnerSpaceEstimator().orElse(null);
682677
}
683678
if (innerSpaceEstimator != null && memoryCost == 0L) {
684679
try {
685-
long roughEstimatedMemoryCost =
686-
innerSpaceEstimator.roughEstimateInnerCompactionMemory(
687-
filesView.sourceFilesInCompactionPerformer);
688680
memoryCost =
689-
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
690-
? roughEstimatedMemoryCost
681+
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughMemoryCost)
682+
? roughMemoryCost
691683
: innerSpaceEstimator.estimateInnerCompactionMemory(
692684
filesView.sourceFilesInCompactionPerformer);
693685
} catch (CompactionSourceFileDeletedException e) {

0 commit comments

Comments
 (0)