From 963fc208dc450b43d01030f99c7b588d5eea26f9 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 24 Apr 2026 12:09:54 +0800 Subject: [PATCH 1/4] feat: add lance_dataset_write for create/append/overwrite from ArrowArrayStream Writes an ArrowArrayStream into a Lance dataset with a committed manifest. A mode enum (CREATE / APPEND / OVERWRITE) and an optional out_dataset that returns the open dataset at the new version (so callers don't need to reopen). Structure follows fragment_writer.rs: schema fail-fast, storage options pass-through, thread-local errors. C++ gets a scoped WriteMode enum and a lance::Dataset::write() static method that reads the stream's schema automatically. Two FFI details: - mode is received as i32 and validated via LanceWriteMode::from_raw; accepting the enum directly would be UB for out-of-range values. - The stream is consumed via ArrowArrayStreamReader::from_raw right after the NULL check so the "consumed on any return" contract holds. Tests: 11 new Rust unit tests (CREATE/APPEND/OVERWRITE happy paths, OVERWRITE-on-missing, CREATE-on-existing, schema mismatches, empty stream, NULL args, invalid mode, out_dataset propagation). The ignored C and C++ integration tests now do a scan->write round-trip. Closes #14. --- include/lance.h | 45 ++++ include/lance.hpp | 62 +++++ src/lib.rs | 2 + src/writer.rs | 186 +++++++++++++++ tests/c_api_test.rs | 422 ++++++++++++++++++++++++++++++++++ tests/compile_and_run_test.rs | 21 +- tests/cpp/test_c_api.c | 50 +++- tests/cpp/test_cpp_api.cpp | 32 ++- 8 files changed, 810 insertions(+), 10 deletions(-) create mode 100644 src/writer.rs diff --git a/include/lance.h b/include/lance.h index 1d53ee0..efed588 100644 --- a/include/lance.h +++ b/include/lance.h @@ -481,6 +481,51 @@ int32_t lance_scanner_full_text_search( uint32_t max_fuzzy_distance ); +/* ─── Dataset writer ─── */ + +/** + * Write mode for lance_dataset_write. Values are ABI-stable. + * The Rust implementation validates the received integer and rejects any + * out-of-range value with LANCE_ERR_INVALID_ARGUMENT. + */ +typedef enum { + LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */ + LANCE_WRITE_APPEND = 1, /* Append; fail if the new schema is incompatible. */ + LANCE_WRITE_OVERWRITE = 2, /* Overwrite existing, or create if missing. */ +} LanceWriteMode; + +/** + * Write an Arrow record batch stream to a Lance dataset at `uri`, committing + * a manifest. + * + * @param uri Dataset URI (file://, s3://, memory://, etc.). Must not + * be NULL or an empty string. + * @param schema Required Arrow schema. The stream schema must match or + * the call fails with LANCE_ERR_INVALID_ARGUMENT. + * @param stream Arrow C Data Interface stream consumed by this call. + * Do not use the stream after returning, regardless of + * the return code. + * @param mode CREATE / APPEND / OVERWRITE (see LanceWriteMode). + * @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL. + * @param out_dataset If non-NULL, on success receives an open LanceDataset* + * at the newly-committed version (caller must + * lance_dataset_close it). Pass NULL to discard. On error + * *out_dataset is left unchanged — do not read or free it. + * @return 0 on success, -1 on error. Possible error codes include + * LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path), + * LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode, + * schema mismatch), + * LANCE_ERR_COMMIT_CONFLICT (concurrent writer). + */ +int32_t lance_dataset_write( + const char* uri, + const struct ArrowSchema* schema, + struct ArrowArrayStream* stream, + LanceWriteMode mode, + const char* const* storage_opts, + LanceDataset** out_dataset +); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/include/lance.hpp b/include/lance.hpp index eaee7f9..ead421b 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -17,6 +17,7 @@ #include "lance.h" +#include #include #include #include @@ -94,6 +95,14 @@ struct VersionInfo { int64_t timestamp_ms; }; +// ─── Write mode ────────────────────────────────────────────────────────────── + +enum class WriteMode : int32_t { + Create = LANCE_WRITE_CREATE, + Append = LANCE_WRITE_APPEND, + Overwrite = LANCE_WRITE_OVERWRITE, +}; + // ─── Dataset ───────────────────────────────────────────────────────────────── class Dataset { @@ -122,6 +131,59 @@ class Dataset { return Dataset(ds); } + /// Write an Arrow record batch stream to a Lance dataset and return the + /// open dataset at the committed version. + /// + /// The stream must be self-describing; its own schema is used. Treat the + /// stream as consumed once this call returns or throws — do not reuse it. + /// Throws lance::Error on failure (including if `stream` is null). + static Dataset write( + const std::string& uri, + ArrowArrayStream* stream, + WriteMode mode, + const std::vector>& storage_opts = {}) { + + if (stream == nullptr) { + throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null"); + } + + // Pull the stream's schema so we can pass it to the C API. + ArrowSchema schema = {}; + if (stream->get_schema(stream, &schema) != 0) { + const char* err = stream->get_last_error + ? stream->get_last_error(stream) + : nullptr; + throw Error( + LANCE_ERR_INVALID_ARGUMENT, + std::string("failed to read stream schema: ") + + (err ? err : "unknown")); + } + struct SchemaGuard { + ArrowSchema* s; + ~SchemaGuard() { if (s && s->release) s->release(s); } + } guard{&schema}; + + std::vector kv; + for (auto& [k, v] : storage_opts) { + kv.push_back(k.c_str()); + kv.push_back(v.c_str()); + } + kv.push_back(nullptr); + const char* const* opts_ptr = + storage_opts.empty() ? nullptr : kv.data(); + + LanceDataset* out = nullptr; + int32_t rc = lance_dataset_write( + uri.c_str(), + &schema, + stream, + static_cast(mode), + opts_ptr, + &out); + if (rc != 0) check_error(); + return Dataset(out); + } + /// Number of rows in the dataset. uint64_t count_rows() const { uint64_t n = lance_dataset_count_rows(handle_.get()); diff --git a/src/lib.rs b/src/lib.rs index 53535a1..461d891 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod index; pub mod runtime; mod scanner; mod versions; +mod writer; // Re-export all extern "C" symbols so they appear in the cdylib. pub use batch::*; @@ -36,3 +37,4 @@ pub use fragment_writer::*; pub use index::*; pub use scanner::*; pub use versions::*; +pub use writer::*; diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..021637b --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,186 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Dataset write C API: create, append, or overwrite a Lance dataset from an +//! Arrow C Data Interface stream, committing a manifest. +//! +//! Mirrors the structure of `src/fragment_writer.rs` but produces a full +//! dataset with a committed manifest rather than uncommitted fragment files. + +use std::ffi::c_char; +use std::sync::{Arc, RwLock}; + +use arrow::ffi::FFI_ArrowSchema; +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +use arrow::record_batch::RecordBatchReader; +use arrow_schema::Schema as ArrowSchema; +use lance::Dataset; +use lance::dataset::{WriteMode as LanceWriteModeUpstream, WriteParams}; +use lance_core::Result; +use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor}; + +use crate::dataset::LanceDataset; +use crate::error::ffi_try; +use crate::helpers; +use crate::runtime::block_on; + +/// Write mode for `lance_dataset_write`. +/// +/// Discriminants are pinned for ABI stability. The FFI accepts this as +/// `int32_t` and validates via [`LanceWriteMode::from_raw`] — storing an +/// out-of-range tag as an enum would be UB. +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum LanceWriteMode { + /// Create a new dataset. Fails with `LANCE_ERR_DATASET_ALREADY_EXISTS` if + /// the path already exists. + Create = 0, + /// Append to an existing dataset. Fails with `LANCE_ERR_INVALID_ARGUMENT` + /// if the stream schema is incompatible with the existing dataset schema. + Append = 1, + /// Overwrite an existing dataset (or create one if the path does not exist). + Overwrite = 2, +} + +impl LanceWriteMode { + /// Validate a raw FFI integer into a `LanceWriteMode`. Out-of-range + /// values become `InvalidInput`. + fn from_raw(raw: i32) -> Result { + match raw { + 0 => Ok(Self::Create), + 1 => Ok(Self::Append), + 2 => Ok(Self::Overwrite), + other => Err(lance_core::Error::InvalidInput { + source: format!( + "invalid write mode {other}; expected 0 (create), 1 (append), or 2 (overwrite)" + ) + .into(), + location: snafu::location!(), + }), + } + } +} + +impl From for LanceWriteModeUpstream { + fn from(mode: LanceWriteMode) -> Self { + match mode { + LanceWriteMode::Create => LanceWriteModeUpstream::Create, + LanceWriteMode::Append => LanceWriteModeUpstream::Append, + LanceWriteMode::Overwrite => LanceWriteModeUpstream::Overwrite, + } + } +} + +/// Write an Arrow record batch stream to a Lance dataset at `uri`, committing a manifest. +/// +/// - `uri`: Dataset URI (`file://`, `s3://`, `memory://`, ...). Must not be NULL or empty. +/// - `schema`: Caller-provided Arrow schema. The stream's schema must match; +/// mismatch returns `LANCE_ERR_INVALID_ARGUMENT`. +/// - `stream`: Arrow C Data Interface stream. Consumed by this call — the +/// caller must not use it again on any return path. +/// - `mode`: `LANCE_WRITE_CREATE` (0), `LANCE_WRITE_APPEND` (1), or +/// `LANCE_WRITE_OVERWRITE` (2). Any other value → `LANCE_ERR_INVALID_ARGUMENT`. +/// - `storage_opts`: NULL-terminated key-value pairs `["k","v",NULL]`, or NULL. +/// - `out_dataset`: If non-NULL, receives an open `LanceDataset*` at the new +/// version on success (caller closes). Pass NULL to discard. On error +/// `*out_dataset` is untouched — do not read or free it. +/// +/// Returns 0 on success, -1 on error. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn lance_dataset_write( + uri: *const c_char, + schema: *const FFI_ArrowSchema, + stream: *mut FFI_ArrowArrayStream, + mode: i32, + storage_opts: *const *const c_char, + out_dataset: *mut *mut LanceDataset, +) -> i32 { + ffi_try!( + unsafe { write_dataset_inner(uri, schema, stream, mode, storage_opts, out_dataset) }, + neg + ) +} + +unsafe fn write_dataset_inner( + uri: *const c_char, + schema: *const FFI_ArrowSchema, + stream: *mut FFI_ArrowArrayStream, + mode: i32, + storage_opts: *const *const c_char, + out_dataset: *mut *mut LanceDataset, +) -> Result { + if uri.is_null() || schema.is_null() || stream.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "uri, schema, and stream must not be NULL".into(), + location: snafu::location!(), + }); + } + + // Consume the stream before any other fallible validation. `from_raw` + // swaps the caller's stream into a Rust-owned reader unconditionally, so + // the stream's resources are released on every return path. + let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| { + lance_core::Error::InvalidInput { + source: e.to_string().into(), + location: snafu::location!(), + } + })?; + + // Validate the mode at the boundary — storing an out-of-range tag as a + // `LanceWriteMode` would be UB. + let mode = LanceWriteMode::from_raw(mode)?; + + let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| { + lance_core::Error::InvalidInput { + source: "uri must not be empty".into(), + location: snafu::location!(), + } + })?; + + let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| { + lance_core::Error::InvalidInput { + source: format!("invalid schema: {e}").into(), + location: snafu::location!(), + } + })?; + + let opts = unsafe { helpers::parse_storage_options(storage_opts)? }; + + // Fail fast: compare the stream schema against the caller-provided schema. + let stream_schema = reader.schema(); + if stream_schema.fields() != expected_schema.fields() { + return Err(lance_core::Error::InvalidInput { + source: format!( + "stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}" + ) + .into(), + location: snafu::location!(), + }); + } + + let mut params = WriteParams { + mode: mode.into(), + ..WriteParams::default() + }; + if !opts.is_empty() { + params.store_params = Some(ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + opts, + ))), + ..ObjectStoreParams::default() + }); + } + + let dataset = block_on(Dataset::write(reader, uri_str, Some(params)))?; + + if !out_dataset.is_null() { + let handle = LanceDataset { + inner: RwLock::new(Arc::new(dataset)), + }; + unsafe { + *out_dataset = Box::into_raw(Box::new(handle)); + } + } + + Ok(0) +} diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index eb0b0eb..d344b42 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -2706,6 +2706,376 @@ fn test_nearest_after_fts_is_rejected() { unsafe { lance_dataset_close(ds) }; } +// --------------------------------------------------------------------------- +// Dataset writer (lance_dataset_write) +// --------------------------------------------------------------------------- + +fn write_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Float32, true), + ])) +} + +fn write_batch(ids: Vec, vals: Vec) -> RecordBatch { + assert_eq!(ids.len(), vals.len()); + RecordBatch::try_new( + write_schema(), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Float32Array::from(vals)), + ], + ) + .unwrap() +} + +#[test] +fn test_dataset_write_create() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("new_ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema = schema_to_ffi(&write_schema()); + let mut stream = batch_to_ffi_stream(write_batch(vec![1, 2, 3], vec![1.0, 2.0, 3.0])); + + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0, "lance_dataset_write create failed"); + assert_eq!(lance_last_error_code(), LanceErrorCode::Ok); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 3); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_write_populates_out_dataset() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema = schema_to_ffi(&write_schema()); + let mut stream = batch_to_ffi_stream(write_batch(vec![1, 2, 3], vec![1.0, 2.0, 3.0])); + + let mut out_ds: *mut LanceDataset = ptr::null_mut(); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + &mut out_ds, + ) + }; + assert_eq!(rc, 0); + assert!(!out_ds.is_null(), "out_dataset must be populated"); + assert_eq!(unsafe { lance_dataset_count_rows(out_ds) }, 3); + unsafe { lance_dataset_close(out_ds) }; +} + +#[test] +fn test_dataset_write_append_accumulates_rows() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema1 = schema_to_ffi(&write_schema()); + let mut stream1 = batch_to_ffi_stream(write_batch(vec![1, 2, 3], vec![1.0, 2.0, 3.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema1, + &mut stream1, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ffi_schema2 = schema_to_ffi(&write_schema()); + let mut stream2 = batch_to_ffi_stream(write_batch(vec![4, 5], vec![4.0, 5.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema2, + &mut stream2, + LanceWriteMode::Append as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_write_overwrite_replaces_rows() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema1 = schema_to_ffi(&write_schema()); + let mut stream1 = batch_to_ffi_stream(write_batch(vec![1, 2, 3], vec![1.0, 2.0, 3.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema1, + &mut stream1, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ffi_schema2 = schema_to_ffi(&write_schema()); + let mut stream2 = batch_to_ffi_stream(write_batch(vec![100, 200], vec![100.0, 200.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema2, + &mut stream2, + LanceWriteMode::Overwrite as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert_eq!( + unsafe { lance_dataset_count_rows(ds) }, + 2, + "overwrite must replace, not append" + ); + let batches = scan_all_rows(ds); + assert!(!batches.is_empty(), "scan must return at least one batch"); + let mut ids: Vec = Vec::new(); + for batch in &batches { + let id_col = batch + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + ids.extend((0..id_col.len()).map(|i| id_col.value(i))); + } + ids.sort(); + assert_eq!(ids, vec![100, 200]); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_write_overwrite_on_missing_path_creates_dataset() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema = schema_to_ffi(&write_schema()); + let mut stream = batch_to_ffi_stream(write_batch(vec![7, 8], vec![7.0, 8.0])); + + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Overwrite as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0, "OVERWRITE on missing path must succeed as create"); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 2); + unsafe { lance_dataset_close(ds) }; +} + +#[test] +fn test_dataset_write_invalid_mode_rejected() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema = schema_to_ffi(&write_schema()); + let mut stream = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + 99, // out of range — must be rejected, not cause UB + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + +#[test] +fn test_dataset_write_create_on_existing_fails() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let ffi_schema1 = schema_to_ffi(&write_schema()); + let mut stream1 = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema1, + &mut stream1, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ffi_schema2 = schema_to_ffi(&write_schema()); + let mut stream2 = batch_to_ffi_stream(write_batch(vec![2], vec![2.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema2, + &mut stream2, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!( + lance_last_error_code(), + LanceErrorCode::DatasetAlreadyExists + ); +} + +#[test] +fn test_dataset_write_append_schema_mismatch_fails() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + // Create with the original schema. + let ffi_schema1 = schema_to_ffi(&write_schema()); + let mut stream1 = batch_to_ffi_stream(write_batch(vec![1, 2], vec![1.0, 2.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema1, + &mut stream1, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + // Append with an extra column → must fail. + let mismatched_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("val", DataType::Float32, true), + Field::new("extra", DataType::Utf8, true), + ])); + let batch2 = RecordBatch::try_new( + mismatched_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![10])), + Arc::new(Float32Array::from(vec![10.0])), + Arc::new(StringArray::from(vec!["x"])), + ], + ) + .unwrap(); + let ffi_schema2 = schema_to_ffi(&mismatched_schema); + let mut stream2 = batch_to_ffi_stream(batch2); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema2, + &mut stream2, + LanceWriteMode::Append as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); +} + +#[test] +fn test_dataset_write_declared_schema_mismatch_fails() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + // Stream has 2 columns but declared schema has only 1 — fail fast. + let mut stream = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let declared_schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let ffi_schema = schema_to_ffi(&declared_schema); + + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} + +#[test] +fn test_dataset_write_empty_stream_creates_empty_dataset() { + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("empty_ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + + let schema = write_schema(); + let ffi_schema = schema_to_ffi(&schema); + + let empty: Vec> = vec![]; + let reader = arrow::record_batch::RecordBatchIterator::new(empty, schema.clone()); + let mut stream = FFI_ArrowArrayStream::new(Box::new(reader)); + + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); + assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 0); + unsafe { lance_dataset_close(ds) }; +} + #[test] fn test_fts_after_nearest_is_rejected() { let (_tmp, uri) = create_vector_dataset(64, 8); @@ -2744,3 +3114,55 @@ fn test_fts_after_nearest_is_rejected() { unsafe { lance_scanner_close(scanner) }; unsafe { lance_dataset_close(ds) }; } + +#[test] +fn test_dataset_write_null_args_return_error() { + let schema = write_schema(); + let c_uri = c_str("memory://x"); + + // NULL uri. + let ffi_schema_a = schema_to_ffi(&schema); + let mut stream_a = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let rc = unsafe { + lance_dataset_write( + ptr::null(), + &ffi_schema_a, + &mut stream_a, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + // NULL schema. + let mut stream_b = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + ptr::null(), + &mut stream_b, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + + // NULL stream. + let ffi_schema_c = schema_to_ffi(&schema); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema_c, + ptr::null_mut(), + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); +} diff --git a/tests/compile_and_run_test.rs b/tests/compile_and_run_test.rs index 33df2bc..d736d61 100644 --- a/tests/compile_and_run_test.rs +++ b/tests/compile_and_run_test.rs @@ -146,10 +146,12 @@ fn compile_cpp_test(source: &Path, output: &Path, include_dir: &Path, lib_path: } } -/// Run a compiled test binary with the dataset URI. -fn run_test_binary(binary: &Path, dataset_uri: &str) { +/// Run a compiled test binary with the source dataset URI and a destination URI +/// for the write test. The destination path must not pre-exist. +fn run_test_binary(binary: &Path, dataset_uri: &str, write_uri: &str) { let output = Command::new(binary) .arg(dataset_uri) + .arg(write_uri) .output() .unwrap_or_else(|e| panic!("Failed to run {}: {e}", binary.display())); @@ -175,7 +177,8 @@ fn run_test_binary(binary: &Path, dataset_uri: &str) { #[ignore = "requires C compiler (cc); run with: cargo test -p lance-c -- --ignored test_c_compilation"] fn test_c_compilation_and_execution() { let (lib_path, include_dir) = build_lance_c(); - let (_tmp, dataset_uri) = create_test_dataset_on_disk(); + let (tmp, dataset_uri) = create_test_dataset_on_disk(); + let write_uri = tmp.path().join("c_write_ds").to_str().unwrap().to_string(); let build_dir = tempfile::tempdir().unwrap(); let source = PathBuf::from(env!("CARGO_MANIFEST_DIR")) @@ -189,14 +192,20 @@ fn test_c_compilation_and_execution() { return; } - run_test_binary(&binary, &dataset_uri); + run_test_binary(&binary, &dataset_uri, &write_uri); } #[test] #[ignore = "requires C++ compiler (c++); run with: cargo test -p lance-c -- --ignored test_cpp_compilation"] fn test_cpp_compilation_and_execution() { let (lib_path, include_dir) = build_lance_c(); - let (_tmp, dataset_uri) = create_test_dataset_on_disk(); + let (tmp, dataset_uri) = create_test_dataset_on_disk(); + let write_uri = tmp + .path() + .join("cpp_write_ds") + .to_str() + .unwrap() + .to_string(); let build_dir = tempfile::tempdir().unwrap(); let source = PathBuf::from(env!("CARGO_MANIFEST_DIR")) @@ -210,5 +219,5 @@ fn test_cpp_compilation_and_execution() { return; } - run_test_binary(&binary, &dataset_uri); + run_test_binary(&binary, &dataset_uri, &write_uri); } diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index c0eeb1d..242e35e 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -201,13 +201,58 @@ static void test_error_handling(void) { printf("OK\n"); } +/* Round-trip: scan src dataset to an ArrowArrayStream, write it into a new + * dataset at dst_uri, and verify row counts match. dst_uri must not pre-exist. */ +static void test_dataset_write_roundtrip(const char *src_uri, const char *dst_uri) { + printf(" test_dataset_write_roundtrip... "); + + LanceDataset *src = lance_dataset_open(src_uri, NULL, 0); + ASSERT(src != NULL, "open source failed"); + uint64_t src_rows = lance_dataset_count_rows(src); + CHECK_OK(); + + LanceScanner *scanner = lance_scanner_new(src, NULL, NULL); + ASSERT(scanner != NULL, "scanner creation failed"); + + struct ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + int32_t rc = lance_scanner_to_arrow_stream(scanner, &stream); + ASSERT(rc == 0, "to_arrow_stream failed"); + + struct ArrowSchema schema; + memset(&schema, 0, sizeof(schema)); + rc = stream.get_schema(&stream, &schema); + ASSERT(rc == 0, "get_schema from stream failed"); + + LanceDataset *dst = NULL; + rc = lance_dataset_write( + dst_uri, &schema, &stream, LANCE_WRITE_CREATE, NULL, &dst); + ASSERT(rc == 0, "lance_dataset_write failed"); + ASSERT(dst != NULL, "out_dataset should be populated"); + + /* stream is consumed by lance_dataset_write; schema we own. */ + if (schema.release) schema.release(&schema); + + uint64_t dst_rows = lance_dataset_count_rows(dst); + CHECK_OK(); + ASSERT(dst_rows == src_rows, "row count mismatch after write"); + printf("src=%llu, dst=%llu... ", + (unsigned long long)src_rows, (unsigned long long)dst_rows); + + lance_dataset_close(dst); + lance_scanner_close(scanner); + lance_dataset_close(src); + printf("OK\n"); +} + int main(int argc, char **argv) { - if (argc < 2) { - fprintf(stderr, "Usage: %s \n", argv[0]); + if (argc < 3) { + fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } const char *uri = argv[1]; + const char *write_uri = argv[2]; printf("Running C API tests with dataset: %s\n", uri); test_open_and_metadata(uri); @@ -215,6 +260,7 @@ int main(int argc, char **argv) { test_scan_with_limit(uri); test_versions(uri); test_error_handling(); + test_dataset_write_roundtrip(uri, write_uri); printf("All C tests passed!\n"); return 0; diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index 3fa5fe5..eb7b7f4 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -261,13 +261,40 @@ static void test_fts_smoke(const std::string& uri) { PASS(); } +// Round-trip: scan src dataset to an ArrowArrayStream, write it to a new +// dataset via lance::Dataset::write, and verify row counts match. +// dst_uri must not pre-exist. +static void test_dataset_write_roundtrip(const std::string& src_uri, + const std::string& dst_uri) { + TEST(test_dataset_write_roundtrip); + + auto src = lance::Dataset::open(src_uri); + uint64_t src_rows = src.count_rows(); + + auto scanner = src.scan(); + ArrowArrayStream stream; + memset(&stream, 0, sizeof(stream)); + scanner.to_arrow_stream(&stream); + + auto dst = lance::Dataset::write( + dst_uri, &stream, lance::WriteMode::Create); + + uint64_t dst_rows = dst.count_rows(); + assert(dst_rows == src_rows); + printf("src=%llu, dst=%llu... ", + (unsigned long long)src_rows, (unsigned long long)dst_rows); + + PASS(); +} + int main(int argc, char** argv) { - if (argc < 2) { - fprintf(stderr, "Usage: %s \n", argv[0]); + if (argc < 3) { + fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } std::string uri(argv[1]); + std::string write_uri(argv[2]); printf("Running C++ API tests with dataset: %s\n", uri.c_str()); test_dataset_open(uri); @@ -280,6 +307,7 @@ int main(int argc, char** argv) { test_index_lifecycle(uri); test_nearest_smoke(uri); test_fts_smoke(uri); + test_dataset_write_roundtrip(uri, write_uri); printf("All C++ tests passed!\n"); return 0; From 4e4067162646cf050673ff2adfb2b77bdb3079b6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 24 Apr 2026 17:12:25 +0800 Subject: [PATCH 2/4] docs(writer): drop private intra-doc link to silence rustdoc warning The LanceWriteMode doc referenced `LanceWriteMode::from_raw`, which is private. rustdoc -D warnings (the Rustdoc CI job) flags this as `private_intra_doc_links`. Rewording to describe the validation behavior without naming the private function. No API changes. --- src/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/writer.rs b/src/writer.rs index 021637b..1fae74e 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -27,8 +27,8 @@ use crate::runtime::block_on; /// Write mode for `lance_dataset_write`. /// /// Discriminants are pinned for ABI stability. The FFI accepts this as -/// `int32_t` and validates via [`LanceWriteMode::from_raw`] — storing an -/// out-of-range tag as an enum would be UB. +/// `int32_t` and rejects out-of-range values with `LANCE_ERR_INVALID_ARGUMENT` +/// — storing an out-of-range tag as a `repr(C)` enum would be UB. #[repr(C)] #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum LanceWriteMode { From dbb0acb79ffb0210902e47778619a5359477cb89 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 27 Apr 2026 13:25:17 +0800 Subject: [PATCH 3/4] fix(writer): harden FFI contracts and tighten test coverage from review Addresses feedback gathered across multiple persona reviews of the dataset_write feature. All changes are scoped to the writer surface; docs/ and Cargo.toml changes are intentionally excluded. Rust (src/writer.rs) - Reject empty URI explicitly via `.filter(|s| !s.is_empty())`. - Add `// SAFETY:` comments to every `unsafe` block. - Split combined NULL guard so each error message names the actual argument, and clarify that the NULL-stream check is the only guard that runs before `from_raw` consumes the stream. - Build `WriteParams` immutably via `(!opts.is_empty()).then(|| ...)`. - Document upstream Lance error mapping for Append schema mismatch. C header (include/lance.h) - Change `mode` parameter to `int32_t` to dodge `-fshort-enums` ABI risk. - Document that the function does NOT release `schema` (caller owns it). - Document the `out_dataset` overwrite-without-release contract. C++ wrapper (include/lance.hpp) - Add `StreamGuard` RAII (with `disarm()`) so an exception during storage_opts/kv construction releases the stream. - Construct `SchemaGuard` and zero-init `schema` BEFORE invoking `get_schema`, so a non-conforming producer that partially populates the schema before reporting an error still has it released on unwind. - Defensive `get_schema == nullptr` guard. - Both guards: explicit constructor (C++20 aggregate-init compat), `noexcept` destructor, full Rule of Five (delete copy AND move). - Throw explicit `LANCE_ERR_INTERNAL` if `lance_dataset_write` returns rc==0 with a null `out_dataset`. Tests (tests/c_api_test.rs) - New `CountingReader`, `make_counted_stream`, `assert_stream_consumed`. - New `test_dataset_write_releases_stream_on_every_error_path` covers six branches: NULL uri, NULL schema, invalid mode, empty URI, declared schema mismatch, Lance-level CREATE-on-existing failure. - New `test_dataset_write_leaves_out_dataset_untouched_on_error` uses `std::ptr::without_provenance_mut` sentinel to verify both pre- and post-`block_on` error paths preserve the out-param contract. - Each case constructs its own `FFI_ArrowSchema` for independence. - Tighten append-schema-mismatch assertion to the observed `LanceErrorCode::Internal` (with comment explaining upstream mapping). - Add missing `assert!(!ds.is_null())` guards in append/overwrite tests. Tests (tests/cpp/test_c_api.c, tests/cpp/test_cpp_api.cpp) - Release `schema` BEFORE the ASSERT(rc == 0) check so a failed write no longer leaks the schema before process exit. - Update file-level `Usage:` comments to reflect the new `` arg. --- include/lance.h | 19 ++- include/lance.hpp | 85 +++++++++-- src/writer.rs | 78 +++++++--- tests/c_api_test.rs | 299 ++++++++++++++++++++++++++++++++++++- tests/cpp/test_c_api.c | 11 +- tests/cpp/test_cpp_api.cpp | 2 +- 6 files changed, 454 insertions(+), 40 deletions(-) diff --git a/include/lance.h b/include/lance.h index efed588..42413f1 100644 --- a/include/lance.h +++ b/include/lance.h @@ -485,8 +485,12 @@ int32_t lance_scanner_full_text_search( /** * Write mode for lance_dataset_write. Values are ABI-stable. - * The Rust implementation validates the received integer and rejects any - * out-of-range value with LANCE_ERR_INVALID_ARGUMENT. + * + * The `mode` parameter on the FFI call is a fixed-width int32_t — not this + * enum type — so callers built with `-fshort-enums` or non-default enum + * sizing cannot mismatch the Rust ABI. The Rust implementation validates the + * received integer and rejects any out-of-range value with + * LANCE_ERR_INVALID_ARGUMENT. */ typedef enum { LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */ @@ -501,7 +505,10 @@ typedef enum { * @param uri Dataset URI (file://, s3://, memory://, etc.). Must not * be NULL or an empty string. * @param schema Required Arrow schema. The stream schema must match or - * the call fails with LANCE_ERR_INVALID_ARGUMENT. + * the call fails with LANCE_ERR_INVALID_ARGUMENT. This + * function does NOT call schema->release; the caller + * retains ownership and must release the schema after the + * call returns (success or failure). * @param stream Arrow C Data Interface stream consumed by this call. * Do not use the stream after returning, regardless of * the return code. @@ -511,6 +518,10 @@ typedef enum { * at the newly-committed version (caller must * lance_dataset_close it). Pass NULL to discard. On error * *out_dataset is left unchanged — do not read or free it. + * On entry `*out_dataset` should be NULL or a pointer + * whose previous value is no longer needed; this function + * overwrites the slot on success without releasing any + * prior handle. * @return 0 on success, -1 on error. Possible error codes include * LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path), * LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode, @@ -521,7 +532,7 @@ int32_t lance_dataset_write( const char* uri, const struct ArrowSchema* schema, struct ArrowArrayStream* stream, - LanceWriteMode mode, + int32_t mode, const char* const* storage_opts, LanceDataset** out_dataset ); diff --git a/include/lance.hpp b/include/lance.hpp index ead421b..1f4db1c 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -147,21 +147,72 @@ class Dataset { throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null"); } - // Pull the stream's schema so we can pass it to the C API. + // RAII guard that releases the stream unless `disarm()` is called. + // Until `lance_dataset_write` returns we still own the stream, and any + // exception (failed schema read, std::bad_alloc while building the + // storage-options vector, etc.) must not leak it. `lance_dataset_write` + // consumes the stream on every return path on the Rust side, so we + // disarm immediately before invoking it. + struct StreamGuard { + ArrowArrayStream* s; + bool armed = true; + // Explicit constructor: `= delete`d copy/move ctors disqualify + // this from being an aggregate under C++20, so brace-init like + // `StreamGuard{stream}` would otherwise fail to compile there. + explicit StreamGuard(ArrowArrayStream* p) noexcept : s(p) {} + ~StreamGuard() noexcept { + if (armed && s && s->release) s->release(s); + } + void disarm() noexcept { armed = false; } + StreamGuard(const StreamGuard&) = delete; + StreamGuard& operator=(const StreamGuard&) = delete; + StreamGuard(StreamGuard&&) = delete; + StreamGuard& operator=(StreamGuard&&) = delete; + } stream_guard{stream}; + + // Defensive: a non-conforming or already-released producer may have a + // null `get_schema`. Without this guard a bad caller would crash with + // a null function-pointer dereference on the next line. + if (stream->get_schema == nullptr) { + throw Error(LANCE_ERR_INVALID_ARGUMENT, + "stream get_schema callback is null"); + } + + // Zero-initialize the schema and arm its RAII guard *before* invoking + // `get_schema`, so a non-conforming producer that partially populates + // the schema and then returns an error code still has its `release` + // callback fired during unwind. (Zero-initialized `schema.release` is + // null, making the destructor a no-op on the success-of-error-without- + // population path.) + struct SchemaGuard { + ArrowSchema* s; + // Explicit constructor for the same C++20 aggregate-init reason + // documented on StreamGuard above. + explicit SchemaGuard(ArrowSchema* p) noexcept : s(p) {} + ~SchemaGuard() noexcept { + if (s && s->release) s->release(s); + } + SchemaGuard(const SchemaGuard&) = delete; + SchemaGuard& operator=(const SchemaGuard&) = delete; + SchemaGuard(SchemaGuard&&) = delete; + SchemaGuard& operator=(SchemaGuard&&) = delete; + }; ArrowSchema schema = {}; + SchemaGuard schema_guard{&schema}; + + // Pull the stream's schema so we can pass it to the C API. If this + // fails the StreamGuard releases the stream during unwind, and the + // SchemaGuard above releases any partial schema state the producer + // may have left behind — preserving the "consumed on return or + // throw" contract for both resources. if (stream->get_schema(stream, &schema) != 0) { const char* err = stream->get_last_error ? stream->get_last_error(stream) : nullptr; - throw Error( - LANCE_ERR_INVALID_ARGUMENT, - std::string("failed to read stream schema: ") + - (err ? err : "unknown")); + std::string msg = std::string("failed to read stream schema: ") + + (err ? err : "unknown"); + throw Error(LANCE_ERR_INVALID_ARGUMENT, msg); } - struct SchemaGuard { - ArrowSchema* s; - ~SchemaGuard() { if (s && s->release) s->release(s); } - } guard{&schema}; std::vector kv; for (auto& [k, v] : storage_opts) { @@ -172,15 +223,29 @@ class Dataset { const char* const* opts_ptr = storage_opts.empty() ? nullptr : kv.data(); + // The C API consumes the stream on every return path, so disarm the + // guard before calling. After this point the stream pointer is logically + // owned by Rust and any C++-side exception must not re-release it. + stream_guard.disarm(); + LanceDataset* out = nullptr; int32_t rc = lance_dataset_write( uri.c_str(), &schema, stream, - static_cast(mode), + static_cast(mode), opts_ptr, &out); if (rc != 0) check_error(); + // Defensive null guard: a conforming Rust impl never returns rc == 0 + // with `out == nullptr`, but constructing a Dataset around a null + // handle would silently crash on the first method call. Throw + // explicitly rather than going through `check_error()` because the + // thread-local code is `LANCE_OK` on this path (rc == 0). + if (!out) { + throw Error(LANCE_ERR_INTERNAL, + "lance_dataset_write returned success with null out_dataset"); + } return Dataset(out); } diff --git a/src/writer.rs b/src/writer.rs index 1fae74e..8e2ad6d 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -35,8 +35,11 @@ pub enum LanceWriteMode { /// Create a new dataset. Fails with `LANCE_ERR_DATASET_ALREADY_EXISTS` if /// the path already exists. Create = 0, - /// Append to an existing dataset. Fails with `LANCE_ERR_INVALID_ARGUMENT` - /// if the stream schema is incompatible with the existing dataset schema. + /// Append to an existing dataset. Schema-incompatibility against the + /// existing dataset is reported via the upstream Lance error code, which + /// currently surfaces as `LANCE_ERR_INTERNAL`; the declared-vs-stream + /// schema mismatch handled in this layer surfaces as + /// `LANCE_ERR_INVALID_ARGUMENT`. Append = 1, /// Overwrite an existing dataset (or create one if the path does not exist). Overwrite = 2, @@ -109,16 +112,23 @@ unsafe fn write_dataset_inner( storage_opts: *const *const c_char, out_dataset: *mut *mut LanceDataset, ) -> Result { - if uri.is_null() || schema.is_null() || stream.is_null() { + // The stream NULL check is the only validation that runs *before* the + // stream is consumed; once `from_raw` succeeds, every other return path + // drops `reader`, which fires the FFI release callback. Reordering the + // uri/schema NULL checks ahead of `from_raw` would leak the stream on + // those paths and break the documented "consumed on every return" contract. + if stream.is_null() { return Err(lance_core::Error::InvalidInput { - source: "uri, schema, and stream must not be NULL".into(), + source: "stream must not be NULL".into(), location: snafu::location!(), }); } - // Consume the stream before any other fallible validation. `from_raw` - // swaps the caller's stream into a Rust-owned reader unconditionally, so - // the stream's resources are released on every return path. + // SAFETY: `stream` is non-NULL (checked above) and the caller guarantees + // it points to an initialized, properly-aligned `FFI_ArrowArrayStream` + // owned by them. `from_raw` performs a `ptr::replace` that transfers + // ownership into the returned reader, zeroing the caller's release + // callback so it cannot be released twice. let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| { lance_core::Error::InvalidInput { source: e.to_string().into(), @@ -126,17 +136,41 @@ unsafe fn write_dataset_inner( } })?; + if uri.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "uri must not be NULL".into(), + location: snafu::location!(), + }); + } + if schema.is_null() { + return Err(lance_core::Error::InvalidInput { + source: "schema must not be NULL".into(), + location: snafu::location!(), + }); + } + // Validate the mode at the boundary — storing an out-of-range tag as a // `LanceWriteMode` would be UB. let mode = LanceWriteMode::from_raw(mode)?; - let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| { - lance_core::Error::InvalidInput { + // SAFETY: `uri` is non-NULL (checked above) and the caller guarantees it + // points to a NUL-terminated C string that lives for the duration of this + // call. `parse_c_string`'s lifetime parameter is unconstrained, so we rely + // on the borrow being used only within this synchronous function body — + // which `block_on` enforces by completing before this function returns. + let uri_str = unsafe { helpers::parse_c_string(uri)? } + .filter(|s| !s.is_empty()) + .ok_or_else(|| lance_core::Error::InvalidInput { + // NULL was rejected by the earlier `uri.is_null()` check, so the + // only remaining failure here is the empty string. source: "uri must not be empty".into(), location: snafu::location!(), - } - })?; + })?; + // SAFETY: `schema` is non-NULL (checked above) and the caller guarantees + // it points to a properly-initialized `FFI_ArrowSchema` valid for the + // duration of this call. `try_from(&FFI_ArrowSchema)` reads by shared + // reference and does not move out of or release the schema. let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| { lance_core::Error::InvalidInput { source: format!("invalid schema: {e}").into(), @@ -144,6 +178,9 @@ unsafe fn write_dataset_inner( } })?; + // SAFETY: `storage_opts` is either NULL or a NULL-terminated array of + // C-string pointers per the FFI contract; `parse_storage_options` returns + // an empty map for NULL. let opts = unsafe { helpers::parse_storage_options(storage_opts)? }; // Fail fast: compare the stream schema against the caller-provided schema. @@ -158,18 +195,15 @@ unsafe fn write_dataset_inner( }); } - let mut params = WriteParams { + let store_params = (!opts.is_empty()).then(|| ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(opts))), + ..ObjectStoreParams::default() + }); + let params = WriteParams { mode: mode.into(), + store_params, ..WriteParams::default() }; - if !opts.is_empty() { - params.store_params = Some(ObjectStoreParams { - storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( - opts, - ))), - ..ObjectStoreParams::default() - }); - } let dataset = block_on(Dataset::write(reader, uri_str, Some(params)))?; @@ -177,6 +211,10 @@ unsafe fn write_dataset_inner( let handle = LanceDataset { inner: RwLock::new(Arc::new(dataset)), }; + // SAFETY: `out_dataset` is non-NULL (checked above) and the caller + // guarantees it points to caller-owned, writable storage of size + // `sizeof(LanceDataset*)`. We only write on success; on any error + // path the early returns above leave `*out_dataset` untouched. unsafe { *out_dataset = Box::into_raw(Box::new(handle)); } diff --git a/tests/c_api_test.rs b/tests/c_api_test.rs index d344b42..24b8c02 100644 --- a/tests/c_api_test.rs +++ b/tests/c_api_test.rs @@ -2818,6 +2818,7 @@ fn test_dataset_write_append_accumulates_rows() { assert_eq!(rc, 0); let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); assert_eq!(unsafe { lance_dataset_count_rows(ds) }, 5); unsafe { lance_dataset_close(ds) }; } @@ -2857,6 +2858,7 @@ fn test_dataset_write_overwrite_replaces_rows() { assert_eq!(rc, 0); let ds = unsafe { lance_dataset_open(c_uri.as_ptr(), ptr::null(), 0) }; + assert!(!ds.is_null()); assert_eq!( unsafe { lance_dataset_count_rows(ds) }, 2, @@ -3017,7 +3019,10 @@ fn test_dataset_write_append_schema_mismatch_fails() { ) }; assert_eq!(rc, -1); - assert_ne!(lance_last_error_code(), LanceErrorCode::Ok); + // Upstream Lance currently surfaces append-with-mismatched-schema as + // `Internal` rather than `InvalidArgument`. Lock the assertion to the + // observed code so we notice (and can revisit the mapping) if it changes. + assert_eq!(lance_last_error_code(), LanceErrorCode::Internal); } #[test] @@ -3166,3 +3171,295 @@ fn test_dataset_write_null_args_return_error() { assert_eq!(rc, -1); assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); } + +/// A `RecordBatchReader` that bumps a shared counter when it is dropped. +/// Wrapping this in an `FFI_ArrowArrayStream` lets a test observe whether the +/// stream's `release` callback was invoked: dropping the boxed reader (via +/// `release` on the FFI side) fires `Drop` and increments the counter. +struct CountingReader { + inner: arrow::record_batch::RecordBatchIterator< + std::vec::IntoIter>, + >, + drop_count: Arc, +} + +impl Drop for CountingReader { + fn drop(&mut self) { + self.drop_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } +} + +impl Iterator for CountingReader { + type Item = arrow::error::Result; + fn next(&mut self) -> Option { + self.inner.next() + } +} + +impl RecordBatchReader for CountingReader { + fn schema(&self) -> Arc { + self.inner.schema() + } +} + +/// Build a `(stream, drop_counter)` pair where the stream wraps a single-batch +/// reader whose `Drop` increments the counter. After a call that consumes the +/// stream, the counter goes from 0 → 1. +fn make_counted_stream( + schema: &Arc, +) -> (FFI_ArrowArrayStream, Arc) { + let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let reader = CountingReader { + inner: arrow::record_batch::RecordBatchIterator::new( + vec![Ok(write_batch(vec![1], vec![1.0]))].into_iter(), + schema.clone(), + ), + drop_count: drop_count.clone(), + }; + (FFI_ArrowArrayStream::new(Box::new(reader)), drop_count) +} + +fn assert_stream_consumed( + _stream: &FFI_ArrowArrayStream, + drop_count: &Arc, +) { + // The drop count is the real behavioral check — it can only reach 1 if + // the FFI release callback fired, which is what frees the boxed reader. + // (We do not also assert `stream.release.is_none()` because `from_raw` + // unconditionally clears that field via `ptr::replace` before any other + // work; the assertion would be vacuously true on every path.) + assert_eq!( + drop_count.load(std::sync::atomic::Ordering::SeqCst), + 1, + "stream's release callback must fire exactly once during the call" + ); +} + +/// FFI contract: every error path that received a non-NULL stream must also +/// release it, so the C caller never has to. We assert this by wrapping the +/// reader in a `Drop`-counter and checking the counter immediately after each +/// `lance_dataset_write` call. The cases below exercise every validation +/// branch in `write_dataset_inner` that runs *after* the stream has been +/// consumed via `from_raw` — including NULL uri/schema, which were previously +/// gated *before* consumption (the bug R1 fixed). +#[test] +fn test_dataset_write_releases_stream_on_every_error_path() { + let schema = write_schema(); + let c_uri = c_str("memory://x"); + + // Each case that passes a non-NULL schema constructs its own + // `FFI_ArrowSchema` via `schema_to_ffi` so the cases stay independent: a + // hypothetical regression where Rust accidentally consumes the schema + // would surface as an immediate failure here instead of silently + // corrupting later cases. Case 2 deliberately passes `ptr::null()` and + // therefore needs no schema construction. + + // Case 1: NULL uri. + let (mut stream, drop_count) = make_counted_stream(&schema); + let ffi_schema = schema_to_ffi(&schema); + let rc = unsafe { + lance_dataset_write( + ptr::null(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + // Case 2: NULL schema. + let (mut stream, drop_count) = make_counted_stream(&schema); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + ptr::null(), + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + // Case 3: invalid mode. + let (mut stream, drop_count) = make_counted_stream(&schema); + let ffi_schema = schema_to_ffi(&schema); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + 99, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + // Case 4: empty URI. + let (mut stream, drop_count) = make_counted_stream(&schema); + let ffi_schema = schema_to_ffi(&schema); + let empty_uri = c_str(""); + let rc = unsafe { + lance_dataset_write( + empty_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + // Case 5: declared-schema mismatch. + let (mut stream, drop_count) = make_counted_stream(&schema); + let one_col_schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let ffi_schema = schema_to_ffi(&one_col_schema); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_stream_consumed(&stream, &drop_count); + + // Case 6: Lance-level rejection (CREATE on an existing dataset). This is + // the only error path that fails inside `block_on(Dataset::write)` after + // the stream has been moved into the upstream writer. Verifies the stream + // is still released even when the failure originates upstream. + let tmp = tempfile::tempdir().unwrap(); + let existing_uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_existing = c_str(&existing_uri); + // Seed the path with an initial dataset. + let mut seed_stream = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let seed_schema = schema_to_ffi(&schema); + let rc = unsafe { + lance_dataset_write( + c_existing.as_ptr(), + &seed_schema, + &mut seed_stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + // Now CREATE again — expected to fail with DatasetAlreadyExists, and the + // stream must still be released by the failure path. + let ffi_schema = schema_to_ffi(&schema); + let (mut stream, drop_count) = make_counted_stream(&schema); + let rc = unsafe { + lance_dataset_write( + c_existing.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, -1); + assert_eq!( + lance_last_error_code(), + LanceErrorCode::DatasetAlreadyExists + ); + assert_stream_consumed(&stream, &drop_count); +} + +/// On error, `*out_dataset` must be left untouched. A caller that passes +/// `&mut some_existing_handle` (perhaps re-using the slot) must be able to +/// trust that a failed call does not silently overwrite or close their handle. +/// Covers both pre-`block_on` validation errors (NULL uri) and Lance-level +/// errors (CREATE on existing) — the contract holds across the success-prep +/// boundary. +#[test] +fn test_dataset_write_leaves_out_dataset_untouched_on_error() { + let schema = write_schema(); + + // Sentinel that is non-NULL but otherwise invalid. `without_provenance_mut` + // (stable since 1.84) creates the pointer without exposing provenance — + // strict-provenance-clean. We never dereference it; the test only checks + // value equality after the call to confirm `*out_dataset` was not written. + let sentinel: *mut LanceDataset = std::ptr::without_provenance_mut(0xDEAD_BEEF); + + // Case 1: pre-`block_on` validation error (NULL uri). + let mut stream = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let ffi_schema = schema_to_ffi(&schema); + let mut out_ds = sentinel; + let rc = unsafe { + lance_dataset_write( + ptr::null(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + &mut out_ds, + ) + }; + assert_eq!(rc, -1); + assert_eq!(lance_last_error_code(), LanceErrorCode::InvalidArgument); + assert_eq!( + out_ds, sentinel, + "*out_dataset must be untouched on pre-block_on error" + ); + + // Case 2: Lance-level error (CREATE on an existing dataset). Verifies the + // contract still holds when failure originates inside `block_on(write)`. + let tmp = tempfile::tempdir().unwrap(); + let uri = tmp.path().join("ds").to_str().unwrap().to_string(); + let c_uri = c_str(&uri); + let mut seed_stream = batch_to_ffi_stream(write_batch(vec![1], vec![1.0])); + let seed_schema = schema_to_ffi(&schema); + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &seed_schema, + &mut seed_stream, + LanceWriteMode::Create as i32, + ptr::null(), + ptr::null_mut(), + ) + }; + assert_eq!(rc, 0); + + let mut stream = batch_to_ffi_stream(write_batch(vec![2], vec![2.0])); + let ffi_schema = schema_to_ffi(&schema); + let mut out_ds = sentinel; + let rc = unsafe { + lance_dataset_write( + c_uri.as_ptr(), + &ffi_schema, + &mut stream, + LanceWriteMode::Create as i32, + ptr::null(), + &mut out_ds, + ) + }; + assert_eq!(rc, -1); + assert_eq!( + lance_last_error_code(), + LanceErrorCode::DatasetAlreadyExists + ); + assert_eq!( + out_ds, sentinel, + "*out_dataset must be untouched on Lance-level error" + ); +} diff --git a/tests/cpp/test_c_api.c b/tests/cpp/test_c_api.c index 242e35e..a6a1efa 100644 --- a/tests/cpp/test_c_api.c +++ b/tests/cpp/test_c_api.c @@ -8,7 +8,7 @@ * This file is compiled by the Rust integration test to verify that * lance.h is valid C and the API works end-to-end. * - * Usage: test_c_api + * Usage: test_c_api */ #include "lance.h" @@ -227,12 +227,15 @@ static void test_dataset_write_roundtrip(const char *src_uri, const char *dst_ur LanceDataset *dst = NULL; rc = lance_dataset_write( dst_uri, &schema, &stream, LANCE_WRITE_CREATE, NULL, &dst); - ASSERT(rc == 0, "lance_dataset_write failed"); - ASSERT(dst != NULL, "out_dataset should be populated"); - /* stream is consumed by lance_dataset_write; schema we own. */ + /* The Rust side reads `schema` by shared reference and never releases it, + * so we must release it ourselves on every return path — including + * failure. Release before the ASSERTs so a failed write doesn't leak. */ if (schema.release) schema.release(&schema); + ASSERT(rc == 0, "lance_dataset_write failed"); + ASSERT(dst != NULL, "out_dataset should be populated"); + uint64_t dst_rows = lance_dataset_count_rows(dst); CHECK_OK(); ASSERT(dst_rows == src_rows, "row count mismatch after write"); diff --git a/tests/cpp/test_cpp_api.cpp b/tests/cpp/test_cpp_api.cpp index eb7b7f4..aa1b834 100644 --- a/tests/cpp/test_cpp_api.cpp +++ b/tests/cpp/test_cpp_api.cpp @@ -7,7 +7,7 @@ * * Tests the RAII wrappers, exception handling, and builder pattern. * - * Usage: test_cpp_api + * Usage: test_cpp_api */ #include "lance.hpp" From 1a9e466bac09b68370a0b3ece7268219503b4b98 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 27 Apr 2026 13:27:17 +0800 Subject: [PATCH 4/4] docs(writer): tighten review-added comments Cut redundant phrasing from comments added in dbb0acb while keeping the non-obvious WHY: - writer.rs empty-URI branch: replace 2-line "NULL was rejected by the earlier check" with a single-line variant. - lance.hpp StreamGuard intro: drop the prose describing what the destructor obviously does; keep the "must release on exception, Rust owns it after the call" rationale. - lance.hpp SchemaGuard ordering: collapse the parenthetical that restates the destructor's null-release check into a tail clause. - lance.hpp get_schema-failure block: drop the "Pull the stream's schema so we can pass it to the C API" lead, which the next line already says. No code changes; 13/13 writer tests pass. --- include/lance.hpp | 28 +++++++++++----------------- src/writer.rs | 3 +-- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/include/lance.hpp b/include/lance.hpp index 1f4db1c..135b8a5 100644 --- a/include/lance.hpp +++ b/include/lance.hpp @@ -147,12 +147,10 @@ class Dataset { throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null"); } - // RAII guard that releases the stream unless `disarm()` is called. - // Until `lance_dataset_write` returns we still own the stream, and any - // exception (failed schema read, std::bad_alloc while building the - // storage-options vector, etc.) must not leak it. `lance_dataset_write` - // consumes the stream on every return path on the Rust side, so we - // disarm immediately before invoking it. + // RAII guard for the stream. Until `lance_dataset_write` is called, + // any exception (failed `get_schema`, `std::bad_alloc` while building + // `kv`, etc.) must release the stream. After that call Rust owns it, + // so we `disarm()` immediately before invoking the C API. struct StreamGuard { ArrowArrayStream* s; bool armed = true; @@ -178,12 +176,10 @@ class Dataset { "stream get_schema callback is null"); } - // Zero-initialize the schema and arm its RAII guard *before* invoking - // `get_schema`, so a non-conforming producer that partially populates - // the schema and then returns an error code still has its `release` - // callback fired during unwind. (Zero-initialized `schema.release` is - // null, making the destructor a no-op on the success-of-error-without- - // population path.) + // Arm SchemaGuard before calling `get_schema` so a non-conforming + // producer that partially populates the schema before returning an + // error still has its `release` fired on unwind. The zero-init keeps + // the destructor a no-op on the clean-error path (release == null). struct SchemaGuard { ArrowSchema* s; // Explicit constructor for the same C++20 aggregate-init reason @@ -200,11 +196,9 @@ class Dataset { ArrowSchema schema = {}; SchemaGuard schema_guard{&schema}; - // Pull the stream's schema so we can pass it to the C API. If this - // fails the StreamGuard releases the stream during unwind, and the - // SchemaGuard above releases any partial schema state the producer - // may have left behind — preserving the "consumed on return or - // throw" contract for both resources. + // On failure, StreamGuard releases the stream and SchemaGuard + // releases any partial schema state — preserving the "consumed on + // return or throw" contract for both resources. if (stream->get_schema(stream, &schema) != 0) { const char* err = stream->get_last_error ? stream->get_last_error(stream) diff --git a/src/writer.rs b/src/writer.rs index 8e2ad6d..f69f45b 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -161,8 +161,7 @@ unsafe fn write_dataset_inner( let uri_str = unsafe { helpers::parse_c_string(uri)? } .filter(|s| !s.is_empty()) .ok_or_else(|| lance_core::Error::InvalidInput { - // NULL was rejected by the earlier `uri.is_null()` check, so the - // only remaining failure here is the empty string. + // NULL is rejected above; only the empty case reaches here. source: "uri must not be empty".into(), location: snafu::location!(), })?;