| layout | default |
|---|---|
| title | Chapter 6: Performance |
| parent | LanceDB Tutorial |
| nav_order | 6 |
Welcome to Chapter 6: Performance. In this part of LanceDB Tutorial: Serverless Vector Database for AI, you will build an intuitive mental model first, then move into concrete implementation details and practical production tradeoffs.
Optimize LanceDB for speed, memory efficiency, and throughput with indexing strategies and query tuning.
Performance optimization is crucial for production deployments. This chapter covers indexing strategies, query optimization, memory management, and benchmarking techniques for LanceDB.
import lancedb
import numpy as np
db = lancedb.connect("./perf_lancedb")
# Create table with 1M vectors
data = [
{"id": i, "vector": np.random.rand(384).tolist()}
for i in range(1_000_000)
]
table = db.create_table("large_table", data)
# Index for balanced performance
table.create_index(
metric="cosine",
num_partitions=256, # sqrt(n) is a good starting point
num_sub_vectors=96, # dimension / 4 is reasonable
)
# Index for high recall (accuracy)
table.create_index(
metric="cosine",
num_partitions=512, # More partitions
num_sub_vectors=128, # More sub-vectors
replace=True
)
# Index for high throughput (speed)
table.create_index(
metric="cosine",
num_partitions=128, # Fewer partitions
num_sub_vectors=48, # Fewer sub-vectors
replace=True
)def recommend_index_params(num_vectors: int, dimension: int, priority: str) -> dict:
"""Recommend index parameters based on dataset and priority."""
# Base calculations
sqrt_n = int(np.sqrt(num_vectors))
if priority == "accuracy":
return {
"num_partitions": min(sqrt_n * 2, 4096),
"num_sub_vectors": min(dimension // 2, 256),
"nprobes_recommendation": sqrt_n // 2
}
elif priority == "speed":
return {
"num_partitions": max(sqrt_n // 2, 64),
"num_sub_vectors": max(dimension // 8, 16),
"nprobes_recommendation": 10
}
else: # balanced
return {
"num_partitions": sqrt_n,
"num_sub_vectors": dimension // 4,
"nprobes_recommendation": sqrt_n // 4
}
# Usage
params = recommend_index_params(
num_vectors=1_000_000,
dimension=384,
priority="balanced"
)
print(params)import time
def benchmark_index_type(table, query_vectors, index_config):
"""Benchmark an index configuration."""
# Create index
start = time.time()
table.create_index(**index_config, replace=True)
index_time = time.time() - start
# Benchmark queries
latencies = []
for query in query_vectors[:100]:
start = time.time()
table.search(query).limit(10).to_list()
latencies.append(time.time() - start)
return {
"index_time": index_time,
"avg_latency": np.mean(latencies),
"p99_latency": np.percentile(latencies, 99)
}
# Compare configurations
configs = [
{"metric": "cosine", "num_partitions": 128, "num_sub_vectors": 48},
{"metric": "cosine", "num_partitions": 256, "num_sub_vectors": 96},
{"metric": "cosine", "num_partitions": 512, "num_sub_vectors": 128},
]
for config in configs:
results = benchmark_index_type(table, query_vectors, config)
print(f"Config: {config}")
print(f" Index time: {results['index_time']:.2f}s")
print(f" Avg latency: {results['avg_latency']*1000:.2f}ms")
print(f" P99 latency: {results['p99_latency']*1000:.2f}ms")# nprobes controls accuracy vs speed tradeoff
# More probes = higher accuracy, slower search
# Fast search (lower accuracy)
results = table.search(query) \
.nprobes(5) \
.limit(10) \
.to_list()
# Balanced
results = table.search(query) \
.nprobes(20) \
.limit(10) \
.to_list()
# High accuracy (slower)
results = table.search(query) \
.nprobes(100) \
.limit(10) \
.to_list()
# Benchmark different nprobes values
def find_optimal_nprobes(table, query, ground_truth, target_recall=0.95):
"""Find minimum nprobes for target recall."""
for nprobes in [5, 10, 20, 50, 100, 200]:
results = table.search(query).nprobes(nprobes).limit(10).to_list()
result_ids = set(r['id'] for r in results)
recall = len(result_ids & ground_truth) / len(ground_truth)
if recall >= target_recall:
return nprobes
return 200 # Maximum# refine_factor fetches more candidates and reranks
# Improves accuracy at cost of latency
# No refinement (fastest)
results = table.search(query).limit(10).to_list()
# With refinement (more accurate)
results = table.search(query) \
.refine_factor(5) \ # Fetch 5x candidates
.limit(10) \
.to_list()
# Higher refinement (most accurate)
results = table.search(query) \
.refine_factor(20) \
.limit(10) \
.to_list()import time
# Slow: Fetching all columns including vectors
start = time.time()
results = table.search(query).limit(100).to_pandas()
print(f"All columns: {time.time() - start:.3f}s")
# Fast: Selecting only needed columns
start = time.time()
results = table.search(query) \
.select(["id", "title"]) \
.limit(100) \
.to_pandas()
print(f"Selected columns: {time.time() - start:.3f}s")
# Avoid fetching vector column unless needed# Pre-filter (applied before vector search) - use for selective filters
results = table.search(query) \
.where("category = 'technology'") \ # Filters out 90% of data
.limit(10) \
.to_list()
# Post-filter (applied after vector search) - use for less selective filters
results = table.search(query) \
.where("is_active = true", prefilter=False) \ # Most rows match
.limit(10) \
.to_list()
# Indexed filters (faster)
# Create scalar index for frequently filtered columns
table.create_scalar_index("category")
results = table.search(query) \
.where("category = 'technology'") \
.limit(10) \
.to_list()import lancedb
# LanceDB uses memory-mapped files by default
# This allows working with datasets larger than RAM
db = lancedb.connect("./large_dataset")
table = db.open_table("big_table")
# Only accessed data is loaded into memory
results = table.search(query).limit(10).to_list()def process_large_dataset(table, queries, batch_size=100):
"""Process queries in batches to manage memory."""
all_results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
batch_results = []
for query in batch:
results = table.search(query).limit(10).to_list()
batch_results.append(results)
all_results.extend(batch_results)
# Optional: Force garbage collection
import gc
gc.collect()
return all_results# For very large result sets, use iteration
def stream_results(table, query, total_limit=10000, batch_size=1000):
"""Stream results in batches."""
offset = 0
while offset < total_limit:
results = table.search(query) \
.limit(batch_size) \
.offset(offset) \
.to_list()
if not results:
break
for result in results:
yield result
offset += batch_size
# Usage
for result in stream_results(table, query):
process_result(result)import lancedb
from concurrent.futures import ThreadPoolExecutor
import threading
# LanceDB connections are thread-safe
db = lancedb.connect("./concurrent_lancedb")
def search_worker(query, table_name="my_table"):
"""Worker function for concurrent search."""
table = db.open_table(table_name)
return table.search(query).limit(10).to_list()
# Concurrent searches
with ThreadPoolExecutor(max_workers=8) as executor:
queries = [np.random.rand(384).tolist() for _ in range(100)]
futures = [executor.submit(search_worker, q) for q in queries]
results = [f.result() for f in futures]class LanceDBPool:
"""Simple connection pool for LanceDB."""
def __init__(self, uri: str, pool_size: int = 5):
self.uri = uri
self.pool_size = pool_size
self._connections = []
self._lock = threading.Lock()
# Pre-create connections
for _ in range(pool_size):
self._connections.append(lancedb.connect(uri))
def get_connection(self):
"""Get a connection from the pool."""
with self._lock:
if self._connections:
return self._connections.pop()
# Create new if pool exhausted
return lancedb.connect(self.uri)
def return_connection(self, conn):
"""Return connection to pool."""
with self._lock:
if len(self._connections) < self.pool_size:
self._connections.append(conn)
# Usage
pool = LanceDBPool("./my_lancedb", pool_size=10)
def search_with_pool(query):
conn = pool.get_connection()
try:
table = conn.open_table("my_table")
return table.search(query).limit(10).to_list()
finally:
pool.return_connection(conn)import time
import numpy as np
from statistics import mean, stdev
def benchmark_search(table, num_queries=100, limit=10):
"""Benchmark search performance."""
# Generate random queries
queries = [np.random.rand(384).tolist() for _ in range(num_queries)]
# Warm up
for query in queries[:10]:
table.search(query).limit(limit).to_list()
# Benchmark
latencies = []
for query in queries:
start = time.perf_counter()
table.search(query).limit(limit).to_list()
latencies.append(time.perf_counter() - start)
return {
"num_queries": num_queries,
"avg_latency_ms": mean(latencies) * 1000,
"std_latency_ms": stdev(latencies) * 1000,
"p50_latency_ms": np.percentile(latencies, 50) * 1000,
"p95_latency_ms": np.percentile(latencies, 95) * 1000,
"p99_latency_ms": np.percentile(latencies, 99) * 1000,
"qps": num_queries / sum(latencies)
}
results = benchmark_search(table)
print(f"Average latency: {results['avg_latency_ms']:.2f}ms")
print(f"P99 latency: {results['p99_latency_ms']:.2f}ms")
print(f"QPS: {results['qps']:.1f}")def benchmark_recall(table, queries, ground_truth, k=10):
"""Benchmark search recall against ground truth."""
recalls = []
for query, truth in zip(queries, ground_truth):
results = table.search(query).limit(k).to_list()
result_ids = set(r['id'] for r in results)
truth_ids = set(truth[:k])
recall = len(result_ids & truth_ids) / len(truth_ids)
recalls.append(recall)
return {
"avg_recall": mean(recalls),
"min_recall": min(recalls),
"max_recall": max(recalls)
}import asyncio
from concurrent.futures import ThreadPoolExecutor
def benchmark_throughput(table, num_queries=1000, num_workers=8):
"""Benchmark concurrent throughput."""
queries = [np.random.rand(384).tolist() for _ in range(num_queries)]
def search_one(query):
return table.search(query).limit(10).to_list()
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=num_workers) as executor:
list(executor.map(search_one, queries))
duration = time.perf_counter() - start
return {
"total_queries": num_queries,
"duration_s": duration,
"qps": num_queries / duration,
"num_workers": num_workers
}
results = benchmark_throughput(table, num_queries=1000, num_workers=8)
print(f"Throughput: {results['qps']:.1f} QPS with {results['num_workers']} workers")import logging
import time
from functools import wraps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("lancedb.performance")
def log_query_performance(func):
"""Decorator to log query performance."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
logger.info(
f"Query completed in {duration*1000:.2f}ms, "
f"results: {len(result) if hasattr(result, '__len__') else 'N/A'}"
)
if duration > 0.1: # Log slow queries
logger.warning(f"Slow query detected: {duration*1000:.2f}ms")
return result
return wrapper
@log_query_performance
def search(table, query, limit=10):
return table.search(query).limit(limit).to_list()from dataclasses import dataclass, field
from typing import List
import time
@dataclass
class QueryMetrics:
"""Collect query metrics."""
latencies: List[float] = field(default_factory=list)
errors: int = 0
total_results: int = 0
def record(self, latency: float, num_results: int):
self.latencies.append(latency)
self.total_results += num_results
def record_error(self):
self.errors += 1
def summary(self) -> dict:
if not self.latencies:
return {}
return {
"total_queries": len(self.latencies),
"avg_latency_ms": np.mean(self.latencies) * 1000,
"p99_latency_ms": np.percentile(self.latencies, 99) * 1000,
"errors": self.errors,
"error_rate": self.errors / (len(self.latencies) + self.errors),
"avg_results": self.total_results / len(self.latencies)
}
# Usage
metrics = QueryMetrics()
for query in queries:
start = time.perf_counter()
try:
results = table.search(query).limit(10).to_list()
metrics.record(time.perf_counter() - start, len(results))
except Exception:
metrics.record_error()
print(metrics.summary())# 1. Create index after bulk data load
table.add(initial_data)
table.create_index(metric="cosine", num_partitions=256)
# 2. Rebuild index periodically after many updates
if table.count_rows() > last_indexed_count * 1.5:
table.create_index(metric="cosine", num_partitions=256, replace=True)
# 3. Use appropriate index for query patterns
# - High accuracy needs: more partitions, more sub-vectors
# - High throughput needs: fewer partitions, fewer sub-vectors# 1. Always set reasonable limits
results = table.search(query).limit(10).to_list() # Good
# results = table.search(query).to_list() # Bad - fetches all
# 2. Select only needed columns
results = table.search(query).select(["id", "title"]).limit(10).to_list()
# 3. Use appropriate nprobes
results = table.search(query).nprobes(20).limit(10).to_list()
# 4. Index frequently filtered columns
table.create_scalar_index("category")In this chapter, you've learned:
- Index Optimization: Choosing and tuning index parameters
- Query Optimization: nprobes, refine factor, and column selection
- Memory Management: Efficient handling of large datasets
- Concurrent Access: Thread safety and connection pooling
- Benchmarking: Measuring latency, throughput, and recall
- Monitoring: Query logging and metrics collection
- Index Matters: Proper indexing is crucial for performance
- Tune nprobes: Balance accuracy vs. speed
- Select Columns: Don't fetch what you don't need
- Monitor Performance: Track latency and throughput
- Benchmark: Measure before and after optimizations
Now that you can optimize performance, let's explore Production Deployment in Chapter 7 for cloud storage and scaling.
Ready for Chapter 7? Production Deployment
Generated for Awesome Code Docs
Most teams struggle here because the hard part is not writing more code, but deciding clear boundaries for table, query, results so behavior stays predictable as complexity grows.
In practical terms, this chapter helps you avoid three common failures:
- coupling core logic too tightly to one implementation path
- missing the handoff boundaries between setup, execution, and validation
- shipping changes without clear rollback or observability strategy
After working through this chapter, you should be able to reason about Chapter 6: Performance as an operating subsystem inside LanceDB Tutorial: Serverless Vector Database for AI, with explicit contracts for inputs, state transitions, and outputs.
Use the implementation notes around search, limit, self as your checklist when adapting these patterns to your own repository.
Under the hood, Chapter 6: Performance usually follows a repeatable control path:
- Context bootstrap: initialize runtime config and prerequisites for
table. - Input normalization: shape incoming data so
queryreceives stable contracts. - Core execution: run the main logic branch and propagate intermediate state through
results. - Policy and safety checks: enforce limits, auth scopes, and failure boundaries.
- Output composition: return canonical result payloads for downstream consumers.
- Operational telemetry: emit logs/metrics needed for debugging and performance tuning.
When debugging, walk this sequence in order and confirm each stage has explicit success/failure conditions.
Use the following upstream sources to verify implementation details while reading this chapter:
- Awesome Code Docs
Why it matters: authoritative reference on
Awesome Code Docs(github.com).
Suggested trace strategy:
- search upstream code for
tableandqueryto map concrete implementation paths - compare docs claims against actual runtime/config code before reusing patterns in production