Skip to content

Commit 6f0663e

Browse files
authored
feat: add lance_dataset_restore for rolling back to a prior version (#18)
## Summary - Adds `lance_dataset_restore(dataset, version)` — commits a new manifest whose fragments match `version` and returns a fresh handle at that version - The caller's original handle is untouched; both must be closed separately - C++ gets a `lance::Dataset::restore(version)` member returning a new `Dataset` ## Motivation With `lance_dataset_versions` (#17) C/C++ callers can list the history, but there was no way to actually roll back to an older version. This closes the remaining piece of read-side version management from the Phase 3 roadmap. ## Contract - A new manifest is **always** written, even when `version` already matches the current latest, so the caller's "make `version` the new latest" intent holds unconditionally under concurrent writers. - `version == 0` is rejected with `LANCE_ERR_INVALID_ARGUMENT` (it is `lance_dataset_open`'s "latest" sentinel). - Errors: `LANCE_ERR_INVALID_ARGUMENT` (NULL handle or `version == 0`), `LANCE_ERR_NOT_FOUND` (unknown version), `LANCE_ERR_COMMIT_CONFLICT` (concurrent commit, typically another restore). ## Test plan - `cargo test` — 5 Rust tests: - prior-version happy path (version bump + row count, original handle untouched) - restore-to-current-latest writes a new manifest (version bumps by exactly one, visible on reopen) - unknown version → `NotFound` - `version == 0` rejection - NULL handle rejection - `cargo clippy --all-targets -- -D warnings` and `cargo fmt --check` clean - `cargo test --test compile_and_run_test -- --ignored` — C and C++ end-to-end tests exercise restore-to-current and assert the version bump Closes #12.
1 parent 07dbdb4 commit 6f0663e

7 files changed

Lines changed: 249 additions & 0 deletions

File tree

include/lance.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,22 @@ int64_t lance_versions_timestamp_ms_at(const LanceVersions* versions, size_t ind
200200
/** Close and free a versions handle. Safe to call with NULL. */
201201
void lance_versions_close(LanceVersions* versions);
202202

203+
/**
204+
* Restore the dataset to an older version by committing a new manifest that
205+
* carries the fragments of `version`. If `version` is already the latest,
206+
* succeeds as a no-op without writing a new manifest.
207+
*
208+
* @param dataset Open dataset (not consumed). Must not be NULL.
209+
* @param version Target version id (>= 1). `0` is rejected since it is the
210+
* "latest" sentinel used by lance_dataset_open.
211+
* @return Fresh LanceDataset* positioned at the target version (caller closes
212+
* with lance_dataset_close), or NULL on error. Possible error codes
213+
* include LANCE_ERR_INVALID_ARGUMENT (NULL handle or version == 0),
214+
* LANCE_ERR_NOT_FOUND (unknown version),
215+
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
216+
*/
217+
LanceDataset* lance_dataset_restore(const LanceDataset* dataset, uint64_t version);
218+
203219
/**
204220
* Export the dataset schema via Arrow C Data Interface.
205221
* @param out Pointer to caller-allocated ArrowSchema struct

include/lance.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,16 @@ class Dataset {
283283
return out;
284284
}
285285

286+
/// Commit a new manifest that aliases `version` as the latest. The
287+
/// returned Dataset points at the target version; this handle is
288+
/// unchanged. If `version` is already the latest, no new manifest is
289+
/// written. Throws lance::Error on failure.
290+
Dataset restore(uint64_t version) const {
291+
auto* out = lance_dataset_restore(handle_.get(), version);
292+
if (!out) check_error();
293+
return Dataset(out);
294+
}
295+
286296
/// Export the schema as an Arrow C Data Interface struct.
287297
void schema(ArrowSchema* out) const {
288298
if (lance_dataset_schema(handle_.get(), out) != 0) {

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod error;
2222
mod fragment_writer;
2323
mod helpers;
2424
mod index;
25+
mod restore;
2526
pub mod runtime;
2627
mod scanner;
2728
mod versions;
@@ -35,6 +36,7 @@ pub use error::{
3536
};
3637
pub use fragment_writer::*;
3738
pub use index::*;
39+
pub use restore::*;
3840
pub use scanner::*;
3941
pub use versions::*;
4042
pub use writer::*;

src/restore.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Restore C API: move a dataset's latest back to an older version by
5+
//! committing a new manifest whose fragments match the chosen version.
6+
//!
7+
//! Returns a fresh `LanceDataset*` positioned at the target version; the
8+
//! caller's original handle is untouched and remains usable.
9+
10+
use std::sync::{Arc, RwLock};
11+
12+
use lance_core::Result;
13+
14+
use crate::dataset::LanceDataset;
15+
use crate::error::ffi_try;
16+
use crate::runtime::block_on;
17+
18+
/// Restore the dataset to an older version by committing a new manifest that
19+
/// carries the fragments of `version`.
20+
///
21+
/// - `dataset`: Open dataset (not consumed). Must not be NULL.
22+
/// - `version`: Target version id. Must be `>= 1`; `0` is reserved as the
23+
/// "latest" sentinel by `lance_dataset_open` and is rejected here with
24+
/// `LANCE_ERR_INVALID_ARGUMENT`.
25+
///
26+
/// A new manifest is always written, even when `version` already matches the
27+
/// current latest — this guarantees the caller's stated intent ("make `version`
28+
/// the new latest") holds under concurrent writers without a TOCTOU race.
29+
///
30+
/// Returns a fresh `LanceDataset*` positioned at the target version on success
31+
/// (caller closes with `lance_dataset_close`), or NULL on error. Errors include
32+
/// `LANCE_ERR_NOT_FOUND` for an unknown `version` and `LANCE_ERR_COMMIT_CONFLICT`
33+
/// for a concurrent commit race.
34+
#[unsafe(no_mangle)]
35+
pub unsafe extern "C" fn lance_dataset_restore(
36+
dataset: *const LanceDataset,
37+
version: u64,
38+
) -> *mut LanceDataset {
39+
ffi_try!(unsafe { restore_inner(dataset, version) }, null)
40+
}
41+
42+
unsafe fn restore_inner(dataset: *const LanceDataset, version: u64) -> Result<*mut LanceDataset> {
43+
if dataset.is_null() {
44+
return Err(lance_core::Error::InvalidInput {
45+
source: "dataset must not be NULL".into(),
46+
location: snafu::location!(),
47+
});
48+
}
49+
if version == 0 {
50+
return Err(lance_core::Error::InvalidInput {
51+
source: "version must be >= 1; 0 is reserved as the \"latest\" sentinel".into(),
52+
location: snafu::location!(),
53+
});
54+
}
55+
56+
let ds = unsafe { &*dataset };
57+
let snap = ds.snapshot();
58+
59+
// Check out the target version, then always commit a new manifest that
60+
// aliases its fragments as the new latest. Skipping the commit when
61+
// `version == latest` would open a TOCTOU window: a concurrent writer
62+
// could land a newer manifest between the read and the comparison, and
63+
// we'd silently leave their version as latest instead of the caller's.
64+
let restored = block_on(async {
65+
let mut checked_out = snap.checkout_version(version).await?;
66+
checked_out.restore().await?;
67+
Ok::<_, lance_core::Error>(checked_out)
68+
})?;
69+
70+
let handle = LanceDataset {
71+
inner: RwLock::new(Arc::new(restored)),
72+
};
73+
Ok(Box::into_raw(Box::new(handle)))
74+
}

tests/c_api_test.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,6 +1785,116 @@ fn test_versions_close_null_is_safe() {
17851785
unsafe { lance_versions_close(ptr::null_mut()) };
17861786
}
17871787

1788+
// ---------------------------------------------------------------------------
1789+
// Restore (lance_dataset_restore)
1790+
// ---------------------------------------------------------------------------
1791+
1792+
/// Helper: set up a dataset with two versions — initial create (rows 1..=5)
1793+
/// plus an append (rows 6..=7), returning `(tempdir, uri)`.
1794+
fn create_two_version_dataset() -> (tempfile::TempDir, String) {
1795+
let (tmp, uri) = create_test_dataset();
1796+
let schema = Arc::new(Schema::new(vec![
1797+
Field::new("id", DataType::Int32, false),
1798+
Field::new("name", DataType::Utf8, true),
1799+
]));
1800+
let batch = RecordBatch::try_new(
1801+
schema.clone(),
1802+
vec![
1803+
Arc::new(Int32Array::from(vec![6, 7])),
1804+
Arc::new(StringArray::from(vec!["frank", "grace"])),
1805+
],
1806+
)
1807+
.unwrap();
1808+
append_batch(&uri, schema, batch);
1809+
(tmp, uri)
1810+
}
1811+
1812+
#[test]
1813+
fn test_dataset_restore_to_prior_version() {
1814+
let (_tmp, uri) = create_two_version_dataset();
1815+
let c_uri = c_str(&uri);
1816+
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
1817+
assert_eq!(unsafe { lance_dataset_version(ds) }, 2);
1818+
assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 7);
1819+
1820+
// Restore to V1 — expect a fresh handle at a new version (3) with V1's
1821+
// row count (5).
1822+
let restored = unsafe { lance_dataset_restore(ds, 1) };
1823+
assert!(!restored.is_null());
1824+
assert_eq!(unsafe { lance_dataset_version(restored) }, 3);
1825+
assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 5);
1826+
1827+
// Original handle is untouched.
1828+
assert_eq!(unsafe { lance_dataset_version(ds) }, 2);
1829+
1830+
unsafe { lance_dataset_close(restored) };
1831+
unsafe { lance_dataset_close(ds) };
1832+
}
1833+
1834+
#[test]
1835+
fn test_dataset_restore_to_current_latest_writes_new_manifest() {
1836+
// Restoring to the current latest still writes a new manifest. The
1837+
// optimization that previously skipped the commit was racy: a concurrent
1838+
// writer could land a newer manifest between the staleness check and the
1839+
// skip, silently leaving their version as latest. We always commit so the
1840+
// caller's "make `version` the new latest" intent holds unconditionally.
1841+
let (_tmp, uri) = create_two_version_dataset();
1842+
let c_uri = c_str(&uri);
1843+
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
1844+
let latest = unsafe { lance_dataset_version(ds) };
1845+
assert_eq!(latest, 2);
1846+
1847+
let restored = unsafe { lance_dataset_restore(ds, latest) };
1848+
assert!(!restored.is_null());
1849+
assert_eq!(
1850+
unsafe { lance_dataset_version(restored) },
1851+
latest + 1,
1852+
"restore to latest must commit a new manifest to defeat TOCTOU races"
1853+
);
1854+
assert_eq!(unsafe { lance_dataset_count_rows(restored) }, 7);
1855+
1856+
// Reopening the dataset reports the bumped latest.
1857+
unsafe { lance_dataset_close(restored) };
1858+
let ds2 = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
1859+
assert_eq!(unsafe { lance_dataset_version(ds2) }, latest + 1);
1860+
1861+
unsafe { lance_dataset_close(ds2) };
1862+
unsafe { lance_dataset_close(ds) };
1863+
}
1864+
1865+
#[test]
1866+
fn test_dataset_restore_nonexistent_version() {
1867+
let (_tmp, uri) = create_test_dataset();
1868+
let c_uri = c_str(&uri);
1869+
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
1870+
1871+
let restored = unsafe { lance_dataset_restore(ds, 999) };
1872+
assert!(restored.is_null());
1873+
assert_eq!(lance_last_error_code(), LanceErrorCode::NotFound);
1874+
1875+
unsafe { lance_dataset_close(ds) };
1876+
}
1877+
1878+
#[test]
1879+
fn test_dataset_restore_version_zero_rejected() {
1880+
let (_tmp, uri) = create_test_dataset();
1881+
let c_uri = c_str(&uri);
1882+
let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) };
1883+
1884+
let restored = unsafe { lance_dataset_restore(ds, 0) };
1885+
assert!(restored.is_null());
1886+
assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument);
1887+
1888+
unsafe { lance_dataset_close(ds) };
1889+
}
1890+
1891+
#[test]
1892+
fn test_dataset_restore_null_dataset_rejected() {
1893+
let restored = unsafe { lance_dataset_restore(ptr::null(), 1) };
1894+
assert!(restored.is_null());
1895+
assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument);
1896+
}
1897+
17881898
// ---------------------------------------------------------------------------
17891899
// Index lifecycle tests (Phase 2)
17901900
// ---------------------------------------------------------------------------

tests/cpp/test_c_api.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,26 @@ static void test_versions(const char *uri) {
179179
printf("OK\n");
180180
}
181181

182+
/* Restore the dataset to its own current version — always commits a new
183+
* manifest (no skip-if-equal optimization) so the caller's "make `version`
184+
* the new latest" intent holds even under concurrent writers. */
185+
static void test_restore_to_current(const char *uri) {
186+
printf(" test_restore_to_current... ");
187+
188+
LanceDataset *ds = lance_dataset_open(uri, NULL, 0);
189+
ASSERT(ds != NULL, "open failed");
190+
uint64_t current = lance_dataset_version(ds);
191+
192+
LanceDataset *after = lance_dataset_restore(ds, current);
193+
ASSERT(after != NULL, "restore failed");
194+
ASSERT(lance_dataset_version(after) == current + 1,
195+
"restore must bump the version to commit a fresh manifest");
196+
197+
lance_dataset_close(after);
198+
lance_dataset_close(ds);
199+
printf("OK\n");
200+
}
201+
182202
static void test_error_handling(void) {
183203
printf(" test_error_handling... ");
184204

@@ -262,6 +282,7 @@ int main(int argc, char **argv) {
262282
test_scan(uri);
263283
test_scan_with_limit(uri);
264284
test_versions(uri);
285+
test_restore_to_current(uri);
265286
test_error_handling();
266287
test_dataset_write_roundtrip(uri, write_uri);
267288

tests/cpp/test_cpp_api.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,21 @@ static void test_versions(const std::string& uri) {
156156
PASS();
157157
}
158158

159+
// Restore to the dataset's own current version — always commits a new
160+
// manifest (no skip-if-equal optimization) to defeat TOCTOU races against
161+
// concurrent writers.
162+
static void test_restore_to_current(const std::string& uri) {
163+
TEST(test_restore_to_current);
164+
165+
auto ds = lance::Dataset::open(uri);
166+
uint64_t current = ds.version();
167+
168+
auto after = ds.restore(current);
169+
assert(after.version() == current + 1);
170+
171+
PASS();
172+
}
173+
159174
static void test_error_exception(const std::string& /*uri*/) {
160175
TEST(test_error_exception);
161176

@@ -303,6 +318,7 @@ int main(int argc, char** argv) {
303318
test_dataset_take(uri);
304319
test_raii_cleanup(uri);
305320
test_versions(uri);
321+
test_restore_to_current(uri);
306322
test_error_exception(uri);
307323
test_index_lifecycle(uri);
308324
test_nearest_smoke(uri);

0 commit comments

Comments
 (0)