Skip to content

Commit b18d409

Browse files
beinanclaude
andcommitted
feat: add num_segments option for configurable zonemap segment count
Allow users to control the number of index segments created during distributed zonemap builds via the num_segments DDL option. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent be30bfc commit b18d409

3 files changed

Lines changed: 61 additions & 4 deletions

File tree

docs/src/operations/ddl/create-index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ For the `zonemap` method, the following options are supported:
3939
| Option | Type | Description |
4040
|-----------------|------|----------------------------------------------|
4141
| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. |
42+
| `num_segments` | Integer | Number of index segments to create. Each segment covers a batch of fragments. Defaults to `min(fragment_count, spark.default.parallelism)`. |
4243

4344
### BTree Options
4445

lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AddIndexExec.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ case class AddIndexExec(
9292
val btreeBuildMode = IndexUtils.btreeBuildMode(indexType, args)
9393
val useLogicalSegmentCommit = IndexUtils.useLogicalSegmentCommit(indexType)
9494

95+
val numSegmentsOpt = args.find(_.name == "num_segments")
96+
if (numSegmentsOpt.isDefined && !useLogicalSegmentCommit) {
97+
throw new IllegalArgumentException(
98+
"num_segments option is only supported for index types that use segmented builds (e.g., zonemap)")
99+
}
100+
95101
// Create distributed index job and run it
96102
val createdSegments = createIndexJob(
97103
lanceDataset,
@@ -317,8 +323,9 @@ class FragmentBasedIndexJob(
317323
val encodedReadOptions = encode(readOptions)
318324
val columns = addIndexExec.columns.toList
319325
val argsJson = IndexUtils.toJson(addIndexExec.args)
326+
val numSegments = addIndexExec.args.find(_.name == "num_segments").map(_.value.asInstanceOf[Number].intValue())
320327
val fragmentBatches = if (groupFragmentsIntoSegments) {
321-
batchFragments(fragmentIds)
328+
batchFragments(fragmentIds, numSegments)
322329
} else {
323330
fragmentIds.map(fid => List(fid))
324331
}
@@ -346,9 +353,11 @@ class FragmentBasedIndexJob(
346353
.toSeq
347354
}
348355

349-
private def batchFragments(fragmentIds: List[Integer]): Seq[List[Integer]] = {
350-
val targetTasks =
351-
math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism))
356+
private def batchFragments(fragmentIds: List[Integer], numSegments: Option[Int] = None): Seq[List[Integer]] = {
357+
val targetTasks = numSegments match {
358+
case Some(n) => math.max(1, math.min(fragmentIds.size, n))
359+
case None => math.max(1, math.min(fragmentIds.size, addIndexExec.session.sparkContext.defaultParallelism))
360+
}
352361
val batchSize = math.ceil(fragmentIds.size.toDouble / targetTasks.toDouble).toInt
353362
fragmentIds.grouped(batchSize).map(_.toList).toSeq
354363
}

lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseAddIndexTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,53 @@ public void testRepeatedCreateZonemapIndexReplacesExistingSegments() {
335335
}
336336
}
337337

338+
@Test
339+
public void testCreateZonemapIndexWithNumSegments() {
340+
prepareDataset();
341+
342+
Dataset<Row> result =
343+
spark.sql(
344+
String.format(
345+
"alter table %s create index test_index_zonemap_segments using zonemap (id) with (num_segments = 3)",
346+
fullTable));
347+
348+
Row row = result.collectAsList().get(0);
349+
long fragmentsIndexed = row.getLong(0);
350+
String indexName = row.getString(1);
351+
352+
Assertions.assertTrue(fragmentsIndexed >= 2, "Expected at least 2 fragments to be indexed");
353+
Assertions.assertEquals("test_index_zonemap_segments", indexName);
354+
355+
// Verify the number of segments matches the requested num_segments
356+
org.lance.Dataset lanceDataset = org.lance.Dataset.open().uri(tableDir).build();
357+
try {
358+
int fragmentCount = lanceDataset.getFragments().size();
359+
int expectedSegmentCount = Math.min(fragmentCount, 3);
360+
List<Index> segments =
361+
lanceDataset.getIndexes().stream()
362+
.filter(index -> "test_index_zonemap_segments".equals(index.name()))
363+
.collect(Collectors.toList());
364+
365+
Assertions.assertEquals(
366+
expectedSegmentCount,
367+
segments.size(),
368+
"Expected num_segments=3 to produce exactly 3 segments (or fewer if fragment count < 3)");
369+
370+
int coveredFragments =
371+
segments.stream()
372+
.map(index -> index.fragments().orElse(Collections.emptyList()).size())
373+
.mapToInt(Integer::intValue)
374+
.sum();
375+
Assertions.assertEquals(
376+
fragmentCount,
377+
coveredFragments,
378+
"Expected committed segments to cover all fragments exactly once");
379+
} finally {
380+
lanceDataset.close();
381+
}
382+
}
383+
384+
338385
@Test
339386
public void testCreateBTreeIndexWithRangeMode() {
340387
prepareDataset();

0 commit comments

Comments
 (0)