forked from apache/iceberg-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_delete_orphans.py
More file actions
117 lines (95 loc) · 3.96 KB
/
test_delete_orphans.py
File metadata and controls
117 lines (95 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from datetime import datetime, timedelta
from pathlib import Path, PosixPath
from unittest.mock import PropertyMock, patch
import pyarrow as pa
import pytest
from pyiceberg.catalog import Catalog
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType
from tests.catalog.test_base import InMemoryCatalog
@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix())
catalog.create_namespace("default")
return catalog
def test_delete_orphaned_files(catalog: Catalog) -> None:
identifier = "default.test_delete_orphaned_files"
schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)
tbl = catalog.create_table(identifier, schema=schema)
arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)
df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)
orphaned_file = Path(tbl.location()) / "orphan.txt"
orphaned_file.touch()
assert orphaned_file.exists()
# assert no files deleted if dry run...
tbl.delete_orphaned_files(dry_run=True)
assert orphaned_file.exists()
# should not delete because it was just created...
tbl.delete_orphaned_files()
assert orphaned_file.exists()
# modify creation date to be older than 3 days
five_days_ago = (datetime.now() - timedelta(days=5)).timestamp()
os.utime(orphaned_file, (five_days_ago, five_days_ago))
def test_delete_orphaned_files_with_invalid_file_doesnt_error(catalog: Catalog) -> None:
identifier = "default.test_delete_orphaned_files"
schema = Schema(
NestedField(1, "city", StringType(), required=True),
NestedField(2, "inhabitants", IntegerType(), required=True),
# Mark City as the identifier field, also known as the primary-key
identifier_field_ids=[1],
)
tbl = catalog.create_table(identifier, schema=schema)
arrow_schema = pa.schema(
[
pa.field("city", pa.string(), nullable=False),
pa.field("inhabitants", pa.int32(), nullable=False),
]
)
df = pa.Table.from_pylist(
[
{"city": "Drachten", "inhabitants": 45019},
{"city": "Drachten", "inhabitants": 45019},
],
schema=arrow_schema,
)
tbl.append(df)
file_that_does_not_exist = "foo/bar.baz"
with patch.object(type(tbl), "inspect", new_callable=PropertyMock) as mock_inspect:
mock_inspect.return_value.orphaned_files = lambda location, older_than: {file_that_does_not_exist}
with patch.object(tbl.io, "delete", wraps=tbl.io.delete) as mock_delete:
tbl.delete_orphaned_files()
mock_delete.assert_called_with(file_that_does_not_exist)