Skip to content

Commit d4fffcb

Browse files
committed
Thread optional backup retry queue through TierStore
Replace TierStoreInner's raw backup DynStore with BackupStore, which holds the backup store plus an optional BackupRetryQueue. Update write/remove backup result handling to accept PendingBackupOp and enqueue failed backup operations when a retry queue is present; otherwise only log the backup failure. This makes the configured backup semantics explicit: best-effort mode logs backup failures, while semisync mode requires durably recording failed backup intents for later retry. Adjust set_backup_store, builder wiring, and docs for the new backup configuration shape.
1 parent 49bc062 commit d4fffcb

2 files changed

Lines changed: 68 additions & 24 deletions

File tree

src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ impl NodeBuilder {
854854
let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger));
855855
if let Some(config) = ts_config {
856856
config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s)));
857-
config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s)));
857+
config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s), None));
858858
}
859859

860860
let seed_bytes = node_entropy.to_seed_bytes();

src/io/tier_store.rs

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,30 @@ impl TierStore {
5959
/// Configures a backup store for primary-backed data.
6060
///
6161
/// Once set, writes and removals targeting the primary tier are issued to the
62-
/// primary and backup stores concurrently. Success semantics depend on the
63-
/// configured [`BackupMode`].
62+
/// primary and backup stores concurrently.
63+
///
64+
/// If `retry_queue` is `None`, backup failures are logged and otherwise ignored
65+
/// as long as the primary store succeeds.
66+
///
67+
/// If `retry_queue` is `Some`, backup failures are treated with durable
68+
/// semi-synchronous semantics: the operation succeeds only if the failed backup
69+
/// intent can be persisted locally and enqueued for asynchronous retry.
6470
///
6571
/// Note: dual-store writes/removals are not atomic. An error may be returned
6672
/// after the primary store has already been updated if the requested backup
6773
/// guarantee could not be achieved.
6874
///
6975
/// The backup store is not consulted for normal reads or lists.
70-
pub fn set_backup_store(&mut self, backup: Arc<DynStore>) {
76+
pub fn set_backup_store(
77+
&mut self, backup: Arc<DynStore>, retry_queue: Option<Arc<BackupRetryQueue<Arc<Logger>>>>,
78+
) {
7179
debug_assert_eq!(Arc::strong_count(&self.inner), 1);
7280

7381
let inner = Arc::get_mut(&mut self.inner).expect(
7482
"TierStore should not be shared during configuration. No other references should exist",
7583
);
7684

77-
inner.backup_store = Some(backup);
85+
inner.backup_store = Some(BackupStore { store: backup, retry_queue });
7886
}
7987

8088
/// Configures the ephemeral store for non-critical, rebuildable data.
@@ -180,13 +188,20 @@ impl KVStoreSync for TierStore {
180188
}
181189
}
182190

191+
struct BackupStore {
192+
/// Store may be remote.
193+
store: Arc<DynStore>,
194+
/// Present only when backup failures should be durably queued for retry.
195+
retry_queue: Option<Arc<BackupRetryQueue<Arc<Logger>>>>,
196+
}
197+
183198
struct TierStoreInner {
184199
/// The authoritative store for durable data.
185200
primary_store: Arc<DynStore>,
186201
/// The store used for non-critical, rebuildable cached data.
187202
ephemeral_store: Option<Arc<DynStore>>,
188-
/// An optional second durable store for primary-backed data.
189-
backup_store: Option<Arc<DynStore>>,
203+
/// Optional backup configuration for primary-backed data.
204+
backup_store: Option<BackupStore>,
190205
logger: Arc<Logger>,
191206
}
192207

@@ -300,11 +315,11 @@ impl TierStoreInner {
300315

301316
if let Some(backup_store) = self.backup_store.as_ref() {
302317
let backup_fut = KVStore::write(
303-
backup_store.as_ref(),
318+
backup_store.store.as_ref(),
304319
primary_namespace,
305320
secondary_namespace,
306321
key,
307-
buf,
322+
buf.clone(),
308323
);
309324

310325
let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut);
@@ -314,6 +329,7 @@ impl TierStoreInner {
314329
primary_namespace,
315330
secondary_namespace,
316331
key,
332+
PendingBackupOp::Write { buf },
317333
primary_res,
318334
backup_res,
319335
)
@@ -334,18 +350,19 @@ impl TierStoreInner {
334350
buf.clone(),
335351
);
336352
let backup_res = KVStoreSync::write(
337-
backup_store.as_ref(),
353+
backup_store.store.as_ref(),
338354
primary_namespace,
339355
secondary_namespace,
340356
key,
341-
buf,
357+
buf.clone(),
342358
);
343359

344360
self.handle_primary_backup_results(
345361
"write",
346362
primary_namespace,
347363
secondary_namespace,
348364
key,
365+
PendingBackupOp::Write { buf },
349366
primary_res,
350367
backup_res,
351368
)
@@ -373,7 +390,7 @@ impl TierStoreInner {
373390

374391
if let Some(backup_store) = self.backup_store.as_ref() {
375392
let backup_fut = KVStore::remove(
376-
backup_store.as_ref(),
393+
backup_store.store.as_ref(),
377394
primary_namespace,
378395
secondary_namespace,
379396
key,
@@ -387,6 +404,7 @@ impl TierStoreInner {
387404
primary_namespace,
388405
secondary_namespace,
389406
key,
407+
PendingBackupOp::Remove { lazy },
390408
primary_res,
391409
backup_res,
392410
)
@@ -407,7 +425,7 @@ impl TierStoreInner {
407425
lazy,
408426
);
409427
let backup_res = KVStoreSync::remove(
410-
backup_store.as_ref(),
428+
backup_store.store.as_ref(),
411429
primary_namespace,
412430
secondary_namespace,
413431
key,
@@ -419,6 +437,7 @@ impl TierStoreInner {
419437
primary_namespace,
420438
secondary_namespace,
421439
key,
440+
PendingBackupOp::Remove { lazy },
422441
primary_res,
423442
backup_res,
424443
)
@@ -665,29 +684,54 @@ impl TierStoreInner {
665684
}
666685

667686
fn handle_primary_backup_results(
668-
&self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str,
669-
primary_res: io::Result<()>, backup_res: io::Result<()>,
687+
&self, op_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str,
688+
pending_backup_op: PendingBackupOp, primary_res: io::Result<()>,
689+
backup_res: io::Result<()>,
670690
) -> io::Result<()> {
671-
match (primary_res, backup_res) {
672-
(Ok(()), Ok(())) => Ok(()),
673-
(Err(primary_err), Ok(())) => Err(primary_err),
674-
(Ok(()), Err(backup_err)) => {
691+
match (
692+
primary_res,
693+
backup_res,
694+
self.backup_store.as_ref().and_then(|b| b.retry_queue.as_ref()),
695+
) {
696+
(Ok(()), Ok(()), _) => Ok(()),
697+
(Err(primary_err), Ok(()), _) => Err(primary_err),
698+
(Ok(()), Err(backup_err), None) => {
675699
log_error!(
676700
self.logger,
677701
"Backup {} failed for key {}/{}/{}: {}",
678-
op,
702+
op_name,
703+
primary_namespace,
704+
secondary_namespace,
705+
key,
706+
backup_err
707+
);
708+
Ok(())
709+
},
710+
(Ok(()), Err(backup_err), Some(retry_queue)) => {
711+
retry_queue.enqueue_sync(
712+
(
713+
primary_namespace.to_string(),
714+
secondary_namespace.to_string(),
715+
key.to_string(),
716+
),
717+
pending_backup_op,
718+
)?;
719+
log_error!(
720+
self.logger,
721+
"Backup {} failed for key {}/{}/{}: {}. Operation was durably queued for retry.",
722+
op_name,
679723
primary_namespace,
680724
secondary_namespace,
681725
key,
682726
backup_err
683727
);
684728
Ok(())
685729
},
686-
(Err(primary_err), Err(backup_err)) => {
730+
(Err(primary_err), Err(backup_err), _) => {
687731
log_error!(
688732
self.logger,
689733
"Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}",
690-
op,
734+
op_name,
691735
primary_namespace,
692736
secondary_namespace,
693737
key,
@@ -1260,7 +1304,7 @@ mod tests {
12601304

12611305
let backup_store: Arc<DynStore> =
12621306
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup"))));
1263-
tier.set_backup_store(Arc::clone(&backup_store));
1307+
tier.set_backup_store(Arc::clone(&backup_store), None);
12641308

12651309
let data = vec![42u8; 32];
12661310

@@ -1304,7 +1348,7 @@ mod tests {
13041348

13051349
let backup_store: Arc<DynStore> =
13061350
Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup"))));
1307-
tier.set_backup_store(Arc::clone(&backup_store));
1351+
tier.set_backup_store(Arc::clone(&backup_store), None);
13081352

13091353
let data = vec![42u8; 32];
13101354
let key = CHANNEL_MANAGER_PERSISTENCE_KEY;

0 commit comments

Comments
 (0)