Skip to content

Commit 80e07d0

Browse files
authored
feat(java): support list versions and checkout version in Dataset (#3945)
close #3944 --------- Co-authored-by: majin.nathan <majin.nathan@bytedance.com>
1 parent 1a0d30c commit 80e07d0

4 files changed

Lines changed: 264 additions & 32 deletions

File tree

java/core/lance-jni/src/blocking_dataset.rs

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ use lance::dataset::builder::DatasetBuilder;
3434
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
3535
use lance::dataset::transaction::Operation;
3636
use lance::dataset::{
37-
ColumnAlteration, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, WriteParams,
37+
ColumnAlteration, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, Version,
38+
WriteParams,
3839
};
3940
use lance::io::{ObjectStore, ObjectStoreParams};
4041
use lance::table::format::Fragment;
@@ -152,6 +153,25 @@ impl BlockingDataset {
152153
Ok(version)
153154
}
154155

156+
pub fn list_versions(&self) -> Result<Vec<Version>> {
157+
let versions = RT.block_on(self.inner.versions())?;
158+
Ok(versions)
159+
}
160+
161+
pub fn version(&self) -> Result<Version> {
162+
Ok(self.inner.version())
163+
}
164+
165+
pub fn checkout_version(&mut self, version: u64) -> Result<Self> {
166+
let inner = RT.block_on(self.inner.checkout_version(version))?;
167+
Ok(Self { inner })
168+
}
169+
170+
pub fn checkout_latest(&mut self) -> Result<()> {
171+
RT.block_on(self.inner.checkout_latest())?;
172+
Ok(())
173+
}
174+
155175
pub fn count_rows(&self, filter: Option<String>) -> Result<usize> {
156176
let rows = RT.block_on(self.inner.count_rows(filter))?;
157177
Ok(rows)
@@ -326,6 +346,41 @@ impl IntoJava for BlockingDataset {
326346
}
327347
}
328348

349+
impl IntoJava for Version {
350+
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
351+
let timestamp_str = self.timestamp.to_rfc3339();
352+
let jtimestamp = env.new_string(timestamp_str)?;
353+
let zdt = env
354+
.call_static_method(
355+
"java/time/ZonedDateTime",
356+
"parse",
357+
"(Ljava/lang/CharSequence;)Ljava/time/ZonedDateTime;",
358+
&[JValue::Object(&jtimestamp)],
359+
)?
360+
.l()?;
361+
362+
let jmap = env.new_object("java/util/TreeMap", "()V", &[])?;
363+
let map = JMap::from_env(env, &jmap)?;
364+
365+
for (k, v) in self.metadata {
366+
let jkey = env.new_string(k)?;
367+
let jval = env.new_string(v)?;
368+
map.put(env, &jkey, &jval).expect("ERROR: calling jmap.put");
369+
}
370+
371+
let java_version = env.new_object(
372+
"com/lancedb/lance/Version",
373+
"(JLjava/time/ZonedDateTime;Ljava/util/TreeMap;)V",
374+
&[
375+
JValue::Long(self.version as i64),
376+
JValue::Object(&zdt),
377+
JValue::Object(&jmap),
378+
],
379+
)?;
380+
Ok(java_version)
381+
}
382+
}
383+
329384
fn attach_native_dataset<'local>(
330385
env: &mut JNIEnv<'local>,
331386
dataset: BlockingDataset,
@@ -658,31 +713,87 @@ fn inner_uri<'local>(env: &mut JNIEnv<'local>, java_dataset: JObject) -> Result<
658713
}
659714

660715
#[no_mangle]
661-
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeVersion(
716+
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeListVersions<'local>(
717+
mut env: JNIEnv<'local>,
718+
java_dataset: JObject,
719+
) -> JObject<'local> {
720+
ok_or_throw!(env, inner_list_versions(&mut env, java_dataset))
721+
}
722+
723+
fn inner_list_versions<'local>(
724+
env: &mut JNIEnv<'local>,
725+
java_dataset: JObject,
726+
) -> Result<JObject<'local>> {
727+
let versions = {
728+
let dataset_guard =
729+
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
730+
dataset_guard.list_versions()?
731+
};
732+
let array_list = env.new_object("java/util/ArrayList", "()V", &[])?;
733+
734+
versions
735+
.into_iter()
736+
.map(|inner_ver| inner_ver.into_java(env))
737+
.collect::<Result<Vec<_>>>()?
738+
.into_iter()
739+
.try_for_each(|java_ver| -> Result<()> {
740+
env.call_method(
741+
&array_list,
742+
"add",
743+
"(Ljava/lang/Object;)Z",
744+
&[JValue::Object(&java_ver)],
745+
)?;
746+
Ok(())
747+
})?;
748+
Ok(array_list)
749+
}
750+
751+
#[no_mangle]
752+
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeGetVersion<'local>(
753+
mut env: JNIEnv<'local>,
754+
java_dataset: JObject,
755+
) -> JObject<'local> {
756+
ok_or_throw!(env, inner_get_version(&mut env, java_dataset))
757+
}
758+
759+
fn inner_get_version<'local>(
760+
env: &mut JNIEnv<'local>,
761+
java_dataset: JObject,
762+
) -> Result<JObject<'local>> {
763+
let version = {
764+
let dataset_guard =
765+
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
766+
dataset_guard.version()?
767+
};
768+
version.into_java(env)
769+
}
770+
771+
#[no_mangle]
772+
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeGetLatestVersionId(
662773
mut env: JNIEnv,
663774
java_dataset: JObject,
664775
) -> jlong {
665-
ok_or_throw_with_return!(env, inner_version(&mut env, java_dataset), -1) as jlong
776+
ok_or_throw_with_return!(env, inner_latest_version_id(&mut env, java_dataset), -1) as jlong
666777
}
667778

668-
fn inner_version(env: &mut JNIEnv, java_dataset: JObject) -> Result<u64> {
779+
fn inner_latest_version_id(env: &mut JNIEnv, java_dataset: JObject) -> Result<u64> {
669780
let dataset_guard =
670781
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
671-
Ok(dataset_guard.inner.version().version)
782+
dataset_guard.latest_version()
672783
}
673784

674785
#[no_mangle]
675-
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeLatestVersion(
786+
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCheckoutLatest(
676787
mut env: JNIEnv,
677788
java_dataset: JObject,
678-
) -> jlong {
679-
ok_or_throw_with_return!(env, inner_latest_version(&mut env, java_dataset), -1) as jlong
789+
) {
790+
ok_or_throw_without_return!(env, inner_checkout_latest(&mut env, java_dataset));
680791
}
681792

682-
fn inner_latest_version(env: &mut JNIEnv, java_dataset: JObject) -> Result<u64> {
683-
let dataset_guard =
793+
fn inner_checkout_latest(env: &mut JNIEnv, java_dataset: JObject) -> Result<()> {
794+
let mut dataset_guard =
684795
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
685-
dataset_guard.latest_version()
796+
dataset_guard.checkout_latest()
686797
}
687798

688799
#[no_mangle]
@@ -700,15 +811,12 @@ fn inner_checkout_version<'local>(
700811
version: jlong,
701812
) -> Result<JObject<'local>> {
702813
let new_dataset = {
703-
let dataset_guard =
814+
let mut dataset_guard =
704815
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
705-
706-
let version_u64 = version as u64;
707-
RT.block_on(dataset_guard.inner.checkout_version(version_u64))?
816+
dataset_guard.checkout_version(version as u64)?
708817
};
709818

710-
let blocking_dataset = BlockingDataset { inner: new_dataset };
711-
blocking_dataset.into_java(env)
819+
new_dataset.into_java(env)
712820
}
713821

714822
#[no_mangle]

java/core/src/main/java/com/lancedb/lance/Dataset.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -448,29 +448,62 @@ public String uri() {
448448

449449
private native String nativeUri();
450450

451+
/**
452+
* Get the currently checked out version id of the dataset
453+
*
454+
* @return the version id of the dataset
455+
*/
456+
public long version() {
457+
return getVersion().getId();
458+
}
459+
451460
/**
452461
* Gets the currently checked out version of the dataset.
453462
*
454463
* @return the version of the dataset
455464
*/
456-
public long version() {
465+
public Version getVersion() {
457466
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
458467
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
459-
return nativeVersion();
468+
return nativeGetVersion();
460469
}
461470
}
462471

463-
private native long nativeVersion();
472+
private native Version nativeGetVersion();
473+
474+
/**
475+
* Get the version history of the dataset.
476+
*
477+
* @return the version history of the dataset
478+
*/
479+
public List<Version> listVersions() {
480+
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
481+
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
482+
return nativeListVersions();
483+
}
484+
}
485+
486+
private native List<Version> nativeListVersions();
464487

465488
/** @return the latest version of the dataset. */
466489
public long latestVersion() {
467-
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
490+
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
491+
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
492+
return nativeGetLatestVersionId();
493+
}
494+
}
495+
496+
private native long nativeGetLatestVersionId();
497+
498+
/** Checkout the dataset to the latest version. */
499+
public void checkoutLatest() {
500+
try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) {
468501
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
469-
return nativeLatestVersion();
502+
nativeCheckoutLatest();
470503
}
471504
}
472505

473-
private native long nativeLatestVersion();
506+
private native void nativeCheckoutLatest();
474507

475508
/**
476509
* Checks out a specific version of the dataset. If the version is already checked out, it returns
@@ -480,17 +513,9 @@ public long latestVersion() {
480513
* @return a new Dataset instance with the specified version checked out
481514
*/
482515
public Dataset checkoutVersion(long version) {
483-
if (version < 1) {
484-
throw new IllegalArgumentException("Version must be greater than 0");
485-
}
486-
516+
Preconditions.checkArgument(version > 0, "version number must be greater than 0");
487517
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
488518
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
489-
490-
if (this.version() == version) {
491-
return this;
492-
}
493-
494519
return nativeCheckoutVersion(version);
495520
}
496521
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance;
15+
16+
import java.time.ZonedDateTime;
17+
import java.util.Objects;
18+
import java.util.SortedMap;
19+
import java.util.TreeMap;
20+
21+
public class Version {
22+
private final long id;
23+
private final ZonedDateTime dataTime;
24+
private final SortedMap<String, String> metadata;
25+
26+
public Version(long id, ZonedDateTime dataTime, TreeMap<String, String> metadata) {
27+
this.id = id;
28+
this.dataTime = dataTime;
29+
this.metadata = metadata;
30+
}
31+
32+
public ZonedDateTime getDataTime() {
33+
return dataTime;
34+
}
35+
36+
public SortedMap<String, String> getMetadata() {
37+
return metadata;
38+
}
39+
40+
public long getId() {
41+
return id;
42+
}
43+
44+
public String toString() {
45+
return "Version: " + id + ", dataTime: " + dataTime + ", metadata: " + metadata;
46+
}
47+
48+
@Override
49+
public boolean equals(Object o) {
50+
if (this == o) {
51+
return true;
52+
}
53+
if (o == null || getClass() != o.getClass()) {
54+
return false;
55+
}
56+
Version version = (Version) o;
57+
return id == version.id
58+
&& Objects.equals(dataTime, version.dataTime)
59+
&& Objects.equals(metadata, version.metadata);
60+
}
61+
62+
@Override
63+
public int hashCode() {
64+
return Objects.hash(id, dataTime, metadata);
65+
}
66+
}

0 commit comments

Comments
 (0)