Skip to content

Commit eb01bb5

Browse files
authored
[Issue #1288] implement file rewriting to enable garbage collection at the storage layer and fix some visibility bugs (#1310)
1 parent 7904a18 commit eb01bb5

File tree

33 files changed

+8532
-330
lines changed

33 files changed

+8532
-330
lines changed

cpp/pixels-retina/include/RGVisibility.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,23 @@
2121
#define RG_VISIBILITY_H
2222
#include "RetinaBase.h"
2323
#include "TileVisibility.h"
24+
#include <utility>
2425
#include <vector>
2526

2627
template<size_t CAPACITY>
2728
class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
2829
public:
29-
explicit RGVisibility(uint64_t rgRecordNum);
30-
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap);
30+
explicit RGVisibility(uint64_t rgRecordNum, uint64_t timestamp = 0,
31+
const std::vector<uint64_t>* initialBitmap = nullptr);
3132
~RGVisibility() override;
3233

3334
void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
3435
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);
3536

36-
void collectRGGarbage(uint64_t timestamp);
37+
std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);
38+
39+
std::vector<uint64_t> exportChainItemsAfter(uint64_t safeGcTs) const;
40+
void importDeletionChain(const uint64_t* items, size_t pairCount);
3741

3842
uint64_t getBitmapSize() const;
3943

cpp/pixels-retina/include/RGVisibilityJni.h

Lines changed: 19 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cpp/pixels-retina/include/TileVisibility.h

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <cstring>
3333
#include <cstddef>
3434
#include <cstdint>
35+
#include <utility>
3536
#include <vector>
3637

3738
// rowId supports up to 65535, timestamp uses 48 bits
@@ -65,13 +66,13 @@ struct VersionedData : public pixels::RetinaBase<VersionedData<CAPACITY>> {
6566
uint64_t baseTimestamp;
6667
DeleteIndexBlock* head; // Delete chain head, part of the version
6768

68-
VersionedData() : baseTimestamp(0), head(nullptr) {
69-
std::memset(baseBitmap, 0, sizeof(baseBitmap));
70-
}
71-
72-
VersionedData(uint64_t ts, const uint64_t* bitmap, DeleteIndexBlock* h)
69+
// timestamp defaults to 0; bitmap defaults to all-zeros.
70+
explicit VersionedData(uint64_t ts = 0, const uint64_t* bitmap = nullptr, DeleteIndexBlock* h = nullptr)
7371
: baseTimestamp(ts), head(h) {
74-
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
72+
if (bitmap)
73+
std::memcpy(baseBitmap, bitmap, NUM_WORDS * sizeof(uint64_t));
74+
else
75+
std::memset(baseBitmap, 0, sizeof(baseBitmap));
7576
}
7677
};
7778

@@ -92,12 +93,15 @@ template<size_t CAPACITY>
9293
class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
9394
static constexpr size_t NUM_WORDS = BITMAP_WORDS(CAPACITY);
9495
public:
95-
TileVisibility();
96-
TileVisibility(uint64_t ts, const uint64_t* bitmap);
96+
// timestamp defaults to 0; bitmap defaults to all-zeros.
97+
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
9798
~TileVisibility() override;
9899
void deleteTileRecord(uint16_t rowId, uint64_t ts);
99100
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
100-
void collectTileGarbage(uint64_t ts);
101+
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);
102+
void exportChainItemsAfter(uint32_t tileId, uint64_t safeGcTs,
103+
std::vector<std::pair<uint32_t, uint64_t>>& gcChainItems) const;
104+
void importDeletionItems(std::vector<uint64_t>& bucket);
101105

102106
private:
103107
TileVisibility(const TileVisibility &) = delete;
@@ -108,7 +112,22 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
108112
std::atomic<VersionedData<CAPACITY>*> currentVersion;
109113
std::atomic<DeleteIndexBlock *> tail;
110114
std::atomic<size_t> tailUsed;
111-
std::vector<RetiredVersion<CAPACITY>> retired; // Protected by GC (single writer)
115+
116+
// Retired versions awaiting epoch-based reclamation. Only the GC thread
117+
// (collectTileGarbage / reclaimRetiredVersions) reads and writes this vector,
118+
// so no locking is needed.
119+
std::vector<RetiredVersion<CAPACITY>> retired;
120+
121+
// Lock-free staging slot between deleteTileRecord (CDC threads) and GC.
122+
// deleteTileRecord's empty-chain path replaces currentVersion but cannot
123+
// write `retired` directly — that would race with the GC thread. Instead
124+
// it atomically stores oldVer here. The GC thread drains this slot at the
125+
// start of collectTileGarbage, moving it into `retired` with a proper epoch.
126+
// Flow: deleteTileRecord → pendingRetire.store → collectTileGarbage →
127+
// pendingRetire.exchange(nullptr) → retired.emplace_back → reclaimRetiredVersions
128+
// At most one version is pending per GC cycle (the empty-chain path fires
129+
// at most once between consecutive GC compactions).
130+
std::atomic<VersionedData<CAPACITY>*> pendingRetire{nullptr};
112131
};
113132

114133
#endif // PIXELS_RETINA_TILE_VISIBILITY_H

cpp/pixels-retina/lib/RGVisibility.cpp

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,35 +22,23 @@
2222
#include <cstring>
2323
#include <thread>
2424

25+
// Validates before allocation: any throw leaves tileVisibilities as nullptr,
26+
// so the incomplete constructor does not invoke the destructor (no memory leak).
2527
template<size_t CAPACITY>
26-
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum)
27-
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
28-
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
29-
void* rawMemory = operator new[](allocSize);
30-
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
31-
for (uint64_t i = 0; i < tileCount; ++i) {
32-
new (&tileVisibilities[i]) TileVisibility<CAPACITY>();
33-
}
34-
}
28+
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp,
29+
const std::vector<uint64_t>* initialBitmap)
30+
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY),
31+
tileVisibilities(nullptr) {
32+
if (initialBitmap && initialBitmap->size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY)
33+
throw std::invalid_argument("Initial bitmap size is too small for the given record number.");
3534

36-
template<size_t CAPACITY>
37-
RGVisibility<CAPACITY>::RGVisibility(uint64_t rgRecordNum, uint64_t timestamp, const std::vector<uint64_t>& initialBitmap)
38-
: tileCount((rgRecordNum + VISIBILITY_RECORD_CAPACITY - 1) / VISIBILITY_RECORD_CAPACITY) {
39-
size_t allocSize = tileCount * sizeof(TileVisibility<CAPACITY>);
40-
void* rawMemory = operator new[](allocSize);
35+
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(
36+
operator new[](tileCount * sizeof(TileVisibility<CAPACITY>)));
4137

42-
if (initialBitmap.size() < tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY) {
43-
operator delete[](rawMemory);
44-
throw std::runtime_error("Initial bitmap size is too small for the given record number.");
45-
}
46-
47-
tileVisibilities = static_cast<TileVisibility<CAPACITY>*>(rawMemory);
48-
for (uint64_t i = 0; i < tileCount; ++i) {
49-
// Each tile takes 4 uint64_t
50-
const uint64_t* tileBitmap = &initialBitmap[i * BITMAP_SIZE_PER_TILE_VISIBILITY];
51-
// We use timestamp 0 for restored checkpoints to serve as the base state
52-
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(timestamp, tileBitmap);
53-
}
38+
for (uint64_t i = 0; i < tileCount; ++i)
39+
new (&tileVisibilities[i]) TileVisibility<CAPACITY>(
40+
timestamp,
41+
initialBitmap ? initialBitmap->data() + i * BITMAP_SIZE_PER_TILE_VISIBILITY : nullptr);
5442
}
5543

5644
template<size_t CAPACITY>
@@ -62,11 +50,14 @@ RGVisibility<CAPACITY>::~RGVisibility() {
6250
}
6351

6452
template<size_t CAPACITY>
65-
void RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
66-
// TileVisibility::collectTileGarbage uses COW + Epoch, so it's safe to call concurrently
67-
for (uint64_t i = 0; i < tileCount; i++) {
68-
tileVisibilities[i].collectTileGarbage(timestamp);
53+
std::vector<uint64_t> RGVisibility<CAPACITY>::collectRGGarbage(uint64_t timestamp) {
54+
size_t totalWords = tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY;
55+
std::vector<uint64_t> rgSnapshot(totalWords, 0);
56+
for (uint32_t t = 0; t < tileCount; t++) {
57+
tileVisibilities[t].collectTileGarbage(timestamp,
58+
rgSnapshot.data() + t * BITMAP_SIZE_PER_TILE_VISIBILITY);
6959
}
60+
return rgSnapshot;
7061
}
7162

7263
template<size_t CAPACITY>
@@ -104,5 +95,32 @@ uint64_t RGVisibility<CAPACITY>::getBitmapSize() const {
10495
return tileCount * BITMAP_SIZE_PER_TILE_VISIBILITY;
10596
}
10697

98+
template<size_t CAPACITY>
99+
std::vector<uint64_t> RGVisibility<CAPACITY>::exportChainItemsAfter(uint64_t safeGcTs) const {
100+
std::vector<std::pair<uint32_t, uint64_t>> items;
101+
for (uint32_t t = 0; t < tileCount; t++)
102+
tileVisibilities[t].exportChainItemsAfter(t, safeGcTs, items);
103+
std::vector<uint64_t> result;
104+
result.reserve(items.size() * 2);
105+
for (auto& [off, ts] : items) { result.push_back(off); result.push_back(ts); }
106+
return result;
107+
}
108+
109+
template<size_t CAPACITY>
110+
void RGVisibility<CAPACITY>::importDeletionChain(const uint64_t* items, size_t pairCount) {
111+
std::vector<std::vector<uint64_t>> tileBuckets(tileCount);
112+
for (size_t i = 0; i < pairCount; i++) {
113+
uint32_t rgRowOffset = static_cast<uint32_t>(items[2 * i]);
114+
uint64_t ts = items[2 * i + 1];
115+
uint32_t tileId = rgRowOffset / CAPACITY;
116+
uint16_t localRowId = static_cast<uint16_t>(rgRowOffset % CAPACITY);
117+
tileBuckets[tileId].push_back(makeDeleteIndex(localRowId, ts));
118+
}
119+
for (uint32_t t = 0; t < tileCount; t++) {
120+
if (tileBuckets[t].empty()) continue;
121+
tileVisibilities[t].importDeletionItems(tileBuckets[t]);
122+
}
123+
}
124+
107125
// Explicit Instantiations for JNI use
108126
template class RGVisibility<RETINA_CAPACITY>;

cpp/pixels-retina/lib/RGVisibilityJni.cpp

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,40 +26,28 @@
2626
/*
2727
* Class: io_pixelsdb_pixels_retina_RGVisibility
2828
* Method: createNativeObject
29-
* Signature: (J)J
30-
*/
31-
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
32-
(JNIEnv* env, jobject, jlong rgRecordNum) {
33-
try {
34-
auto* rgVisibility = new RGVisibilityInstance(rgRecordNum);
35-
return reinterpret_cast<jlong>(rgVisibility);
36-
} catch (const std::exception& e) {
37-
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
38-
return 0;
39-
}
40-
}
41-
42-
/*
43-
* Class: io_pixelsdb_pixels_retina_RGVisibility
44-
* Method: createNativeObjectInitialized
4529
* Signature: (JJ[J)J
30+
*
31+
* Converts the Java bitmap array to a native vector when present, then
32+
* forwards to a single RGVisibility constructor call.
4633
*/
47-
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObjectInitialized
34+
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_createNativeObject
4835
(JNIEnv* env, jobject, jlong rgRecordNum, jlong timestamp, jlongArray bitmap) {
4936
try {
50-
jsize len = env->GetArrayLength(bitmap);
51-
jlong *body = env->GetLongArrayElements(bitmap, nullptr);
52-
5337
std::vector<uint64_t> bitmapData;
54-
bitmapData.reserve(len);
55-
for (int i = 0; i < len; i++) {
56-
bitmapData.push_back((uint64_t)body[i]);
38+
const std::vector<uint64_t>* bitmapPtr = nullptr;
39+
if (bitmap != nullptr) {
40+
jsize len = env->GetArrayLength(bitmap);
41+
jlong* body = env->GetLongArrayElements(bitmap, nullptr);
42+
bitmapData.assign(reinterpret_cast<uint64_t*>(body),
43+
reinterpret_cast<uint64_t*>(body) + len);
44+
env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);
45+
bitmapPtr = &bitmapData;
5746
}
58-
59-
env->ReleaseLongArrayElements(bitmap, body, JNI_ABORT);
60-
61-
RGVisibilityInstance *rgVisibility = new RGVisibilityInstance(rgRecordNum, timestamp, bitmapData);
62-
return reinterpret_cast<jlong>(rgVisibility);
47+
return reinterpret_cast<jlong>(new RGVisibilityInstance(
48+
static_cast<uint64_t>(rgRecordNum),
49+
static_cast<uint64_t>(timestamp),
50+
bitmapPtr));
6351
} catch (const std::exception& e) {
6452
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
6553
return 0;
@@ -129,13 +117,55 @@ JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getVisi
129117
/*
130118
* Class: io_pixelsdb_pixels_retina_RGVisibility
131119
* Method: garbageCollect
132-
* Signature: (JJ)V
120+
* Signature: (JJ)[J
133121
*/
134-
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
122+
JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_garbageCollect
135123
(JNIEnv* env, jobject, jlong timestamp, jlong handle) {
136124
try {
137125
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
138-
rgVisibility->collectRGGarbage(timestamp);
126+
std::vector<uint64_t> snapshot = rgVisibility->collectRGGarbage(timestamp);
127+
jlongArray result = env->NewLongArray(snapshot.size());
128+
env->SetLongArrayRegion(result, 0, snapshot.size(),
129+
reinterpret_cast<const jlong*>(snapshot.data()));
130+
return result;
131+
} catch (const std::exception& e) {
132+
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
133+
return nullptr;
134+
}
135+
}
136+
137+
/*
138+
* Class: io_pixelsdb_pixels_retina_RGVisibility
139+
* Method: exportChainItemsAfter
140+
* Signature: (JJ)[J
141+
*/
142+
JNIEXPORT jlongArray JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_exportChainItemsAfter
143+
(JNIEnv* env, jobject, jlong safeGcTs, jlong handle) {
144+
try {
145+
auto* rgVis = reinterpret_cast<RGVisibilityInstance*>(handle);
146+
std::vector<uint64_t> items = rgVis->exportChainItemsAfter(static_cast<uint64_t>(safeGcTs));
147+
jlongArray result = env->NewLongArray(items.size());
148+
env->SetLongArrayRegion(result, 0, items.size(), reinterpret_cast<const jlong*>(items.data()));
149+
return result;
150+
} catch (const std::exception& e) {
151+
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
152+
return nullptr;
153+
}
154+
}
155+
156+
/*
157+
* Class: io_pixelsdb_pixels_retina_RGVisibility
158+
* Method: importDeletionChain
159+
* Signature: ([JJ)V
160+
*/
161+
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_importDeletionChain
162+
(JNIEnv* env, jobject, jlongArray items, jlong handle) {
163+
try {
164+
auto* rgVis = reinterpret_cast<RGVisibilityInstance*>(handle);
165+
jsize len = env->GetArrayLength(items);
166+
jlong* body = env->GetLongArrayElements(items, nullptr);
167+
rgVis->importDeletionChain(reinterpret_cast<const uint64_t*>(body), len / 2);
168+
env->ReleaseLongArrayElements(items, body, JNI_ABORT);
139169
} catch (const std::exception& e) {
140170
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
141171
}
@@ -190,6 +220,5 @@ JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaTra
190220
*/
191221
JNIEXPORT jlong JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_getRetinaObjectCount
192222
(JNIEnv *env, jclass clazz) {
193-
// Read the atomic object counter from RetinaBase namespace
194223
return static_cast<jlong>(pixels::g_retina_object_count.load(std::memory_order_relaxed));
195224
}

0 commit comments

Comments
 (0)