Skip to content

Commit f164321

Browse files
committed
feat(retina)!: introduce recovery checkpoint subsystem
1 parent e8f05a9 commit f164321

22 files changed

Lines changed: 1805 additions & 1361 deletions

File tree

pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package io.pixelsdb.pixels.common.transaction;
2121

2222
import com.google.common.collect.ImmutableList;
23-
import com.google.protobuf.Empty;
2423
import io.grpc.ManagedChannel;
2524
import io.grpc.ManagedChannelBuilder;
2625
import io.pixelsdb.pixels.common.error.ErrorCode;
@@ -498,12 +497,22 @@ public boolean bindExternalTraceId(long transId, String externalTraceId) throws
498497
return true;
499498
}
500499

501-
public long getSafeGcTimestamp() throws TransException
500+
/**
501+
* Get the safe upper bound (inclusive) for folding DELETE timestamps into
502+
* the visibility base bitmap.
503+
*
504+
* @param includeRunningQueries whether the returned timestamp must remain safe for live running queries
505+
*/
506+
public long getSafeVisibilityFoldingTimestamp(boolean includeRunningQueries) throws TransException
502507
{
503-
TransProto.GetSafeGcTimestampResponse response = this.stub.getSafeGcTimestamp(Empty.getDefaultInstance());
508+
TransProto.GetSafeVisibilityFoldingTimestampRequest request =
509+
TransProto.GetSafeVisibilityFoldingTimestampRequest.newBuilder()
510+
.setIncludeRunningQueries(includeRunningQueries).build();
511+
TransProto.GetSafeVisibilityFoldingTimestampResponse response =
512+
this.stub.getSafeVisibilityFoldingTimestamp(request);
504513
if (response.getErrorCode() != ErrorCode.SUCCESS)
505514
{
506-
throw new TransException("failed to get safe garbage collection timestamp"
515+
throw new TransException("failed to get safe visibility folding timestamp, error code="
507516
+ response.getErrorCode());
508517
}
509518
return response.getTimestamp();

pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/EtcdUtil.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424
import io.etcd.jetcd.KeyValue;
2525
import io.etcd.jetcd.Watch;
2626
import io.etcd.jetcd.kv.PutResponse;
27+
import io.etcd.jetcd.kv.TxnResponse;
2728
import io.etcd.jetcd.lease.LeaseGrantResponse;
29+
import io.etcd.jetcd.op.Cmp;
30+
import io.etcd.jetcd.op.CmpTarget;
31+
import io.etcd.jetcd.op.Op;
2832
import io.etcd.jetcd.options.DeleteOption;
2933
import io.etcd.jetcd.options.GetOption;
3034
import io.etcd.jetcd.options.PutOption;
@@ -260,6 +264,33 @@ public long putKeyValueWithLeaseId(String key, String value, long leaseId)
260264
return 0L;
261265
}
262266

267+
/**
268+
* Atomic compare-and-swap put.
269+
*
270+
* @param key
271+
* @param expectedValue
272+
* @param newValue
273+
* @return true if the txn committed; false if CAS failed
274+
*/
275+
public boolean compareAndPut(String key, String expectedValue, String newValue)
276+
throws ExecutionException, InterruptedException
277+
{
278+
ByteSequence keyBs = ByteSequence.from(key, StandardCharsets.UTF_8);
279+
Cmp cmp = (expectedValue == null)
280+
? new Cmp(keyBs, Cmp.Op.EQUAL, CmpTarget.version(0L))
281+
: new Cmp(keyBs, Cmp.Op.EQUAL, CmpTarget.value(
282+
ByteSequence.from(expectedValue, StandardCharsets.UTF_8)));
283+
Op putOp = Op.put(keyBs,
284+
ByteSequence.from(newValue, StandardCharsets.UTF_8),
285+
PutOption.DEFAULT);
286+
TxnResponse resp = this.client.getKVClient().txn()
287+
.If(cmp)
288+
.Then(putOp)
289+
.commit()
290+
.get();
291+
return resp.isSucceeded();
292+
}
293+
263294
/**
264295
* delete key-value by key.
265296
*

pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929

3030
public class RetinaUtils
3131
{
32-
public static final String CHECKPOINT_PREFIX_GC = "vis_gc_";
3332
public static final String CHECKPOINT_PREFIX_OFFLOAD = "vis_offload_";
33+
public static final String CHECKPOINT_PREFIX_RECOVERY = "recovery_";
3434
public static final String CHECKPOINT_SUFFIX = ".bin";
3535

3636
private static volatile RetinaUtils instance;
@@ -132,12 +132,12 @@ public static String getCheckpointPrefix(String typePrefix, String hostname)
132132
}
133133

134134
/**
135-
* Builds the checkpoint file path from a directory, prefix, hostname and timestamp.
135+
* Builds the checkpoint file path from a directory, type prefix, hostname and identifier timestamp.
136136
*
137137
* @param checkpointDir directory where checkpoint files reside (may or may not end with '/')
138-
* @param prefix {@link #CHECKPOINT_PREFIX_GC} or {@link #CHECKPOINT_PREFIX_OFFLOAD}
138+
* @param prefix {@link #CHECKPOINT_PREFIX_OFFLOAD} or {@link #CHECKPOINT_PREFIX_RECOVERY}
139139
* @param hostname the retina host name
140-
* @param timestamp the GC or offload timestamp
140+
* @param timestamp the checkpoint identifier timestamp (offload ts for offload, applied ts for recovery)
141141
*/
142142
public static String buildCheckpointPath(String checkpointDir, String prefix, String hostname, long timestamp)
143143
{

pixels-common/src/main/resources/pixels.properties

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,6 @@ retina.buffer.flush.count=20
291291
retina.buffer.flush.interval=30
292292
# interval in seconds for retina visibility garbage
293293
retina.gc.interval=300
294-
# number of threads for retina checkpoint
295-
retina.checkpoint.threads=4
296294
# retina buffer reader prefetch threads num
297295
retina.reader.prefetch.threads=8
298296
# retina service init threads num
@@ -305,8 +303,12 @@ retina.upsert-mode.enabled=false
305303
pixels.transaction.offload.threshold=1800
306304
# lease duration for retina offload cache in seconds, default 600s
307305
retina.offload.cache.lease.duration=600
308-
# snapshot storage directory
309-
retina.checkpoint.dir=file:///tmp/pixels-checkpoints
306+
# number of threads for offload checkpoint writers
307+
retina.offload.checkpoint.threads=4
308+
# storage URI for long-running query offload visibility snapshots; cleared on Retina startup
309+
retina.offload.checkpoint.dir=file:///tmp/pixels-offload-checkpoints
310+
# storage URI for recovery checkpoint body objects (one body per node per round)
311+
retina.recovery.checkpoint.dir=file:///tmp/pixels-recovery-checkpoints
310312
# set to true to enable storage GC (rewrites high-deletion-ratio files to reclaim space)
311313
retina.storage.gc.enabled=false
312314
# invalidRatio must be strictly greater than this value for a file to be a GC candidate

pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class TestVisibilityCheckpointCache
4646
@Before
4747
public void setUp() throws IOException
4848
{
49-
testCheckpointDir = ConfigFactory.Instance().getProperty("retina.checkpoint.dir");
49+
testCheckpointDir = ConfigFactory.Instance().getProperty("retina.offload.checkpoint.dir");
5050
storage = StorageFactory.Instance().getStorage(testCheckpointDir);
5151

5252
if (!storage.exists(testCheckpointDir))
@@ -86,7 +86,7 @@ private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, lo
8686
public void testCacheLoading() throws Exception
8787
{
8888
long timestamp = 1000L;
89-
String checkpointPath = resolve(testCheckpointDir, "vis_gc_tencent_100.bin");
89+
String checkpointPath = resolve(testCheckpointDir, "vis_offload_tencent_100.bin");
9090
long[] dummyBitmap = new long[]{0x1L, 0x2L};
9191
createDummyCheckpoint(checkpointPath, 1, 1, dummyBitmap);
9292

pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public RetinaServerImpl()
114114
private void initializeRetinaResources() throws Exception
115115
{
116116
logger.info("Pre-loading checkpoints...");
117-
this.retinaResourceManager.recoverCheckpoints();
117+
this.retinaResourceManager.recoverOffloadCheckpoints();
118118

119119
List<Schema> schemas = this.metadataService.getSchemas();
120120
for (Schema schema : schemas)
@@ -825,7 +825,7 @@ public void queryVisibility(RetinaProto.QueryVisibilityRequest request,
825825
.newBuilder()
826826
.setHeader(headerBuilder.build());
827827

828-
String checkpointPath = this.retinaResourceManager.getCheckpointPath(timestamp);
828+
String checkpointPath = this.retinaResourceManager.getOffloadCheckpointPath(timestamp);
829829
if (checkpointPath != null)
830830
{
831831
responseBuilder.setCheckpointPath(checkpointPath);

pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -629,14 +629,18 @@ public void dumpTrans(TransProto.DumpTransRequest request,
629629
}
630630

631631
@Override
632-
public void getSafeGcTimestamp(com.google.protobuf.Empty request,
633-
StreamObserver<TransProto.GetSafeGcTimestampResponse> responseObserver)
632+
public void getSafeVisibilityFoldingTimestamp(TransProto.GetSafeVisibilityFoldingTimestampRequest request,
633+
StreamObserver<TransProto.GetSafeVisibilityFoldingTimestampResponse> responseObserver)
634634
{
635-
long safeTs = Math.max(0, lowWatermark.get() - 1);
636-
TransProto.GetSafeGcTimestampResponse response = TransProto.GetSafeGcTimestampResponse.newBuilder()
637-
.setErrorCode(ErrorCode.SUCCESS)
638-
.setTimestamp(safeTs)
639-
.build();
635+
long writerSafeTs = Math.max(0, highWatermark.get() - 1);
636+
long safeTs = request.getIncludeRunningQueries()
637+
? Math.min(lowWatermark.get(), writerSafeTs)
638+
: writerSafeTs;
639+
TransProto.GetSafeVisibilityFoldingTimestampResponse response =
640+
TransProto.GetSafeVisibilityFoldingTimestampResponse.newBuilder()
641+
.setErrorCode(ErrorCode.SUCCESS)
642+
.setTimestamp(safeTs)
643+
.build();
640644
responseObserver.onNext(response);
641645
responseObserver.onCompleted();
642646
}

pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/retina/TestRetinaServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public void testRetinaServerImplInitializationFailureIsFailClosed() throws Excep
9898
assertTrue(e.getMessage().contains("Failed to initialize RetinaServerImpl"));
9999
}
100100

101-
verify(resourceManager).recoverCheckpoints();
101+
verify(resourceManager).recoverOffloadCheckpoints();
102102
verify(resourceManager, never()).startBackgroundGc();
103103
}
104104

@@ -137,7 +137,7 @@ public void testRetinaServerImplStartsBackgroundGcAfterSuccessfulInitialization(
137137
doAnswer(invocation -> {
138138
lifecycleEvents.add("recover");
139139
return null;
140-
}).when(resourceManager).recoverCheckpoints();
140+
}).when(resourceManager).recoverOffloadCheckpoints();
141141
doAnswer(invocation -> {
142142
lifecycleEvents.add("visibility:" + invocation.getArgument(0));
143143
return null;
@@ -188,7 +188,7 @@ public void testRetinaServerImplBackgroundGcStartFailureIsFailClosed() throws Ex
188188
}
189189

190190
InOrder inOrder = inOrder(resourceManager);
191-
inOrder.verify(resourceManager).recoverCheckpoints();
191+
inOrder.verify(resourceManager).recoverOffloadCheckpoints();
192192
inOrder.verify(resourceManager).startBackgroundGc();
193193
}
194194

pixels-retina/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
<dependency>
8989
<groupId>io.etcd</groupId>
9090
<artifactId>jetcd-core</artifactId>
91-
<scope>test</scope>
9291
</dependency>
9392

9493
<dependency>

0 commit comments

Comments
 (0)