diff --git a/docs/src/format/table/branch_tag.md b/docs/src/format/table/branch_tag.md index e3bd328d5d2..d4784a288e9 100644 --- a/docs/src/format/table/branch_tag.md +++ b/docs/src/format/table/branch_tag.md @@ -38,10 +38,11 @@ Each branch metadata file is a JSON file with the following fields: | JSON Key | Type | Optional | Description | |------------------|--------|----------|--------------------------------------------------------------------------------| -| `parent_branch` | string | Yes | Name of the branch this was created from. `null` indicates branched from main. | -| `parent_version` | number | | Version number of the parent branch at the time this branch was created. | -| `create_at` | number | | Unix timestamp (seconds since epoch) when the branch was created. | -| `manifest_size` | number | | Size of the initial manifest file in bytes. | +| `parentBranch` | string | Yes | Name of the branch this was created from. `null` indicates branched from main. | +| `parentVersion` | number | | Version number of the parent branch at the time this branch was created. | +| `createAt` | number | | Unix timestamp (seconds since epoch) when the branch was created. | +| `manifestSize` | number | | Size of the initial manifest file in bytes. | +| `metadata` | object | Yes | String key/value metadata map. If absent, it is treated as an empty object. | ### Branch Dataset Layout @@ -118,4 +119,7 @@ Each tag file is a JSON file with the following fields: |-----------------|--------|----------|--------------------------------------------------------------------------| | `branch` | string | Yes | Branch name being tagged. `null` or absent indicates main branch. | | `version` | number | | Version number being tagged within that branch. | -| `manifest_size` | number | | Size of the manifest file in bytes. Used for efficient manifest loading. | +| `createdAt` | string | Yes | RFC 3339 timestamp for when the tag was first created. | +| `updatedAt` | string | Yes | RFC 3339 timestamp for the latest tag reference update. | +| `manifestSize` | number | | Size of the manifest file in bytes. Used for efficient manifest loading. | +| `metadata` | object | Yes | String key/value metadata map. If absent, it is treated as an empty object. | diff --git a/docs/src/guide/tags_and_branches.md b/docs/src/guide/tags_and_branches.md index 02701f29e84..8af2302bfb0 100644 --- a/docs/src/guide/tags_and_branches.md +++ b/docs/src/guide/tags_and_branches.md @@ -36,10 +36,10 @@ print(ds.tags.list()) # {} ds.tags.create("v1-prod", (None, 1)) print(ds.tags.list()) -# {'v1-prod': {'version': 1, 'manifest_size': ...}} +# {'v1-prod': {'version': 1, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...}} ds.tags.update("v1-prod", (None, 2)) print(ds.tags.list()) -# {'v1-prod': {'version': 2, 'manifest_size': ...}} +# {'v1-prod': {'version': 2, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...}} ds.tags.delete("v1-prod") print(ds.tags.list()) # {} @@ -47,10 +47,10 @@ print(ds.tags.list_ordered()) # [] ds.tags.create("v1-prod", (None, 1)) print(ds.tags.list_ordered()) -# [('v1-prod', {'version': 1, 'manifest_size': ...})] +# [('v1-prod', {'version': 1, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...})] ds.tags.update("v1-prod", (None, 2)) print(ds.tags.list_ordered()) -# [('v1-prod', {'version': 2, 'manifest_size': ...})] +# [('v1-prod', {'version': 2, 'created_at': ..., 'updated_at': ..., 'manifest_size': ...})] ds.tags.delete("v1-prod") print(ds.tags.list_ordered()) # [] @@ -122,4 +122,4 @@ print(ds.branches.list_ordered(order="desc")) Branches hold references to data files. Lance ensures that cleanup does not delete files still referenced by any branch. - Delete unused branches to allow their referenced files to be cleaned up by `cleanup_old_versions()`. \ No newline at end of file + Delete unused branches to allow their referenced files to be cleaned up by `cleanup_old_versions()`. diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 700cbdf308a..a91bd27fccb 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -11,7 +11,7 @@ use crate::storage_options::JavaStorageOptionsProvider; use crate::traits::{FromJObjectWithEnv, FromJString, export_vec, import_vec, import_vec_to_rust}; use crate::utils::{ build_compaction_options, extract_storage_options, extract_write_params, - get_scalar_index_params, get_vector_index_params, to_rust_map, + get_scalar_index_params, get_vector_index_params, to_java_map, to_rust_map, }; use crate::{RT, traits::IntoJava}; use arrow::array::RecordBatchReader; @@ -314,6 +314,24 @@ impl BlockingDataset { Ok(()) } + pub fn replace_tag_metadata( + &mut self, + tag: &str, + metadata: HashMap, + ) -> Result<()> { + RT.block_on(self.inner.tags().replace_metadata(tag, metadata))?; + Ok(()) + } + + pub fn replace_branch_metadata( + &mut self, + branch: &str, + metadata: HashMap, + ) -> Result<()> { + RT.block_on(self.inner.branches().replace_metadata(branch, metadata))?; + Ok(()) + } + pub fn get_version(&self, tag: &str) -> Result { let version = RT.block_on(self.inner.tags().get_version(tag))?; Ok(version) @@ -2261,6 +2279,26 @@ fn inner_add_columns_by_schema( ////////////////////////////// // Tag operation Methods // ////////////////////////////// +fn optional_datetime_to_java_instant<'local>( + env: &mut JNIEnv<'local>, + timestamp: Option<&DateTime>, +) -> Result> { + if let Some(timestamp) = timestamp { + let seconds = timestamp.timestamp(); + let nanos = timestamp.timestamp_subsec_nanos() as i64; + Ok(env + .call_static_method( + "java/time/Instant", + "ofEpochSecond", + "(JJ)Ljava/time/Instant;", + &[JValue::Long(seconds), JValue::Long(nanos)], + )? + .l()?) + } else { + Ok(JObject::null()) + } +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeListTags<'local>( mut env: JNIEnv<'local>, @@ -2273,27 +2311,34 @@ fn inner_list_tags<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, ) -> Result> { - let tag_map = { + let mut tags: Vec<_> = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - dataset_guard.list_tags()? + dataset_guard.list_tags()?.into_iter().collect() }; + tags.sort_unstable_by(|(left_name, _), (right_name, _)| left_name.cmp(right_name)); let array_list = env.new_object("java/util/ArrayList", "()V", &[])?; - for (tag_name, tag_contents) in tag_map { + for (tag_name, tag_contents) in tags { let branch_name: JObject = if let Some(branch_name) = tag_contents.branch.as_ref() { env.new_string(branch_name)?.into() } else { JObject::null() }; + let created_at = optional_datetime_to_java_instant(env, tag_contents.created_at.as_ref())?; + let updated_at = optional_datetime_to_java_instant(env, tag_contents.updated_at.as_ref())?; + let java_metadata = to_java_map(env, &tag_contents.metadata)?; let java_tag = env.new_object( "org/lance/Tag", - "(Ljava/lang/String;Ljava/lang/String;JI)V", + "(Ljava/lang/String;Ljava/lang/String;JILjava/time/Instant;Ljava/time/Instant;Ljava/util/Map;)V", &[ JValue::Object(&env.new_string(tag_name)?.into()), JValue::Object(&branch_name), JValue::Long(tag_contents.version as i64), JValue::Int(tag_contents.manifest_size as i32), + JValue::Object(&created_at), + JValue::Object(&updated_at), + JValue::Object(&java_metadata), ], )?; env.call_method( @@ -2375,6 +2420,32 @@ fn inner_update_tag( dataset_guard.update_tag(tag.as_str(), reference) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeReplaceTagMetadata( + mut env: JNIEnv, + java_dataset: JObject, + jtag_name: JString, + jmetadata: JObject, +) { + ok_or_throw_without_return!( + env, + inner_replace_tag_metadata(&mut env, java_dataset, jtag_name, jmetadata) + ) +} + +fn inner_replace_tag_metadata( + env: &mut JNIEnv, + java_dataset: JObject, + jtag_name: JString, + jmetadata: JObject, +) -> Result<()> { + let tag = jtag_name.extract(env)?; + let metadata = extract_metadata_map(env, &jmetadata)?; + let mut dataset_guard = + { unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }? }; + dataset_guard.replace_tag_metadata(tag.as_str(), metadata) +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeGetVersionByTag( mut env: JNIEnv, @@ -2414,11 +2485,12 @@ fn inner_list_branches<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, ) -> Result> { - let branches = { + let mut branches: Vec<_> = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - dataset_guard.list_branches()? + dataset_guard.list_branches()?.into_iter().collect() }; + branches.sort_unstable_by(|(left_name, _), (right_name, _)| left_name.cmp(right_name)); let array_list = env.new_object("java/util/ArrayList", "()V", &[])?; for (name, contents) in branches { @@ -2447,9 +2519,10 @@ fn inner_list_branches<'local>( &[JValue::Object(&jmapping)], )?; } + let java_metadata = to_java_map(env, &contents.metadata)?; let jbranch = env.new_object( "org/lance/Branch", - "(Ljava/lang/String;Ljava/lang/String;Ljava/util/List;JJI)V", + "(Ljava/lang/String;Ljava/lang/String;Ljava/util/List;JJILjava/util/Map;)V", &[ JValue::Object(&jname), JValue::Object(&jparent), @@ -2457,6 +2530,7 @@ fn inner_list_branches<'local>( JValue::Long(contents.parent_version as i64), JValue::Long(contents.create_at as i64), JValue::Int(contents.manifest_size as i32), + JValue::Object(&java_metadata), ], )?; env.call_method( @@ -2507,6 +2581,37 @@ fn inner_create_branch<'local>( new_blocking_dataset.into_java(env) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_Dataset_nativeReplaceBranchMetadata( + mut env: JNIEnv, + java_dataset: JObject, + jbranch: JString, + jmetadata: JObject, +) { + ok_or_throw_without_return!( + env, + inner_replace_branch_metadata(&mut env, java_dataset, jbranch, jmetadata) + ) +} + +fn inner_replace_branch_metadata( + env: &mut JNIEnv, + java_dataset: JObject, + jbranch: JString, + jmetadata: JObject, +) -> Result<()> { + let branch: String = jbranch.extract(env)?; + let metadata = extract_metadata_map(env, &jmetadata)?; + let mut dataset_guard = + unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; + dataset_guard.replace_branch_metadata(&branch, metadata) +} + +fn extract_metadata_map(env: &mut JNIEnv, jmetadata: &JObject) -> Result> { + let jmap = JMap::from_env(env, jmetadata)?; + to_rust_map(env, &jmap) +} + fn transform_jref_to_ref(jref: JObject, env: &mut JNIEnv) -> Result { let source_tag_name = env.get_optional_string_from_method(&jref, "getTagName")?; let source_version_number = env.get_optional_u64_from_method(&jref, "getVersionNumber")?; diff --git a/java/lance-jni/src/traits.rs b/java/lance-jni/src/traits.rs index 09b90639c48..8e3fad8eedc 100644 --- a/java/lance-jni/src/traits.rs +++ b/java/lance-jni/src/traits.rs @@ -262,6 +262,15 @@ impl IntoJava for JLance> { } } +impl IntoJava for JLance> { + fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result> { + Ok(match self.0 { + Some(value) => env.new_string(value)?.into(), + None => JObject::null(), + }) + } +} + impl FromJObjectWithEnv> for JObject<'_> { fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result> { let ret = if self.is_null() { diff --git a/java/src/main/java/org/lance/Branch.java b/java/src/main/java/org/lance/Branch.java index 07f79a08f6a..526a65c33da 100755 --- a/java/src/main/java/org/lance/Branch.java +++ b/java/src/main/java/org/lance/Branch.java @@ -16,7 +16,10 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -70,10 +73,18 @@ public int hashCode() { private final long parentVersion; private final long createAt; private final int manifestSize; + private final Map metadata; public Branch( String name, String parentBranch, long parentVersion, long createAt, int manifestSize) { - this(name, parentBranch, ImmutableList.of(), parentVersion, createAt, manifestSize); + this( + name, + parentBranch, + ImmutableList.of(), + parentVersion, + createAt, + manifestSize, + Collections.emptyMap()); } public Branch( @@ -82,13 +93,15 @@ public Branch( List branchIdentifier, long parentVersion, long createAt, - int manifestSize) { + int manifestSize, + Map metadata) { this.name = name; this.parentBranch = Optional.ofNullable(parentBranch); this.branchIdentifier = ImmutableList.copyOf(Objects.requireNonNull(branchIdentifier)); this.parentVersion = parentVersion; this.createAt = createAt; this.manifestSize = manifestSize; + this.metadata = Collections.unmodifiableMap(new HashMap<>(metadata)); } public String getName() { @@ -115,6 +128,10 @@ public int getManifestSize() { return manifestSize; } + public Map getMetadata() { + return metadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -124,6 +141,7 @@ public String toString() { .add("parentVersion", parentVersion) .add("createAt", createAt) .add("manifestSize", manifestSize) + .add("metadata", metadata) .toString(); } @@ -137,12 +155,13 @@ public boolean equals(Object o) { && manifestSize == branch.manifestSize && Objects.equals(name, branch.name) && Objects.equals(parentBranch, branch.parentBranch) - && Objects.equals(branchIdentifier, branch.branchIdentifier); + && Objects.equals(branchIdentifier, branch.branchIdentifier) + && Objects.equals(metadata, branch.metadata); } @Override public int hashCode() { return Objects.hash( - name, parentBranch, branchIdentifier, parentVersion, createAt, manifestSize); + name, parentBranch, branchIdentifier, parentVersion, createAt, manifestSize, metadata); } } diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 5cba7696a76..4f8c4049e19 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1695,6 +1695,15 @@ public void update(String tag, Ref ref) { } } + public void replaceMetadata(String tag, Map metadata) { + Preconditions.checkArgument(tag != null, "tag cannot be null"); + Preconditions.checkArgument(metadata != null, "metadata cannot be null"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + nativeReplaceTagMetadata(tag, metadata); + } + } + /** * List all tags of the dataset. * @@ -1747,6 +1756,15 @@ public List list() { return nativeListBranches(); } } + + public void replaceMetadata(String branchName, Map metadata) { + Preconditions.checkArgument(branchName != null, "branchName cannot be null"); + Preconditions.checkArgument(metadata != null, "metadata cannot be null"); + try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { + Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); + nativeReplaceBranchMetadata(branchName, metadata); + } + } } /** @@ -1832,6 +1850,8 @@ private native MergeInsertResult nativeMergeInsert( private native void nativeUpdateTag(String tag, Ref ref); + private native void nativeReplaceTagMetadata(String tag, Map metadata); + private native List nativeListTags(); private native long nativeGetVersionByTag(String tag); @@ -1846,6 +1866,8 @@ private native Dataset nativeCreateBranch( private native List nativeListBranches(); + private native void nativeReplaceBranchMetadata(String branch, Map metadata); + public Dataset shallowClone(String targetPath, Ref ref) { return shallowClone(targetPath, ref, null); } diff --git a/java/src/main/java/org/lance/Tag.java b/java/src/main/java/org/lance/Tag.java index f7ce7be83cc..d09185d38c0 100644 --- a/java/src/main/java/org/lance/Tag.java +++ b/java/src/main/java/org/lance/Tag.java @@ -15,6 +15,10 @@ import com.google.common.base.MoreObjects; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -23,12 +27,34 @@ public class Tag { private final Optional branch; private final long version; private final int manifestSize; + private final Optional createdAt; + private final Optional updatedAt; + private final Map metadata; public Tag(String name, String branch, long version, int manifestSize) { + this(name, branch, version, manifestSize, null, null, Collections.emptyMap()); + } + + public Tag( + String name, String branch, long version, int manifestSize, Map metadata) { + this(name, branch, version, manifestSize, null, null, metadata); + } + + public Tag( + String name, + String branch, + long version, + int manifestSize, + Instant createdAt, + Instant updatedAt, + Map metadata) { this.name = name; this.branch = Optional.ofNullable(branch); this.version = version; this.manifestSize = manifestSize; + this.createdAt = Optional.ofNullable(createdAt); + this.updatedAt = Optional.ofNullable(updatedAt); + this.metadata = Collections.unmodifiableMap(new HashMap<>(metadata)); } public String getName() { @@ -47,6 +73,18 @@ public int getManifestSize() { return manifestSize; } + public Optional getCreatedAt() { + return createdAt; + } + + public Optional getUpdatedAt() { + return updatedAt; + } + + public Map getMetadata() { + return metadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -54,6 +92,9 @@ public String toString() { .add("branch", branch) .add("version", version) .add("manifestSize", manifestSize) + .add("createdAt", createdAt) + .add("updatedAt", updatedAt) + .add("metadata", metadata) .toString(); } @@ -69,11 +110,14 @@ public boolean equals(Object o) { return version == tag.version && Objects.equals(branch, tag.branch) && manifestSize == tag.manifestSize + && Objects.equals(createdAt, tag.createdAt) + && Objects.equals(updatedAt, tag.updatedAt) + && Objects.equals(metadata, tag.metadata) && Objects.equals(name, tag.name); } @Override public int hashCode() { - return Objects.hash(name, branch, version, manifestSize); + return Objects.hash(name, branch, version, manifestSize, createdAt, updatedAt, metadata); } } diff --git a/java/src/test/java/org/lance/DatasetTest.java b/java/src/test/java/org/lance/DatasetTest.java index 2b241013427..f8987f64ba4 100644 --- a/java/src/test/java/org/lance/DatasetTest.java +++ b/java/src/test/java/org/lance/DatasetTest.java @@ -61,6 +61,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -317,8 +318,15 @@ void testTags(@TempDir Path tempDir) { try (Dataset dataset = testDataset.createEmptyDataset()) { assertEquals(1, dataset.version()); dataset.tags().create("tag1", Ref.ofMain()); - assertEquals(1, dataset.tags().list().size()); - assertEquals(1, dataset.tags().list().get(0).getVersion()); + dataset.tags().replaceMetadata("tag1", Map.of("description", "primary tag")); + List tags = dataset.tags().list(); + Tag tag1 = tags.get(0); + assertEquals(1, tags.size()); + assertEquals(1, tag1.getVersion()); + assertEquals(Map.of("description", "primary tag"), tag1.getMetadata()); + assertTrue(tag1.getCreatedAt().isPresent()); + assertTrue(tag1.getUpdatedAt().isPresent()); + assertEquals(tag1.getCreatedAt(), tag1.getUpdatedAt()); assertEquals(1, dataset.tags().getVersion("tag1")); } @@ -332,12 +340,67 @@ void testTags(@TempDir Path tempDir) { assertEquals(2, dataset2.tags().list().size()); assertEquals(1, dataset2.tags().getVersion("tag1")); assertEquals(2, dataset2.tags().getVersion("tag2")); - dataset2.tags().update("tag2", Ref.ofMain(1)); + dataset2.tags().replaceMetadata("tag2", Map.of("description", "rollback tag")); + Instant tag2CreatedAt = + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getCreatedAt() + .orElseThrow(); + Instant tag2UpdatedAt = + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getUpdatedAt() + .orElseThrow(); + assertEquals(tag2CreatedAt, tag2UpdatedAt); assertEquals(2, dataset2.tags().list().size()); assertEquals(1, dataset2.tags().list().get(0).getVersion()); - assertEquals(1, dataset2.tags().list().get(1).getVersion()); + assertEquals(2, dataset2.tags().list().get(1).getVersion()); assertEquals(1, dataset2.tags().getVersion("tag1")); + assertEquals(2, dataset2.tags().getVersion("tag2")); + assertEquals( + Map.of("description", "rollback tag"), + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getMetadata()); + dataset2.tags().update("tag2", Ref.ofMain(1)); + Instant updatedTag2CreatedAt = + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getCreatedAt() + .orElseThrow(); + Instant updatedTag2UpdatedAt = + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getUpdatedAt() + .orElseThrow(); assertEquals(1, dataset2.tags().getVersion("tag2")); + assertEquals(updatedTag2CreatedAt, tag2CreatedAt); + assertFalse(updatedTag2UpdatedAt.isBefore(tag2UpdatedAt)); + assertEquals( + Map.of("description", "rollback tag"), + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getMetadata()); + dataset2.tags().replaceMetadata("tag2", Collections.emptyMap()); + assertEquals( + Collections.emptyMap(), + dataset2.tags().list().stream() + .filter(tag -> tag.getName().equals("tag2")) + .findFirst() + .orElseThrow() + .getMetadata()); dataset2.tags().delete("tag2"); assertEquals(1, dataset2.tags().list().size()); assertEquals(1, dataset2.tags().list().get(0).getVersion()); @@ -357,6 +420,7 @@ void testTags(@TempDir Path tempDir) { try (Dataset branch = dataset2.createBranch("branch", Ref.ofMain(2))) { branch.tags().create("tag_on_branch", Ref.ofBranch("branch")); + branch.tags().replaceMetadata("tag_on_branch", Map.of("description", "branch tag")); assertEquals(2, dataset2.tags().getVersion("tag_on_branch")); List tags = dataset2.tags().list(); Optional tagOptional = @@ -367,6 +431,7 @@ void testTags(@TempDir Path tempDir) { assertTrue(tagOptional.isPresent()); assertEquals(2, tagOptional.get().getVersion()); assertEquals(Optional.of("branch"), tagOptional.get().getBranch()); + assertEquals(Map.of("description", "branch tag"), tagOptional.get().getMetadata()); dataset2.tags().update("tag1", Ref.ofBranch("branch")); tags = dataset2.tags().list(); @@ -1741,6 +1806,19 @@ void testBranches(@TempDir Path tempDir) { assertFalse(branch1Meta.getBranchIdentifier().get(0).getUuid().isEmpty()); assertTrue(branch1Meta.getCreateAt() > 0); assertTrue(branch1Meta.getManifestSize() > 0); + assertEquals(Collections.emptyMap(), branch1Meta.getMetadata()); + mainV2 + .branches() + .replaceMetadata("branch1", Map.of("description", "long-lived branch")); + branches = branch2V4.branches().list(); + b1 = branches.stream().filter(b -> b.getName().equals("branch1")).findFirst(); + assertTrue(b1.isPresent()); + assertEquals(Map.of("description", "long-lived branch"), b1.get().getMetadata()); + mainV2.branches().replaceMetadata("branch1", Collections.emptyMap()); + branches = branch2V4.branches().list(); + b1 = branches.stream().filter(b -> b.getName().equals("branch1")).findFirst(); + assertTrue(b1.isPresent()); + assertEquals(Collections.emptyMap(), b1.get().getMetadata()); assertEquals("branch2", branch2Meta.getName()); assertTrue(branch2Meta.getParentBranch().isPresent()); diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index fda7ddd5d0d..b0dd7a3ec21 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -617,7 +617,6 @@ def create_branch( storage_options: Optional[Dict[str, str]] Storage options for the underlying object store. If not provided, the storage options from the current dataset will be used. - Returns ------- LanceDataset @@ -4367,7 +4366,10 @@ class Transaction: class Tag(TypedDict): branch: Optional[str] version: int + created_at: Optional[datetime] + updated_at: Optional[datetime] manifest_size: int + metadata: Dict[str, str] class Branch(TypedDict): @@ -4376,6 +4378,7 @@ class Branch(TypedDict): parent_version: int create_at: int manifest_size: int + metadata: Dict[str, str] class Version(TypedDict): @@ -5725,7 +5728,9 @@ def list(self) -> dict[str, Tag]: Returns ------- dict[str, Tag] - A dictionary mapping tag names to version numbers. + A dictionary mapping tag names to tag metadata, including the + referenced branch, version, timestamps, manifest size, and any + attached metadata. """ return self._ds.tags() @@ -5745,7 +5750,7 @@ def get_version(self, tag: str) -> Optional[int]: """ return self._ds.get_version(tag) - def list_ordered(self, order: Optional[str] = None) -> list[str, Tag]: + def list_ordered(self, order: Optional[str] = None) -> List[Tuple[str, Tag]]: """ List all dataset tags. @@ -5758,7 +5763,7 @@ def list_ordered(self, order: Optional[str] = None) -> list[str, Tag]: Returns ------- - list[str, Tag] + List[Tuple[str, Tag]] An ordered list of tuples mapping tag names to its `Tag` metadata. """ return self._ds.tags_ordered(order) @@ -5816,6 +5821,17 @@ def update( """ self._ds.update_tag(tag, reference) + def replace_metadata(self, tag: str, metadata: Dict[str, str]) -> None: + """ + Replace metadata for an existing tag. + + This replaces the entire metadata map instead of merging with existing + keys. It does not change the tag reference, and it does not update + `updated_at`. `updated_at` only changes when `update()` moves the tag + to a different reference. + """ + self._ds.replace_tag_metadata(tag, metadata) + class Branches: """ @@ -5848,6 +5864,12 @@ def delete(self, branch: str) -> None: """ self._ds.delete_branch(branch) + def replace_metadata(self, branch: str, metadata: Dict[str, str]) -> None: + """ + Replace metadata for a branch. + """ + self._ds.replace_branch_metadata(branch, metadata) + @dataclass class FieldStatistics: diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 6b9e6e2ea0c..9c631083e50 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -335,6 +335,11 @@ class _Dataset: tag: str, reference: Optional[int | str | Tuple[Optional[str], Optional[int]]] = None, ): ... + def replace_tag_metadata( + self, + tag: str, + metadata: Dict[str, str], + ) -> None: ... # Branch operations def branches(self) -> Dict[str, Branch]: ... def branches_ordered(self, order: Optional[str]) -> List[Tuple[str, Branch]]: ... @@ -346,6 +351,11 @@ class _Dataset: **kwargs, ) -> _Dataset: ... def delete_branch(self, branch: str) -> None: ... + def replace_branch_metadata( + self, + branch: str, + metadata: Dict[str, str], + ) -> None: ... def optimize_indices(self, **kwargs): ... def create_index( self, diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 8d4f70d6efb..a10346b9bf2 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -471,7 +471,15 @@ def test_tag(tmp_path: Path): ds.tags.delete("tag1") ds.tags.create("tag1", 1) + ds.tags.replace_metadata("tag1", {"description": "first tag"}) + tag1_meta = ds.tags.list()["tag1"] + assert tag1_meta["created_at"] is not None + assert isinstance(tag1_meta["created_at"], datetime) + assert tag1_meta["updated_at"] is not None + assert isinstance(tag1_meta["updated_at"], datetime) + assert tag1_meta["created_at"] == tag1_meta["updated_at"] assert len(ds.tags.list()) == 1 + assert ds.tags.list()["tag1"]["metadata"] == {"description": "first tag"} with pytest.raises(ValueError): ds.tags.create("tag1", 1) @@ -505,13 +513,41 @@ def test_tag(tmp_path: Path): ): ds.tags.update("tag3", 1) + tag1_meta = ds.tags.list()["tag1"] + tag1_created_at = tag1_meta["created_at"] + tag1_updated_at = tag1_meta["updated_at"] + assert tag1_created_at is not None + assert tag1_updated_at is not None + ds.tags.replace_metadata("tag1", {"description": "updated tag"}) + ds = lance.dataset(base_dir, "tag1") + assert ds.version == 1 + replaced_tag1_meta = ds.tags.list()["tag1"] + assert replaced_tag1_meta["metadata"] == {"description": "updated tag"} + assert replaced_tag1_meta["updated_at"] == tag1_updated_at + + ds.tags.replace_metadata("tag1", {"owner": "ml-team"}) + replaced_again_tag1_meta = ds.tags.list()["tag1"] + assert replaced_again_tag1_meta["metadata"] == {"owner": "ml-team"} + assert replaced_again_tag1_meta["updated_at"] == tag1_updated_at + ds.tags.update("tag1", 2) + updated_tag1_meta = ds.tags.list()["tag1"] + assert updated_tag1_meta["created_at"] == tag1_created_at + assert updated_tag1_meta["updated_at"] is not None + assert updated_tag1_meta["updated_at"] >= tag1_updated_at + ds = lance.dataset(base_dir, "tag1") + assert ds.version == 2 + assert ds.tags.list()["tag1"]["metadata"] == {"owner": "ml-team"} + + ds.tags.replace_metadata("tag1", {}) ds = lance.dataset(base_dir, "tag1") assert ds.version == 2 + assert ds.tags.list()["tag1"]["metadata"] == {} ds.tags.update("tag1", 1) ds = lance.dataset(base_dir, "tag1") assert ds.version == 1 + assert ds.tags.list()["tag1"]["metadata"] == {} version = ds.tags.get_version("tag1") assert version == 1 @@ -558,6 +594,11 @@ def test_tag_order(tmp_path: Path): tags_asc = ds.tags.list_ordered(order="asc") assert len(tags_asc) == 3 + first_tag = tags_asc[0][1] + assert first_tag["created_at"] is not None + assert isinstance(first_tag["created_at"], datetime) + assert first_tag["updated_at"] is not None + assert isinstance(first_tag["updated_at"], datetime) tag_names_asc = [t[0] for t in tags_asc] assert tag_names_asc == sorted(expected_tags.keys()), ( f"Unexpected ascending order: {tag_names_asc}" @@ -5213,6 +5254,7 @@ def test_branches(tmp_path: Path): ds_main = lance.write_dataset(main_table, base_dir) branch1 = ds_main.create_branch("branch1") + ds_main.branches.replace_metadata("branch1", {"description": "branch one"}) assert branch1.version == 1 branch1_append = pa.Table.from_pydict({"a": [7, 8], "b": [9, 10]}) branch1 = lance.write_dataset(branch1_append, branch1, mode="append") @@ -5231,10 +5273,13 @@ def test_branches(tmp_path: Path): branch1.tags.create("main_latest", (None, None)) branch1.tags.create("main_latest2", ("main", None)) branch1.create_branch("branch_from_main", ("main", None)) + ordered_tags = dict(branch1.tags.list_ordered()) branches_with_main = branch1.branches.list() assert branch1.tags.list()["branch1_latest"]["branch"] == "branch1" assert branch1.tags.list()["main_latest"]["branch"] is None assert branch1.tags.list()["main_latest2"]["branch"] is None + assert ordered_tags["branch1_latest"]["branch"] == "branch1" + assert ordered_tags["main_latest"]["branch"] is None assert branches_with_main["branch_from_main"]["parent_branch"] is None assert branches_with_main["branch_from_main"]["branch_identifier"][0][0] == 1 assert isinstance( @@ -5270,6 +5315,10 @@ def test_branches(tmp_path: Path): assert isinstance(b1_meta["branch_identifier"][0][1], str) assert len(b1_meta["branch_identifier"][0][1]) > 0 assert "create_at" in b1_meta + assert b1_meta["metadata"] == {"description": "branch one"} + ordered_branches = dict(ds_main.branches.list_ordered()) + assert ordered_branches["branch1"]["metadata"] == {"description": "branch one"} + assert "metadata" in ordered_branches["branch2"] try: ds_main.checkout_version("branch_not_exists") diff --git a/python/src/dataset.rs b/python/src/dataset.rs index e1f219b8f9b..56414241df0 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1608,8 +1608,12 @@ impl Dataset { for (tag_name, tag_content) in tags { let dict = PyDict::new(py); + dict.set_item("branch", tag_content.branch.clone())?; dict.set_item("version", tag_content.version)?; + dict.set_item("created_at", tag_content.created_at)?; + dict.set_item("updated_at", tag_content.updated_at)?; dict.set_item("manifest_size", tag_content.manifest_size)?; + dict.set_item("metadata", tag_content.metadata.clone())?; pylist.append((tag_name.as_str(), dict))?; } @@ -1626,7 +1630,10 @@ impl Dataset { let dict = PyDict::new(py); dict.set_item("branch", v.branch.clone())?; dict.set_item("version", v.version)?; + dict.set_item("created_at", v.created_at)?; + dict.set_item("updated_at", v.updated_at)?; dict.set_item("manifest_size", v.manifest_size)?; + dict.set_item("metadata", v.metadata.clone())?; pytags.set_item(k, dict.into_py_any(py)?)?; } pytags.into_py_any(py) @@ -1682,6 +1689,15 @@ impl Dataset { Ok(()) } + fn replace_tag_metadata(&self, tag: String, metadata: HashMap) -> PyResult<()> { + rt().block_on( + None, + self.ds.as_ref().tags().replace_metadata(&tag, metadata), + )? + .infer_error()?; + Ok(()) + } + /// Check out the latest version of the current branch fn checkout_latest(&mut self) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); @@ -1750,11 +1766,28 @@ impl Dataset { dict.set_item("parent_version", meta.parent_version)?; dict.set_item("create_at", meta.create_at)?; dict.set_item("manifest_size", meta.manifest_size)?; + dict.set_item("metadata", meta.metadata.clone())?; pybranches.set_item(name, dict.into_py_any(py)?)?; } Ok(pybranches.into()) } + fn replace_branch_metadata( + &self, + branch: String, + metadata: HashMap, + ) -> PyResult<()> { + rt().block_on( + None, + self.ds + .as_ref() + .branches() + .replace_metadata(&branch, metadata), + )? + .infer_error()?; + Ok(()) + } + /// List branches ordered by parent_version fn branches_ordered( &self, @@ -1785,6 +1818,7 @@ impl Dataset { dict.set_item("parent_version", meta.parent_version)?; dict.set_item("create_at", meta.create_at)?; dict.set_item("manifest_size", meta.manifest_size)?; + dict.set_item("metadata", meta.metadata.clone())?; out.push((name, dict.into_py_any(py)?)); } Ok(out) diff --git a/rust/lance/src/dataset/refs.rs b/rust/lance/src/dataset/refs.rs index 15d4e74a50d..1f1b1581c30 100644 --- a/rust/lance/src/dataset/refs.rs +++ b/rust/lance/src/dataset/refs.rs @@ -3,6 +3,7 @@ use std::ops::Range; +use chrono::{DateTime, Utc}; use futures::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use lance_io::object_store::ObjectStore; @@ -13,6 +14,7 @@ use std::sync::Arc; use crate::dataset::branch_location::BranchLocation; use crate::dataset::refs::Ref::{Tag, Version, VersionNumber}; +use crate::utils::temporal::utc_now; use crate::{Error, Result}; use serde::de::DeserializeOwned; use std::cmp::Ordering; @@ -221,7 +223,10 @@ impl Tags<'_> { message: format!("tag {} already exists", tag), }); } - let tag_contents = self.build_tag_content_by_ref(reference).await?; + let now = utc_now(); + let tag_contents = self + .build_tag_content_by_ref(reference, Some(now), Some(now)) + .await?; self.object_store() .put( @@ -257,7 +262,15 @@ impl Tags<'_> { message: format!("tag {} does not exist", tag), }); } - let tag_contents = self.build_tag_content_by_ref(reference).await?; + let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?; + let updated_reference = self + .build_tag_content_by_ref(reference, tag_contents.created_at, Some(utc_now())) + .await?; + tag_contents.branch = updated_reference.branch; + tag_contents.version = updated_reference.version; + tag_contents.created_at = updated_reference.created_at; + tag_contents.updated_at = updated_reference.updated_at; + tag_contents.manifest_size = updated_reference.manifest_size; self.object_store() .put( @@ -268,7 +281,39 @@ impl Tags<'_> { .map(|_| ()) } - async fn build_tag_content_by_ref(&self, reference: impl Into) -> Result { + pub async fn replace_metadata( + &self, + tag: &str, + metadata: HashMap, + ) -> Result<()> { + check_valid_tag(tag)?; + + let root_location = self.refs.root()?; + let tag_file = tag_path(&root_location.path, tag); + if !self.object_store().exists(&tag_file).await? { + return Err(Error::RefNotFound { + message: format!("tag {} does not exist", tag), + }); + } + + let mut tag_contents = TagContents::from_path(&tag_file, self.object_store()).await?; + tag_contents.metadata = metadata; + + self.object_store() + .put( + &tag_file, + serde_json::to_string_pretty(&tag_contents)?.as_bytes(), + ) + .await + .map(|_| ()) + } + + async fn build_tag_content_by_ref( + &self, + reference: impl Into, + created_at: Option>, + updated_at: Option>, + ) -> Result { let reference = reference.into(); let (branch, version_number) = match reference { Version(branch, version_number) => (branch, version_number), @@ -313,7 +358,10 @@ impl Tags<'_> { let tag_contents = TagContents { branch, version: manifest_file.version, + created_at, + updated_at, manifest_size, + metadata: HashMap::new(), }; Ok(tag_contents) } @@ -455,6 +503,7 @@ impl Branches<'_> { } else { self.object_store().size(&manifest_file.path).await? as usize }, + metadata: HashMap::new(), }; self.object_store() @@ -466,6 +515,34 @@ impl Branches<'_> { .map(|_| ()) } + pub async fn replace_metadata( + &self, + branch: &str, + metadata: HashMap, + ) -> Result<()> { + check_valid_branch(branch)?; + + let root_location = self.refs.root()?; + let branch_file = branch_contents_path(&root_location.path, branch); + if !self.object_store().exists(&branch_file).await? { + return Err(Error::RefNotFound { + message: format!("branch {} does not exist", branch), + }); + } + + let mut branch_contents = + BranchContents::from_path(&branch_file, self.object_store()).await?; + branch_contents.metadata = metadata; + + self.object_store() + .put( + &branch_file, + serde_json::to_string_pretty(&branch_contents)?.as_bytes(), + ) + .await + .map(|_| ()) + } + /// Delete a branch /// /// If the `BranchContents` does not exist, it will return an error directly unless `force` is true. @@ -654,7 +731,16 @@ impl<'a> BranchRelativePath<'a> { pub struct TagContents { pub branch: Option, pub version: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option>, pub manifest_size: usize, + /// Metadata associated with this tag. + /// + /// Missing metadata is deserialized as an empty map. + #[serde(default)] + pub metadata: HashMap, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -666,6 +752,11 @@ pub struct BranchContents { pub parent_version: u64, pub create_at: u64, // unix timestamp pub manifest_size: usize, + /// Metadata associated with this branch. + /// + /// Missing metadata is deserialized as an empty map. + #[serde(default)] + pub metadata: HashMap, } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] @@ -1073,6 +1164,7 @@ mod tests { parent_version: 42, create_at: 1234567890, manifest_size: 1024, + metadata: HashMap::from([("description".to_string(), "production branch".to_string())]), }; // Test serialization @@ -1081,6 +1173,7 @@ mod tests { assert!(json.contains("parentVersion")); assert!(json.contains("createAt")); assert!(json.contains("manifestSize")); + assert!(json.contains("metadata")); // Test deserialization let deserialized: BranchContents = serde_json::from_str(&json).unwrap(); @@ -1088,6 +1181,12 @@ mod tests { assert_eq!(deserialized.parent_version, branch_contents.parent_version); assert_eq!(deserialized.create_at, branch_contents.create_at); assert_eq!(deserialized.manifest_size, branch_contents.manifest_size); + assert_eq!(deserialized.metadata, branch_contents.metadata); + + // Backward compatibility: older serialized content does not include metadata. + let legacy_json = r#"{"parentBranch":"main","parentVersion":42,"createAt":1234567890,"manifestSize":1024}"#; + let legacy_deserialized: BranchContents = serde_json::from_str(legacy_json).unwrap(); + assert!(legacy_deserialized.metadata.is_empty()); } #[tokio::test] @@ -1095,20 +1194,59 @@ mod tests { let tag_contents = TagContents { branch: Some("feature".to_string()), version: 10, + created_at: Some(chrono::DateTime::from_timestamp(1_234_567_000, 456_000_000).unwrap()), + updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()), manifest_size: 2048, + metadata: HashMap::from([("channel".to_string(), "release".to_string())]), }; // Test serialization let json = serde_json::to_string(&tag_contents).unwrap(); assert!(json.contains("branch")); assert!(json.contains("version")); + assert!(json.contains("createdAt")); + assert!(json.contains("updatedAt")); assert!(json.contains("manifestSize")); + assert!(json.contains("metadata")); // Test deserialization let deserialized: TagContents = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.branch, tag_contents.branch); assert_eq!(deserialized.version, tag_contents.version); + assert_eq!(deserialized.created_at, tag_contents.created_at); + assert_eq!(deserialized.updated_at, tag_contents.updated_at); assert_eq!(deserialized.manifest_size, tag_contents.manifest_size); + assert_eq!(deserialized.metadata, tag_contents.metadata); + + let tag_contents_without_created_at = TagContents { + branch: Some("feature".to_string()), + version: 10, + created_at: None, + updated_at: Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()), + manifest_size: 2048, + metadata: HashMap::new(), + }; + let json_without_created_at = + serde_json::to_string(&tag_contents_without_created_at).unwrap(); + assert!(!json_without_created_at.contains("createdAt")); + assert!(json_without_created_at.contains("updatedAt")); + + // Backward compatibility: older serialized content does not include timestamps or metadata. + let legacy_json = r#"{"branch":"feature","version":10,"manifestSize":2048}"#; + let legacy_deserialized: TagContents = serde_json::from_str(legacy_json).unwrap(); + assert_eq!(legacy_deserialized.created_at, None); + assert_eq!(legacy_deserialized.updated_at, None); + assert!(legacy_deserialized.metadata.is_empty()); + + let legacy_updated_only_json = r#"{"branch":"feature","version":10,"updatedAt":"2009-02-13T23:31:30.123Z","manifestSize":2048}"#; + let legacy_updated_only_deserialized: TagContents = + serde_json::from_str(legacy_updated_only_json).unwrap(); + assert_eq!(legacy_updated_only_deserialized.created_at, None); + assert_eq!( + legacy_updated_only_deserialized.updated_at, + Some(chrono::DateTime::from_timestamp(1_234_567_890, 123_000_000).unwrap()) + ); + assert!(legacy_updated_only_deserialized.metadata.is_empty()); } #[rstest] @@ -1191,6 +1329,7 @@ mod tests { parent_version: parent_ver, create_at: 0, manifest_size: 1, + metadata: HashMap::new(), } } let mut contents = HashMap::new(); diff --git a/rust/lance/src/dataset/tests/dataset_versioning.rs b/rust/lance/src/dataset/tests/dataset_versioning.rs index e9253cc69fe..d4eb711cbcf 100644 --- a/rust/lance/src/dataset/tests/dataset_versioning.rs +++ b/rust/lance/src/dataset/tests/dataset_versioning.rs @@ -19,6 +19,7 @@ use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use lance_core::utils::tempfile::{TempDir, TempStdDir, TempStrDir}; use lance_datagen::{BatchCount, RowCount, array, gen_batch}; use lance_file::version::LanceFileVersion; +use mock_instant::thread_local::MockClock; use crate::dataset::refs::branch_contents_path; use futures::TryStreamExt; @@ -324,7 +325,12 @@ async fn test_tag( "Ref not found error: tag tag1 does not exist" ); + MockClock::set_system_time(std::time::Duration::from_secs(1)); dataset.tags().create("tag1", 1).await.unwrap(); + let mut tag_map = dataset.tags().list().await.unwrap(); + let tag1_meta = tag_map.remove("tag1").unwrap(); + assert_eq!(tag1_meta.created_at, tag1_meta.updated_at); + assert!(tag1_meta.created_at.is_some()); assert_eq!(dataset.tags().list().await.unwrap().len(), 1); @@ -405,11 +411,24 @@ async fn test_tag( "Version not found error: version main:3 does not exist" ); + let tag1_before_update = dataset.tags().get("tag1").await.unwrap(); + MockClock::set_system_time(std::time::Duration::from_secs(2)); dataset.tags().update("tag1", 2).await.unwrap(); + let tag1_after_update = dataset.tags().get("tag1").await.unwrap(); + assert_eq!(tag1_after_update.created_at, tag1_before_update.created_at); + assert!(tag1_after_update.updated_at > tag1_before_update.updated_at); dataset = dataset.checkout_version("tag1").await.unwrap(); assert_eq!(dataset.manifest.version, 2); + let tag1_before_second_update = dataset.tags().get("tag1").await.unwrap(); + MockClock::set_system_time(std::time::Duration::from_secs(3)); dataset.tags().update("tag1", 1).await.unwrap(); + let tag1_after_second_update = dataset.tags().get("tag1").await.unwrap(); + assert_eq!( + tag1_after_second_update.created_at, + tag1_before_second_update.created_at + ); + assert!(tag1_after_second_update.updated_at > tag1_before_second_update.updated_at); dataset = dataset.checkout_version("tag1").await.unwrap(); assert_eq!(dataset.manifest.version, 1); }