Skip to content

Commit e3a4ed2

Browse files
committed
feat: optimize Retina visibility replay delete modes
1 parent c0dd7ea commit e3a4ed2

9 files changed

Lines changed: 398 additions & 12 deletions

File tree

cpp/pixels-retina/include/RGVisibility.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
3131
const std::vector<uint64_t>* initialBitmap = nullptr);
3232
~RGVisibility() override;
3333

34-
void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
34+
void deleteRGRecord(uint32_t rowId, uint64_t timestamp,
35+
ReplayMode replayMode = ReplayMode::NORMAL);
3536
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);
3637

3738
std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);

cpp/pixels-retina/include/RGVisibilityJni.h

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cpp/pixels-retina/include/TileVisibility.h

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,22 @@ inline uint64_t extractTimestamp(uint64_t raw) {
4848
return (raw & 0x0000FFFFFFFFFFFFULL);
4949
}
5050

51+
/**
52+
* Controls how DELETE replay interacts with the compacted base bitmap.
53+
*
54+
* NORMAL is the live append path: the caller provides a current delete
55+
* timestamp and the record is appended to the chain. VERSIONED is used when
56+
* replay may race with READY readers; historical deletes publish a new
57+
* VersionedData with a folded baseBitmap. EXCLUSIVE is used only while recovery
58+
* blocks readers and GC; historical deletes may update baseBitmap in place, but
59+
* concurrent recovery writers still need tile-level synchronization.
60+
*/
61+
enum class ReplayMode : uint8_t {
62+
NORMAL = 0,
63+
VERSIONED = 1,
64+
EXCLUSIVE = 2
65+
};
66+
5167
struct DeleteIndexBlock : public pixels::RetinaBase<DeleteIndexBlock> {
5268
static constexpr size_t BLOCK_CAPACITY = 8;
5369
uint64_t items[BLOCK_CAPACITY] = {0};
@@ -96,7 +112,7 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
96112
// timestamp defaults to 0; bitmap defaults to all-zeros.
97113
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
98114
~TileVisibility() override;
99-
void deleteTileRecord(uint16_t rowId, uint64_t ts);
115+
void deleteTileRecord(uint16_t rowId, uint64_t ts, ReplayMode replayMode = ReplayMode::NORMAL);
100116
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
101117
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);
102118
void exportChainItemsAfter(uint32_t tileId, uint64_t safeGcTs,
@@ -109,6 +125,14 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
109125

110126
void reclaimRetiredVersions();
111127

128+
void appendDeleteChain(uint16_t rowId, uint64_t ts);
129+
130+
// VERSIONED: replay with possible readers; historical deletes use COW fold.
131+
void deleteTileRecordVersioned(uint16_t rowId, uint64_t ts);
132+
133+
// EXCLUSIVE: recovery replay without readers; historical deletes fold in place.
134+
void deleteTileRecordExclusive(uint16_t rowId, uint64_t ts);
135+
112136
std::atomic<VersionedData<CAPACITY>*> currentVersion;
113137
std::atomic<DeleteIndexBlock *> tail;
114138
std::atomic<size_t> tailUsed;

cpp/pixels-retina/lib/RGVisibility.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ TileVisibility<CAPACITY>* RGVisibility<CAPACITY>::getTileVisibility(uint32_t row
7070
}
7171

7272
template<size_t CAPACITY>
73-
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp) {
73+
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp,
74+
ReplayMode replayMode) {
7475
TileVisibility<CAPACITY>* tileVisibility = getTileVisibility(rowId);
75-
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp);
76+
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp, replayMode);
7677
}
7778

7879
template<size_t CAPACITY>

cpp/pixels-retina/lib/RGVisibilityJni.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,17 @@
2323
#include "RGVisibility.h"
2424
#include <stdexcept>
2525

26+
namespace {
27+
ReplayMode toReplayMode(jint mode) {
28+
switch (mode) {
29+
case 0: return ReplayMode::NORMAL;
30+
case 1: return ReplayMode::VERSIONED;
31+
case 2: return ReplayMode::EXCLUSIVE;
32+
default: throw std::invalid_argument("unknown ReplayMode");
33+
}
34+
}
35+
}
36+
2637
/*
2738
* Class: io_pixelsdb_pixels_retina_RGVisibility
2839
* Method: createNativeObject
@@ -72,13 +83,13 @@ JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_destroyNative
7283
/*
7384
* Class: io_pixelsdb_pixels_retina_RGVisibility
7485
* Method: deleteRecord
75-
* Signature: (JJJ)V
86+
* Signature: (IJJI)V
7687
*/
7788
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_deleteRecord
78-
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle) {
89+
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle, jint replayMode) {
7990
try {
8091
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
81-
rgVisibility->deleteRGRecord(rowId, timestamp);
92+
rgVisibility->deleteRGRecord(rowId, timestamp, toReplayMode(replayMode));
8293
} catch (const std::exception& e) {
8394
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
8495
}

cpp/pixels-retina/lib/TileVisibility.cpp

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,71 @@ TileVisibility<CAPACITY>::~TileVisibility() {
6868
}
6969

7070
template<size_t CAPACITY>
71-
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts) {
71+
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts,
72+
ReplayMode replayMode) {
73+
switch (replayMode) {
74+
case ReplayMode::NORMAL:
75+
appendDeleteChain(rowId, ts);
76+
return;
77+
case ReplayMode::VERSIONED:
78+
deleteTileRecordVersioned(rowId, ts);
79+
return;
80+
case ReplayMode::EXCLUSIVE:
81+
deleteTileRecordExclusive(rowId, ts);
82+
return;
83+
default:
84+
throw std::invalid_argument("unknown ReplayMode");
85+
}
86+
}
87+
88+
template<size_t CAPACITY>
89+
void TileVisibility<CAPACITY>::deleteTileRecordVersioned(uint16_t rowId, uint64_t ts) {
90+
// READY backlog replay can race with getTileVisibilityBitmap readers. Fold
91+
// historical deletes by publishing a new VersionedData instead of mutating
92+
// baseBitmap observed by an existing reader.
93+
// Keep ts=0 out of this path because item=0 is the chain-slot sentinel.
94+
while (ts > 0) {
95+
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
96+
if (ts > cur->baseTimestamp) {
97+
break;
98+
}
99+
if ((cur->baseBitmap[rowId / 64] & (1ULL << (rowId % 64))) != 0) {
100+
return;
101+
}
102+
uint64_t newBaseBitmap[NUM_WORDS];
103+
std::memcpy(newBaseBitmap, cur->baseBitmap, NUM_WORDS * sizeof(uint64_t));
104+
SET_BITMAP_BIT(newBaseBitmap, rowId);
105+
VersionedData<CAPACITY>* newVer =
106+
new VersionedData<CAPACITY>(cur->baseTimestamp, newBaseBitmap, cur->head);
107+
if (currentVersion.compare_exchange_strong(cur, newVer, std::memory_order_acq_rel)) {
108+
pendingRetire.store(cur, std::memory_order_release);
109+
return;
110+
}
111+
delete newVer;
112+
}
113+
114+
appendDeleteChain(rowId, ts);
115+
}
116+
117+
template<size_t CAPACITY>
118+
void TileVisibility<CAPACITY>::deleteTileRecordExclusive(uint16_t rowId, uint64_t ts) {
119+
// RECOVERING replay blocks readers and GC, so historical deletes can fold
120+
// into baseBitmap in place. Atomic OR prevents lost updates when concurrent
121+
// recovery writers touch the same bitmap word.
122+
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
123+
if (ts > 0 && ts <= cur->baseTimestamp) {
124+
uint64_t mask = 1ULL << (rowId % 64);
125+
__atomic_fetch_or(&cur->baseBitmap[rowId / 64], mask, __ATOMIC_RELAXED);
126+
return;
127+
}
128+
129+
appendDeleteChain(rowId, ts);
130+
}
131+
132+
template<size_t CAPACITY>
133+
void TileVisibility<CAPACITY>::appendDeleteChain(uint16_t rowId, uint64_t ts) {
134+
// Normal live apply assumes a current timestamp and records the delete in
135+
// the append-only chain, leaving baseBitmap untouched for the hot path.
72136
uint64_t item = makeDeleteIndex(rowId, ts);
73137
while (true) {
74138
DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire);

cpp/pixels-retina/test/RGVisibilityTest.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,50 @@ class RGVisibilityTest : public ::testing::Test {
4949
RGVisibilityInstance* rgVisibility;
5050
};
5151

52+
static bool rgBitSet(const uint64_t* bitmap, uint32_t rowId) {
53+
return ((bitmap[rowId / 64] >> (rowId % 64)) & 1ULL) != 0;
54+
}
55+
56+
static void runConcurrentRGDeletes(RGVisibilityInstance* visibility,
57+
ReplayMode mode,
58+
uint64_t ts,
59+
int rowCount = 64,
60+
int threadCount = 8) {
61+
ASSERT_EQ(rowCount % threadCount, 0);
62+
std::atomic<bool> start{false};
63+
std::vector<std::thread> threads;
64+
int rowsPerThread = rowCount / threadCount;
65+
66+
for (int t = 0; t < threadCount; t++) {
67+
threads.emplace_back([&, t]() {
68+
while (!start.load(std::memory_order_acquire)) {
69+
std::this_thread::yield();
70+
}
71+
for (int i = 0; i < rowsPerThread; i++) {
72+
uint32_t rowId = static_cast<uint32_t>(t * rowsPerThread + i);
73+
visibility->deleteRGRecord(rowId, ts, mode);
74+
}
75+
});
76+
}
77+
78+
start.store(true, std::memory_order_release);
79+
for (auto& thread : threads) {
80+
thread.join();
81+
}
82+
}
83+
84+
static void expectRGRows(RGVisibilityInstance* visibility,
85+
uint64_t queryTs,
86+
int rowCount,
87+
bool expectedSet) {
88+
uint64_t* bitmap = visibility->getRGVisibilityBitmap(queryTs);
89+
for (int row = 0; row < rowCount; row++) {
90+
EXPECT_EQ(expectedSet, rgBitSet(bitmap, static_cast<uint32_t>(row)))
91+
<< "row=" << row << " queryTs=" << queryTs;
92+
}
93+
delete[] bitmap;
94+
}
95+
5296
TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
5397
uint64_t timestamp1 = 100;
5498
uint64_t timestamp2 = 200;
@@ -67,6 +111,34 @@ TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
67111
delete[] bitmap2;
68112
}
69113

114+
TEST_F(RGVisibilityTest, ConcurrentNormalModeAppendsDeleteChain) {
115+
constexpr uint64_t baseTs = 100;
116+
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);
117+
118+
runConcurrentRGDeletes(&visibility, ReplayMode::NORMAL, baseTs + 1);
119+
120+
expectRGRows(&visibility, baseTs, 64, false);
121+
expectRGRows(&visibility, baseTs + 1, 64, true);
122+
}
123+
124+
TEST_F(RGVisibilityTest, ConcurrentVersionedModeFoldsWithCow) {
125+
constexpr uint64_t baseTs = 100;
126+
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);
127+
128+
runConcurrentRGDeletes(&visibility, ReplayMode::VERSIONED, baseTs - 1);
129+
130+
expectRGRows(&visibility, baseTs, 64, true);
131+
}
132+
133+
TEST_F(RGVisibilityTest, ConcurrentExclusiveModeFoldsWithAtomicOr) {
134+
constexpr uint64_t baseTs = 100;
135+
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);
136+
137+
runConcurrentRGDeletes(&visibility, ReplayMode::EXCLUSIVE, baseTs - 1);
138+
139+
expectRGRows(&visibility, baseTs, 64, true);
140+
}
141+
70142
TEST_F(RGVisibilityTest, MultiThread) {
71143
struct DeleteRecord {
72144
uint64_t timestamp;

0 commit comments

Comments
 (0)