|
23 | 23 |
|
24 | 24 | from pyiceberg.catalog import Catalog |
25 | 25 | from pyiceberg.table import Table |
26 | | -from pyiceberg.table.refs import SnapshotRef |
| 26 | +from pyiceberg.table.refs import SnapshotRef, SnapshotRefType |
27 | 27 |
|
28 | 28 |
|
29 | 29 | @pytest.fixture |
@@ -107,6 +107,55 @@ def test_remove_branch(catalog: Catalog) -> None: |
107 | 107 | assert tbl.metadata.refs.get(branch_name, None) is None |
108 | 108 |
|
109 | 109 |
|
| 110 | +@pytest.mark.integration |
| 111 | +@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) |
| 112 | +def test_replace_tag(catalog: Catalog) -> None: |
| 113 | + identifier = "default.test_table_snapshot_operations" |
| 114 | + tbl = catalog.load_table(identifier) |
| 115 | + assert len(tbl.history()) > 2 |
| 116 | + |
| 117 | + current_snapshot_id = tbl.history()[-1].snapshot_id |
| 118 | + older_snapshot_id = tbl.history()[-2].snapshot_id |
| 119 | + |
| 120 | + tag_name = "my-tag" |
| 121 | + tbl.manage_snapshots().create_tag(older_snapshot_id, tag_name, 1).commit() |
| 122 | + tag = tbl.metadata.refs.get(tag_name) |
| 123 | + assert tag is not None |
| 124 | + assert tag.snapshot_id == older_snapshot_id |
| 125 | + assert tag.snapshot_ref_type == SnapshotRefType.TAG |
| 126 | + assert tag.max_ref_age_ms == 1 |
| 127 | + |
| 128 | + tbl.manage_snapshots().replace_tag(tag_name=tag_name, snapshot_id=current_snapshot_id).commit() |
| 129 | + |
| 130 | + tag = tbl.metadata.refs.get(tag_name) |
| 131 | + assert tag is not None |
| 132 | + assert tag.snapshot_id == current_snapshot_id |
| 133 | + assert tag.snapshot_ref_type == SnapshotRefType.TAG |
| 134 | + assert tag.max_ref_age_ms == 1 |
| 135 | + |
| 136 | + |
| 137 | +@pytest.mark.integration |
| 138 | +@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) |
| 139 | +def test_replace_missing_tag(catalog: Catalog) -> None: |
| 140 | + identifier = "default.test_table_snapshot_operations" |
| 141 | + tbl = catalog.load_table(identifier) |
| 142 | + snapshot_id = tbl.history()[-1].snapshot_id |
| 143 | + |
| 144 | + with pytest.raises(ValueError, match="Tag does not exist: test"): |
| 145 | + tbl.manage_snapshots().replace_tag(tag_name="test", snapshot_id=snapshot_id).commit() |
| 146 | + |
| 147 | + |
| 148 | +@pytest.mark.integration |
| 149 | +@pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) |
| 150 | +def test_replace_tag_with_branch(catalog: Catalog) -> None: |
| 151 | + identifier = "default.test_table_snapshot_operations" |
| 152 | + tbl = catalog.load_table(identifier) |
| 153 | + snapshot_id = tbl.history()[-1].snapshot_id |
| 154 | + |
| 155 | + with pytest.raises(ValueError, match="Ref main is not a tag"): |
| 156 | + tbl.manage_snapshots().replace_tag(tag_name="main", snapshot_id=snapshot_id).commit() |
| 157 | + |
| 158 | + |
110 | 159 | @pytest.mark.integration |
111 | 160 | @pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) |
112 | 161 | def test_set_current_snapshot(catalog: Catalog) -> None: |
|
0 commit comments