Skip to content

Commit 5771d6f

Browse files
jja725claude
andauthored
feat: add lance_write_fragments for local fragment creation (#5)
## Summary - Adds `lance_write_fragments(uri, stream, storage_opts)` — a C/C++ API that writes an `ArrowArrayStream` to Lance fragment files at a given URI **without committing** a dataset manifest - Returns a JSON array of fragment metadata strings (freed with the existing `lance_free_string()`), which a separate Rust finalizer can deserialize and commit via `CommitBuilder` - Adds `lance::write_fragments()` C++ RAII wrapper in `lance.hpp` - Two new integration tests: round-trip write + JSON parse, and null-URI error path ## Motivation Enables efficient data ingestion from embedded/robotics C++ codebases (e.g. sensor pipelines) with minimal changes. The C++ process writes fragments locally; a separate Rust process finalizes them into a remote data lake: ```cpp // C++ robot/sensor process ArrowArrayStream stream = ...; const char* json = lance::write_fragments("file:///staging/robot.lance", &stream); save_to_disk("fragments.json", json); lance_free_string(json); ``` ```rust // Rust finalizer let frags: Vec<Fragment> = serde_json::from_str(&json)?; let txn = Transaction::new(0, Operation::Append { fragments: frags }, None); CommitBuilder::new("s3://datalake/robot.lance").execute(txn).await?; ``` ## Test plan - [ ] `cargo test` — all 42 integration tests pass - [ ] `cargo clippy --all-targets -- -D warnings` — no warnings - [ ] `test_write_fragments_returns_json` — verifies JSON round-trips to `Vec<Fragment>` with correct row counts - [ ] `test_write_fragments_null_uri_returns_null` — verifies null safety and error reporting ## Test Plan ## Issues --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 62b6bac commit 5771d6f

8 files changed

Lines changed: 483 additions & 1 deletion

File tree

AGENTS.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,3 @@ test C/C++ compilation: `cargo test --test compile_and_run_test -- --ignored`
5252
2. Add declaration to `include/lance.h`.
5353
3. Add C++ wrapper to `include/lance.hpp`.
5454
4. Add test in `tests/c_api_test.rs`.
55-

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ snafu = "0.9"
3333
[dev-dependencies]
3434
lance = "3.0.1"
3535
lance-datagen = "3.0.1"
36+
lance-file = "3.0.1"
37+
lance-table = "3.0.1"
3638
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
3739
arrow-array = "57.0.0"
3840
arrow-schema = "57.0.0"

include/lance.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,35 @@ int32_t lance_batch_to_arrow(
281281
/** Free a batch handle. */
282282
void lance_batch_free(LanceBatch* batch);
283283

284+
/* ─── Fragment writer ─── */
285+
286+
/**
287+
* Write an Arrow record batch stream to fragment files at `uri`.
288+
*
289+
* Designed for embedded / robotics C++ pipelines: write Lance fragment files
290+
* locally with minimal overhead. A separate Rust finalizer process later
291+
* reconstructs Fragment metadata from the file footers and commits them
292+
* into a dataset on a remote data lake via CommitBuilder.
293+
*
294+
* The data is written but NOT committed — no dataset manifest is created or
295+
* updated. The written .lance files under <uri>/data/ contain full metadata
296+
* in their footers (schema with field IDs, row counts, format version).
297+
*
298+
* @param uri Directory URI for fragment files (file://, s3://, etc.)
299+
* @param schema Required Arrow schema. The stream schema must match
300+
* or the call fails with LANCE_ERR_INVALID_ARGUMENT.
301+
* @param stream Arrow C Data Interface stream; consumed by this call —
302+
* do not use the stream after returning.
303+
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
304+
* @return 0 on success, -1 on error
305+
*/
306+
int32_t lance_write_fragments(
307+
const char* uri,
308+
const struct ArrowSchema* schema,
309+
struct ArrowArrayStream* stream,
310+
const char* const* storage_opts
311+
);
312+
284313
#ifdef __cplusplus
285314
} /* extern "C" */
286315
#endif

include/lance.hpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,42 @@ class Batch {
274274

275275
} // namespace lance
276276

277+
// ─── Fragment writer (free functions) ────────────────────────────────────────
278+
279+
namespace lance {
280+
281+
/**
282+
* Write an Arrow record batch stream to fragment files at `uri`.
283+
*
284+
* Data files are written under `<uri>/data/`. A Rust finalizer reconstructs
285+
* Fragment metadata from the file footers and commits via CommitBuilder.
286+
* No dynamic memory is returned to the caller.
287+
*
288+
* @param uri Directory URI (file://, s3://, etc.)
289+
* @param schema Required Arrow schema — stream schema must match.
290+
* @param stream ArrowArrayStream to consume. Must not be used after this call.
291+
* @param storage_opts Key-value storage options, or empty for defaults.
292+
* @throws lance::Error on failure.
293+
*/
294+
inline void write_fragments(
295+
const std::string& uri,
296+
const ArrowSchema* schema,
297+
ArrowArrayStream* stream,
298+
const std::vector<std::pair<std::string, std::string>>& storage_opts = {})
299+
{
300+
std::vector<const char*> kv;
301+
for (auto& [k, v] : storage_opts) {
302+
kv.push_back(k.c_str());
303+
kv.push_back(v.c_str());
304+
}
305+
kv.push_back(nullptr);
306+
307+
const char* const* opts_ptr = storage_opts.empty() ? nullptr : kv.data();
308+
if (lance_write_fragments(uri.c_str(), schema, stream, opts_ptr) != 0) {
309+
check_error();
310+
}
311+
}
312+
313+
} // namespace lance
314+
277315
#endif /* LANCE_HPP */

src/fragment_writer.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
//! Fragment writer C API: write Arrow data to local fragment files without committing.
5+
//!
6+
//! Designed for embedded / robotics C++ pipelines where sensor data is ingested
7+
//! at high frequency on edge devices. The C++ process writes Lance fragment files
8+
//! locally with minimal overhead (no manifest, no coordination). A separate Rust
9+
//! finalizer process later reads the file footers, reconstructs fragment metadata,
10+
//! and commits them into a dataset on a remote data lake (S3, GCS, etc.).
11+
//!
12+
//! # Two-process workflow
13+
//!
14+
//! **1. Writer process (C/C++ on edge device):**
15+
//! ```c
16+
//! // Stream sensor batches into local fragment files.
17+
//! int32_t rc = lance_write_fragments(
18+
//! "file:///data/staging/robot.lance", &schema, &stream, NULL);
19+
//! ```
20+
//!
21+
//! **2. Finalizer process (Rust, runs periodically or on sync):**
22+
//! ```text
23+
//! // Scan data/*.lance files, reconstruct Fragment metadata from file footers,
24+
//! // then commit via CommitBuilder to publish to the data lake.
25+
//! ```
26+
27+
use std::ffi::c_char;
28+
use std::sync::Arc;
29+
30+
use arrow::ffi::FFI_ArrowSchema;
31+
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
32+
use arrow::record_batch::RecordBatchReader;
33+
use arrow_schema::Schema as ArrowSchema;
34+
use lance::dataset::{InsertBuilder, WriteParams};
35+
use lance_core::Result;
36+
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
37+
38+
use crate::error::ffi_try;
39+
use crate::helpers;
40+
use crate::runtime::block_on;
41+
42+
/// Write an Arrow record batch stream to fragment files at `uri`.
43+
///
44+
/// The data is written but **not committed** — no dataset manifest is created
45+
/// or updated. The written `.lance` files under `<uri>/data/` contain full
46+
/// metadata in their footers (schema with field IDs, row counts, format version).
47+
/// A Rust finalizer can reconstruct `Fragment` metadata by reading these footers
48+
/// and commit via `CommitBuilder`.
49+
///
50+
/// - `uri`: Directory URI where fragment files are written (`file://`, `s3://`, etc.)
51+
/// - `schema`: Required Arrow schema. The stream's schema must match; the call
52+
/// fails fast with `LANCE_ERR_INVALID_ARGUMENT` on mismatch.
53+
/// - `stream`: Arrow C Data Interface stream consumed by this call. The caller
54+
/// must not use the stream after this function returns.
55+
/// - `storage_opts`: NULL-terminated key-value pairs `["key","val",NULL]`, or NULL.
56+
///
57+
/// Returns 0 on success, -1 on error.
58+
#[unsafe(no_mangle)]
59+
pub unsafe extern "C" fn lance_write_fragments(
60+
uri: *const c_char,
61+
schema: *const FFI_ArrowSchema,
62+
stream: *mut FFI_ArrowArrayStream,
63+
storage_opts: *const *const c_char,
64+
) -> i32 {
65+
ffi_try!(
66+
unsafe { write_fragments_inner(uri, schema, stream, storage_opts) },
67+
neg
68+
)
69+
}
70+
71+
unsafe fn write_fragments_inner(
72+
uri: *const c_char,
73+
schema: *const FFI_ArrowSchema,
74+
stream: *mut FFI_ArrowArrayStream,
75+
storage_opts: *const *const c_char,
76+
) -> Result<i32> {
77+
if uri.is_null() || schema.is_null() || stream.is_null() {
78+
return Err(lance_core::Error::InvalidInput {
79+
source: "uri, schema, and stream must not be NULL".into(),
80+
location: snafu::location!(),
81+
});
82+
}
83+
84+
let uri_str = unsafe { helpers::parse_c_string(uri)? }.ok_or_else(|| {
85+
lance_core::Error::InvalidInput {
86+
source: "uri must not be empty".into(),
87+
location: snafu::location!(),
88+
}
89+
})?;
90+
91+
// Import the caller-provided schema from the Arrow C Data Interface.
92+
let expected_schema = ArrowSchema::try_from(unsafe { &*schema }).map_err(|e| {
93+
lance_core::Error::InvalidInput {
94+
source: format!("invalid schema: {e}").into(),
95+
location: snafu::location!(),
96+
}
97+
})?;
98+
99+
let opts = unsafe { helpers::parse_storage_options(storage_opts)? };
100+
101+
// Consume the C stream into an Arrow RecordBatch reader.
102+
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream) }.map_err(|e| {
103+
lance_core::Error::InvalidInput {
104+
source: e.to_string().into(),
105+
location: snafu::location!(),
106+
}
107+
})?;
108+
109+
// Fail fast: compare the stream schema against the caller-provided schema.
110+
let stream_schema = reader.schema();
111+
if stream_schema.fields() != expected_schema.fields() {
112+
return Err(lance_core::Error::InvalidInput {
113+
source: format!(
114+
"stream schema does not match the provided schema.\n expected: {expected_schema}\n got: {stream_schema}"
115+
)
116+
.into(),
117+
location: snafu::location!(),
118+
});
119+
}
120+
121+
let mut params = WriteParams::default();
122+
if !opts.is_empty() {
123+
params.store_params = Some(ObjectStoreParams {
124+
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
125+
opts,
126+
))),
127+
..ObjectStoreParams::default()
128+
});
129+
}
130+
131+
// Write fragment data files. The Transaction result is discarded —
132+
// the finalizer reconstructs Fragment metadata from the file footers.
133+
let _transaction = block_on(
134+
InsertBuilder::new(uri_str)
135+
.with_params(&params)
136+
.execute_uncommitted_stream(reader),
137+
)?;
138+
139+
Ok(0)
140+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod async_dispatcher;
1919
mod batch;
2020
mod dataset;
2121
mod error;
22+
mod fragment_writer;
2223
mod helpers;
2324
pub mod runtime;
2425
mod scanner;
@@ -29,4 +30,5 @@ pub use dataset::*;
2930
pub use error::{
3031
LanceErrorCode, lance_free_string, lance_last_error_code, lance_last_error_message,
3132
};
33+
pub use fragment_writer::*;
3234
pub use scanner::*;

0 commit comments

Comments
 (0)