Skip to content

Commit b3e813c

Browse files
committed
feat: add lance_dataset_write for create/append/overwrite from ArrowArrayStream
Writes an ArrowArrayStream into a Lance dataset with a committed manifest. A mode enum (CREATE / APPEND / OVERWRITE) and an optional out_dataset that returns the open dataset at the new version (so callers don't need to reopen). Structure follows fragment_writer.rs: schema fail-fast, storage options pass-through, thread-local errors. C++ gets a scoped WriteMode enum and a lance::Dataset::write() static method that reads the stream's schema automatically. Two FFI details: - mode is received as i32 and validated via LanceWriteMode::from_raw; accepting the enum directly would be UB for out-of-range values. - The stream is consumed via ArrowArrayStreamReader::from_raw right after the NULL check so the "consumed on any return" contract holds. Tests: 11 new Rust unit tests (CREATE/APPEND/OVERWRITE happy paths, OVERWRITE-on-missing, CREATE-on-existing, schema mismatches, empty stream, NULL args, invalid mode, out_dataset propagation). The ignored C and C++ integration tests now do a scan->write round-trip. Closes #14.
1 parent d721501 commit b3e813c

8 files changed

Lines changed: 810 additions & 10 deletions

File tree

include/lance.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,51 @@ int32_t lance_write_fragments(
310310
const char* const* storage_opts
311311
);
312312

313+
/* ─── Dataset writer ─── */
314+
315+
/**
316+
* Write mode for lance_dataset_write. Values are ABI-stable.
317+
* The Rust implementation validates the received integer and rejects any
318+
* out-of-range value with LANCE_ERR_INVALID_ARGUMENT.
319+
*/
320+
typedef enum {
321+
LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */
322+
LANCE_WRITE_APPEND = 1, /* Append; fail if the new schema is incompatible. */
323+
LANCE_WRITE_OVERWRITE = 2, /* Overwrite existing, or create if missing. */
324+
} LanceWriteMode;
325+
326+
/**
327+
* Write an Arrow record batch stream to a Lance dataset at `uri`, committing
328+
* a manifest.
329+
*
330+
* @param uri Dataset URI (file://, s3://, memory://, etc.). Must not
331+
* be NULL or an empty string.
332+
* @param schema Required Arrow schema. The stream schema must match or
333+
* the call fails with LANCE_ERR_INVALID_ARGUMENT.
334+
* @param stream Arrow C Data Interface stream consumed by this call.
335+
* Do not use the stream after returning, regardless of
336+
* the return code.
337+
* @param mode CREATE / APPEND / OVERWRITE (see LanceWriteMode).
338+
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
339+
* @param out_dataset If non-NULL, on success receives an open LanceDataset*
340+
* at the newly-committed version (caller must
341+
* lance_dataset_close it). Pass NULL to discard. On error
342+
* *out_dataset is left unchanged — do not read or free it.
343+
* @return 0 on success, -1 on error. Possible error codes include
344+
* LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path),
345+
* LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode,
346+
* schema mismatch),
347+
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
348+
*/
349+
int32_t lance_dataset_write(
350+
const char* uri,
351+
const struct ArrowSchema* schema,
352+
struct ArrowArrayStream* stream,
353+
LanceWriteMode mode,
354+
const char* const* storage_opts,
355+
LanceDataset** out_dataset
356+
);
357+
313358
#ifdef __cplusplus
314359
} /* extern "C" */
315360
#endif

include/lance.hpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "lance.h"
1919

20+
#include <cstdint>
2021
#include <memory>
2122
#include <stdexcept>
2223
#include <string>
@@ -84,6 +85,14 @@ class Handle {
8485

8586
class Scanner;
8687

88+
// ─── Write mode ──────────────────────────────────────────────────────────────
89+
90+
enum class WriteMode : int32_t {
91+
Create = LANCE_WRITE_CREATE,
92+
Append = LANCE_WRITE_APPEND,
93+
Overwrite = LANCE_WRITE_OVERWRITE,
94+
};
95+
8796
// ─── Dataset ─────────────────────────────────────────────────────────────────
8897

8998
class Dataset {
@@ -112,6 +121,59 @@ class Dataset {
112121
return Dataset(ds);
113122
}
114123

124+
/// Write an Arrow record batch stream to a Lance dataset and return the
125+
/// open dataset at the committed version.
126+
///
127+
/// The stream must be self-describing; its own schema is used. Treat the
128+
/// stream as consumed once this call returns or throws — do not reuse it.
129+
/// Throws lance::Error on failure (including if `stream` is null).
130+
static Dataset write(
131+
const std::string& uri,
132+
ArrowArrayStream* stream,
133+
WriteMode mode,
134+
const std::vector<std::pair<std::string, std::string>>& storage_opts = {}) {
135+
136+
if (stream == nullptr) {
137+
throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null");
138+
}
139+
140+
// Pull the stream's schema so we can pass it to the C API.
141+
ArrowSchema schema = {};
142+
if (stream->get_schema(stream, &schema) != 0) {
143+
const char* err = stream->get_last_error
144+
? stream->get_last_error(stream)
145+
: nullptr;
146+
throw Error(
147+
LANCE_ERR_INVALID_ARGUMENT,
148+
std::string("failed to read stream schema: ") +
149+
(err ? err : "unknown"));
150+
}
151+
struct SchemaGuard {
152+
ArrowSchema* s;
153+
~SchemaGuard() { if (s && s->release) s->release(s); }
154+
} guard{&schema};
155+
156+
std::vector<const char*> kv;
157+
for (auto& [k, v] : storage_opts) {
158+
kv.push_back(k.c_str());
159+
kv.push_back(v.c_str());
160+
}
161+
kv.push_back(nullptr);
162+
const char* const* opts_ptr =
163+
storage_opts.empty() ? nullptr : kv.data();
164+
165+
LanceDataset* out = nullptr;
166+
int32_t rc = lance_dataset_write(
167+
uri.c_str(),
168+
&schema,
169+
stream,
170+
static_cast<LanceWriteMode>(mode),
171+
opts_ptr,
172+
&out);
173+
if (rc != 0) check_error();
174+
return Dataset(out);
175+
}
176+
115177
/// Number of rows in the dataset.
116178
uint64_t count_rows() const {
117179
uint64_t n = lance_dataset_count_rows(handle_.get());

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod fragment_writer;
2323
mod helpers;
2424
pub mod runtime;
2525
mod scanner;
26+
mod writer;
2627

2728
// Re-export all extern "C" symbols so they appear in the cdylib.
2829
pub use batch::*;
@@ -32,3 +33,4 @@ pub use error::{
3233
};
3334
pub use fragment_writer::*;
3435
pub use scanner::*;
36+
pub use writer::*;

src/writer.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Dataset write C API: create, append, or overwrite a Lance dataset from an
5+
//! Arrow C Data Interface stream, committing a manifest.
6+
//!
7+
//! Mirrors the structure of `src/fragment_writer.rs` but produces a full
8+
//! dataset with a committed manifest rather than uncommitted fragment files.
9+
10+
use std::ffi::c_char;
11+
use std::sync::Arc;
12+
13+
use arrow::ffi::FFI_ArrowSchema;
14+
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
15+
use arrow::record_batch::RecordBatchReader;
16+
use arrow_schema::Schema as ArrowSchema;
17+
use lance::Dataset;
18+
use lance::dataset::{WriteMode as LanceWriteModeUpstream, WriteParams};
19+
use lance_core::Result;
20+
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
21+
22+
use crate::dataset::LanceDataset;
23+
use crate::error::ffi_try;
24+
use crate::helpers;
25+
use crate::runtime::block_on;
26+
27+
/// Write mode for `lance_dataset_write`.
28+
///
29+
/// Discriminants are pinned for ABI stability. The FFI accepts this as
30+
/// `int32_t` and validates via [`LanceWriteMode::from_raw`] — storing an
31+
/// out-of-range tag as an enum would be UB.
32+
#[repr(C)]
33+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34+
pub enum LanceWriteMode {
35+
/// Create a new dataset. Fails with `LANCE_ERR_DATASET_ALREADY_EXISTS` if
36+
/// the path already exists.
37+
Create = 0,
38+
/// Append to an existing dataset. Fails with `LANCE_ERR_INVALID_ARGUMENT`
39+
/// if the stream schema is incompatible with the existing dataset schema.
40+
Append = 1,
41+
/// Overwrite an existing dataset (or create one if the path does not exist).
42+
Overwrite = 2,
43+
}
44+
45+
impl LanceWriteMode {
46+
/// Validate a raw FFI integer into a `LanceWriteMode`. Out-of-range
47+
/// values become `InvalidInput`.
48+
fn from_raw(raw: i32) -> Result<Self> {
49+
match raw {
50+
0 => Ok(Self::Create),
51+
1 => Ok(Self::Append),
52+
2 => Ok(Self::Overwrite),
53+
other => Err(lance_core::Error::InvalidInput {
54+
source: format!(
55+
"invalid write mode {other}; expected 0 (create), 1 (append), or 2 (overwrite)"
56+
)
57+
.into(),
58+
location: snafu::location!(),
59+
}),
60+
}
61+
}
62+
}
63+
64+
impl From<LanceWriteMode> for LanceWriteModeUpstream {
65+
fn from(mode: LanceWriteMode) -> Self {
66+
match mode {
67+
LanceWriteMode::Create => LanceWriteModeUpstream::Create,
68+
LanceWriteMode::Append => LanceWriteModeUpstream::Append,
69+
LanceWriteMode::Overwrite => LanceWriteModeUpstream::Overwrite,
70+
}
71+
}
72+
}
73+
74+
/// Write an Arrow record batch stream to a Lance dataset at `uri`, committing a manifest.
75+
///
76+
/// - `uri`: Dataset URI (`file://`, `s3://`, `memory://`, ...). Must not be NULL or empty.
77+
/// - `schema`: Caller-provided Arrow schema. The stream's schema must match;
78+
/// mismatch returns `LANCE_ERR_INVALID_ARGUMENT`.
79+
/// - `stream`: Arrow C Data Interface stream. Consumed by this call — the
80+
/// caller must not use it again on any return path.
81+
/// - `mode`: `LANCE_WRITE_CREATE` (0), `LANCE_WRITE_APPEND` (1), or
82+
/// `LANCE_WRITE_OVERWRITE` (2). Any other value → `LANCE_ERR_INVALID_ARGUMENT`.
83+
/// - `storage_opts`: NULL-terminated key-value pairs `["k","v",NULL]`, or NULL.
84+
/// - `out_dataset`: If non-NULL, receives an open `LanceDataset*` at the new
85+
/// version on success (caller closes). Pass NULL to discard. On error
86+
/// `*out_dataset` is untouched — do not read or free it.
87+
///
88+
/// Returns 0 on success, -1 on error.
89+
#[unsafe(no_mangle)]
90+
pub unsafe extern "C" fn lance_dataset_write(
91+
uri: *const c_char,
92+
schema: *const FFI_ArrowSchema,
93+
stream: *mut FFI_ArrowArrayStream,
94+
mode: i32,
95+
storage_opts: *const *const c_char,
96+
out_dataset: *mut *mut LanceDataset,
97+
) -> i32 {
98+
ffi_try!(
99+
unsafe { write_dataset_inner(uri, schema, stream, mode, storage_opts, out_dataset) },
100+
neg
101+
)
102+
}
103+
104+
unsafe fn write_dataset_inner(
105+
uri: *const c_char,
106+
schema: *const FFI_ArrowSchema,
107+
stream: *mut FFI_ArrowArrayStream,
108+
mode: i32,
109+
storage_opts: *const *const c_char,
110+
out_dataset: *mut *mut LanceDataset,
111+
) -> Result<i32> {
112+
if uri.is_null() || schema.is_null() || stream.is_null() {
113+
return Err(lance_core::Error::InvalidInput {
114+
source: "uri, schema, and stream must not be NULL".into(),
115+
location: snafu::location!(),
116+
});
117+
}
118+
119+
// Consume the stream before any other fallible validation. `from_raw`
120+
// swaps the caller's stream into a Rust-owned reader unconditionally, so
121+
// the stream's resources are released on every return path.
122+
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| {
123+
lance_core::Error::InvalidInput {
124+
source: e.to_string().into(),
125+
location: snafu::location!(),
126+
}
127+
})?;
128+
129+
// Validate the mode at the boundary — storing an out-of-range tag as a
130+
// `LanceWriteMode` would be UB.
131+
let mode = LanceWriteMode::from_raw(mode)?;
132+
133+
let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| {
134+
lance_core::Error::InvalidInput {
135+
source: "uri must not be empty".into(),
136+
location: snafu::location!(),
137+
}
138+
})?;
139+
140+
let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| {
141+
lance_core::Error::InvalidInput {
142+
source: format!("invalid schema: {e}").into(),
143+
location: snafu::location!(),
144+
}
145+
})?;
146+
147+
let opts = unsafe { helpers::parse_storage_options(storage_opts)? };
148+
149+
// Fail fast: compare the stream schema against the caller-provided schema.
150+
let stream_schema = reader.schema();
151+
if stream_schema.fields() != expected_schema.fields() {
152+
return Err(lance_core::Error::InvalidInput {
153+
source: format!(
154+
"stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}"
155+
)
156+
.into(),
157+
location: snafu::location!(),
158+
});
159+
}
160+
161+
let mut params = WriteParams {
162+
mode: mode.into(),
163+
..WriteParams::default()
164+
};
165+
if !opts.is_empty() {
166+
params.store_params = Some(ObjectStoreParams {
167+
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
168+
opts,
169+
))),
170+
..ObjectStoreParams::default()
171+
});
172+
}
173+
174+
let dataset = block_on(Dataset::write(reader, uri_str, Some(params)))?;
175+
176+
if !out_dataset.is_null() {
177+
let handle = LanceDataset {
178+
inner: Arc::new(dataset),
179+
};
180+
unsafe {
181+
*out_dataset = Box::into_raw(Box::new(handle));
182+
}
183+
}
184+
185+
Ok(0)
186+
}

0 commit comments

Comments
 (0)