Skip to content

Commit 9392d87

Browse files
author
birchkwok
committed
Enhance file saving mechanism in BinaryMetadataStore
- Implemented unique temporary file naming to prevent conflicts in multi-threaded environments, using process ID, thread ID, and timestamp. - Added atomic file renaming with error handling to ensure temporary files are cleaned up on failure. - Introduced a retry mechanism in sync_to_disk to handle potential file access conflicts, improving reliability during concurrent operations.
1 parent f6f0f0b commit 9392d87

1 file changed

Lines changed: 59 additions & 8 deletions

File tree

src/storage/binary_metadata.rs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,28 @@ impl BinaryMetadataStore {
459459
}
460460

461461
pub fn save(&self, path: &Path) -> NpkResult<()> {
462-
let temp_path = path.with_extension("tmp");
462+
// 生成唯一的临时文件名,避免多线程冲突
463+
// 格式:metadata.npkm.tmp.{pid}_{tid}_{timestamp}
464+
use std::time::{SystemTime, UNIX_EPOCH};
463465

466+
let pid = std::process::id();
467+
let tid = std::thread::current().id();
468+
let tid_str = format!("{:?}", tid).replace("ThreadId(", "").replace(")", "");
469+
let timestamp = SystemTime::now()
470+
.duration_since(UNIX_EPOCH)
471+
.unwrap()
472+
.as_nanos();
473+
474+
let temp_filename = format!(
475+
"{}.tmp.{}_{}_{}",
476+
path.file_name().unwrap().to_string_lossy(),
477+
pid,
478+
tid_str,
479+
timestamp
480+
);
481+
let temp_path = path.with_file_name(temp_filename);
482+
483+
// 写入临时文件
464484
{
465485
let file = File::create(&temp_path)?;
466486
let mut writer = BufWriter::new(file);
@@ -483,10 +503,19 @@ impl BinaryMetadataStore {
483503
}
484504

485505
writer.flush()?;
506+
// 确保文件写入完成
507+
drop(writer);
486508
}
487509

488-
std::fs::rename(temp_path, path)?;
489-
Ok(())
510+
// 原子性重命名(如果失败,清理临时文件)
511+
match std::fs::rename(&temp_path, path) {
512+
Ok(_) => Ok(()),
513+
Err(e) => {
514+
// 清理临时文件
515+
let _ = std::fs::remove_file(&temp_path);
516+
Err(NpkError::IoError(e))
517+
}
518+
}
490519
}
491520

492521
pub fn add_array(&mut self, meta: BinaryArrayMetadata) {
@@ -553,12 +582,34 @@ impl BinaryCachedStore {
553582
}
554583

555584
fn sync_to_disk(&self) -> NpkResult<()> {
556-
let store = self.store.read().unwrap();
557-
store.save(&self.path)?;
585+
// 多线程环境下可能出现临时的文件访问冲突,添加重试机制
586+
const MAX_RETRIES: usize = 3;
587+
const RETRY_DELAY_MS: u64 = 10;
558588

559-
let mut last_sync = self.last_sync.lock().unwrap();
560-
*last_sync = SystemTime::now();
561-
Ok(())
589+
let mut last_error = None;
590+
for attempt in 0..MAX_RETRIES {
591+
let store = self.store.read().unwrap();
592+
match store.save(&self.path) {
593+
Ok(_) => {
594+
drop(store);
595+
let mut last_sync = self.last_sync.lock().unwrap();
596+
*last_sync = SystemTime::now();
597+
return Ok(());
598+
}
599+
Err(e) => {
600+
last_error = Some(e);
601+
drop(store);
602+
603+
// 最后一次尝试不需要等待
604+
if attempt < MAX_RETRIES - 1 {
605+
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
606+
}
607+
}
608+
}
609+
}
610+
611+
// 所有重试都失败,返回最后一个错误
612+
Err(last_error.unwrap())
562613
}
563614

564615
pub fn add_array(&self, meta: BinaryArrayMetadata) -> NpkResult<()> {

0 commit comments

Comments
 (0)