Skip to content

Commit 9b910d4

Browse files
committed
feat: implement StorageGC framework with file scanning and grouping
1 parent 9d5289f commit 9b910d4

6 files changed

Lines changed: 1190 additions & 17 deletions

File tree

pixels-common/src/main/resources/pixels.properties

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ cpuspl = [10000,60000,300000,600000]
265265
# split mem (G)
266266
memspl = [1,8,16,32,64]
267267

268-
### pixels-retina - write buffer flush configuration ###
268+
### pixels-retina ###
269269

270270
# set to true to enable pixels-retina
271271
retina.enable=false
@@ -304,7 +304,15 @@ pixels.transaction.offload.threshold=1800
304304
# lease duration for retina offload cache in seconds, default 600s
305305
retina.offload.cache.lease.duration=600
306306
# snapshot storage directory
307-
pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints
307+
retina.checkpoint.dir=file:///tmp/pixels-checkpoints
308+
# set to true to enable storage GC (rewrites high-deletion-ratio files to reclaim space)
309+
retina.storage.gc.enabled=false
310+
# invalidRatio must be strictly greater than this value for a file to be a GC candidate
311+
retina.storage.gc.threshold=0.5
312+
# target size in bytes for rewritten files produced by Storage GC, default 128MB
313+
retina.storage.gc.target.file.size=134217728
314+
# maximum number of (tableId, virtualNodeId) file groups processed per GC cycle
315+
retina.storage.gc.max.file.groups.per.run=10
308316

309317
### pixels-sink ###
310318
sink.server.enabled=false

pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class TestVisibilityCheckpointCache
4646
@Before
4747
public void setUp() throws IOException
4848
{
49-
testCheckpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir");
49+
testCheckpointDir = ConfigFactory.Instance().getProperty("retina.checkpoint.dir");
5050
storage = StorageFactory.Instance().getStorage(testCheckpointDir);
5151

5252
if (!storage.exists(testCheckpointDir))

pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class RetinaResourceManager
6363

6464
// GC related fields
6565
private final ScheduledExecutorService gcExecutor;
66+
private final boolean storageGcEnabled;
67+
private final StorageGarbageCollector storageGarbageCollector;
6668

6769
// Checkpoint related fields
6870
private final ExecutorService checkpointExecutor;
@@ -91,7 +93,7 @@ private RetinaResourceManager()
9193
ConfigFactory config = ConfigFactory.Instance();
9294

9395
this.checkpointRefCounts = new ConcurrentHashMap<>();
94-
this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir");
96+
this.checkpointDir = config.getProperty("retina.checkpoint.dir");
9597

9698
int cpThreads = Integer.parseInt(config.getProperty("retina.checkpoint.threads"));
9799
this.checkpointExecutor = Executors.newFixedThreadPool(cpThreads, r -> {
@@ -124,6 +126,31 @@ private RetinaResourceManager()
124126
this.gcExecutor = executor;
125127
totalVirtualNodeNum = Integer.parseInt(ConfigFactory.Instance().getProperty("node.virtual.num"));
126128
this.retinaHostName = NetUtils.getLocalHostName();
129+
130+
boolean gcEnabled = false;
131+
StorageGarbageCollector gc = null;
132+
try
133+
{
134+
gcEnabled = Boolean.parseBoolean(config.getProperty("retina.storage.gc.enabled"));
135+
if (gcEnabled)
136+
{
137+
double threshold = Double.parseDouble(config.getProperty("retina.storage.gc.threshold"));
138+
long targetFileSize = Long.parseLong(config.getProperty("retina.storage.gc.target.file.size"));
139+
int maxGroups = Integer.parseInt(config.getProperty("retina.storage.gc.max.file.groups.per.run"));
140+
gc = new StorageGarbageCollector(this, this.metadataService,
141+
threshold, targetFileSize, maxGroups);
142+
logger.info("Storage GC enabled (threshold={}, targetFileSize={}, maxGroups={})",
143+
threshold, targetFileSize, maxGroups);
144+
}
145+
}
146+
catch (Exception e)
147+
{
148+
logger.error("Failed to initialise StorageGarbageCollector, Storage GC will be disabled", e);
149+
gcEnabled = false;
150+
gc = null;
151+
}
152+
this.storageGcEnabled = gcEnabled;
153+
this.storageGarbageCollector = gc;
127154
}
128155

129156
private static final class InstanceHolder
@@ -595,7 +622,7 @@ private PixelsWriteBuffer checkPixelsWriteBuffer(String schema, String table, in
595622
}
596623

597624
/**
598-
* Run a full GC cycle: Memory GC → checkpoint → Storage GC (future).
625+
* Run a full GC cycle: Memory GC → checkpoint → Storage GC (S1 scan + future S2-S6).
599626
*
600627
* <p>Ordering rationale:
601628
* <ol>
@@ -605,16 +632,18 @@ private PixelsWriteBuffer checkPixelsWriteBuffer(String schema, String table, in
605632
* {@code getVisibilityBitmap(lwm)} call traverses at most one partial block
606633
* (≤ {@code BLOCK_CAPACITY} items) instead of the entire pre-GC chain. This makes
607634
* checkpoint bitmap serialisation significantly cheaper.</li>
608-
* <li><b>Checkpoint second, blocking</b>: we call {@code .join()} so that
609-
* {@code runGC()} does not return until the checkpoint file is fully written to disk.
610-
* {@code gcExecutor} is a single-threaded scheduler; because {@code runGC()} must
611-
* complete before the next invocation begins, this blocking join is the simplest way
612-
* to guarantee that no two GC cycles ever overlap — no additional lock is required.</li>
635+
* <li><b>Checkpoint second, unconditional and blocking</b>: written regardless of whether
636+
* Storage GC finds any candidate files. The {@code .join()} ensures the checkpoint
637+
* file is fully on disk before Storage GC begins rewriting any files, so crash
638+
* recovery can always restore the post-Memory-GC visibility state independently of
639+
* any in-progress Storage GC rewrite. {@code gcExecutor} is single-threaded, so the
640+
* blocking join is also the simplest way to guarantee no two GC cycles overlap.</li>
613641
* <li><b>Storage GC third</b>: requires an up-to-date {@code baseBitmap} (hence after
614642
* Memory GC) and its own WAL for crash recovery. Placing it after the checkpoint
615643
* keeps the two recovery paths independent: on restart, the GC checkpoint restores
616644
* the post-Memory-GC visibility state, and the GcWal resumes any in-progress Storage
617-
* GC task separately.</li>
645+
* GC task separately. Once scan completes, bitmaps for non-candidate files are
646+
* immediately released from memory (they are no longer needed by S2-S6).</li>
618647
* <li><b>Advance {@code latestGcTimestamp} last</b>: updated only after the entire cycle
619648
* succeeds (Memory GC + checkpoint + Storage GC). If any step throws, the timestamp
620649
* is not advanced and the next scheduled invocation will retry the full cycle.</li>
@@ -640,20 +669,38 @@ private void runGC()
640669
try
641670
{
642671
// Step 1: Memory GC — compact Deletion Chain into baseBitmap.
643-
// Collect gcSnapshotBitmaps produced as a side-effect of compaction.
672+
// In the same pass, pre-compute per-RG stats (recordNum, invalidCount) that
673+
// Storage GC needs for S1 candidate selection, avoiding a second traversal.
644674
Map<String, long[]> gcSnapshotBitmaps = new HashMap<>();
675+
Map<String, long[]> rgStats = new HashMap<>(); // rgKey → {recordNum, invalidCount}
645676
for (Map.Entry<String, RGVisibility> entry : this.rgVisibilityMap.entrySet())
646677
{
647678
long[] bitmap = entry.getValue().garbageCollect(timestamp);
648679
gcSnapshotBitmaps.put(entry.getKey(), bitmap);
680+
681+
long recordNum = entry.getValue().getRecordNum();
682+
long invalidCount = 0;
683+
for (long word : bitmap) invalidCount += Long.bitCount(word);
684+
rgStats.put(entry.getKey(), new long[]{recordNum, invalidCount});
649685
}
650686

651-
// Step 2: Persist the post-Memory-GC visibility state using precomputed
652-
// bitmaps (zero chain traversal). Block until fully written.
687+
// Step 2: Checkpoint — always written unconditionally with the full visibility
688+
// snapshot before any Storage GC rewriting begins.
653689
createCheckpoint(timestamp, CheckpointType.GC, gcSnapshotBitmaps).join();
654690

655-
// Step 3: Storage GC — rewrite high-deletion-ratio files (TODO).
656-
// if (storageGcEnabled) storageGarbageCollector.runStorageGC();
691+
// Step 3: Storage GC — scan for high-deletion-ratio files; on finding candidates,
692+
// trim non-candidate bitmaps from memory and log (S2-S6 added in Tasks 3-6).
693+
if (storageGcEnabled && storageGarbageCollector != null)
694+
{
695+
try
696+
{
697+
storageGarbageCollector.runStorageGC(timestamp, rgStats, gcSnapshotBitmaps);
698+
}
699+
catch (Exception e)
700+
{
701+
logger.error("Storage GC failed", e);
702+
}
703+
}
657704

658705
// Step 4: Advance the timestamp only after the full cycle succeeds.
659706
// latestGcTimestamp is no longer updated inside createCheckpoint's async

0 commit comments

Comments
 (0)