Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 96 additions & 19 deletions rust/lance-namespace-impls/examples/manifest_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
//!
//! # Seed 500K rows directly into __manifest table
//! manifest_bench seed-large --root s3://bucket/bench/scale \
//! --count 500000 --inline-optimization true
//! --count 500000 --inline-optimization true --rich-metadata true
//!
//! # Run scale benchmark at 500K initial entries
//! manifest_bench run --root s3://bucket/bench/scale \
//! --concurrency 1,10,100 --operations 200

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
Expand All @@ -41,7 +41,10 @@ use lance_namespace::models::{
CreateNamespaceRequest, CreateTableRequest, DeclareTableRequest, DescribeTableRequest,
ListNamespacesRequest, ListTablesRequest,
};
use lance_namespace_impls::DirectoryNamespaceBuilder;
use lance_namespace_impls::{
DESCRIPTION_METADATA_KEY, DirectoryNamespace, DirectoryNamespaceBuilder, MetadataUpdate,
UpdateTableMetadataRequest,
};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -162,7 +165,7 @@ async fn build_namespace(
inline_optimization: bool,
manifest_shard_count: usize,
storage_options: &HashMap<String, String>,
) -> Box<dyn LanceNamespace> {
) -> DirectoryNamespace {
let mut properties = HashMap::new();
properties.insert("root".to_string(), root.to_string());
properties.insert("dir_listing_enabled".to_string(), "false".to_string());
Expand All @@ -181,7 +184,7 @@ async fn build_namespace(
}
let builder = DirectoryNamespaceBuilder::from_properties(properties, None)
.expect("Failed to create namespace builder from properties");
Box::new(builder.build().await.expect("Failed to build namespace"))
builder.build().await.expect("Failed to build namespace")
}

// ──────────────────── seed mode ────────────────────
Expand All @@ -192,6 +195,7 @@ async fn seed(
inline_optimization: bool,
manifest_shard_count: usize,
storage_options: &HashMap<String, String>,
rich_metadata: bool,
) {
eprintln!("Seeding {} entries at {}", count, root);
let ns = build_namespace(
Expand All @@ -209,6 +213,10 @@ async fn seed(
for i in 0..ns_count {
let mut req = CreateNamespaceRequest::new();
req.id = Some(vec![format!("ns_{}", i)]);
if rich_metadata {
req.properties =
Some(serde_json::from_str(&rich_metadata_json(i, "namespace")).unwrap());
}
if let Err(e) = ns.create_namespace(req).await {
eprintln!("seed ns_{}: {}", i, e);
}
Expand All @@ -219,6 +227,9 @@ async fn seed(
for i in 0..table_count {
let mut req = CreateTableRequest::new();
req.id = Some(vec![format!("table_{}", i)]);
if rich_metadata {
req.properties = Some(serde_json::from_str(&rich_metadata_json(i, "table")).unwrap());
}
if let Err(e) = ns.create_table(req, ipc_data.clone()).await {
eprintln!("seed table_{}: {}", i, e);
}
Expand Down Expand Up @@ -253,6 +264,38 @@ fn stable_hash(value: &str) -> u64 {
hash
}

fn rich_metadata_json(row_index: usize, object_type: &str) -> String {
let description = format!(
"Catalog row {} for {} stores a carefully maintained analytical dataset used by product, research, and operations teams. The description is intentionally long enough to model user supplied catalog text while remaining deterministic across benchmark runs. It includes ownership context, lifecycle notes, discovery hints, and operational expectations for steady state metadata update measurements.",
row_index, object_type
);
let mut metadata = BTreeMap::new();
metadata.insert(DESCRIPTION_METADATA_KEY.to_string(), description);
for tag_idx in 0..10 {
metadata.insert(
format!("tag_{:02}", tag_idx),
format!("{}_{}_{}", object_type, row_index % 4096, tag_idx),
);
}
serde_json::to_string(&metadata).expect("failed to serialize rich metadata")
}

fn update_metadata(worker_id: usize, op_idx: usize, table_idx: usize) -> MetadataUpdate {
let description = format!(
"Updated catalog description for table {} from worker {} operation {}. This write changes the same JSON metadata object shape used during bootstrap and models a steady state user edit to long descriptive text plus several key value tags in the directory catalog. The payload remains intentionally verbose to reflect realistic catalog annotations that explain ownership, business purpose, freshness expectations, and operational review notes.",
table_idx, worker_id, op_idx
);
let mut set = HashMap::new();
set.insert(DESCRIPTION_METADATA_KEY.to_string(), description);
for tag_idx in 0..10 {
set.insert(
format!("tag_{:02}", tag_idx),
format!("updated_{}_{}_{}", worker_id, op_idx, tag_idx),
);
}
MetadataUpdate { set, unset: vec![] }
}

fn manifest_shard_marker_row(shard_index: usize) -> ManifestSeedRow {
ManifestSeedRow {
object_id: format!("__manifest_shard_marker_{:06}", shard_index),
Expand All @@ -262,22 +305,26 @@ fn manifest_shard_marker_row(shard_index: usize) -> ManifestSeedRow {
}
}

fn manifest_seed_row(i: usize, total_count: usize) -> ManifestSeedRow {
fn manifest_seed_row(i: usize, total_count: usize, rich_metadata: bool) -> ManifestSeedRow {
let ns_count = total_count / 3;
if i < ns_count {
ManifestSeedRow {
object_id: format!("ns_{}", i),
object_type: "namespace".to_string(),
location: None,
metadata: None,
metadata: rich_metadata.then(|| rich_metadata_json(i, "namespace")),
}
} else {
let table_idx = i - ns_count;
ManifestSeedRow {
object_id: format!("table_{}", table_idx),
object_type: "table".to_string(),
location: Some(format!("table_{}", table_idx)),
metadata: Some(r#"{"bench":"true"}"#.to_string()),
metadata: Some(if rich_metadata {
rich_metadata_json(i, "table")
} else {
r#"{"bench":"true"}"#.to_string()
}),
}
}
}
Expand Down Expand Up @@ -380,32 +427,37 @@ fn generate_manifest_batch(
start_idx: usize,
batch_size: usize,
total_count: usize,
rich_metadata: bool,
) -> RecordBatch {
let ns_count = total_count / 3;
let actual_size = batch_size.min(total_count - start_idx);

let mut object_ids = Vec::with_capacity(actual_size);
let mut object_types = Vec::with_capacity(actual_size);
let mut locations: Vec<Option<String>> = Vec::with_capacity(actual_size);
let mut metadatas: Vec<Option<&str>> = Vec::with_capacity(actual_size);
let mut metadatas: Vec<Option<String>> = Vec::with_capacity(actual_size);

for i in start_idx..start_idx + actual_size {
if i < ns_count {
object_ids.push(format!("ns_{}", i));
object_types.push("namespace".to_string());
locations.push(None);
metadatas.push(None);
metadatas.push(rich_metadata.then(|| rich_metadata_json(i, "namespace")));
} else {
let table_idx = i - ns_count;
object_ids.push(format!("table_{}", table_idx));
object_types.push("table".to_string());
locations.push(Some(format!("table_{}", table_idx)));
metadatas.push(Some(r#"{"bench":"true"}"#));
metadatas.push(Some(if rich_metadata {
rich_metadata_json(i, "table")
} else {
r#"{"bench":"true"}"#.to_string()
}));
}
}

let metadata_array = Arc::new(
JsonArray::try_from_iter(metadatas.into_iter())
JsonArray::try_from_iter(metadatas.iter().map(|metadata| metadata.as_deref()))
.expect("Failed to encode metadata as JSON")
.into_inner(),
);
Expand All @@ -428,6 +480,7 @@ async fn seed_large(
inline_optimization: bool,
manifest_shard_count: usize,
storage_options: &HashMap<String, String>,
rich_metadata: bool,
) {
if manifest_shard_count > 0 {
seed_large_sharded(
Expand All @@ -436,6 +489,7 @@ async fn seed_large(
inline_optimization,
manifest_shard_count,
storage_options,
rich_metadata,
)
.await;
return;
Expand All @@ -454,7 +508,13 @@ async fn seed_large(
let mut offset = 0;
while offset < count {
let batch_size = SEED_LARGE_BATCH_SIZE.min(count - offset);
batches.push(generate_manifest_batch(&schema, offset, batch_size, count));
batches.push(generate_manifest_batch(
&schema,
offset,
batch_size,
count,
rich_metadata,
));
offset += batch_size;
}
eprintln!(" generated {} batches", batches.len());
Expand Down Expand Up @@ -500,6 +560,7 @@ async fn seed_large_sharded(
inline_optimization: bool,
manifest_shard_count: usize,
storage_options: &HashMap<String, String>,
rich_metadata: bool,
) {
eprintln!(
"Seed-large: writing {} rows directly across {} manifest fragments",
Expand All @@ -511,7 +572,7 @@ async fn seed_large_sharded(
(0..manifest_shard_count).map(|_| Vec::new()).collect();

for i in 0..count {
let row = manifest_seed_row(i, count);
let row = manifest_seed_row(i, count, rich_metadata);
let shard_index = stable_hash(&row.object_id) as usize % manifest_shard_count;
rows_by_shard[shard_index].push(row);
}
Expand Down Expand Up @@ -579,16 +640,15 @@ async fn worker(
// Warmup (only for warm-read operations)
if operation.starts_with("warm-read") {
for _ in 0..warmup {
let _ =
run_operation(ns.as_ref(), operation, worker_id, 0, table_count, &ipc_data).await;
let _ = run_operation(&ns, operation, worker_id, 0, table_count, &ipc_data).await;
}
}

wait_for_coordinator(ready_path, start_path);

for i in 0..operations {
let start = Instant::now();
let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data)
let err = run_operation(&ns, operation, worker_id, i, table_count, &ipc_data)
.await
.is_err();
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
Expand All @@ -602,7 +662,7 @@ async fn worker(
}

async fn run_operation(
ns: &dyn LanceNamespace,
ns: &DirectoryNamespace,
operation: &str,
worker_id: usize,
op_idx: usize,
Expand Down Expand Up @@ -645,6 +705,14 @@ async fn run_operation(
};
ns.declare_table(req).await?;
}
"write-update-table-metadata" => {
let table_idx = (worker_id * 1000 + op_idx) % table_count.max(1);
ns.update_table_metadata(UpdateTableMetadataRequest {
id: vec![format!("table_{}", table_idx)],
update: update_metadata(worker_id, op_idx, table_idx),
})
.await?;
}
_ => {
return Err(format!("unknown operation: {}", operation).into());
}
Expand Down Expand Up @@ -681,7 +749,7 @@ async fn cold_read_worker(
storage_options,
)
.await;
let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data)
let err = run_operation(&ns, operation, worker_id, i, table_count, &ipc_data)
.await
.is_err();
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
Expand Down Expand Up @@ -902,6 +970,7 @@ async fn main() {
let mut ready_path: Option<String> = None;
let mut start_path: Option<String> = None;
let mut storage_options: HashMap<String, String> = HashMap::new();
let mut rich_metadata = false;

let mut i = 2;
while i < args.len() {
Expand Down Expand Up @@ -950,6 +1019,10 @@ async fn main() {
manifest_shard_count = args[i + 1].parse().unwrap();
i += 2;
}
"--rich-metadata" => {
rich_metadata = args[i + 1].parse().unwrap();
i += 2;
}
"--variant" => {
variant = args[i + 1].clone();
i += 2;
Expand Down Expand Up @@ -992,6 +1065,7 @@ async fn main() {
inline_optimization,
manifest_shard_count,
&storage_options,
rich_metadata,
)
.await;
}
Expand All @@ -1002,6 +1076,7 @@ async fn main() {
inline_optimization,
manifest_shard_count,
&storage_options,
rich_metadata,
)
.await;
}
Expand Down Expand Up @@ -1053,6 +1128,7 @@ async fn main() {
"write-create-namespace",
"write-declare-table",
"write-create-table",
"write-update-table-metadata",
];

// If --operation is set, only run that one
Expand All @@ -1067,6 +1143,7 @@ async fn main() {
eprintln!("root: {}", root);
eprintln!("inline_optimization: {}", inline_optimization);
eprintln!("manifest_shard_count: {}", manifest_shard_count);
eprintln!("rich_metadata: {}", rich_metadata);
eprintln!("initial_entries: {}", initial_entries);
eprintln!("concurrency: {:?}", concurrency_list);
eprintln!("operations per level: {}", operations);
Expand Down
Loading
Loading