forked from EOPF-Explorer/data-model
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
177 lines (147 loc) · 5.19 KB
/
utils.py
File metadata and controls
177 lines (147 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""Utility functions for GeoZarr conversion."""
import numpy as np
import rasterio # noqa: F401 # Import to enable .rio accessor
import xarray as xr
def downsample_2d_array(
source_data: np.ndarray, target_height: int, target_width: int
) -> np.ndarray:
"""
Downsample a 2D array using block averaging.
Parameters
----------
source_data : numpy.ndarray
Source 2D array
target_height : int
Target height
target_width : int
Target width
Returns
-------
numpy.ndarray
Downsampled 2D array
"""
source_height, source_width = source_data.shape
# Calculate block sizes
block_size_y = source_height // target_height
block_size_x = source_width // target_width
if block_size_y > 1 and block_size_x > 1:
# Block averaging
reshaped = source_data[
: target_height * block_size_y, : target_width * block_size_x
]
reshaped = reshaped.reshape(
target_height, block_size_y, target_width, block_size_x
)
downsampled = reshaped.mean(axis=(1, 3))
else:
# Simple subsampling
y_indices = np.linspace(0, source_height - 1, target_height, dtype=int)
x_indices = np.linspace(0, source_width - 1, target_width, dtype=int)
downsampled = source_data[np.ix_(y_indices, x_indices)]
return downsampled
def is_grid_mapping_variable(ds: xr.Dataset, var_name: str) -> bool:
"""
Check if a variable is a grid_mapping variable by looking for references to it.
Parameters
----------
ds : xarray.Dataset
Dataset to check
var_name : str
Variable name to check
Returns
-------
bool
True if this variable is referenced as a grid_mapping
"""
for data_var in ds.data_vars:
if data_var != var_name and "grid_mapping" in ds[data_var].attrs:
if ds[data_var].attrs["grid_mapping"] == var_name:
return True
return False
def calculate_aligned_chunk_size(dimension_size: int, target_chunk_size: int) -> int:
"""
Calculate a chunk size that divides evenly into the dimension size.
This ensures that Zarr chunks align properly with the data dimensions,
preventing chunk overlap issues when writing with Dask.
Parameters
----------
dimension_size : int
Size of the dimension to chunk
target_chunk_size : int
Desired chunk size
Returns
-------
int
Aligned chunk size that divides evenly into dimension_size
"""
if target_chunk_size >= dimension_size:
return dimension_size
# Find the largest divisor of dimension_size that is <= target_chunk_size
for chunk_size in range(target_chunk_size, 0, -1):
if dimension_size % chunk_size == 0:
return chunk_size
# Fallback: return 1 if no good divisor found
return 1
def validate_existing_band_data(
existing_group: xr.Dataset, var_name: str, reference_ds: xr.Dataset
) -> bool:
"""
Validate that a specific band exists and is complete in the dataset.
Parameters
----------
existing_group : xarray.Dataset
Existing dataset to validate
var_name : str
Name of the variable to validate
reference_ds : xarray.Dataset
Reference dataset structure for comparison
Returns
-------
bool
True if the variable exists and is valid, False otherwise
"""
try:
# Check if the variable exists
if (
var_name not in existing_group.data_vars
and var_name not in existing_group.coords
):
return False
# Check shape matches
if var_name in reference_ds.data_vars:
expected_shape = reference_ds[var_name].shape
existing_shape = existing_group[var_name].shape
if expected_shape != existing_shape:
return False
# Check required attributes for data variables
if var_name in reference_ds.data_vars and not is_grid_mapping_variable(
reference_ds, var_name
):
required_attrs = ["_ARRAY_DIMENSIONS", "standard_name"]
for attr in required_attrs:
if attr not in existing_group[var_name].attrs:
return False
# Check rio CRS
if existing_group.rio.crs != reference_ds.rio.crs:
return False
# Basic data integrity check for data variables
if var_name in existing_group.data_vars and not is_grid_mapping_variable(
existing_group, var_name
):
try:
# Just check if we can access the array metadata without reading data
array_info = existing_group[var_name]
if array_info.size == 0:
return False
# read a piece of data to ensure it's valid
test = array_info.isel(
{dim: 0 for dim in array_info.dims}
).values.mean()
if np.isnan(test):
return False
except Exception as e:
print(f"Error validating variable {var_name}: {e}")
return False
return True
except Exception:
return False