Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions include/lance.h
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,62 @@ int32_t lance_scanner_full_text_search(
uint32_t max_fuzzy_distance
);

/* ─── Dataset writer ─── */

/**
* Write mode for lance_dataset_write. Values are ABI-stable.
*
* The `mode` parameter on the FFI call is a fixed-width int32_t — not this
* enum type — so callers built with `-fshort-enums` or non-default enum
* sizing cannot mismatch the Rust ABI. The Rust implementation validates the
* received integer and rejects any out-of-range value with
* LANCE_ERR_INVALID_ARGUMENT.
*/
typedef enum {
LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */
LANCE_WRITE_APPEND = 1, /* Append; fail if the new schema is incompatible. */
LANCE_WRITE_OVERWRITE = 2, /* Overwrite existing, or create if missing. */
} LanceWriteMode;

/**
* Write an Arrow record batch stream to a Lance dataset at `uri`, committing
* a manifest.
*
* @param uri Dataset URI (file://, s3://, memory://, etc.). Must not
* be NULL or an empty string.
* @param schema Required Arrow schema. The stream schema must match or
* the call fails with LANCE_ERR_INVALID_ARGUMENT. This
* function does NOT call schema->release; the caller
* retains ownership and must release the schema after the
* call returns (success or failure).
* @param stream Arrow C Data Interface stream consumed by this call.
* Do not use the stream after returning, regardless of
* the return code.
* @param mode CREATE / APPEND / OVERWRITE (see LanceWriteMode).
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
* @param out_dataset If non-NULL, on success receives an open LanceDataset*
* at the newly-committed version (caller must
* lance_dataset_close it). Pass NULL to discard. On error
* *out_dataset is left unchanged — do not read or free it.
* On entry `*out_dataset` should be NULL or a pointer
* whose previous value is no longer needed; this function
* overwrites the slot on success without releasing any
* prior handle.
* @return 0 on success, -1 on error. Possible error codes include
* LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path),
* LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode,
* schema mismatch),
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
*/
int32_t lance_dataset_write(
const char* uri,
const struct ArrowSchema* schema,
struct ArrowArrayStream* stream,
int32_t mode,
const char* const* storage_opts,
LanceDataset** out_dataset
);

#ifdef __cplusplus
} /* extern "C" */
#endif
Expand Down
121 changes: 121 additions & 0 deletions include/lance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "lance.h"

#include <cstdint>
#include <memory>
#include <stdexcept>
#include <string>
Expand Down Expand Up @@ -94,6 +95,14 @@ struct VersionInfo {
int64_t timestamp_ms;
};

// ─── Write mode ──────────────────────────────────────────────────────────────

enum class WriteMode : int32_t {
Create = LANCE_WRITE_CREATE,
Append = LANCE_WRITE_APPEND,
Overwrite = LANCE_WRITE_OVERWRITE,
};

// ─── Dataset ─────────────────────────────────────────────────────────────────

class Dataset {
Expand Down Expand Up @@ -122,6 +131,118 @@ class Dataset {
return Dataset(ds);
}

/// Write an Arrow record batch stream to a Lance dataset and return the
/// open dataset at the committed version.
///
/// The stream must be self-describing; its own schema is used. Treat the
/// stream as consumed once this call returns or throws — do not reuse it.
/// Throws lance::Error on failure (including if `stream` is null).
static Dataset write(
const std::string& uri,
ArrowArrayStream* stream,
WriteMode mode,
const std::vector<std::pair<std::string, std::string>>& storage_opts = {}) {

if (stream == nullptr) {
throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null");
}

// RAII guard for the stream. Until `lance_dataset_write` is called,
// any exception (failed `get_schema`, `std::bad_alloc` while building
// `kv`, etc.) must release the stream. After that call Rust owns it,
// so we `disarm()` immediately before invoking the C API.
struct StreamGuard {
ArrowArrayStream* s;
bool armed = true;
// Explicit constructor: `= delete`d copy/move ctors disqualify
// this from being an aggregate under C++20, so brace-init like
// `StreamGuard{stream}` would otherwise fail to compile there.
explicit StreamGuard(ArrowArrayStream* p) noexcept : s(p) {}
~StreamGuard() noexcept {
if (armed && s && s->release) s->release(s);
}
void disarm() noexcept { armed = false; }
StreamGuard(const StreamGuard&) = delete;
StreamGuard& operator=(const StreamGuard&) = delete;
StreamGuard(StreamGuard&&) = delete;
StreamGuard& operator=(StreamGuard&&) = delete;
} stream_guard{stream};

// Defensive: a non-conforming or already-released producer may have a
// null `get_schema`. Without this guard a bad caller would crash with
// a null function-pointer dereference on the next line.
if (stream->get_schema == nullptr) {
throw Error(LANCE_ERR_INVALID_ARGUMENT,
"stream get_schema callback is null");
}

// Arm SchemaGuard before calling `get_schema` so a non-conforming
// producer that partially populates the schema before returning an
// error still has its `release` fired on unwind. The zero-init keeps
// the destructor a no-op on the clean-error path (release == null).
struct SchemaGuard {
ArrowSchema* s;
// Explicit constructor for the same C++20 aggregate-init reason
// documented on StreamGuard above.
explicit SchemaGuard(ArrowSchema* p) noexcept : s(p) {}
~SchemaGuard() noexcept {
if (s && s->release) s->release(s);
}
SchemaGuard(const SchemaGuard&) = delete;
SchemaGuard& operator=(const SchemaGuard&) = delete;
SchemaGuard(SchemaGuard&&) = delete;
SchemaGuard& operator=(SchemaGuard&&) = delete;
};
ArrowSchema schema = {};
SchemaGuard schema_guard{&schema};

// On failure, StreamGuard releases the stream and SchemaGuard
// releases any partial schema state — preserving the "consumed on
// return or throw" contract for both resources.
if (stream->get_schema(stream, &schema) != 0) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same FFI-contract concern as src/writer.rs:112. The wrapper's doc says "Treat the stream as consumed once this call returns or throws — do not reuse it." But if get_schema fails here, we throw before ever calling lance_dataset_write, so the stream is never consumed by the Rust side. A caller following the doc won't release it either → leak.

Either release the stream in this branch:

if (stream->release) stream->release(stream);
throw Error(...);

…or relax the doc to say the caller retains ownership when get_schema itself fails.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4e40671. Dataset::write now invokes stream->release(stream) before throwing on get_schema failure, preserving the documented "consumed on return or throw" contract.

const char* err = stream->get_last_error
? stream->get_last_error(stream)
: nullptr;
std::string msg = std::string("failed to read stream schema: ") +
(err ? err : "unknown");
throw Error(LANCE_ERR_INVALID_ARGUMENT, msg);
}

std::vector<const char*> kv;
for (auto& [k, v] : storage_opts) {
kv.push_back(k.c_str());
kv.push_back(v.c_str());
}
kv.push_back(nullptr);
const char* const* opts_ptr =
storage_opts.empty() ? nullptr : kv.data();

// The C API consumes the stream on every return path, so disarm the
// guard before calling. After this point the stream pointer is logically
// owned by Rust and any C++-side exception must not re-release it.
stream_guard.disarm();

LanceDataset* out = nullptr;
int32_t rc = lance_dataset_write(
uri.c_str(),
&schema,
stream,
static_cast<int32_t>(mode),
opts_ptr,
&out);
if (rc != 0) check_error();
// Defensive null guard: a conforming Rust impl never returns rc == 0
// with `out == nullptr`, but constructing a Dataset around a null
// handle would silently crash on the first method call. Throw
// explicitly rather than going through `check_error()` because the
// thread-local code is `LANCE_OK` on this path (rc == 0).
if (!out) {
throw Error(LANCE_ERR_INTERNAL,
"lance_dataset_write returned success with null out_dataset");
}
return Dataset(out);
}

/// Number of rows in the dataset.
uint64_t count_rows() const {
uint64_t n = lance_dataset_count_rows(handle_.get());
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod index;
pub mod runtime;
mod scanner;
mod versions;
mod writer;

// Re-export all extern "C" symbols so they appear in the cdylib.
pub use batch::*;
Expand All @@ -36,3 +37,4 @@ pub use fragment_writer::*;
pub use index::*;
pub use scanner::*;
pub use versions::*;
pub use writer::*;
Loading
Loading