Skip to content

Commit ea92d84

Browse files
committed
Refine logging
1 parent 2e3432b commit ea92d84

1 file changed

Lines changed: 17 additions & 79 deletions

File tree

src/mdio/transpose_writers/chunking.py

Lines changed: 17 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -51,55 +51,35 @@ def _validate_inputs(
5151
compressor: "ZFP" | "Blosc" | list["ZFP" | "Blosc"] | None,
5252
) -> None:
5353
"""Validate basic shapes/types (no broadcasting here)."""
54-
logger.debug("Validating inputs: new_variable=%r, chunk_grid type=%s, compressor type=%s",
55-
new_variable, type(chunk_grid), type(compressor))
5654

5755
# new_variable must be str or non-empty list[str]
5856
if isinstance(new_variable, str):
59-
logger.debug("new_variable is a single string: %r", new_variable)
6057
pass
6158
elif isinstance(new_variable, list):
6259
if not new_variable:
6360
raise ValueError("new_variable list must not be empty")
6461
if not all(isinstance(v, str) for v in new_variable):
6562
raise TypeError("All entries in new_variable must be strings")
66-
logger.debug("new_variable is a list with %d items: %r", len(new_variable), new_variable)
6763
else:
6864
raise TypeError("new_variable must be a string or a list of strings")
6965

7066
# chunk_grid can be a single grid or non-empty list of grids
71-
if isinstance(chunk_grid, list):
72-
if not chunk_grid:
73-
raise ValueError("chunk_grid list must not be empty")
74-
logger.debug("chunk_grid is a list with %d items, types: %s",
75-
len(chunk_grid), [type(g) for g in chunk_grid])
76-
else:
77-
logger.debug("chunk_grid is a single item of type: %s", type(chunk_grid))
67+
if isinstance(chunk_grid, list) and not chunk_grid:
68+
raise ValueError("chunk_grid list must not be empty")
7869

7970
# compressor can be None, a single compressor, or non-empty list
80-
if isinstance(compressor, list):
81-
if not compressor:
82-
raise ValueError("compressor list must not be empty")
83-
logger.debug("compressor is a list with %d items, types: %s",
84-
len(compressor), [type(c) for c in compressor])
85-
elif compressor is None:
86-
logger.debug("compressor is None")
87-
else:
88-
logger.debug("compressor is a single item of type: %s", type(compressor))
71+
if isinstance(compressor, list) and not compressor:
72+
raise ValueError("compressor list must not be empty")
8973

9074

9175
def _normalize_new_variable(
9276
new_variable: str | list[str],
9377
) -> list[str]:
9478
"""Normalize new_variable to a list of names."""
9579
if isinstance(new_variable, str):
96-
result = [new_variable]
97-
logger.debug("Normalized new_variable from string %r to list: %r", new_variable, result)
98-
return result
80+
return [new_variable]
9981
# At this point _validate_inputs already ensured this is non-empty list[str]
100-
result = list(new_variable)
101-
logger.debug("Normalized new_variable from list %r to list: %r", new_variable, result)
102-
return result
82+
return list(new_variable)
10383

10484

10585
def _normalize_chunk_grid(
@@ -109,57 +89,37 @@ def _normalize_chunk_grid(
10989
num_variables: int,
11090
) -> list["RegularChunkGrid" | "RectilinearChunkGrid"]:
11191
"""Broadcast chunk_grid to match num_variables."""
112-
logger.debug("Normalizing chunk_grid for %d variables", num_variables)
113-
11492
if isinstance(chunk_grid, list):
115-
logger.debug("Input chunk_grid is a list with %d items", len(chunk_grid))
11693
if len(chunk_grid) == 1 and num_variables > 1:
117-
result = chunk_grid * num_variables
118-
logger.debug("Broadcasting single chunk_grid to %d variables: %r", num_variables, result)
119-
return result
94+
return chunk_grid * num_variables
12095
if len(chunk_grid) == num_variables:
121-
result = list(chunk_grid)
122-
logger.debug("Using chunk_grid list as-is for %d variables: %r", num_variables, result)
123-
return result
96+
return list(chunk_grid)
12497
raise ValueError(
12598
"chunk_grid list length must be 1 or equal to the number of new variables"
12699
)
127100
# single grid reused for all variables
128-
result = [chunk_grid] * num_variables
129-
logger.debug("Replicating single chunk_grid for %d variables: %r", num_variables, result)
130-
return result
101+
return [chunk_grid] * num_variables
131102

132103

133104
def _normalize_compressor(
134105
compressor: "ZFP" | "Blosc" | list["ZFP" | "Blosc"] | None,
135106
num_variables: int,
136107
) -> list["ZFP" | "Blosc" | None]:
137108
"""Broadcast compressor to match num_variables."""
138-
logger.debug("Normalizing compressor for %d variables", num_variables)
139-
140109
if compressor is None:
141-
result = [None] * num_variables
142-
logger.debug("Setting compressor to None for all %d variables: %r", num_variables, result)
143-
return result
110+
return [None] * num_variables
144111

145112
if isinstance(compressor, list):
146-
logger.debug("Input compressor is a list with %d items", len(compressor))
147113
if len(compressor) == 1 and num_variables > 1:
148-
result = compressor * num_variables
149-
logger.debug("Broadcasting single compressor to %d variables: %r", num_variables, result)
150-
return result
114+
return compressor * num_variables
151115
if len(compressor) == num_variables:
152-
result = list(compressor)
153-
logger.debug("Using compressor list as-is for %d variables: %r", num_variables, result)
154-
return result
116+
return list(compressor)
155117
raise ValueError(
156118
"compressor list length must be 1 or equal to the number of new variables"
157119
)
158120

159121
# single compressor reused for all variables
160-
result = [compressor] * num_variables
161-
logger.debug("Replicating single compressor for %d variables: %r", num_variables, result)
162-
return result
122+
return [compressor] * num_variables
163123

164124

165125
def from_variable(
@@ -193,20 +153,15 @@ def from_variable(
193153
copy_metadata: Whether to copy attrs/encoding from the source Variable.
194154
"""
195155
# 1) Basic validation (types, emptiness)
196-
logger.debug("Starting from_variable operation: dataset_path=%r, source_variable=%r",
197-
dataset_path, source_variable)
198156
_validate_inputs(new_variable, chunk_grid, compressor)
199157

200158
# 2) Normalize/broadcast each argument
201-
logger.debug("Normalizing inputs for processing")
202159
new_variables = _normalize_new_variable(new_variable)
203160
num_vars = len(new_variables)
204161
chunk_grids = _normalize_chunk_grid(chunk_grid, num_vars)
205162
compressors = _normalize_compressor(compressor, num_vars)
206-
logger.debug("After normalization: %d variables to create", num_vars)
207163

208164
normed_path = _normalize_path(dataset_path)
209-
logger.debug("Opening dataset at: %r", normed_path)
210165
ds = open_mdio(normed_path)
211166

212167
source_var = ds[source_variable]
@@ -222,22 +177,15 @@ def from_variable(
222177

223178
dask_config: dict[str, Any] = {"scheduler": "processes", "num_workers": num_workers}
224179

225-
logger.debug("Using Dask config: %s", dask_config)
226-
227180
# 3) One Dask config context, write each new variable sequentially
228181
with dask.config.set(**dask_config):
229-
logger.debug("Starting variable processing loop with Dask config")
230182
for name, grid, comp in tqdm(
231183
zip(new_variables, chunk_grids, compressors, strict=True),
232184
total=len(new_variables),
233185
desc="Generating newly chunked Variables",
234186
unit="variable"
235187
):
236-
logger.debug("Processing variable %r with grid type %s and compressor type %s",
237-
name, type(grid), type(comp))
238-
239188
new_chunks = tuple(grid.configuration.chunk_shape)
240-
logger.debug("New chunk shape for variable %r: %r", name, new_chunks)
241189

242190
if len(dims) != len(new_chunks):
243191
logger.warning(
@@ -251,32 +199,28 @@ def from_variable(
251199

252200
# Build Dask chunk mapping for target chunks
253201
dest_mapping = dict(zip(dims, new_chunks, strict=True))
254-
logger.debug("Target chunk mapping: %r", dest_mapping)
255202

256203
# Rechunk directly to target chunks - skip intermediate work chunks to avoid task explosion
257204
if store_chunks is not None and tuple(store_chunks) == new_chunks:
258-
logger.debug("Variable %r already has target chunks %r, using as-is", name, new_chunks)
259205
rechunked = source_var
260206
else:
261-
logger.debug("Rechunking variable %r directly to target chunks: %r", name, dest_mapping)
262207
rechunked = source_var.chunk(dest_mapping)
263-
logger.debug("Final rechunked array chunks: %r", rechunked.chunks)
264208

265-
logger.debug("Dask task graph for variable %r has %d tasks", name, len(rechunked.__dask_graph__()))
209+
logger.debug("Variable %r: nominal_chunks=%r, task graph has %d tasks",
210+
name,
211+
tuple(dim_chunks[0] for dim_chunks in rechunked.chunks),
212+
len(rechunked.__dask_graph__()))
266213

267214
# Build DataArray for the new variable
268215
attrs = source_var.attrs.copy() if copy_metadata else {}
269-
logger.debug("Creating DataArray for variable %r with copy_metadata=%s", name, copy_metadata)
270216
new_da = DataArray(
271217
data=rechunked.data,
272218
dims=dims,
273219
coords=source_var.coords,
274220
attrs=attrs,
275221
name=name,
276222
)
277-
logger.debug("DataArray created with shape=%r, chunks=%r", new_da.shape, new_da.chunks)
278223
new_ds = new_da.to_dataset(name=name)
279-
logger.debug("Converted to dataset with %d variables", len(new_ds.data_vars))
280224

281225
# Per-variable encoding
282226
encoding: dict[str, Any] = (
@@ -286,21 +230,15 @@ def from_variable(
286230
if comp is not None:
287231
compressor_encoding = _compressor_to_encoding(comp)
288232
encoding.update(compressor_encoding)
289-
logger.debug("Applied compressor encoding for variable %r: %r", name, compressor_encoding)
290233
new_ds[name].encoding = encoding
291-
logger.debug("Final encoding for variable %r: %r", name, encoding)
292234

293235
# Clean up attrs that can conflict with consolidated metadata
294-
logger.debug("Cleaning up _FillValue attributes for variable %r", name)
295236
_remove_fillvalue_attrs(new_ds)
296237

297238
# Drop non-dimensional coordinates to avoid chunk conflicts
298239
coords_to_drop = [coord for coord in new_ds.coords if coord not in new_ds.dims]
299240
if coords_to_drop:
300-
logger.debug("Dropping non-dimensional coordinates for variable %r: %r", name, coords_to_drop)
301241
new_ds = new_ds.drop_vars(coords_to_drop)
302-
else:
303-
logger.debug("No non-dimensional coordinates to drop for variable %r", name)
304242

305243
logger.debug("Starting write operation for variable %r with compute=True", name)
306244
with TqdmCallback(desc=f"Writing variable '{name}'", unit="chunk"):

0 commit comments

Comments
 (0)