Skip to content

Commit 47f5282

Browse files
fix(s3): use fs_utils for S3-aware path deletion (#49)
* fix(s3): use fs_utils for S3-aware path deletion - Replace os.path.exists() with fs.exists() for S3 compatibility - Use fs_utils.get_filesystem() for both local and S3 paths - Fixes encoding conflict when retrying failed S3 writes - Aligns with existing fs_utils patterns in codebase * fix: centralize zarr cleanup across retries --------- Co-authored-by: Emmanuel Mathot <emmanuel.mathot@gmail.com>
1 parent 183d1e4 commit 47f5282

1 file changed

Lines changed: 27 additions & 17 deletions

File tree

src/eopf_geozarr/conversion/geozarr.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import dataclasses
1717
import itertools
1818
import os
19-
import shutil
2019
import time
2120
from collections.abc import Hashable, Iterable, Mapping, Sequence
2221
from typing import Any
@@ -1082,6 +1081,23 @@ def write_dataset_band_by_band_with_validation(
10821081

10831082
store_exists = existing_dataset is not None and len(existing_dataset.data_vars) > 0
10841083

1084+
store_storage_options = fs_utils.get_storage_options(output_path)
1085+
fs = fs_utils.get_filesystem(output_path, **(store_storage_options or {}))
1086+
1087+
def cleanup_prefix(prefix: str) -> None:
1088+
key = prefix.lstrip("/")
1089+
base_path = output_path.rstrip("/")
1090+
if fs_utils.is_s3_path(base_path):
1091+
target_path = fs_utils.normalize_path(f"{base_path}/{key}")
1092+
else:
1093+
target_path = os.path.join(base_path, key)
1094+
try:
1095+
fs.rm(target_path, recursive=True)
1096+
except FileNotFoundError:
1097+
pass
1098+
except Exception as cleanup_error:
1099+
print(f" ⚠️ Failed to remove {target_path}: {cleanup_error}")
1100+
10851101
# Write data variables one by one with validation
10861102
for var in data_vars:
10871103
# Check if this variable already exists and is valid
@@ -1093,9 +1109,9 @@ def write_dataset_band_by_band_with_validation(
10931109
skipped_vars.append(var)
10941110
successful_vars.append(var)
10951111
continue
1096-
var_path = os.path.join(output_path, group_name.lstrip("/"), str(var))
1097-
if os.path.exists(var_path):
1098-
shutil.rmtree(var_path)
1112+
# Remove invalid existing variable using filesystem-agnostic method
1113+
print(f" 🧹 Removing invalid existing variable {var}...")
1114+
cleanup_prefix(f"{group_name.lstrip('/')}/{var}")
10991115

11001116
print(f" Writing data variable {var}...")
11011117

@@ -1146,16 +1162,14 @@ def write_dataset_band_by_band_with_validation(
11461162
else:
11471163
single_var_ds[var] = single_var_ds[var].chunk()
11481164

1149-
# Get storage options and write variable
1150-
storage_options = fs_utils.get_storage_options(output_path)
11511165
single_var_ds.to_zarr(
11521166
output_path,
11531167
group=group_name,
11541168
mode="a",
11551169
consolidated=False,
11561170
zarr_format=3,
11571171
encoding=var_encoding,
1158-
storage_options=storage_options,
1172+
storage_options=store_storage_options,
11591173
)
11601174

11611175
print(f" ✅ Successfully wrote {var}")
@@ -1165,28 +1179,24 @@ def write_dataset_band_by_band_with_validation(
11651179
group_path = fs_utils.normalize_path(
11661180
f"{output_path}/{group_name.lstrip('/')}"
11671181
)
1168-
storage_options = fs_utils.get_storage_options(output_path)
11691182
existing_dataset = xr.open_dataset(
11701183
group_path,
11711184
mode="r",
11721185
engine="zarr",
11731186
decode_coords="all",
11741187
chunks="auto",
1175-
storage_options=storage_options,
1188+
storage_options=store_storage_options,
11761189
)
11771190
break
11781191

11791192
except Exception as e:
11801193
# Delete the started data array to avoid conflict on next attempt
11811194
for written_var in var_encoding.keys():
1182-
if os.path.exists(
1183-
os.path.join(output_path, group_name.lstrip("/"), written_var)
1184-
):
1185-
shutil.rmtree(
1186-
os.path.join(
1187-
output_path, group_name.lstrip("/"), written_var
1188-
)
1189-
)
1195+
target_components = [group_name.lstrip("/"), str(written_var)]
1196+
target_prefix = "/".join(
1197+
component for component in target_components if component
1198+
)
1199+
cleanup_prefix(target_prefix)
11901200
if attempt < max_retries - 1:
11911201
print(
11921202
f" ⚠️ Attempt {attempt + 1} failed for {var}: {e}, retrying in 2 seconds..."

0 commit comments

Comments
 (0)