Skip to content

Commit 57d2aad

Browse files
zhangyue19921010zhangyue19921010
andauthored
feat: support distributed bitmap index build (#6598)
closes: #6599 --------- Co-authored-by: zhangyue19921010 <zhangyue.1010@bytedance.com>
1 parent 9d97361 commit 57d2aad

6 files changed

Lines changed: 831 additions & 28 deletions

File tree

java/src/main/java/org/lance/index/scalar/BitmapIndexParams.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
*/
1414
package org.lance.index.scalar;
1515

16+
import org.lance.util.JsonUtils;
17+
18+
import java.util.HashMap;
19+
import java.util.Map;
20+
1621
/** Builder-style configuration for Bitmap scalar index parameters. */
1722
public final class BitmapIndexParams {
1823
private static final String INDEX_TYPE = "bitmap";
@@ -25,9 +30,36 @@ public static Builder builder() {
2530
}
2631

2732
public static final class Builder {
33+
private Integer shardId;
34+
35+
/**
36+
* Configure an explicit shard ID for distributed bitmap builds spanning multiple fragments.
37+
*
38+
* @param shardId non-negative shard identifier
39+
* @return this builder
40+
* @throws IllegalArgumentException
41+
*/
42+
public Builder shardId(int shardId) {
43+
if (shardId < 0) {
44+
throw new IllegalArgumentException("shardId must be non-negative");
45+
}
46+
this.shardId = shardId;
47+
return this;
48+
}
49+
2850
/** Build a {@link ScalarIndexParams} instance for a Bitmap index. */
2951
public ScalarIndexParams build() {
30-
return ScalarIndexParams.create(INDEX_TYPE);
52+
Map<String, Object> params = new HashMap<>();
53+
if (shardId != null) {
54+
params.put("shard_id", shardId);
55+
}
56+
57+
if (params.isEmpty()) {
58+
return ScalarIndexParams.create(INDEX_TYPE);
59+
}
60+
61+
String json = JsonUtils.toJson(params);
62+
return ScalarIndexParams.create(INDEX_TYPE, json);
3163
}
3264
}
3365
}

python/python/lance/indices/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class IndexFileVersion(str, Enum):
3232
class SupportedDistributedIndices(str, Enum):
3333
# Scalar index types
3434
BTREE = "BTREE"
35+
BITMAP = "BITMAP"
3536
INVERTED = "INVERTED"
3637

3738
# Precise vector index types supported by distributed merge

python/python/tests/test_scalar_index.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3100,6 +3100,45 @@ def generate_coherent_text():
31003100
return ds
31013101

31023102

3103+
def generate_multi_fragment_bitmap_dataset(
3104+
tmp_path, num_fragments=4, rows_per_fragment=40
3105+
):
3106+
"""
3107+
Generate a multi-fragment dataset with a low-cardinality integer column
3108+
suitable for distributed bitmap index tests.
3109+
"""
3110+
3111+
def make_mock_bitmap_table(start_id: int) -> pa.Table:
3112+
ids = list(range(start_id, start_id + rows_per_fragment))
3113+
return pa.table(
3114+
{
3115+
"id": pa.array(ids, type=pa.int32()),
3116+
"category": pa.array([row_id % 5 for row_id in ids], type=pa.int32()),
3117+
}
3118+
)
3119+
3120+
ds = lance.write_dataset(
3121+
make_mock_bitmap_table(0),
3122+
tmp_path,
3123+
max_rows_per_file=rows_per_fragment,
3124+
)
3125+
3126+
for fragment_idx in range(1, num_fragments):
3127+
ds = lance.write_dataset(
3128+
make_mock_bitmap_table(fragment_idx * rows_per_fragment),
3129+
tmp_path,
3130+
mode="append",
3131+
max_rows_per_file=rows_per_fragment,
3132+
)
3133+
3134+
fragments = ds.get_fragments()
3135+
assert len(fragments) == num_fragments, (
3136+
f"Expected {num_fragments} fragments, got {len(fragments)}"
3137+
)
3138+
3139+
return ds
3140+
3141+
31033142
# ============================================================================
31043143
# Distributed FTS Index Unit Tests
31053144
# ============================================================================
@@ -3777,6 +3816,122 @@ def test_distribute_btree_index_build(tmp_path):
37773816
)
37783817

37793818

3819+
def _assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids):
3820+
ds.merge_index_metadata(index_id, index_type="BITMAP")
3821+
3822+
from lance.dataset import Index
3823+
3824+
field_id = ds.schema.get_field_index("category")
3825+
index = Index(
3826+
uuid=index_id,
3827+
name=index_name,
3828+
fields=[field_id],
3829+
dataset_version=ds.version,
3830+
fragment_ids=set(fragment_ids),
3831+
index_version=0,
3832+
)
3833+
create_index_op = lance.LanceOperation.CreateIndex(
3834+
new_indices=[index],
3835+
removed_indices=[],
3836+
)
3837+
lance.LanceDataset.commit(
3838+
ds.uri,
3839+
create_index_op,
3840+
read_version=ds.version,
3841+
)
3842+
reopened_ds = lance.dataset(ds.uri)
3843+
3844+
stats = reopened_ds.stats.index_stats(index_name)
3845+
assert stats["index_type"] == "Bitmap"
3846+
3847+
filter_expr = "category = 3"
3848+
without_index = reopened_ds.scanner(
3849+
filter=filter_expr,
3850+
columns=["id", "category"],
3851+
use_scalar_index=False,
3852+
).to_table()
3853+
with_index = reopened_ds.scanner(
3854+
filter=filter_expr,
3855+
columns=["id", "category"],
3856+
use_scalar_index=True,
3857+
).to_table()
3858+
3859+
assert with_index.num_rows == without_index.num_rows
3860+
assert with_index["id"].to_pylist() == without_index["id"].to_pylist()
3861+
assert set(with_index["category"].to_pylist()) == {3}
3862+
3863+
explain = reopened_ds.scanner(
3864+
filter=filter_expr,
3865+
use_scalar_index=True,
3866+
).explain_plan()
3867+
assert "ScalarIndexQuery" in explain
3868+
3869+
empty_without_index = reopened_ds.scanner(
3870+
filter="category = 99",
3871+
use_scalar_index=False,
3872+
).to_table()
3873+
empty_with_index = reopened_ds.scanner(
3874+
filter="category = 99",
3875+
use_scalar_index=True,
3876+
).to_table()
3877+
assert empty_with_index.num_rows == empty_without_index.num_rows == 0
3878+
3879+
3880+
def test_distributed_bitmap_index_build(tmp_path):
3881+
ds = generate_multi_fragment_bitmap_dataset(
3882+
tmp_path / "bitmap_dist.lance", num_fragments=4, rows_per_fragment=40
3883+
)
3884+
3885+
index_id = str(uuid.uuid4())
3886+
index_name = "bitmap_multiple_fragment_idx"
3887+
fragments = ds.get_fragments()
3888+
fragment_ids = [fragment.fragment_id for fragment in fragments]
3889+
fragment_groups = [
3890+
fragment_ids[idx : idx + 2] for idx in range(0, len(fragment_ids), 2)
3891+
]
3892+
assert len(fragment_groups) >= 2
3893+
3894+
for shard_id, fragment_group in enumerate(fragment_groups):
3895+
ds.create_scalar_index(
3896+
column="category",
3897+
index_type=IndexConfig(
3898+
index_type="bitmap",
3899+
parameters={"shard_id": shard_id},
3900+
),
3901+
name=index_name,
3902+
replace=False,
3903+
index_uuid=index_id,
3904+
fragment_ids=fragment_group,
3905+
)
3906+
3907+
_assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids)
3908+
3909+
3910+
def test_distributed_bitmap_index_build_single_fragment_shards(tmp_path):
3911+
ds = generate_multi_fragment_bitmap_dataset(
3912+
tmp_path / "bitmap_single_fragment_dist.lance",
3913+
num_fragments=4,
3914+
rows_per_fragment=40,
3915+
)
3916+
3917+
index_id = str(uuid.uuid4())
3918+
index_name = "bitmap_single_fragment_idx"
3919+
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]
3920+
assert len(fragment_ids) >= 2
3921+
3922+
for fragment_id in fragment_ids:
3923+
ds.create_scalar_index(
3924+
column="category",
3925+
index_type="BITMAP",
3926+
name=index_name,
3927+
replace=False,
3928+
index_uuid=index_id,
3929+
fragment_ids=[fragment_id],
3930+
)
3931+
3932+
_assert_committed_distributed_bitmap_index(ds, index_id, index_name, fragment_ids)
3933+
3934+
37803935
def test_btree_fragment_ids_parameter_validation(tmp_path):
37813936
"""
37823937
Test validation of fragment_ids parameter for B-tree indices.

0 commit comments

Comments
 (0)