Skip to content

Commit 7ebea9a

Browse files
tsafinclaude
andcommitted
perf(lance): tune write parameters for 3x throughput improvement
Restore and increase max_rows_per_group (1024 → 8192) which was regressed during the streaming refactor. Increase max_rows_per_file (2M → 50M) to reduce file finalization overhead at larger scale factors. Increase batch size (5K → 10K) to halve FFI round-trips. Cache compression schema to avoid per-batch recomputation. Increase streaming channel capacity (100 → 500) for better buffering. Fix streaming WriteMode::Create → Overwrite for idempotent runs. Results (lineitem --max-rows 0 --zero-copy): Lance SF=1: 300K → 967K rows/sec (+221%) Lance SF=5: 300K → 506K rows/sec (+68%) Also adds .gitignore rules for root-level *.md/*.txt files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3ea5fde commit 7ebea9a

3 files changed

Lines changed: 41 additions & 28 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ __pycache__/
6969
*.deb
7070
*.rpm
7171

72+
# Root-level documentation and text files (except README.md)
73+
/*.md
74+
!README.md
75+
/*.txt
76+
7277
# Rust build artifacts
7378
# Note: Cargo target directory should be in CMAKE_BINARY_DIR (e.g., build/rust/)
7479
# NOT in the source tree. Ignore any in-source target/ directories.

src/main.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ void generate_with_dbgen(
325325
GenerateFn generate_fn,
326326
size_t& total_rows) {
327327

328-
const size_t batch_size = 5000;
328+
const size_t batch_size = 10000;
329329
size_t rows_in_batch = 0;
330330

331331
auto builders = create_builders_from_schema(schema);
@@ -378,7 +378,7 @@ void generate_lineitem_zero_copy(
378378
std::unique_ptr<tpch::WriterInterface>& writer,
379379
size_t& total_rows) {
380380

381-
const size_t batch_size = 5000; // Match Phase 13.4 plan
381+
const size_t batch_size = 10000; // Match Phase 13.4 plan
382382

383383
// Use batch iterator (zero-copy friendly)
384384
auto batch_iter = dbgen.generate_lineitem_batches(batch_size, opts.max_rows);
@@ -418,7 +418,7 @@ void generate_orders_zero_copy(
418418
std::unique_ptr<tpch::WriterInterface>& writer,
419419
size_t& total_rows) {
420420

421-
const size_t batch_size = 5000;
421+
const size_t batch_size = 10000;
422422
auto batch_iter = dbgen.generate_orders_batches(batch_size, opts.max_rows);
423423

424424
while (batch_iter.has_next()) {
@@ -454,7 +454,7 @@ void generate_customer_zero_copy(
454454
std::unique_ptr<tpch::WriterInterface>& writer,
455455
size_t& total_rows) {
456456

457-
const size_t batch_size = 5000;
457+
const size_t batch_size = 10000;
458458
auto batch_iter = dbgen.generate_customer_batches(batch_size, opts.max_rows);
459459

460460
while (batch_iter.has_next()) {
@@ -490,7 +490,7 @@ void generate_part_zero_copy(
490490
std::unique_ptr<tpch::WriterInterface>& writer,
491491
size_t& total_rows) {
492492

493-
const size_t batch_size = 5000;
493+
const size_t batch_size = 10000;
494494
auto batch_iter = dbgen.generate_part_batches(batch_size, opts.max_rows);
495495

496496
while (batch_iter.has_next()) {
@@ -526,7 +526,7 @@ void generate_partsupp_zero_copy(
526526
std::unique_ptr<tpch::WriterInterface>& writer,
527527
size_t& total_rows) {
528528

529-
const size_t batch_size = 5000;
529+
const size_t batch_size = 10000;
530530
auto batch_iter = dbgen.generate_partsupp_batches(batch_size, opts.max_rows);
531531

532532
while (batch_iter.has_next()) {
@@ -562,7 +562,7 @@ void generate_supplier_zero_copy(
562562
std::unique_ptr<tpch::WriterInterface>& writer,
563563
size_t& total_rows) {
564564

565-
const size_t batch_size = 5000;
565+
const size_t batch_size = 10000;
566566
auto batch_iter = dbgen.generate_supplier_batches(batch_size, opts.max_rows);
567567

568568
while (batch_iter.has_next()) {
@@ -598,7 +598,7 @@ void generate_nation_zero_copy(
598598
std::unique_ptr<tpch::WriterInterface>& writer,
599599
size_t& total_rows) {
600600

601-
const size_t batch_size = 5000; // Nation table has exactly 25 rows
601+
const size_t batch_size = 10000; // Nation table has exactly 25 rows
602602
auto batch_iter = dbgen.generate_nation_batches(batch_size, opts.max_rows);
603603

604604
while (batch_iter.has_next()) {
@@ -630,7 +630,7 @@ void generate_region_zero_copy(
630630
std::unique_ptr<tpch::WriterInterface>& writer,
631631
size_t& total_rows) {
632632

633-
const size_t batch_size = 5000; // Region table has exactly 5 rows
633+
const size_t batch_size = 10000; // Region table has exactly 5 rows
634634
auto batch_iter = dbgen.generate_region_batches(batch_size, opts.max_rows);
635635

636636
while (batch_iter.has_next()) {
@@ -674,7 +674,7 @@ void generate_lineitem_true_zero_copy(
674674
std::unique_ptr<tpch::WriterInterface>& writer,
675675
size_t& total_rows) {
676676

677-
const size_t batch_size = 5000;
677+
const size_t batch_size = 10000;
678678

679679
// Use batch iterator (zero-copy friendly)
680680
auto batch_iter = dbgen.generate_lineitem_batches(batch_size, opts.max_rows);
@@ -725,7 +725,7 @@ void generate_orders_true_zero_copy(
725725
std::unique_ptr<tpch::WriterInterface>& writer,
726726
size_t& total_rows) {
727727

728-
const size_t batch_size = 5000;
728+
const size_t batch_size = 10000;
729729
auto batch_iter = dbgen.generate_orders_batches(batch_size, opts.max_rows);
730730

731731
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -770,7 +770,7 @@ void generate_customer_true_zero_copy(
770770
std::unique_ptr<tpch::WriterInterface>& writer,
771771
size_t& total_rows) {
772772

773-
const size_t batch_size = 5000;
773+
const size_t batch_size = 10000;
774774
auto batch_iter = dbgen.generate_customer_batches(batch_size, opts.max_rows);
775775

776776
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -815,7 +815,7 @@ void generate_part_true_zero_copy(
815815
std::unique_ptr<tpch::WriterInterface>& writer,
816816
size_t& total_rows) {
817817

818-
const size_t batch_size = 5000;
818+
const size_t batch_size = 10000;
819819
auto batch_iter = dbgen.generate_part_batches(batch_size, opts.max_rows);
820820

821821
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -860,7 +860,7 @@ void generate_partsupp_true_zero_copy(
860860
std::unique_ptr<tpch::WriterInterface>& writer,
861861
size_t& total_rows) {
862862

863-
const size_t batch_size = 5000;
863+
const size_t batch_size = 10000;
864864
auto batch_iter = dbgen.generate_partsupp_batches(batch_size, opts.max_rows);
865865

866866
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -905,7 +905,7 @@ void generate_supplier_true_zero_copy(
905905
std::unique_ptr<tpch::WriterInterface>& writer,
906906
size_t& total_rows) {
907907

908-
const size_t batch_size = 5000;
908+
const size_t batch_size = 10000;
909909
auto batch_iter = dbgen.generate_supplier_batches(batch_size, opts.max_rows);
910910

911911
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -950,7 +950,7 @@ void generate_nation_true_zero_copy(
950950
std::unique_ptr<tpch::WriterInterface>& writer,
951951
size_t& total_rows) {
952952

953-
const size_t batch_size = 5000; // Nation table has exactly 25 rows
953+
const size_t batch_size = 10000; // Nation table has exactly 25 rows
954954
auto batch_iter = dbgen.generate_nation_batches(batch_size, opts.max_rows);
955955

956956
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -991,7 +991,7 @@ void generate_region_true_zero_copy(
991991
std::unique_ptr<tpch::WriterInterface>& writer,
992992
size_t& total_rows) {
993993

994-
const size_t batch_size = 5000; // Region table has exactly 5 rows
994+
const size_t batch_size = 10000; // Region table has exactly 5 rows
995995
auto batch_iter = dbgen.generate_region_batches(batch_size, opts.max_rows);
996996

997997
auto parquet_writer = dynamic_cast<tpch::ParquetWriter*>(writer.get());
@@ -1463,7 +1463,7 @@ int main(int argc, char* argv[]) {
14631463
}
14641464
} else {
14651465
// Synthetic data (current implementation, kept for backward compatibility)
1466-
const size_t batch_size = 5000;
1466+
const size_t batch_size = 10000;
14671467
size_t batch_count = 0;
14681468
size_t rows_in_batch = 0;
14691469

third_party/lance-ffi/src/lib.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ enum WriterBackend {
219219
pub struct LanceWriterHandle {
220220
uri: String,
221221
schema: Option<arrow::datatypes::Schema>,
222+
/// Cached schema with compression metadata (computed once on first batch)
223+
compressed_schema: Option<Arc<Schema>>,
222224
batch_count: usize,
223225
row_count: usize,
224226
closed: bool,
@@ -234,7 +236,7 @@ impl LanceWriterHandle {
234236
let runtime = Runtime::new().map_err(|e| format!("Failed to create Tokio runtime: {}", e))?;
235237

236238
let backend = if use_streaming {
237-
let (sender, receiver) = mpsc::channel(100);
239+
let (sender, receiver) = mpsc::channel(500);
238240
WriterBackend::Streaming {
239241
sender: Some(sender),
240242
receiver: Some(receiver),
@@ -252,6 +254,7 @@ impl LanceWriterHandle {
252254
Ok(LanceWriterHandle {
253255
uri,
254256
schema: None,
257+
compressed_schema: None,
255258
batch_count: 0,
256259
row_count: 0,
257260
closed: false,
@@ -292,7 +295,7 @@ impl LanceWriterHandle {
292295

293296
let mode = if *dataset_initialized { WriteMode::Append } else { WriteMode::Overwrite };
294297
let write_params = WriteParams {
295-
max_rows_per_group: 1024, max_rows_per_file: 2_000_000, mode, ..Default::default()
298+
max_rows_per_group: 8192, max_rows_per_file: 50_000_000, mode, ..Default::default()
296299
};
297300

298301
let result = self.runtime.block_on(async {
@@ -336,9 +339,9 @@ impl LanceWriterHandle {
336339
let source: Pin<Box<dyn RecordBatchStream + Send>> = Box::pin(stream_adapter);
337340

338341
let write_params = WriteParams {
339-
max_rows_per_group: 1024,
340-
max_rows_per_file: 2_000_000,
341-
mode: WriteMode::Create,
342+
max_rows_per_group: 8192,
343+
max_rows_per_file: 50_000_000,
344+
mode: WriteMode::Overwrite,
342345
..Default::default()
343346
};
344347

@@ -397,11 +400,16 @@ pub extern "C" fn lance_writer_write_batch(writer_ptr: *mut LanceWriterHandle, a
397400
Ok(b) => b, Err(e) => { eprintln!("FFI Import Error: {}", e); return 4; }
398401
};
399402

400-
// Apply automatic compression settings (LZ4 + BSS)
401-
// We must re-wrap the batch with the new schema containing metadata
402-
let compressed_schema = Arc::new(apply_compression_metadata(raw_batch.schema().as_ref()));
403-
404-
// This is a zero-copy schema replacement (buffers are shared)
403+
// Apply compression metadata (cached - computed once on first batch)
404+
let compressed_schema = if let Some(ref cached) = writer.compressed_schema {
405+
cached.clone()
406+
} else {
407+
let schema = Arc::new(apply_compression_metadata(raw_batch.schema().as_ref()));
408+
writer.compressed_schema = Some(schema.clone());
409+
schema
410+
};
411+
412+
// Zero-copy schema replacement (buffers are shared)
405413
let columns = raw_batch.columns().to_vec();
406414
let record_batch = match RecordBatch::try_new(compressed_schema, columns) {
407415
Ok(b) => b,

0 commit comments

Comments
 (0)