Skip to content

Commit 626de2e

Browse files
authored
feat(iceberg): Add snapshot utils to scan ancestors (#2285)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #2241 ## What changes are included in this PR? - Add `Ancestors` to help scan past snapshots - Moved existing util to the new utils mod <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Yes <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent dc3a2d5 commit 626de2e

6 files changed

Lines changed: 329 additions & 3 deletions

File tree

crates/iceberg/src/arrow/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use crate::io::{FileIO, FileMetadata, FileRead};
5858
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
5959
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
6060
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
61-
use crate::utils::available_parallelism;
61+
use crate::util::available_parallelism;
6262
use crate::{Error, ErrorKind};
6363

6464
/// Default gap between byte ranges below which they are coalesced into a

crates/iceberg/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ pub mod arrow;
9595
pub(crate) mod delete_file_index;
9696
pub mod encryption;
9797
pub mod test_utils;
98-
mod utils;
9998
pub mod writer;
10099

101100
mod delete_vector;
102101
pub mod metadata_columns;
103102
pub mod puffin;
103+
/// Utility functions and modules.
104+
pub mod util;

crates/iceberg/src/scan/mod.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name};
4040
use crate::runtime::spawn;
4141
use crate::spec::{DataContentType, SnapshotRef};
4242
use crate::table::Table;
43-
use crate::utils::available_parallelism;
43+
use crate::util::available_parallelism;
4444
use crate::{Error, ErrorKind, Result};
4545

4646
/// A stream of arrow [`RecordBatch`]es.
@@ -683,6 +683,39 @@ pub mod tests {
683683
}
684684
}
685685

686+
/// Creates a fixture with 5 snapshots chained as:
687+
/// S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
688+
/// Useful for testing snapshot history traversal.
689+
pub fn new_with_deep_history() -> Self {
690+
let tmp_dir = TempDir::new().unwrap();
691+
let table_location = tmp_dir.path().join("table1");
692+
let table_metadata1_location = table_location.join("metadata/v1.json");
693+
694+
let file_io = FileIO::new_with_fs();
695+
696+
let table_metadata = {
697+
let json_str = fs::read_to_string(format!(
698+
"{}/testdata/example_table_metadata_v2_deep_history.json",
699+
env!("CARGO_MANIFEST_DIR")
700+
))
701+
.unwrap();
702+
serde_json::from_str::<TableMetadata>(&json_str).unwrap()
703+
};
704+
705+
let table = Table::builder()
706+
.metadata(table_metadata)
707+
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
708+
.file_io(file_io.clone())
709+
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
710+
.build()
711+
.unwrap();
712+
713+
Self {
714+
table_location: table_location.to_str().unwrap().to_string(),
715+
table,
716+
}
717+
}
718+
686719
pub fn new_unpartitioned() -> Self {
687720
let tmp_dir = TempDir::new().unwrap();
688721
let table_location = tmp_dir.path().join("table1");
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
use std::num::NonZeroUsize;
1919

20+
/// Utilities for working with snapshots.
21+
pub mod snapshot;
22+
2023
// Use a default value of 1 as the safest option.
2124
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
2225
// for more details.
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::spec::{SnapshotRef, TableMetadataRef};
19+
20+
struct Ancestors {
21+
next: Option<SnapshotRef>,
22+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
23+
}
24+
25+
impl Iterator for Ancestors {
26+
type Item = SnapshotRef;
27+
28+
fn next(&mut self) -> Option<Self::Item> {
29+
let snapshot = self.next.take()?;
30+
self.next = snapshot
31+
.parent_snapshot_id()
32+
.and_then(|id| (self.get_snapshot)(id));
33+
Some(snapshot)
34+
}
35+
}
36+
37+
/// Iterate starting from `snapshot_id` (inclusive) to the root snapshot.
38+
pub fn ancestors_of(
39+
table_metadata: &TableMetadataRef,
40+
snapshot_id: i64,
41+
) -> impl Iterator<Item = SnapshotRef> + Send {
42+
let initial = table_metadata.snapshot_by_id(snapshot_id).cloned();
43+
let table_metadata = table_metadata.clone();
44+
Ancestors {
45+
next: initial,
46+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
47+
}
48+
}
49+
50+
/// Iterate starting from `latest_snapshot_id` (inclusive) to `oldest_snapshot_id` (exclusive).
51+
pub fn ancestors_between(
52+
table_metadata: &TableMetadataRef,
53+
latest_snapshot_id: i64,
54+
oldest_snapshot_id: Option<i64>,
55+
) -> impl Iterator<Item = SnapshotRef> + Send {
56+
ancestors_of(table_metadata, latest_snapshot_id).take_while(move |snapshot| {
57+
oldest_snapshot_id
58+
.map(|id| snapshot.snapshot_id() != id)
59+
.unwrap_or(true)
60+
})
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::*;
66+
use crate::scan::tests::TableTestFixture;
67+
68+
// Five snapshots chained as: S1 (root) -> S2 -> S3 -> S4 -> S5 (current)
69+
const S1: i64 = 3051729675574597004;
70+
const S2: i64 = 3055729675574597004;
71+
const S3: i64 = 3056729675574597004;
72+
const S4: i64 = 3057729675574597004;
73+
const S5: i64 = 3059729675574597004;
74+
75+
fn metadata() -> TableMetadataRef {
76+
let fixture = TableTestFixture::new_with_deep_history();
77+
std::sync::Arc::new(fixture.table.metadata().clone())
78+
}
79+
80+
// --- ancestors_of ---
81+
82+
#[test]
83+
fn test_ancestors_of_nonexistent_snapshot_returns_empty() {
84+
let meta = metadata();
85+
let ids: Vec<i64> = ancestors_of(&meta, 999).map(|s| s.snapshot_id()).collect();
86+
assert!(ids.is_empty());
87+
}
88+
89+
#[test]
90+
fn test_ancestors_of_root_returns_only_root() {
91+
let meta = metadata();
92+
let ids: Vec<i64> = ancestors_of(&meta, S1).map(|s| s.snapshot_id()).collect();
93+
assert_eq!(ids, vec![S1]);
94+
}
95+
96+
#[test]
97+
fn test_ancestors_of_leaf_returns_full_chain() {
98+
let meta = metadata();
99+
let ids: Vec<i64> = ancestors_of(&meta, S5).map(|s| s.snapshot_id()).collect();
100+
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
101+
}
102+
103+
#[test]
104+
fn test_ancestors_of_mid_chain_returns_partial_chain() {
105+
let meta = metadata();
106+
let ids: Vec<i64> = ancestors_of(&meta, S3).map(|s| s.snapshot_id()).collect();
107+
assert_eq!(ids, vec![S3, S2, S1]);
108+
}
109+
110+
#[test]
111+
fn test_ancestors_of_second_snapshot() {
112+
let meta = metadata();
113+
let ids: Vec<i64> = ancestors_of(&meta, S2).map(|s| s.snapshot_id()).collect();
114+
assert_eq!(ids, vec![S2, S1]);
115+
}
116+
117+
// --- ancestors_between ---
118+
119+
#[test]
120+
fn test_ancestors_between_same_id_returns_empty() {
121+
let meta = metadata();
122+
let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S3))
123+
.map(|s| s.snapshot_id())
124+
.collect();
125+
assert!(ids.is_empty());
126+
}
127+
128+
#[test]
129+
fn test_ancestors_between_no_oldest_returns_all_ancestors() {
130+
let meta = metadata();
131+
let ids: Vec<i64> = ancestors_between(&meta, S5, None)
132+
.map(|s| s.snapshot_id())
133+
.collect();
134+
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
135+
}
136+
137+
#[test]
138+
fn test_ancestors_between_excludes_oldest_snapshot() {
139+
let meta = metadata();
140+
// S5 down to (but not including) S2
141+
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S2))
142+
.map(|s| s.snapshot_id())
143+
.collect();
144+
assert_eq!(ids, vec![S5, S4, S3]);
145+
}
146+
147+
#[test]
148+
fn test_ancestors_between_adjacent_snapshots() {
149+
let meta = metadata();
150+
// S3 down to (but not including) S2 — only S3 itself
151+
let ids: Vec<i64> = ancestors_between(&meta, S3, Some(S2))
152+
.map(|s| s.snapshot_id())
153+
.collect();
154+
assert_eq!(ids, vec![S3]);
155+
}
156+
157+
#[test]
158+
fn test_ancestors_between_leaf_and_root() {
159+
let meta = metadata();
160+
// S5 down to (but not including) S1
161+
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(S1))
162+
.map(|s| s.snapshot_id())
163+
.collect();
164+
assert_eq!(ids, vec![S5, S4, S3, S2]);
165+
}
166+
167+
#[test]
168+
fn test_ancestors_between_nonexistent_oldest_returns_full_chain() {
169+
let meta = metadata();
170+
// oldest_snapshot_id doesn't exist in the chain, so take_while never stops
171+
let ids: Vec<i64> = ancestors_between(&meta, S5, Some(999))
172+
.map(|s| s.snapshot_id())
173+
.collect();
174+
assert_eq!(ids, vec![S5, S4, S3, S2, S1]);
175+
}
176+
177+
#[test]
178+
fn test_ancestors_between_nonexistent_latest_returns_empty() {
179+
let meta = metadata();
180+
let ids: Vec<i64> = ancestors_between(&meta, 999, Some(S1))
181+
.map(|s| s.snapshot_id())
182+
.collect();
183+
assert!(ids.is_empty());
184+
}
185+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
{
2+
"format-version": 2,
3+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
4+
"location": "s3://bucket/test/location",
5+
"last-sequence-number": 34,
6+
"last-updated-ms": 1602638573590,
7+
"last-column-id": 3,
8+
"current-schema-id": 1,
9+
"schemas": [
10+
{
11+
"type": "struct",
12+
"schema-id": 0,
13+
"fields": [
14+
{"id": 1, "name": "x", "required": true, "type": "long"}
15+
]
16+
},
17+
{
18+
"type": "struct",
19+
"schema-id": 1,
20+
"identifier-field-ids": [1, 2],
21+
"fields": [
22+
{"id": 1, "name": "x", "required": true, "type": "long"},
23+
{"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"},
24+
{"id": 3, "name": "z", "required": true, "type": "long"}
25+
]
26+
}
27+
],
28+
"default-spec-id": 0,
29+
"partition-specs": [
30+
{
31+
"spec-id": 0,
32+
"fields": [
33+
{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}
34+
]
35+
}
36+
],
37+
"last-partition-id": 1000,
38+
"default-sort-order-id": 3,
39+
"sort-orders": [
40+
{
41+
"order-id": 3,
42+
"fields": [
43+
{"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"},
44+
{"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"}
45+
]
46+
}
47+
],
48+
"properties": {},
49+
"current-snapshot-id": 3059729675574597004,
50+
"snapshots": [
51+
{
52+
"snapshot-id": 3051729675574597004,
53+
"timestamp-ms": 1515100955770,
54+
"sequence-number": 0,
55+
"summary": {"operation": "append"},
56+
"manifest-list": "s3://bucket/metadata/snap-3051729675574597004.avro"
57+
},
58+
{
59+
"snapshot-id": 3055729675574597004,
60+
"parent-snapshot-id": 3051729675574597004,
61+
"timestamp-ms": 1555100955770,
62+
"sequence-number": 1,
63+
"summary": {"operation": "append"},
64+
"manifest-list": "s3://bucket/metadata/snap-3055729675574597004.avro",
65+
"schema-id": 1
66+
},
67+
{
68+
"snapshot-id": 3056729675574597004,
69+
"parent-snapshot-id": 3055729675574597004,
70+
"timestamp-ms": 1575100955770,
71+
"sequence-number": 2,
72+
"summary": {"operation": "append"},
73+
"manifest-list": "s3://bucket/metadata/snap-3056729675574597004.avro",
74+
"schema-id": 1
75+
},
76+
{
77+
"snapshot-id": 3057729675574597004,
78+
"parent-snapshot-id": 3056729675574597004,
79+
"timestamp-ms": 1595100955770,
80+
"sequence-number": 3,
81+
"summary": {"operation": "overwrite"},
82+
"manifest-list": "s3://bucket/metadata/snap-3057729675574597004.avro",
83+
"schema-id": 1
84+
},
85+
{
86+
"snapshot-id": 3059729675574597004,
87+
"parent-snapshot-id": 3057729675574597004,
88+
"timestamp-ms": 1602638573590,
89+
"sequence-number": 4,
90+
"summary": {"operation": "append"},
91+
"manifest-list": "s3://bucket/metadata/snap-3059729675574597004.avro",
92+
"schema-id": 1
93+
}
94+
],
95+
"snapshot-log": [
96+
{"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770},
97+
{"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770},
98+
{"snapshot-id": 3056729675574597004, "timestamp-ms": 1575100955770},
99+
{"snapshot-id": 3057729675574597004, "timestamp-ms": 1595100955770},
100+
{"snapshot-id": 3059729675574597004, "timestamp-ms": 1602638573590}
101+
],
102+
"metadata-log": [],
103+
"refs": {"main": {"snapshot-id": 3059729675574597004, "type": "branch"}}
104+
}

0 commit comments

Comments
 (0)