Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub(crate) fn convert_to_java_transaction<'local>(
env: &mut JNIEnv<'local>,
transaction: Transaction,
) -> Result<JObject<'local>> {
let uuid = env.new_string(transaction.uuid)?;
let uuid = transaction.uuid.into_java(env)?;
let tag = match transaction.tag {
Some(tag) => JObject::from(env.new_string(tag)?),
None => JObject::null(),
Expand All @@ -322,7 +322,7 @@ pub(crate) fn convert_to_java_transaction<'local>(

let java_transaction = env.new_object(
"org/lance/Transaction",
"(JLjava/lang/String;Lorg/lance/operation/Operation;Ljava/lang/String;Ljava/util/Map;)V",
"(JLjava/util/UUID;Lorg/lance/operation/Operation;Ljava/lang/String;Ljava/util/Map;)V",
&[
JValue::Long(transaction.read_version as i64),
JValue::Object(&uuid),
Expand Down Expand Up @@ -785,7 +785,10 @@ fn convert_to_rust_transaction(
dataset: Option<&mut BlockingDataset>,
) -> Result<Transaction> {
let read_ver = env.get_u64_from_method(&java_transaction, "readVersion")?;
let uuid = env.get_string_from_method(&java_transaction, "uuid")?;
let uuid = env
.call_method(&java_transaction, "uuid", "()Ljava/util/UUID;", &[])?
.l()?
.extract_object(env)?;
let op = env
.call_method(
&java_transaction,
Expand Down
3 changes: 2 additions & 1 deletion java/src/main/java/org/lance/SourcedTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

/**
* A convenience wrapper that pairs a {@link Transaction} with a {@link Dataset}, providing a simple
Expand Down Expand Up @@ -59,7 +60,7 @@ public long readVersion() {
}

/** Delegates to {@link Transaction#uuid()}. */
public String uuid() {
public UUID uuid() {
return transaction.uuid();
}

Expand Down
14 changes: 7 additions & 7 deletions java/src/main/java/org/lance/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class Transaction implements AutoCloseable {

private final long readVersion;
private final String uuid;
private final UUID uuid;
private final Operation operation;
private final Optional<String> tag;
private final Optional<Map<String, String>> transactionProperties;
Expand All @@ -50,7 +50,7 @@ public class Transaction implements AutoCloseable {
*/
private Transaction(
long readVersion,
String uuid,
UUID uuid,
Operation operation,
String tag,
Map<String, String> transactionProperties) {
Expand All @@ -69,14 +69,14 @@ private Transaction(
* @param operation the operation to perform
*/
public Transaction(long readVersion, Operation operation) {
this(readVersion, UUID.randomUUID().toString(), operation, null, null);
this(readVersion, UUID.randomUUID(), operation, null, null);
}

public long readVersion() {
return readVersion;
}

public String uuid() {
public UUID uuid() {
return uuid;
}

Expand Down Expand Up @@ -133,22 +133,22 @@ public int hashCode() {

/** Builder for constructing {@link Transaction} instances. */
public static class Builder {
private String uuid;
private UUID uuid;
private long readVersion;
private Operation operation;
private String tag;
private Map<String, String> transactionProperties;

public Builder() {
this.uuid = UUID.randomUUID().toString();
this.uuid = UUID.randomUUID();
}

public Builder readVersion(long readVersion) {
this.readVersion = readVersion;
return this;
}

public Builder uuid(String uuid) {
public Builder uuid(UUID uuid) {
this.uuid = uuid;
return this;
}
Expand Down
3 changes: 2 additions & 1 deletion java/src/test/java/org/lance/TransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -175,7 +176,7 @@ public void testCustomUuid(@TempDir Path tempDir) {
try (Dataset dataset = testDataset.createEmptyDataset()) {
FragmentMetadata fragmentMeta = testDataset.createNewFragment(10);

String customUuid = "custom-uuid-12345";
UUID customUuid = UUID.fromString("550e8400-e29b-41d4-a716-446655440000");
try (Transaction txn =
new Transaction.Builder()
.readVersion(dataset.version())
Expand Down
2 changes: 1 addition & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4361,7 +4361,7 @@ class BulkCommitResult(TypedDict):
class Transaction:
read_version: int
operation: LanceOperation.BaseOperation
uuid: str = dataclasses.field(default_factory=lambda: str(uuid.uuid4()))
uuid: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4)
transaction_properties: Optional[Dict[str, str]] = dataclasses.field(
default_factory=dict
)
Expand Down
10 changes: 5 additions & 5 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5027,7 +5027,7 @@ def test_update_config_transaction(tmp_path: Path):
)

transaction = lance.Transaction(
read_version=ds.version, operation=update_config_op, uuid=str(uuid.uuid4())
read_version=ds.version, operation=update_config_op, uuid=uuid.uuid4()
)

# Commit transaction
Expand All @@ -5051,7 +5051,7 @@ def test_update_config_transaction(tmp_path: Path):
)

transaction2 = lance.Transaction(
read_version=ds_v2.version, operation=update_config_op2, uuid=str(uuid.uuid4())
read_version=ds_v2.version, operation=update_config_op2, uuid=uuid.uuid4()
)

ds_v3 = lance.LanceDataset.commit(tmp_path, transaction2)
Expand All @@ -5073,7 +5073,7 @@ def test_update_config_transaction(tmp_path: Path):
)

transaction3 = lance.Transaction(
read_version=ds_v3.version, operation=update_config_op3, uuid=str(uuid.uuid4())
read_version=ds_v3.version, operation=update_config_op3, uuid=uuid.uuid4()
)

ds_v4 = lance.LanceDataset.commit(tmp_path, transaction3)
Expand Down Expand Up @@ -5102,7 +5102,7 @@ def test_update_config_transaction(tmp_path: Path):
)

transaction4 = lance.Transaction(
read_version=ds_v4.version, operation=update_config_op4, uuid=str(uuid.uuid4())
read_version=ds_v4.version, operation=update_config_op4, uuid=uuid.uuid4()
)

ds_v5 = lance.LanceDataset.commit(tmp_path, transaction4)
Expand Down Expand Up @@ -5147,7 +5147,7 @@ def test_update_config_transaction(tmp_path: Path):
transaction5 = lance.Transaction(
read_version=ds_v6.version, # Use the actual latest version
operation=update_config_op5,
uuid=str(uuid.uuid4()),
uuid=uuid.uuid4(),
)

ds_v7 = lance.LanceDataset.commit(tmp_path, transaction5)
Expand Down
7 changes: 5 additions & 2 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ impl<'py> IntoPyObject<'py> for PyLance<&Operation> {
impl FromPyObject<'_> for PyLance<Transaction> {
fn extract_bound(ob: &pyo3::Bound<'_, PyAny>) -> PyResult<Self> {
let read_version = ob.getattr("read_version")?.extract()?;
let uuid = ob.getattr("uuid")?.extract()?;
let uuid_str = ob.getattr("uuid")?.to_string();
let uuid = Uuid::parse_str(&uuid_str).map_err(|e| PyValueError::new_err(e.to_string()))?;
let operation = ob.getattr("operation")?.extract::<PyLance<Operation>>()?.0;
let transaction_properties = ob
.getattr("transaction_properties")?
Expand Down Expand Up @@ -613,7 +614,9 @@ impl<'py> IntoPyObject<'py> for PyLance<&Transaction> {
.expect("Failed to import lance module");

let read_version = self.0.read_version;
let uuid = &self.0.uuid;
let uuid_mod = py.import(intern!(py, "uuid"))?;
let uuid_cls = uuid_mod.getattr(intern!(py, "UUID"))?;
let uuid = uuid_cls.call1((self.0.uuid.to_string(),))?;
let operation = PyLance(&self.0.operation).into_pyobject(py)?;

let cls = namespace
Expand Down
1 change: 1 addition & 0 deletions rust/lance-namespace-impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ time = { version = "0.3", optional = true }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
tempfile.workspace = true
uuid.workspace = true
wiremock.workspace = true
arrow = { workspace = true }
arrow-ipc = { workspace = true }
Expand Down
93 changes: 49 additions & 44 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ impl DirectoryNamespace {
.as_ref()
.map(|properties| (**properties).clone())
.unwrap_or_default();
properties.insert("uuid".to_string(), transaction.uuid.clone());
properties.insert("uuid".to_string(), transaction.uuid.to_string());
properties.insert("version".to_string(), version.to_string());
properties.insert(
"read_version".to_string(),
Expand Down Expand Up @@ -1171,7 +1171,7 @@ impl DirectoryNamespace {
),
})
})?
&& transaction.uuid == id
&& transaction.uuid.to_string() == id
{
return Ok((version.version, transaction));
}
Expand Down Expand Up @@ -2491,7 +2491,7 @@ impl LanceNamespace for DirectoryNamespace {
),
})
})?
.map(|transaction| transaction.uuid);
.map(|transaction| transaction.uuid.to_string());

Ok(CreateTableIndexResponse { transaction_id })
}
Expand Down Expand Up @@ -2721,7 +2721,7 @@ impl LanceNamespace for DirectoryNamespace {
),
})
})?
.map(|transaction| transaction.uuid);
.map(|transaction| transaction.uuid.to_string());

Ok(DropTableIndexResponse { transaction_id })
}
Expand All @@ -2744,6 +2744,7 @@ mod tests {
use lance_namespace::schema::convert_json_arrow_schema;
use std::io::Cursor;
use std::sync::Arc;
use uuid::Uuid;

/// Helper to create a test DirectoryNamespace with a temporary directory
async fn create_test_namespace() -> (DirectoryNamespace, TempStdDir) {
Expand Down Expand Up @@ -3064,12 +3065,12 @@ mod tests {

let transaction_id = create_scalar_index(&namespace, "users", "users_id_idx").await;
let dataset = open_dataset(&namespace, "users").await;
let expected_transaction_id = dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(transaction_id, expected_transaction_id);
assert_eq!(
transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
dataset.read_transaction().await.unwrap().map(|t| t.uuid)
);
let indices = dataset.load_indices().await.unwrap();
assert!(indices.iter().any(|index| index.name == "users_id_idx"));
}
Expand All @@ -3093,12 +3094,12 @@ mod tests {
.transaction_id;

let dataset = open_dataset(&namespace, "vectors").await;
let expected_transaction_id = dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(transaction_id, expected_transaction_id);
assert_eq!(
transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
dataset.read_transaction().await.unwrap().map(|t| t.uuid)
);
let indices = dataset.load_indices().await.unwrap();
assert!(indices.iter().any(|index| index.name == "vector_idx"));
}
Expand Down Expand Up @@ -3135,12 +3136,12 @@ mod tests {
assert_eq!(users_id_idx.status, "SUCCEEDED");

let dataset = open_dataset(&namespace, "users").await;
let expected_transaction_id = dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(transaction_id, expected_transaction_id);
assert_eq!(
transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
dataset.read_transaction().await.unwrap().map(|t| t.uuid)
);
let indices = dataset.load_indices().await.unwrap();
assert_eq!(
indices
Expand Down Expand Up @@ -3201,12 +3202,12 @@ mod tests {
assert_eq!(response.num_unindexed_rows, Some(0));

let dataset = open_dataset(&namespace, "users").await;
let expected_transaction_id = dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(transaction_id, expected_transaction_id);
assert_eq!(
transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
dataset.read_transaction().await.unwrap().map(|t| t.uuid)
);
let stats: serde_json::Value =
serde_json::from_str(&dataset.index_statistics("users_id_idx").await.unwrap()).unwrap();
assert_eq!(stats["index_type"], "BTree");
Expand All @@ -3225,10 +3226,10 @@ mod tests {
let dataset = open_dataset(&namespace, "users").await;
let latest_transaction = dataset.read_transaction().await.unwrap();
assert_eq!(
transaction_id,
latest_transaction
.as_ref()
.map(|transaction| transaction.uuid.clone())
transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
latest_transaction.as_ref().map(|t| t.uuid)
);

if let Some(transaction_id) = transaction_id {
Expand Down Expand Up @@ -3282,18 +3283,22 @@ mod tests {
.checkout_version(dataset.version().version - 1)
.await
.unwrap();
let previous_transaction_id = previous_dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(create_transaction_id, previous_transaction_id);
let expected_drop_transaction_id = dataset
.read_transaction()
.await
.unwrap()
.map(|transaction| transaction.uuid);
assert_eq!(drop_transaction_id, expected_drop_transaction_id);
assert_eq!(
create_transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
previous_dataset
.read_transaction()
.await
.unwrap()
.map(|t| t.uuid)
);
assert_eq!(
drop_transaction_id
.as_deref()
.map(|s| Uuid::parse_str(s).unwrap()),
dataset.read_transaction().await.unwrap().map(|t| t.uuid)
);
let indices = dataset.load_indices().await.unwrap();
assert!(!indices.iter().any(|index| index.name == "users_id_idx"));

Expand Down
Loading
Loading