Skip to content

Commit 444ebdd

Browse files
author
birchkwok
committed
refactor(storage): optimize concurrent metadata sync and remove lock contention
Simplify binary metadata synchronization by: - Remove retry mechanism in sync_to_disk for cleaner error handling - Minimize lock scope by cloning store before I/O operations - Add Clone derive to BinaryMetadataStore for easier snapshotting - Remove implicit sync in constructor to avoid I/O contention with multiple instances Also bump version to 0.5.2 and update documentation with expanded API reference, concurrent access examples, and performance tips.
1 parent 4d96341 commit 444ebdd

6 files changed

Lines changed: 154 additions & 81 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "numpack"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
edition = "2021"
55
description = "A high-performance array storage and manipulation library"
66
authors = ["NumPack Contributors"]
@@ -32,7 +32,7 @@ tempfile = "3.8.1"
3232
serde = { version = "1.0", features = ["derive"] }
3333
serde_json = "1.0"
3434
rayon = { version = "1.8", optional = true }
35-
memmap2 = "0.5.10"
35+
memmap2 = "0.5.20"
3636
half = { version = "2.6.0", features = ["bytemuck"] }
3737
bitvec = "1.0.1"
3838
bincode = "1.3"

README.md

Lines changed: 139 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ Add to your `Cargo.toml`:
3636
```toml
3737
[dependencies]
3838
numpack = "0.5.1"
39+
ndarray = "0.16"
3940
```
4041

4142
**Features:**
4243
- `rayon` (default) - Parallel processing support
43-
- `avx512` - AVX-512 SIMD optimizations
44+
- `avx512` - AVX-512 SIMD optimizations
4445
- `io-uring-support` - io_uring on Linux
4546

4647
**Requirements:** Rust ≥ 1.70.0
@@ -57,69 +58,161 @@ maturin develop # or: maturin build --release
5758
```
5859
</details>
5960

60-
## Quick Start
61+
#### Basic Usage
6162

62-
### Python
63-
64-
```python
65-
import numpy as np
66-
from numpack import NumPack
63+
```rust
64+
use numpack::prelude::*;
65+
use ndarray::{ArrayD, Array2, array};
66+
use std::path::Path;
6767

68-
with NumPack("data.npk") as npk:
69-
# Save
70-
npk.save({'embeddings': np.random.rand(10000, 128).astype(np.float32)})
68+
fn main() -> NpkResult<()> {
69+
// Create or open a NumPack storage
70+
let io = ParallelIO::new(PathBuf::from("data.npk"))?;
7171

72-
# Load (normal or lazy)
73-
data = npk.load("embeddings")
74-
lazy = npk.load("embeddings", lazy=True)
72+
// Save arrays with explicit dtype
73+
let data: Array2<f32> = Array2::from_shape_fn((1000, 128), |_| rand::random());
74+
io.save_arrays(&[("embeddings".to_string(), data.into_dyn(), DataType::Float32)])?;
7575

76-
# Modify
77-
npk.replace({'embeddings': new_rows}, indices=[0, 1, 2])
78-
npk.append({'embeddings': more_rows})
79-
npk.drop('embeddings', [0, 1, 2]) # drop rows
76+
// Metadata is written on drop, or call sync_metadata() explicitly
77+
io.sync_metadata()?;
8078

81-
# Random access
82-
subset = npk.getitem('embeddings', [100, 200, 300])
79+
Ok(())
80+
}
8381
```
8482

85-
### Rust
83+
#### API Reference
84+
85+
**Storage Operations:**
86+
87+
| Method | Description | Example |
88+
|--------|-------------|---------|
89+
| `ParallelIO::new(path)` | Create/open storage | `ParallelIO::new(PathBuf::from("data"))?` |
90+
| `save_arrays(&[(name, array, dtype)])` | Save multiple arrays | See below |
91+
| `sync_metadata()` | Persist metadata to disk | `io.sync_metadata()?` |
92+
| `reset()` | Delete all arrays | `io.reset()?` |
93+
94+
**Array Operations:**
8695

8796
```rust
88-
use numpack::{ParallelIO, DataType};
89-
use ndarray::{ArrayD, Array2};
90-
use std::path::Path;
97+
// Check if array exists
98+
if io.has_array("embeddings") {
99+
println!("Array exists!");
100+
}
91101

92-
fn main() -> Result<(), Box<dyn std::error::Error>> {
93-
// Create or open a NumPack storage
94-
let npk = ParallelIO::new(Path::new("data.npk"));
95-
96-
// Save arrays
97-
let data: Array2<f32> = Array2::from_shape_fn((1000, 128), |_| rand::random());
98-
npk.save_arrays(&[("embeddings", data.view().into_dyn())])?;
99-
100-
// Load arrays
101-
let loaded: ArrayD<f32> = npk.load_array("embeddings")?;
102+
// List all arrays
103+
let names = io.list_arrays();
104+
println!("Arrays: {:?}", names);
105+
106+
// Get array metadata
107+
let meta = io.get_array_metadata("embeddings")?;
108+
println!("Shape: {:?}, dtype: {:?}", meta.shape, meta.get_dtype());
109+
110+
// Read specific rows (returns raw bytes)
111+
let row_data = io.read_rows("embeddings", &[0, 10, 20])?;
112+
113+
// Replace rows in-place (fastest for existing arrays)
114+
let new_rows = Array2::<f32>::zeros((3, 128)).into_dyn();
115+
io.replace_rows("embeddings", &new_rows, &[0, 1, 2])?;
116+
117+
// Logical delete (mark rows as deleted)
118+
io.drop_arrays("embeddings", Some(&[5, 6, 7]))?;
119+
120+
// Physical compact (remove deleted rows, reclaim space)
121+
io.compact_array("embeddings")?;
122+
123+
// Delete entire array
124+
io.drop_arrays("embeddings", None)?;
125+
```
126+
127+
**Data Type Mapping:**
128+
129+
| NumPack Type | Rust Type | Size |
130+
|--------------|-----------|------|
131+
| `DataType::Bool` | `bool` | 1 byte |
132+
| `DataType::Int8` | `i8` | 1 byte |
133+
| `DataType::Int16` | `i16` | 2 bytes |
134+
| `DataType::Int32` | `i32` | 4 bytes |
135+
| `DataType::Int64` | `i64` | 8 bytes |
136+
| `DataType::Uint8` | `u8` | 1 byte |
137+
| `DataType::Uint16` | `u16` | 2 bytes |
138+
| `DataType::Uint32` | `u32` | 4 bytes |
139+
| `DataType::Uint64` | `u64` | 8 bytes |
140+
| `DataType::Float16` | `half::f16` | 2 bytes |
141+
| `DataType::Float32` | `f32` | 4 bytes |
142+
| `DataType::Float64` | `f64` | 8 bytes |
143+
| `DataType::Complex64` | `num_complex::Complex32` | 8 bytes |
144+
| `DataType::Complex128` | `num_complex::Complex64` | 16 bytes |
145+
146+
#### Concurrent Access
147+
148+
Multiple threads can safely write to the same storage concurrently (since v0.5.1+):
149+
150+
```rust
151+
use std::thread;
152+
153+
fn concurrent_write() -> NpkResult<()> {
154+
let dir = "/tmp/numpack_data";
155+
std::fs::create_dir_all(dir)?;
102156

103-
// Random access - get specific rows
104-
let rows = npk.get_rows("embeddings", &[0, 10, 20])?;
157+
let handles: Vec<_> = (0..10)
158+
.map(|i| {
159+
let dir = dir.to_string();
160+
thread::spawn(move || {
161+
let io = ParallelIO::new(PathBuf::from(dir))?;
162+
let data = Array2::<f32>::ones((100, 128)).into_dyn();
163+
io.save_arrays(&[(format!("chunk_{}", i), data, DataType::Float32)])?;
164+
io.sync_metadata()?; // Ensure metadata is written
165+
println!("Thread {} done", i);
166+
Ok::<_, NpkError>(())
167+
})
168+
})
169+
.collect();
105170

106-
// Get metadata
107-
let meta = npk.get_metadata("embeddings")?;
108-
println!("Shape: {:?}, dtype: {:?}", meta.shape, meta.dtype);
171+
for h in handles {
172+
h.join().unwrap()?;
173+
}
109174

110175
Ok(())
111176
}
112177
```
113178

114-
**Core Types:**
115-
- `ParallelIO` - Main interface for storage operations
116-
- `DataType` - Supported data types (Bool, Int8-64, Uint8-64, Float16/32/64, Complex64/128)
117-
- `ArrayMetadata` - Array shape, dtype, and file info
179+
**Best Practices for Concurrent Access:**
180+
- Each thread creates its own `ParallelIO` instance
181+
- Call `sync_metadata()` before dropping the instance
182+
- For read-heavy workloads, use separate read instances
118183

119-
**Features:**
120-
- Zero-copy memory mapping via `memmap2`
121-
- Parallel IO with `rayon`
122-
- SIMD vector operations
184+
#### Performance Tips
185+
186+
```rust
187+
// 1. Batch saves for multiple arrays
188+
let arrays: Vec<(String, ArrayD<f32>, DataType)> = vec![
189+
("a".to_string(), data_a, DataType::Float32),
190+
("b".to_string(), data_b, DataType::Float32),
191+
];
192+
io.save_arrays(&arrays)?; // Parallel processing for large data
193+
194+
// 2. Use replace_rows for updating existing arrays (fastest)
195+
// Avoids file recreation when shape and dtype match
196+
197+
// 3. Call sync_metadata() once after all operations
198+
// Not needed after every save_arrays()
199+
200+
// 4. Use compact_array() periodically after many deletions
201+
io.drop_arrays("data", Some(&[0, 1, 2]))?; // Logical delete
202+
io.compact_array("data")?; // Physical cleanup when convenient
203+
```
204+
205+
#### Error Handling
206+
207+
```rust
208+
use numpack::core::error::{NpkError, NpkResult};
209+
210+
match io.get_array_metadata("nonexistent") {
211+
Ok(meta) => println!("Found: {:?}", meta.shape),
212+
Err(NpkError::ArrayNotFound(name)) => println!("Array {} not found", name),
213+
Err(e) => eprintln!("Error: {:?}", e),
214+
}
215+
```
123216

124217
### Batch Modes
125218

docs/api_reference/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,4 @@ from numpack import get_backend_info
8383

8484
## Version
8585

86-
This documentation is for NumPack version **0.5.1**.
86+
This documentation is for NumPack version **0.5.2**.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "maturin"
44

55
[project]
66
name = "numpack"
7-
version = "0.5.1"
7+
version = "0.5.2"
88
description = "A high-performance array storage and manipulation library"
99
authors = [{ name = "NumPack Contributors" }]
1010
requires-python = ">=3.9"

python/numpack/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def _numpack_issubdtype(arg1, arg2):
3939

4040
np.issubdtype = _numpack_issubdtype # type: ignore[assignment]
4141

42-
__version__ = "0.5.1"
42+
__version__ = "0.5.2"
4343

4444
# Platform detection
4545
def _is_windows():

src/storage/binary_metadata.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl BinaryArrayMetadata {
396396
}
397397

398398
/// 二进制格式的元数据存储
399-
#[derive(Debug)]
399+
#[derive(Debug, Clone)]
400400
pub struct BinaryMetadataStore {
401401
pub version: u32,
402402
pub arrays: HashMap<String, BinaryArrayMetadata>,
@@ -574,8 +574,8 @@ impl BinaryCachedStore {
574574
sync_interval: std::time::Duration::from_secs(1),
575575
};
576576

577-
// 保存初始存储
578-
cached_store.sync_to_disk()?;
577+
// 不在构造函数中调用 sync_to_disk,避免多实例并发创建时的 I/O 争用
578+
// 元数据会在显式调用 force_sync() 时写入磁盘
579579
Ok(cached_store)
580580
}
581581

@@ -593,34 +593,14 @@ impl BinaryCachedStore {
593593
}
594594

595595
fn sync_to_disk(&self) -> NpkResult<()> {
596-
// 多线程环境下可能出现临时的文件访问冲突,添加重试机制
597-
const MAX_RETRIES: usize = 3;
598-
const RETRY_DELAY_MS: u64 = 10;
599-
600-
let mut last_error = None;
601-
for attempt in 0..MAX_RETRIES {
602-
let store = self.store.read().unwrap();
603-
match store.save(&self.path) {
604-
Ok(_) => {
605-
drop(store);
606-
let mut last_sync = self.last_sync.lock().unwrap();
607-
*last_sync = SystemTime::now();
608-
return Ok(());
609-
}
610-
Err(e) => {
611-
last_error = Some(e);
612-
drop(store);
613-
614-
// 最后一次尝试不需要等待
615-
if attempt < MAX_RETRIES - 1 {
616-
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
617-
}
618-
}
619-
}
620-
}
596+
// 最小化锁范围:仅在 clone 期间持有读锁,I/O 操作不持有任何锁
597+
let snapshot = self.store.read().unwrap().clone();
598+
// 读锁已在上一行末尾自动释放
621599

622-
// 所有重试都失败,返回最后一个错误
623-
Err(last_error.unwrap())
600+
snapshot.save(&self.path)?;
601+
602+
*self.last_sync.lock().unwrap() = SystemTime::now();
603+
Ok(())
624604
}
625605

626606
pub fn add_array(&self, meta: BinaryArrayMetadata) -> NpkResult<()> {

0 commit comments

Comments
 (0)