Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2b49ea6
fix: guard against tail/tailUsed race in TileVisibility GC and query …
gengdy1545 Mar 17, 2026
ce9b15b
fix: TOCTOU race condition in addVisibility
gengdy1545 Mar 21, 2026
e331b26
fix: use-after-free in deleteTileRecord empty-chain path
gengdy1545 Mar 21, 2026
8327c5c
fix: written files on Retina are uniformly formatted in ordered
gengdy1545 Mar 21, 2026
9d5289f
feat: produce gcSnapshotBitmap during Memory GC for zero-traversal ch…
gengdy1545 Mar 28, 2026
9b910d4
feat: implement StorageGC framework with file scanning and grouping
gengdy1545 Mar 29, 2026
1146b38
feat: unify constructors
gengdy1545 Mar 29, 2026
221c30b
feat: feat: expose hidden commit-timestamp column in PixelsRecordRead…
gengdy1545 Mar 30, 2026
1dd22d6
feat: unify RGVisibility constructor and addVisibility entry points
gengdy1545 Mar 30, 2026
c9ec0a0
feat: implement Storage GC data rewrite with row-filtering compaction
gengdy1545 Mar 31, 2026
3d5fc5e
test: add edge-case and error-handling tests for StorageGarbageCollector
gengdy1545 Mar 31, 2026
4831ef0
feat: implement dual-write redirection for Storage GC rewrite transit…
gengdy1545 Apr 1, 2026
673a7a4
feat: implement visibility sync with chain export/import and coordina…
gengdy1545 Apr 1, 2026
9f00ab4
fix: harden Storage GC pipeline with consistent snapshots and partial…
gengdy1545 Apr 1, 2026
cc8bfee
feat: complete Storage GC pipeline with index sync, atomic commit, an…
gengdy1545 Apr 2, 2026
3903f18
fix: storage gc test
gengdy1545 Apr 6, 2026
acbf8a0
fix: storage gc test
gengdy1545 Apr 6, 2026
413eb3d
feat: opt storage gc test
gengdy1545 Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cpp/pixels-retina/include/RGVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
#define RG_VISIBILITY_H
#include "RetinaBase.h"
#include "TileVisibility.h"
#include <utility>
#include <vector>

template<size_t CAPACITY>
class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
public:
explicit RGVisibility(uint64_t rgRecordNum);
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap);
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp = 0,
const std::vector<uint64_t>* initialBitmap = nullptr);
~RGVisibility() override;

void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);

void collectRGGarbage(uint64_t timestamp);
std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);

std::vector<uint64_t> exportChainItemsAfter(uint64_t safeGcTs) const;
void importDeletionChain(const uint64_t* items, size_t pairCount);

uint64_t getBitmapSize() const;

Expand Down
30 changes: 19 additions & 11 deletions cpp/pixels-retina/include/RGVisibilityJni.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 29 additions & 10 deletions cpp/pixels-retina/include/TileVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <cstring>
#include <cstddef>
#include <cstdint>
#include <utility>
#include <vector>

// rowId supports up to 65535, timestamp uses 48 bits
Expand Down Expand Up @@ -65,13 +66,13 @@ struct VersionedData : public pixels::RetinaBase<VersionedData<CAPACITY>> {
uint64_t baseTimestamp;
DeleteIndexBlock* head; // Delete chain head, part of the version

VersionedData() : baseTimestamp(0), head(nullptr) {
std::memset(baseBitmap, 0, sizeof(baseBitmap));
}

VersionedData(uint64_t ts, const uint64_t* bitmap, DeleteIndexBlock* h)
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit VersionedData(uint64_t ts = 0, const uint64_t* bitmap = nullptr, DeleteIndexBlock* h = nullptr)
: baseTimestamp(ts), head(h) {
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
if (bitmap)
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
else
std::memset(baseBitmap, 0, sizeof(baseBitmap));
}
};

Expand All @@ -92,12 +93,15 @@ template<size_t CAPACITY>
class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
static constexpr size_t NUM_WORDS = BITMAP_WORDS(CAPACITY);
public:
TileVisibility();
TileVisibility(uint64_t ts, const uint64_t* bitmap);
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
~TileVisibility() override;
void deleteTileRecord(uint16_t rowId, uint64_t ts);
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
void collectTileGarbage(uint64_t ts);
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);
void exportChainItemsAfter(uint32_t tileId, uint64_t safeGcTs,
std::vector<std::pair<uint32_t, uint64_t>>& gcChainItems) const;
void importDeletionItems(std::vector<uint64_t>& bucket);

private:
TileVisibility(const TileVisibility &) = delete;
Expand All @@ -108,7 +112,22 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
std::atomic<VersionedData<CAPACITY>*> currentVersion;
std::atomic<DeleteIndexBlock *> tail;
std::atomic<size_t> tailUsed;
std::vector<RetiredVersion<CAPACITY>> retired; // Protected by GC (single writer)

// Retired versions awaiting epoch-based reclamation. Only the GC thread
// (collectTileGarbage / reclaimRetiredVersions) reads and writes this vector,
// so no locking is needed.
std::vector<RetiredVersion<CAPACITY>> retired;

// Lock-free staging slot between deleteTileRecord (CDC threads) and GC.
// deleteTileRecord's empty-chain path replaces currentVersion but cannot
// write `retired` directly — that would race with the GC thread. Instead
// it atomically stores oldVer here. The GC thread drains this slot at the
// start of collectTileGarbage, moving it into `retired` with a proper epoch.
// Flow: deleteTileRecord → pendingRetire.store → collectTileGarbage →
// pendingRetire.exchange(nullptr) → retired.emplace_back → reclaimRetiredVersions
// At most one version is pending per GC cycle (the empty-chain path fires
// at most once between consecutive GC compactions).
std::atomic<VersionedData<CAPACITY>*> pendingRetire{nullptr};
};

#endif // PIXELS_RETINA_TILE_VISIBILITY_H
78 changes: 48 additions & 30 deletions cpp/pixels-retina/lib/RGVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,23 @@
#include <cstring>
#include <thread>

// Validates before allocation: any throw leaves tileVisibilities as nullptr,
// so the incomplete constructor does not invoke the destructor (no memory leak).
template<size_t CAPACITY>
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
void* rawMemory = operator new[](allocSize);
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
for (uint64_t i = 0; i < tileCount; ++i) {
new (&tileVisibilities[i]) TileVisibility<CAPACITY>();
}
}
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp,
const std::vector<uint64_t>* initialBitmap)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY),
tileVisibilities(nullptr) {
if (initialBitmap && initialBitmap->size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY)
throw std::invalid_argument("Initial bitmap size is too small for the given record number.");

template<size_t CAPACITY>
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap)
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
void* rawMemory = operator new[](allocSize);
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(
operator new[](tileCount * sizeof(TileVisibility<CAPACITY>)));

if (initialBitmap.size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY) {
operator delete[](rawMemory);
throw std::runtime_error("Initial bitmap size is too small for the given record number.");
}

tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
for (uint64_t i = 0; i < tileCount; ++i) {
// Each tile takes 4 uint64_t
const uint64_t* tileBitmap = &initialBitmap[i * BITMAP_SIZE_PER_TILE_VISIBILITY];
// We use timestamp 0 for restored checkpoints to serve as the base state
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(timestamp, tileBitmap);
}
for (uint64_t i = 0; i < tileCount; ++i)
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(
timestamp,
initialBitmap ? initialBitmap->data() + i * BITMAP_SIZE_PER_TILE_VISIBILITY : nullptr);
}

template<size_t CAPACITY>
Expand All @@ -62,11 +50,14 @@ RGVisibility<CAPACITY>::~RGVisibility() {
}

template<size_t CAPACITY>
void RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
// TileVisibility::collectTileGarbage uses COW + Epoch, so it's safe to call concurrently
for (uint64_t i = 0; i < tileCount; i++) {
tileVisibilities[i].collectTileGarbage(timestamp);
std::vector<uint64_t> RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
size_t totalWords = tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY;
std::vector<uint64_t> rgSnapshot(totalWords, 0);
for (uint32_t t = 0; t < tileCount; t++) {
tileVisibilities[t].collectTileGarbage(timestamp,
rgSnapshot.data() + t * BITMAP_SIZE_PER_TILE_VISIBILITY);
}
return rgSnapshot;
}

template<size_t CAPACITY>
Expand Down Expand Up @@ -104,5 +95,32 @@ uint64_t RGVisibility<CAPACITY>::getBitmapSize() const {
return tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY;
}

template<size_t CAPACITY>
std::vector<uint64_t> RGVisibility<CAPACITY>::exportChainItemsAfter(uint64_t safeGcTs) const {
std::vector<std::pair<uint32_t, uint64_t>> items;
for (uint32_t t = 0; t < tileCount; t++)
tileVisibilities[t].exportChainItemsAfter(t, safeGcTs, items);
std::vector<uint64_t> result;
result.reserve(items.size() * 2);
for (auto& [off, ts] : items) { result.push_back(off); result.push_back(ts); }
return result;
}

template<size_t CAPACITY>
void RGVisibility<CAPACITY>::importDeletionChain(const uint64_t* items, size_t pairCount) {
std::vector<std::vector<uint64_t>> tileBuckets(tileCount);
for (size_t i = 0; i < pairCount; i++) {
uint32_t rgRowOffset = static_cast<uint32_t>(items[2 * i]);
uint64_t ts = items[2 * i + 1];
uint32_t tileId = rgRowOffset / CAPACITY;
uint16_t localRowId = static_cast<uint16_t>(rgRowOffset % CAPACITY);
tileBuckets[tileId].push_back(makeDeleteIndex(localRowId, ts));
}
for (uint32_t t = 0; t < tileCount; t++) {
if (tileBuckets[t].empty()) continue;
tileVisibilities[t].importDeletionItems(tileBuckets[t]);
}
}

// Explicit Instantiations for JNI use
template class RGVisibility<RETINA_CAPACITY>;
93 changes: 61 additions & 32 deletions cpp/pixels-retina/lib/RGVisibilityJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,28 @@
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObject
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
(JNIEnv* env, jobject, jlong rgRecordNum) {
try {
auto* rgVisibility = new RGVisibilityInstance(rgRecordNum);
return reinterpret_cast<jlong>(rgVisibility);
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return 0;
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObjectInitialized
* Signature: (JJ[J)J
*
* Converts the Java bitmap array to a native vector when present, then
* forwards to a single RGVisibility constructor call.
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObjectInitialized
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
(JNIEnv* env, jobject, jlong rgRecordNum, jlong timestamp, jlongArray bitmap) {
try {
jsize len = env->GetArrayLength(bitmap);
jlong *body = env->GetLongArrayElements(bitmap, nullptr);

std::vector<uint64_t> bitmapData;
bitmapData.reserve(len);
for (int i = 0; i < len; i++) {
bitmapData.push_back((uint64_t)body[i]);
const std::vector<uint64_t>* bitmapPtr = nullptr;
if (bitmap != nullptr) {
jsize len = env->GetArrayLength(bitmap);
jlong* body = env->GetLongArrayElements(bitmap, nullptr);
bitmapData.assign(reinterpret_cast<uint64_t*>(body),
reinterpret_cast<uint64_t*>(body) + len);
env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);
bitmapPtr = &bitmapData;
}

env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);

RGVisibilityInstance *rgVisibility = new RGVisibilityInstance(rgRecordNum, timestamp, bitmapData);
return reinterpret_cast<jlong>(rgVisibility);
return reinterpret_cast<jlong>(new RGVisibilityInstance(
static_cast<uint64_t>(rgRecordNum),
static_cast<uint64_t>(timestamp),
bitmapPtr));
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return 0;
Expand Down Expand Up @@ -129,13 +117,55 @@ JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getVisi
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: garbageCollect
* Signature: (JJ)V
* Signature: (JJ)[J
*/
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
(JNIEnv* env, jobject, jlong timestamp, jlong handle) {
try {
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
rgVisibility->collectRGGarbage(timestamp);
std::vector<uint64_t> snapshot = rgVisibility->collectRGGarbage(timestamp);
jlongArray result = env->NewLongArray(snapshot.size());
env->SetLongArrayRegion(result, 0, snapshot.size(),
reinterpret_cast<const jlong*>(snapshot.data()));
return result;
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return nullptr;
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: exportChainItemsAfter
* Signature: (JJ)[J
*/
JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_exportChainItemsAfter
(JNIEnv* env, jobject, jlong safeGcTs, jlong handle) {
try {
auto* rgVis = reinterpret_cast<RGVisibilityInstance*>(handle);
std::vector<uint64_t> items = rgVis->exportChainItemsAfter(static_cast<uint64_t>(safeGcTs));
jlongArray result = env->NewLongArray(items.size());
env->SetLongArrayRegion(result, 0, items.size(), reinterpret_cast<const jlong*>(items.data()));
return result;
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
return nullptr;
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: importDeletionChain
* Signature: ([JJ)V
*/
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_importDeletionChain
(JNIEnv* env, jobject, jlongArray items, jlong handle) {
try {
auto* rgVis = reinterpret_cast<RGVisibilityInstance*>(handle);
jsize len = env->GetArrayLength(items);
jlong* body = env->GetLongArrayElements(items, nullptr);
rgVis->importDeletionChain(reinterpret_cast<const uint64_t*>(body), len / 2);
env->ReleaseLongArrayElements(items, body, JNI_ABORT);
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
}
Expand Down Expand Up @@ -190,6 +220,5 @@ JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaTra
*/
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaObjectCount
(JNIEnv *env, jclass clazz) {
// Read the atomic object counter from RetinaBase namespace
return static_cast<jlong>(pixels::g_retina_object_count.load(std::memory_order_relaxed));
}
Loading
Loading