Skip to content

Commit 8a9325e

Browse files
committed
feat: add lance_dataset_write for create/append/overwrite from ArrowArrayStream
Writes an ArrowArrayStream into a Lance dataset with a committed manifest. A mode enum (CREATE / APPEND / OVERWRITE) and an optional out_dataset that returns the open dataset at the new version (so callers don't need to reopen). Structure follows fragment_writer.rs: schema fail-fast, storage options pass-through, thread-local errors. C++ gets a scoped WriteMode enum and a lance::Dataset::write() static method that reads the stream's schema automatically. Two FFI details: - mode is received as i32 and validated via LanceWriteMode::from_raw; accepting the enum directly would be UB for out-of-range values. - The stream is consumed via ArrowArrayStreamReader::from_raw right after the NULL check so the "consumed on any return" contract holds. Tests: 11 new Rust unit tests (CREATE/APPEND/OVERWRITE happy paths, OVERWRITE-on-missing, CREATE-on-existing, schema mismatches, empty stream, NULL args, invalid mode, out_dataset propagation). The ignored C and C++ integration tests now do a scan->write round-trip. Closes #14.
1 parent 5e201c7 commit 8a9325e

8 files changed

Lines changed: 810 additions & 10 deletions

File tree

include/lance.h

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,51 @@ int32_t lance_write_fragments(
340340
const char* const* storage_opts
341341
);
342342

343+
/* ─── Dataset writer ─── */
344+
345+
/**
346+
* Write mode for lance_dataset_write. Values are ABI-stable.
347+
* The Rust implementation validates the received integer and rejects any
348+
* out-of-range value with LANCE_ERR_INVALID_ARGUMENT.
349+
*/
350+
typedef enum {
351+
LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */
352+
LANCE_WRITE_APPEND = 1, /* Append; fail if the new schema is incompatible. */
353+
LANCE_WRITE_OVERWRITE = 2, /* Overwrite existing, or create if missing. */
354+
} LanceWriteMode;
355+
356+
/**
357+
* Write an Arrow record batch stream to a Lance dataset at `uri`, committing
358+
* a manifest.
359+
*
360+
* @param uri Dataset URI (file://, s3://, memory://, etc.). Must not
361+
* be NULL or an empty string.
362+
* @param schema Required Arrow schema. The stream schema must match or
363+
* the call fails with LANCE_ERR_INVALID_ARGUMENT.
364+
* @param stream Arrow C Data Interface stream consumed by this call.
365+
* Do not use the stream after returning, regardless of
366+
* the return code.
367+
* @param mode CREATE / APPEND / OVERWRITE (see LanceWriteMode).
368+
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
369+
* @param out_dataset If non-NULL, on success receives an open LanceDataset*
370+
* at the newly-committed version (caller must
371+
* lance_dataset_close it). Pass NULL to discard. On error
372+
* *out_dataset is left unchanged — do not read or free it.
373+
* @return 0 on success, -1 on error. Possible error codes include
374+
* LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path),
375+
* LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode,
376+
* schema mismatch),
377+
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
378+
*/
379+
int32_t lance_dataset_write(
380+
const char* uri,
381+
const struct ArrowSchema* schema,
382+
struct ArrowArrayStream* stream,
383+
LanceWriteMode mode,
384+
const char* const* storage_opts,
385+
LanceDataset** out_dataset
386+
);
387+
343388
#ifdef __cplusplus
344389
} /* extern "C" */
345390
#endif

include/lance.hpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "lance.h"
1919

20+
#include <cstdint>
2021
#include <memory>
2122
#include <stdexcept>
2223
#include <string>
@@ -94,6 +95,14 @@ struct VersionInfo {
9495
int64_t timestamp_ms;
9596
};
9697

98+
// ─── Write mode ──────────────────────────────────────────────────────────────
99+
100+
enum class WriteMode : int32_t {
101+
Create = LANCE_WRITE_CREATE,
102+
Append = LANCE_WRITE_APPEND,
103+
Overwrite = LANCE_WRITE_OVERWRITE,
104+
};
105+
97106
// ─── Dataset ─────────────────────────────────────────────────────────────────
98107

99108
class Dataset {
@@ -122,6 +131,59 @@ class Dataset {
122131
return Dataset(ds);
123132
}
124133

134+
/// Write an Arrow record batch stream to a Lance dataset and return the
135+
/// open dataset at the committed version.
136+
///
137+
/// The stream must be self-describing; its own schema is used. Treat the
138+
/// stream as consumed once this call returns or throws — do not reuse it.
139+
/// Throws lance::Error on failure (including if `stream` is null).
140+
static Dataset write(
141+
const std::string& uri,
142+
ArrowArrayStream* stream,
143+
WriteMode mode,
144+
const std::vector<std::pair<std::string, std::string>>& storage_opts = {}) {
145+
146+
if (stream == nullptr) {
147+
throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null");
148+
}
149+
150+
// Pull the stream's schema so we can pass it to the C API.
151+
ArrowSchema schema = {};
152+
if (stream->get_schema(stream, &schema) != 0) {
153+
const char* err = stream->get_last_error
154+
? stream->get_last_error(stream)
155+
: nullptr;
156+
throw Error(
157+
LANCE_ERR_INVALID_ARGUMENT,
158+
std::string("failed to read stream schema: ") +
159+
(err ? err : "unknown"));
160+
}
161+
struct SchemaGuard {
162+
ArrowSchema* s;
163+
~SchemaGuard() { if (s && s->release) s->release(s); }
164+
} guard{&schema};
165+
166+
std::vector<const char*> kv;
167+
for (auto& [k, v] : storage_opts) {
168+
kv.push_back(k.c_str());
169+
kv.push_back(v.c_str());
170+
}
171+
kv.push_back(nullptr);
172+
const char* const* opts_ptr =
173+
storage_opts.empty() ? nullptr : kv.data();
174+
175+
LanceDataset* out = nullptr;
176+
int32_t rc = lance_dataset_write(
177+
uri.c_str(),
178+
&schema,
179+
stream,
180+
static_cast<LanceWriteMode>(mode),
181+
opts_ptr,
182+
&out);
183+
if (rc != 0) check_error();
184+
return Dataset(out);
185+
}
186+
125187
/// Number of rows in the dataset.
126188
uint64_t count_rows() const {
127189
uint64_t n = lance_dataset_count_rows(handle_.get());

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod helpers;
2424
pub mod runtime;
2525
mod scanner;
2626
mod versions;
27+
mod writer;
2728

2829
// Re-export all extern "C" symbols so they appear in the cdylib.
2930
pub use batch::*;
@@ -34,3 +35,4 @@ pub use error::{
3435
pub use fragment_writer::*;
3536
pub use scanner::*;
3637
pub use versions::*;
38+
pub use writer::*;

src/writer.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Dataset write C API: create, append, or overwrite a Lance dataset from an
5+
//! Arrow C Data Interface stream, committing a manifest.
6+
//!
7+
//! Mirrors the structure of `src/fragment_writer.rs` but produces a full
8+
//! dataset with a committed manifest rather than uncommitted fragment files.
9+
10+
use std::ffi::c_char;
11+
use std::sync::Arc;
12+
13+
use arrow::ffi::FFI_ArrowSchema;
14+
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
15+
use arrow::record_batch::RecordBatchReader;
16+
use arrow_schema::Schema as ArrowSchema;
17+
use lance::Dataset;
18+
use lance::dataset::{WriteMode as LanceWriteModeUpstream, WriteParams};
19+
use lance_core::Result;
20+
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
21+
22+
use crate::dataset::LanceDataset;
23+
use crate::error::ffi_try;
24+
use crate::helpers;
25+
use crate::runtime::block_on;
26+
27+
/// Write mode for `lance_dataset_write`.
28+
///
29+
/// Discriminants are pinned for ABI stability. The FFI accepts this as
30+
/// `int32_t` and validates via [`LanceWriteMode::from_raw`] — storing an
31+
/// out-of-range tag as an enum would be UB.
32+
#[repr(C)]
33+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34+
pub enum LanceWriteMode {
35+
/// Create a new dataset. Fails with `LANCE_ERR_DATASET_ALREADY_EXISTS` if
36+
/// the path already exists.
37+
Create = 0,
38+
/// Append to an existing dataset. Fails with `LANCE_ERR_INVALID_ARGUMENT`
39+
/// if the stream schema is incompatible with the existing dataset schema.
40+
Append = 1,
41+
/// Overwrite an existing dataset (or create one if the path does not exist).
42+
Overwrite = 2,
43+
}
44+
45+
impl LanceWriteMode {
46+
/// Validate a raw FFI integer into a `LanceWriteMode`. Out-of-range
47+
/// values become `InvalidInput`.
48+
fn from_raw(raw: i32) -> Result<Self> {
49+
match raw {
50+
0 => Ok(Self::Create),
51+
1 => Ok(Self::Append),
52+
2 => Ok(Self::Overwrite),
53+
other => Err(lance_core::Error::InvalidInput {
54+
source: format!(
55+
"invalid write mode {other}; expected 0 (create), 1 (append), or 2 (overwrite)"
56+
)
57+
.into(),
58+
location: snafu::location!(),
59+
}),
60+
}
61+
}
62+
}
63+
64+
impl From<LanceWriteMode> for LanceWriteModeUpstream {
65+
fn from(mode: LanceWriteMode) -> Self {
66+
match mode {
67+
LanceWriteMode::Create => LanceWriteModeUpstream::Create,
68+
LanceWriteMode::Append => LanceWriteModeUpstream::Append,
69+
LanceWriteMode::Overwrite => LanceWriteModeUpstream::Overwrite,
70+
}
71+
}
72+
}
73+
74+
/// Write an Arrow record batch stream to a Lance dataset at `uri`, committing a manifest.
75+
///
76+
/// - `uri`: Dataset URI (`file://`, `s3://`, `memory://`, ...). Must not be NULL or empty.
77+
/// - `schema`: Caller-provided Arrow schema. The stream's schema must match;
78+
/// mismatch returns `LANCE_ERR_INVALID_ARGUMENT`.
79+
/// - `stream`: Arrow C Data Interface stream. Consumed by this call — the
80+
/// caller must not use it again on any return path.
81+
/// - `mode`: `LANCE_WRITE_CREATE` (0), `LANCE_WRITE_APPEND` (1), or
82+
/// `LANCE_WRITE_OVERWRITE` (2). Any other value → `LANCE_ERR_INVALID_ARGUMENT`.
83+
/// - `storage_opts`: NULL-terminated key-value pairs `["k","v",NULL]`, or NULL.
84+
/// - `out_dataset`: If non-NULL, receives an open `LanceDataset*` at the new
85+
/// version on success (caller closes). Pass NULL to discard. On error
86+
/// `*out_dataset` is untouched — do not read or free it.
87+
///
88+
/// Returns 0 on success, -1 on error.
89+
#[unsafe(no_mangle)]
90+
pub unsafe extern "C" fn lance_dataset_write(
91+
uri: *const c_char,
92+
schema: *const FFI_ArrowSchema,
93+
stream: *mut FFI_ArrowArrayStream,
94+
mode: i32,
95+
storage_opts: *const *const c_char,
96+
out_dataset: *mut *mut LanceDataset,
97+
) -> i32 {
98+
ffi_try!(
99+
unsafe { write_dataset_inner(uri, schema, stream, mode, storage_opts, out_dataset) },
100+
neg
101+
)
102+
}
103+
104+
unsafe fn write_dataset_inner(
105+
uri: *const c_char,
106+
schema: *const FFI_ArrowSchema,
107+
stream: *mut FFI_ArrowArrayStream,
108+
mode: i32,
109+
storage_opts: *const *const c_char,
110+
out_dataset: *mut *mut LanceDataset,
111+
) -> Result<i32> {
112+
if uri.is_null() || schema.is_null() || stream.is_null() {
113+
return Err(lance_core::Error::InvalidInput {
114+
source: "uri, schema, and stream must not be NULL".into(),
115+
location: snafu::location!(),
116+
});
117+
}
118+
119+
// Consume the stream before any other fallible validation. `from_raw`
120+
// swaps the caller's stream into a Rust-owned reader unconditionally, so
121+
// the stream's resources are released on every return path.
122+
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| {
123+
lance_core::Error::InvalidInput {
124+
source: e.to_string().into(),
125+
location: snafu::location!(),
126+
}
127+
})?;
128+
129+
// Validate the mode at the boundary — storing an out-of-range tag as a
130+
// `LanceWriteMode` would be UB.
131+
let mode = LanceWriteMode::from_raw(mode)?;
132+
133+
let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| {
134+
lance_core::Error::InvalidInput {
135+
source: "uri must not be empty".into(),
136+
location: snafu::location!(),
137+
}
138+
})?;
139+
140+
let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| {
141+
lance_core::Error::InvalidInput {
142+
source: format!("invalid schema: {e}").into(),
143+
location: snafu::location!(),
144+
}
145+
})?;
146+
147+
let opts = unsafe { helpers::parse_storage_options(storage_opts)? };
148+
149+
// Fail fast: compare the stream schema against the caller-provided schema.
150+
let stream_schema = reader.schema();
151+
if stream_schema.fields() != expected_schema.fields() {
152+
return Err(lance_core::Error::InvalidInput {
153+
source: format!(
154+
"stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}"
155+
)
156+
.into(),
157+
location: snafu::location!(),
158+
});
159+
}
160+
161+
let mut params = WriteParams {
162+
mode: mode.into(),
163+
..WriteParams::default()
164+
};
165+
if !opts.is_empty() {
166+
params.store_params = Some(ObjectStoreParams {
167+
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
168+
opts,
169+
))),
170+
..ObjectStoreParams::default()
171+
});
172+
}
173+
174+
let dataset = block_on(Dataset::write(reader, uri_str, Some(params)))?;
175+
176+
if !out_dataset.is_null() {
177+
let handle = LanceDataset {
178+
inner: Arc::new(dataset),
179+
};
180+
unsafe {
181+
*out_dataset = Box::into_raw(Box::new(handle));
182+
}
183+
}
184+
185+
Ok(0)
186+
}

0 commit comments

Comments
 (0)