Skip to content

Commit f5b9dec

Browse files
authored
[index] support lumina index in python (#7579)
Add Lumina vector index read/search support to paimon-python. Usage example: ```python from pypaimon import CatalogFactory catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'}) table = catalog.get_table('default.my_table') # Step 1: Vector search — find top-5 nearest neighbors builder = table.new_vector_search_builder() builder.with_vector_column('embedding') builder.with_query_vector([0.1, 0.2, 0.3, 0.4]) builder.with_limit(5) result = builder.execute_local() # Step 2: Read the matching rows read_builder = table.new_read_builder() scan = read_builder.new_scan().with_global_index_result(result) plan = scan.plan() arrow_table = read_builder.new_read().to_arrow(plan.splits()) print(arrow_table.to_pandas()) ```
1 parent 99bb804 commit f5b9dec

22 files changed

Lines changed: 1216 additions & 5 deletions

.github/workflows/paimon-python-checks.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ on:
3333
env:
3434
JDK_VERSION: 8
3535
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true
36+
LUMINA_DATA_VERSION: 0.1.0
3637

3738

3839
concurrency:
@@ -127,10 +128,12 @@ jobs:
127128
python -m pip install --upgrade pip==21.3.1
128129
python --version
129130
python -m pip install --no-cache-dir pyroaring readerwriterlock==1.0.9 'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1 2>&1 >/dev/null
131+
python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}' -i https://pypi.org/simple/
130132
else
131133
python -m pip install --upgrade pip
132134
pip install torch --index-url https://download.pytorch.org/whl/cpu
133135
python -m pip install pyroaring readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0
136+
python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}' -i https://pypi.org/simple/
134137
if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 11) else 1)"; then
135138
python -m pip install vortex-data
136139
fi
@@ -190,6 +193,7 @@ jobs:
190193
python -m pip install --upgrade pip
191194
pip install torch --index-url https://download.pytorch.org/whl/cpu
192195
python -m pip install pyroaring readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0
196+
python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}' -i https://pypi.org/simple/
193197
- name: Run lint-python.sh
194198
shell: bash
195199
run: |
@@ -288,6 +292,7 @@ jobs:
288292
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 \
289293
numpy==1.24.3 pandas==2.0.3 cramjam pytest~=7.0 py4j==0.10.9.9 requests \
290294
parameterized==0.9.0 packaging
295+
python -m pip install 'lumina-data>=${{ env.LUMINA_DATA_VERSION }}' -i https://pypi.org/simple/
291296
- name: Test Ray version compatibility
292297
run: |
293298
cd paimon-python

paimon-lumina/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,22 @@ under the License.
7777
<scope>test</scope>
7878
</dependency>
7979

80+
<dependency>
81+
<groupId>org.apache.paimon</groupId>
82+
<artifactId>paimon-common</artifactId>
83+
<version>${project.version}</version>
84+
<type>test-jar</type>
85+
<scope>test</scope>
86+
</dependency>
87+
88+
<dependency>
89+
<groupId>org.apache.paimon</groupId>
90+
<artifactId>paimon-core</artifactId>
91+
<version>${project.version}</version>
92+
<type>test-jar</type>
93+
<scope>test</scope>
94+
</dependency>
95+
8096
<dependency>
8197
<groupId>org.apache.paimon</groupId>
8298
<artifactId>paimon-format</artifactId>
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.lumina.index;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.data.GenericArray;
23+
import org.apache.paimon.data.GenericRow;
24+
import org.apache.paimon.fs.FileIOFinder;
25+
import org.apache.paimon.fs.Path;
26+
import org.apache.paimon.fs.local.LocalFileIO;
27+
import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
28+
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
29+
import org.apache.paimon.globalindex.ResultEntry;
30+
import org.apache.paimon.index.IndexFileMeta;
31+
import org.apache.paimon.io.CompactIncrement;
32+
import org.apache.paimon.io.DataIncrement;
33+
import org.apache.paimon.options.Options;
34+
import org.apache.paimon.schema.Schema;
35+
import org.apache.paimon.schema.SchemaManager;
36+
import org.apache.paimon.schema.SchemaUtils;
37+
import org.apache.paimon.schema.TableSchema;
38+
import org.apache.paimon.table.AppendOnlyFileStoreTable;
39+
import org.apache.paimon.table.CatalogEnvironment;
40+
import org.apache.paimon.table.sink.BatchTableCommit;
41+
import org.apache.paimon.table.sink.BatchTableWrite;
42+
import org.apache.paimon.table.sink.BatchWriteBuilder;
43+
import org.apache.paimon.table.sink.CommitMessage;
44+
import org.apache.paimon.table.sink.CommitMessageImpl;
45+
import org.apache.paimon.types.ArrayType;
46+
import org.apache.paimon.types.DataField;
47+
import org.apache.paimon.types.DataType;
48+
import org.apache.paimon.types.DataTypes;
49+
import org.apache.paimon.types.FloatType;
50+
import org.apache.paimon.types.RowType;
51+
import org.apache.paimon.utils.Range;
52+
53+
import org.aliyun.lumina.Lumina;
54+
import org.aliyun.lumina.LuminaException;
55+
import org.junit.jupiter.api.BeforeAll;
56+
import org.junit.jupiter.api.BeforeEach;
57+
import org.junit.jupiter.api.Test;
58+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
59+
60+
import java.nio.file.Files;
61+
import java.nio.file.Paths;
62+
import java.util.Collections;
63+
import java.util.List;
64+
65+
import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
66+
import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
67+
import static org.apache.paimon.CoreOptions.PATH;
68+
import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
69+
import static org.assertj.core.api.Assertions.assertThat;
70+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
71+
72+
/**
73+
* Mixed language E2E test for Java Lumina vector index building and Python reading.
74+
*
75+
* <p>Java writes data and builds a Lumina vector index, then Python reads and searches it.
76+
*/
77+
public class JavaPyLuminaE2ETest {
78+
79+
@BeforeAll
80+
public static void checkNativeLibrary() {
81+
if (!Lumina.isLibraryLoaded()) {
82+
try {
83+
Lumina.loadLibrary();
84+
} catch (LuminaException e) {
85+
assumeTrue(false, "Lumina native library not available: " + e.getMessage());
86+
}
87+
}
88+
}
89+
90+
java.nio.file.Path tempDir = Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
91+
92+
protected Path warehouse;
93+
94+
@BeforeEach
95+
public void before() throws Exception {
96+
if (!Files.exists(tempDir.resolve("warehouse"))) {
97+
Files.createDirectories(tempDir.resolve("warehouse"));
98+
}
99+
warehouse = new Path("file://" + tempDir.resolve("warehouse"));
100+
}
101+
102+
@Test
103+
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
104+
public void testLuminaVectorIndexWrite() throws Exception {
105+
String tableName = "test_lumina_vector";
106+
Path tablePath = new Path(warehouse.toString() + "/default.db/" + tableName);
107+
108+
int dimension = 4;
109+
110+
RowType rowType =
111+
RowType.of(
112+
new DataType[] {DataTypes.INT(), new ArrayType(new FloatType())},
113+
new String[] {"id", "embedding"});
114+
115+
Options options = new Options();
116+
options.set(PATH, tablePath.toString());
117+
options.set(ROW_TRACKING_ENABLED, true);
118+
options.set(DATA_EVOLUTION_ENABLED, true);
119+
options.set(GLOBAL_INDEX_ENABLED, true);
120+
options.setString(LuminaVectorIndexOptions.DIMENSION.key(), String.valueOf(dimension));
121+
options.setString(LuminaVectorIndexOptions.DISTANCE_METRIC.key(), "l2");
122+
options.setString(LuminaVectorIndexOptions.ENCODING_TYPE.key(), "rawf32");
123+
124+
TableSchema tableSchema =
125+
SchemaUtils.forceCommit(
126+
new SchemaManager(LocalFileIO.create(), tablePath),
127+
new Schema(
128+
rowType.getFields(),
129+
Collections.emptyList(),
130+
Collections.emptyList(),
131+
options.toMap(),
132+
""));
133+
134+
AppendOnlyFileStoreTable table =
135+
new AppendOnlyFileStoreTable(
136+
FileIOFinder.find(tablePath),
137+
tablePath,
138+
tableSchema,
139+
CatalogEnvironment.empty());
140+
141+
// Test vectors: 6 vectors of dimension 4
142+
float[][] vectors =
143+
new float[][] {
144+
new float[] {1.0f, 0.0f, 0.0f, 0.0f},
145+
new float[] {0.9f, 0.1f, 0.0f, 0.0f},
146+
new float[] {0.0f, 1.0f, 0.0f, 0.0f},
147+
new float[] {0.0f, 0.0f, 1.0f, 0.0f},
148+
new float[] {0.0f, 0.0f, 0.0f, 1.0f},
149+
new float[] {0.95f, 0.05f, 0.0f, 0.0f}
150+
};
151+
152+
// Write data rows
153+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
154+
try (BatchTableWrite write = writeBuilder.newWrite();
155+
BatchTableCommit commit = writeBuilder.newCommit()) {
156+
for (int i = 0; i < vectors.length; i++) {
157+
write.write(GenericRow.of(i, new GenericArray(vectors[i])));
158+
}
159+
commit.commit(write.prepareCommit());
160+
}
161+
162+
// Build Lumina vector index on "embedding" column
163+
DataField embeddingField = table.rowType().getField("embedding");
164+
Options indexOptions = table.coreOptions().toConfiguration();
165+
LuminaVectorIndexOptions luminaOptions = new LuminaVectorIndexOptions(indexOptions);
166+
167+
GlobalIndexSingletonWriter writer =
168+
(GlobalIndexSingletonWriter)
169+
GlobalIndexBuilderUtils.createIndexWriter(
170+
table,
171+
LuminaVectorGlobalIndexerFactory.IDENTIFIER,
172+
embeddingField,
173+
indexOptions);
174+
175+
// Write vectors to index
176+
for (float[] vec : vectors) {
177+
writer.write(vec);
178+
}
179+
180+
List<ResultEntry> entries = writer.finish();
181+
assertThat(entries).hasSize(1);
182+
assertThat(entries.get(0).rowCount()).isEqualTo(vectors.length);
183+
184+
Range rowRange = new Range(0, vectors.length - 1);
185+
List<IndexFileMeta> indexFiles =
186+
GlobalIndexBuilderUtils.toIndexFileMetas(
187+
table.fileIO(),
188+
table.store().pathFactory().globalIndexFileFactory(),
189+
table.coreOptions(),
190+
rowRange,
191+
embeddingField.id(),
192+
LuminaVectorGlobalIndexerFactory.IDENTIFIER,
193+
entries);
194+
195+
// Commit the index
196+
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFiles);
197+
CommitMessage message =
198+
new CommitMessageImpl(
199+
BinaryRow.EMPTY_ROW,
200+
0,
201+
null,
202+
dataIncrement,
203+
CompactIncrement.emptyIncrement());
204+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
205+
commit.commit(Collections.singletonList(message));
206+
}
207+
208+
// Verify index was committed
209+
List<org.apache.paimon.manifest.IndexManifestEntry> indexEntries =
210+
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest());
211+
assertThat(indexEntries).hasSize(1);
212+
assertThat(indexEntries.get(0).indexFile().indexType())
213+
.isEqualTo(LuminaVectorGlobalIndexerFactory.IDENTIFIER);
214+
}
215+
}

paimon-python/dev/requirements-dev.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,5 @@ pytest~=7.0
2525
ray>=2.10.0
2626
requests
2727
parameterized
28+
# Lumina vector search (optional, for lumina index tests)
29+
lumina-data>=0.1.0

paimon-python/dev/run_mixed_tests.sh

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,30 @@ run_tantivy_fulltext_test() {
266266
fi
267267
}
268268

269+
# Function to run Lumina vector index test (Java write index, Python read and search)
270+
run_lumina_vector_test() {
271+
echo -e "${YELLOW}=== Step 9: Running Lumina Vector Index Test (Java Write, Python Read) ===${NC}"
272+
273+
cd "$PROJECT_ROOT"
274+
275+
echo "Running Maven test for JavaPyLuminaE2ETest.testLuminaVectorIndexWrite..."
276+
if mvn test -Dtest=org.apache.paimon.lumina.index.JavaPyLuminaE2ETest#testLuminaVectorIndexWrite -pl paimon-lumina -q -Drun.e2e.tests=true; then
277+
echo -e "${GREEN}✓ Java test completed successfully${NC}"
278+
else
279+
echo -e "${RED}✗ Java test failed${NC}"
280+
return 1
281+
fi
282+
cd "$PAIMON_PYTHON_DIR"
283+
echo "Running Python test for JavaPyReadWriteTest.test_read_lumina_vector_index..."
284+
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_read_lumina_vector_index -v; then
285+
echo -e "${GREEN}✓ Python test completed successfully${NC}"
286+
return 0
287+
else
288+
echo -e "${RED}✗ Python test failed${NC}"
289+
return 1
290+
fi
291+
}
292+
269293
# Main execution
270294
main() {
271295
local java_write_result=0
@@ -276,6 +300,7 @@ main() {
276300
local btree_index_result=0
277301
local compressed_text_result=0
278302
local tantivy_fulltext_result=0
303+
local lumina_vector_result=0
279304

280305
# Detect Python version
281306
PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown")
@@ -351,6 +376,13 @@ main() {
351376

352377
echo ""
353378

379+
# Run Lumina vector index test (Java write, Python read)
380+
if ! run_lumina_vector_test; then
381+
lumina_vector_result=1
382+
fi
383+
384+
echo ""
385+
354386
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
355387

356388
if [[ $java_write_result -eq 0 ]]; then
@@ -401,12 +433,18 @@ main() {
401433
echo -e "${RED}✗ Tantivy Full-Text Index Test (Java Write, Python Read): FAILED${NC}"
402434
fi
403435

436+
if [[ $lumina_vector_result -eq 0 ]]; then
437+
echo -e "${GREEN}✓ Lumina Vector Index Test (Java Write, Python Read): PASSED${NC}"
438+
else
439+
echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): FAILED${NC}"
440+
fi
441+
404442
echo ""
405443

406444
# Clean up warehouse directory after all tests
407445
cleanup_warehouse
408446

409-
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 ]]; then
447+
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 ]]; then
410448
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
411449
return 0
412450
else

paimon-python/pypaimon/globalindex/global_index_scanner.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,11 @@ class GlobalIndexScanner:
3535

3636
def __init__(
3737
self,
38-
options: dict,
3938
fields: list,
4039
file_io,
4140
index_path: str,
4241
index_files: Collection['IndexFileMeta']
4342
):
44-
self._options = options
4543
self._evaluator = self._create_evaluator(fields, file_io, index_path, index_files)
4644

4745
def _create_evaluator(self, fields, file_io, index_path, index_files):
@@ -89,7 +87,6 @@ def create(table, index_files=None, partition_filter=None, predicate=None) -> Op
8987
if len(index_files) == 0:
9088
return None
9189
return GlobalIndexScanner(
92-
options=table.table_schema.options,
9390
fields=table.fields,
9491
file_io=table.file_io,
9592
index_path=table.path_factory().global_index_path_factory().index_path(),
@@ -122,7 +119,6 @@ def index_file_filter(entry):
122119
if len(scanned_index_files) == 0:
123120
return None
124121
return GlobalIndexScanner(
125-
options=table.table_schema.options,
126122
fields=table.fields,
127123
file_io=table.file_io,
128124
index_path=table.path_factory().global_index_path_factory().index_path(),

0 commit comments

Comments
 (0)