Skip to content

Commit de6964e

Browse files
committed
Fix(lance): Streaming Zero-Copy Implementation
- Fixed memory leak in FFI bridge by correctly owning Arrow arrays using . - Implemented true zero-copy import from C++ buffers to Rust RecordBatch. - Enabled automatic compression (LZ4) and Byte Stream Split (BSS) for floating point columns via schema metadata injection. - Optimized write path to prevent immediate buffer copies.
1 parent 30d8f63 commit de6964e

1 file changed

Lines changed: 48 additions & 18 deletions

File tree

  • third_party/lance-ffi/src

third_party/lance-ffi/src/lib.rs

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::panic::{catch_unwind, AssertUnwindSafe};
77
use std::sync::Arc;
88
use std::slice;
99
use std::pin::Pin;
10+
use std::collections::HashMap;
1011

1112
use arrow::ffi::{FFI_ArrowSchema, FFI_ArrowArray};
1213
use arrow::record_batch::RecordBatch;
@@ -177,6 +178,27 @@ fn import_string_array(safe_array: &SafeArrowArray, _field: &Field) -> Result<Ar
177178
}
178179
}
179180

181+
fn apply_compression_metadata(schema: &Schema) -> Schema {
182+
let fields: Vec<Field> = schema.fields().iter().map(|field| {
183+
let mut metadata = field.metadata().clone();
184+
185+
// Use lz4 for fast compression (quick and effective)
186+
metadata.insert("lance-encoding:compression".to_string(), "lz4".to_string());
187+
188+
// Enable Byte Stream Split for better float compression
189+
match field.data_type() {
190+
DataType::Float16 | DataType::Float32 | DataType::Float64 => {
191+
metadata.insert("lance-encoding:bss".to_string(), "auto".to_string());
192+
},
193+
_ => {}
194+
}
195+
196+
field.as_ref().clone().with_metadata(metadata)
197+
}).collect();
198+
199+
Schema::new(fields).with_metadata(schema.metadata().clone())
200+
}
201+
180202
enum WriterBackend {
181203
Buffered {
182204
batches: Vec<RecordBatch>,
@@ -240,24 +262,21 @@ impl LanceWriterHandle {
240262

241263
fn import_ffi_batch(arrow_array_ptr: *mut FFI_ArrowArray, arrow_schema_ptr: *mut FFI_ArrowSchema) -> Result<RecordBatch, String> {
242264
unsafe {
265+
// TAKING OWNERSHIP: We convert raw pointers to unsafe FFI structs.
266+
// These structs implement Drop and will automatically call the C-side release() callback
267+
// when they go out of scope, decrementing refcounts on the buffers.
268+
let ffi_array = FFI_ArrowArray::from_raw(arrow_array_ptr);
243269
let ffi_schema = FFI_ArrowSchema::from_raw(arrow_schema_ptr);
244-
let schema = Schema::try_from(&ffi_schema).map_err(|e| e.to_string())?;
245-
let safe_array = SafeArrowArray { ffi: arrow_array_ptr as *mut CDataArrowArray };
246-
247-
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
248-
for (i, field) in schema.fields().iter().enumerate() {
249-
let child_array_ptr = safe_array.child(i).ok_or_else(|| format!("Missing child {} ({})", i, field.name()))?;
250-
let child_safe = SafeArrowArray { ffi: child_array_ptr };
251-
252-
let array = match field.data_type() {
253-
DataType::Int64 | DataType::Float64 | DataType::Int32 | DataType::Float32 |
254-
DataType::Date32 | DataType::Boolean => import_primitive_array(&child_safe, field)?,
255-
DataType::Utf8 => import_string_array(&child_safe, field)?,
256-
dt => return Err(format!("Unsupported type {}: {}", field.name(), dt)),
257-
};
258-
arrays.push(array);
259-
}
260-
RecordBatch::try_new(Arc::new(schema), arrays).map_err(|e| e.to_string())
270+
271+
// Import using Arrow's official Zero-Copy FFI integration
272+
// This verifies the schema and array consistency and creates Arrow Arrays restricted to the FFI buffers.
273+
let array_data = arrow::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| e.to_string())?;
274+
275+
// RecordBatches are exported as a single StructArray from C++
276+
let struct_array = arrow::array::StructArray::from(array_data);
277+
278+
// Convert back to RecordBatch (Zero-Copy)
279+
Ok(RecordBatch::from(&struct_array))
261280
}
262281
}
263282

@@ -374,10 +393,21 @@ pub extern "C" fn lance_writer_write_batch(writer_ptr: *mut LanceWriterHandle, a
374393
let writer = unsafe { &mut *writer_ptr };
375394
if writer.closed { return 2; }
376395

377-
let record_batch = match LanceWriterHandle::import_ffi_batch(arrow_array_ptr as *mut _, arrow_schema_ptr as *mut _) {
396+
let raw_batch = match LanceWriterHandle::import_ffi_batch(arrow_array_ptr as *mut _, arrow_schema_ptr as *mut _) {
378397
Ok(b) => b, Err(e) => { eprintln!("FFI Import Error: {}", e); return 4; }
379398
};
380399

400+
// Apply automatic compression settings (LZ4 + BSS)
401+
// We must re-wrap the batch with the new schema containing metadata
402+
let compressed_schema = Arc::new(apply_compression_metadata(raw_batch.schema().as_ref()));
403+
404+
// This is a zero-copy schema replacement (buffers are shared)
405+
let columns = raw_batch.columns().to_vec();
406+
let record_batch = match RecordBatch::try_new(compressed_schema, columns) {
407+
Ok(b) => b,
408+
Err(e) => { eprintln!("Compression schema update error: {}", e); return 4; }
409+
};
410+
381411
if writer.schema.is_none() { writer.schema = Some(record_batch.schema().as_ref().clone()); }
382412
writer.row_count += record_batch.num_rows();
383413
writer.batch_count += 1;

0 commit comments

Comments
 (0)