Skip to content

Commit 53988e3

Browse files
committed
Add schema rollback to RollbackToSnapshotAction
1 parent 8d65084 commit 53988e3

1 file changed

Lines changed: 22 additions & 5 deletions

File tree

crates/iceberg/src/transaction/rollback_to_snapshot.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
2929
#[derive(Default)]
3030
pub struct RollbackToSnapshotAction {
3131
snapshot_id: Option<i64>,
32+
rollback_schema: bool,
3233
}
3334

3435
impl RollbackToSnapshotAction {
@@ -42,6 +43,12 @@ impl RollbackToSnapshotAction {
4243
self.snapshot_id = Some(snapshot_id);
4344
self
4445
}
46+
47+
/// Updates the table's current schema to match the schema of the target snapshot.
48+
pub fn with_schema_rollback(mut self) -> Self {
49+
self.rollback_schema = true;
50+
self
51+
}
4552
}
4653

4754
#[async_trait]
@@ -51,10 +58,9 @@ impl TransactionAction for RollbackToSnapshotAction {
5158
return Err(Error::new(ErrorKind::DataInvalid, "snapshot id is not set"));
5259
};
5360

54-
table
61+
let snapshot = table
5562
.metadata()
56-
.snapshots()
57-
.find(|s| s.snapshot_id() == snapshot_id)
63+
.snapshot_by_id(snapshot_id)
5864
.ok_or_else(|| {
5965
Error::new(
6066
ErrorKind::DataInvalid,
@@ -68,12 +74,12 @@ impl TransactionAction for RollbackToSnapshotAction {
6874
let reference =
6975
SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None));
7076

71-
let updates = vec![TableUpdate::SetSnapshotRef {
77+
let mut updates = vec![TableUpdate::SetSnapshotRef {
7278
ref_name: MAIN_BRANCH.to_string(),
7379
reference,
7480
}];
7581

76-
let requirements = vec![
82+
let mut requirements = vec![
7783
TableRequirement::UuidMatch {
7884
uuid: table.metadata().uuid(),
7985
},
@@ -83,6 +89,17 @@ impl TransactionAction for RollbackToSnapshotAction {
8389
},
8490
];
8591

92+
let current_schema_id = table.metadata().current_schema_id();
93+
if self.rollback_schema
94+
&& let Some(snapshot_schema_id) = snapshot.schema_id()
95+
&& current_schema_id != snapshot_schema_id
96+
{
97+
updates.push(TableUpdate::SetCurrentSchema {
98+
schema_id: snapshot_schema_id,
99+
});
100+
requirements.push(TableRequirement::CurrentSchemaIdMatch { current_schema_id });
101+
}
102+
86103
Ok(ActionCommit::new(updates, requirements))
87104
}
88105
}

0 commit comments

Comments
 (0)