Skip to content

Commit bdfb8fc

Browse files
authored
feat(java): brings transaction api to Java module and support project (lance-format#4219)
Close lance-format#4201
1 parent f86f196 commit bdfb8fc

8 files changed

Lines changed: 458 additions & 3 deletions

File tree

java/core/lance-jni/src/blocking_dataset.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ use jni::{objects::JObject, JNIEnv};
3636
use lance::dataset::builder::DatasetBuilder;
3737
use lance::dataset::refs::TagContents;
3838
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
39-
use lance::dataset::transaction::Operation;
39+
use lance::dataset::transaction::{Operation, Transaction};
4040
use lance::dataset::{
41-
ColumnAlteration, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, Version,
42-
WriteParams,
41+
ColumnAlteration, CommitBuilder, Dataset, NewColumnTransform, ProjectionRequest, ReadParams,
42+
Version, WriteParams,
4343
};
4444
use lance::io::{ObjectStore, ObjectStoreParams};
4545
use lance::table::format::Fragment;
@@ -239,6 +239,22 @@ impl BlockingDataset {
239239
Ok(())
240240
}
241241

242+
pub fn commit_transaction(
243+
&mut self,
244+
transaction: Transaction,
245+
write_params: HashMap<String, String>,
246+
) -> Result<Self> {
247+
let new_dataset = RT.block_on(
248+
CommitBuilder::new(Arc::new(self.clone().inner))
249+
.with_store_params(ObjectStoreParams {
250+
storage_options: Some(write_params),
251+
..Default::default()
252+
})
253+
.execute(transaction),
254+
)?;
255+
Ok(BlockingDataset { inner: new_dataset })
256+
}
257+
242258
pub fn replace_schema_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()> {
243259
RT.block_on(self.inner.replace_schema_metadata(metadata))?;
244260
Ok(())

java/core/lance-jni/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ mod file_writer;
5959
mod fragment;
6060
mod schema;
6161
pub mod traits;
62+
mod transaction;
6263
pub mod utils;
6364

6465
pub use error::Error;
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET};
2+
use crate::error::Result;
3+
use crate::traits::IntoJava;
4+
use crate::utils::to_rust_map;
5+
use arrow::datatypes::Schema;
6+
use arrow_schema::ffi::FFI_ArrowSchema;
7+
use jni::objects::{JMap, JObject, JString};
8+
use jni::JNIEnv;
9+
use lance::dataset::transaction::{Operation, Transaction};
10+
use lance_core::datatypes::Schema as LanceSchema;
11+
12+
#[no_mangle]
13+
pub extern "system" fn Java_com_lancedb_lance_Transaction_commitNative<'local>(
14+
mut env: JNIEnv<'local>,
15+
jtransaction: JObject,
16+
) -> JObject<'local> {
17+
ok_or_throw!(env, inner_commit_transaction(&mut env, jtransaction))
18+
}
19+
20+
fn inner_commit_transaction<'local>(
21+
env: &mut JNIEnv<'local>,
22+
java_tx: JObject,
23+
) -> Result<JObject<'local>> {
24+
let java_dataset: JObject = env
25+
.call_method(&java_tx, "dataset", "()Lcom/lancedb/lance/Dataset;", &[])?
26+
.l()?;
27+
let write_param_jobj = env
28+
.call_method(&java_tx, "writeParams", "()Ljava/util/Map;", &[])?
29+
.l()?;
30+
let write_param_jmap = JMap::from_env(env, &write_param_jobj)?;
31+
let write_param = to_rust_map(env, &write_param_jmap)?;
32+
let transaction = convert_to_rust_transaction(env, java_tx)?;
33+
let new_blocking_ds = {
34+
let mut dataset_guard =
35+
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
36+
dataset_guard.commit_transaction(transaction, write_param)?
37+
};
38+
new_blocking_ds.into_java(env)
39+
}
40+
41+
fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Transaction> {
42+
let read_ver = env.call_method(&java_tx, "readVersion", "()J", &[])?.j()?;
43+
let uuid = env
44+
.call_method(&java_tx, "uuid", "()Ljava/lang/String;", &[])?
45+
.l()?;
46+
let uuid = JString::from(uuid);
47+
let uuid = env.get_string(&uuid)?.into();
48+
let op = env
49+
.call_method(
50+
&java_tx,
51+
"operation",
52+
"()Lcom/lancedb/lance/operation/Operation;",
53+
&[],
54+
)?
55+
.l()?;
56+
let op = convert_to_rust_operation(env, op)?;
57+
58+
let blobs_op = env
59+
.call_method(
60+
&java_tx,
61+
"blobsOperation",
62+
"()Lcom/lancedb/lance/operation/Operation;",
63+
&[],
64+
)?
65+
.l()?;
66+
let blobs_op = if blobs_op.is_null() {
67+
None
68+
} else {
69+
Some(convert_to_rust_operation(env, blobs_op)?)
70+
};
71+
72+
Ok(Transaction {
73+
read_version: read_ver as u64,
74+
uuid,
75+
operation: op,
76+
blobs_op,
77+
tag: None,
78+
})
79+
}
80+
81+
fn convert_to_rust_operation(env: &mut JNIEnv, java_operation: JObject) -> Result<Operation> {
82+
let name = env
83+
.call_method(&java_operation, "name", "()Ljava/lang/String;", &[])?
84+
.l()?;
85+
let name = JString::from(name);
86+
let name: String = env.get_string(&name)?.into();
87+
let op = match name.as_str() {
88+
"Project" => {
89+
let schema_ptr = env
90+
.call_method(&java_operation, "exportSchema", "()J", &[])?
91+
.j()?;
92+
log::info!("Schema pointer: {:?}", schema_ptr);
93+
let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema;
94+
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
95+
let schema = Schema::try_from(&c_schema)?;
96+
97+
Operation::Project {
98+
schema: LanceSchema::try_from(&schema)
99+
.expect("Failed to convert from arrow schema to lance schema"),
100+
}
101+
}
102+
_ => unimplemented!(),
103+
};
104+
Ok(op)
105+
}

java/core/src/main/java/com/lancedb/lance/Dataset.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,16 @@ public static native Dataset commitOverwrite(
266266
List<FragmentMetadata> fragmentsMetadata,
267267
Map<String, String> storageOptions);
268268

269+
/**
270+
* Create a new transaction builder at current version for the dataset. The dataset itself will
271+
* not refresh after the transaction committed.
272+
*
273+
* @return A new instance of {@link Transaction.Builder} linked to the opened dataset.
274+
*/
275+
public Transaction.Builder newTransactionBuilder() {
276+
return new Transaction.Builder(this).readVersion(version());
277+
}
278+
269279
/**
270280
* Drop a Dataset.
271281
*
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance;
15+
16+
import com.lancedb.lance.operation.Operation;
17+
import com.lancedb.lance.operation.Project;
18+
19+
import org.apache.arrow.util.Preconditions;
20+
import org.apache.arrow.vector.types.pojo.Schema;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.UUID;
25+
26+
/**
27+
* Align with the Transaction struct in rust. The transaction won't commit the status to original
28+
* dataset. It will return a new dataset after committed.
29+
*/
30+
public class Transaction {
31+
32+
private final long readVersion;
33+
private final String uuid;
34+
private final Map<String, String> writeParams;
35+
// Mainly for JNI usage
36+
private final Dataset dataset;
37+
private final Operation operation;
38+
private final Operation blobOp;
39+
40+
private Transaction(
41+
Dataset dataset,
42+
long readVersion,
43+
String uuid,
44+
Operation operation,
45+
Operation blobOp,
46+
Map<String, String> writeParams) {
47+
this.dataset = dataset;
48+
this.readVersion = readVersion;
49+
this.uuid = uuid;
50+
this.operation = operation;
51+
this.blobOp = blobOp;
52+
this.writeParams = writeParams != null ? writeParams : new HashMap<>();
53+
}
54+
55+
public Dataset dataset() {
56+
return dataset;
57+
}
58+
59+
public long readVersion() {
60+
return readVersion;
61+
}
62+
63+
public String uuid() {
64+
return uuid;
65+
}
66+
67+
public Operation operation() {
68+
return operation;
69+
}
70+
71+
public Operation blobsOperation() {
72+
return blobOp;
73+
}
74+
75+
public Map<String, String> writeParams() {
76+
return writeParams;
77+
}
78+
79+
public Dataset commit() {
80+
try {
81+
Dataset committed = commitNative();
82+
committed.allocator = dataset.allocator;
83+
return committed;
84+
} finally {
85+
operation.release();
86+
if (blobOp != null) {
87+
blobOp.release();
88+
}
89+
}
90+
}
91+
92+
private native Dataset commitNative();
93+
94+
public static class Builder {
95+
private final String uuid;
96+
private final Dataset dataset;
97+
private long readVersion;
98+
private Operation operation;
99+
private Operation blobOp;
100+
private Map<String, String> writeParams;
101+
102+
public Builder(Dataset dataset) {
103+
this.dataset = dataset;
104+
this.uuid = UUID.randomUUID().toString();
105+
}
106+
107+
public Builder readVersion(long readVersion) {
108+
this.readVersion = readVersion;
109+
return this;
110+
}
111+
112+
public Builder writeParams(Map<String, String> writeParams) {
113+
this.writeParams = writeParams;
114+
return this;
115+
}
116+
117+
public Builder project(Schema newSchema) {
118+
validateState();
119+
this.operation = new Project.Builder().schema(newSchema).allocator(dataset.allocator).build();
120+
return this;
121+
}
122+
123+
private void validateState() {
124+
Preconditions.checkState(operation == null, "Operation " + operation + " already set");
125+
}
126+
127+
public Transaction build() {
128+
Preconditions.checkState(operation != null, "TransactionBuilder has no operations");
129+
return new Transaction(dataset, readVersion, uuid, operation, blobOp, writeParams);
130+
}
131+
}
132+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.lancedb.lance.operation;
15+
16+
/** Operation interface. */
17+
public interface Operation {
18+
19+
/**
20+
* We use this name to align with the Rust operation enum underlying in JNI.
21+
*
22+
* @return the name of the operation.
23+
*/
24+
String name();
25+
26+
/** Release the underlying JNI resource including arrow c schema */
27+
void release();
28+
}

0 commit comments

Comments
 (0)