1111from pathlib import Path
1212from time import perf_counter
1313from typing import Any , Optional
14+ from urllib .parse import urlparse
1415
1516import xarray as xr
1617
@@ -175,6 +176,44 @@ def _summarize_dask_metrics(
175176 return base
176177
177178
179+ def _derive_item_id_from_input_path (path : str ) -> str :
180+ """Best-effort item id from local path or URL."""
181+ try :
182+ if path .startswith (("http://" , "https://" , "s3://" , "gs://" )):
183+ parsed = urlparse (path )
184+ name = Path (parsed .path .rstrip ("/" )).name
185+ else :
186+ name = Path (str (path ).rstrip ("/" )).name
187+ if name .endswith (".zarr" ):
188+ name = name [: - len (".zarr" )]
189+ return name or "dataset"
190+ except Exception :
191+ return "dataset"
192+
193+
194+ def _resolve_output_prefix (prefix : str , item_id : str ) -> str :
195+ target_name = f"{ item_id } _geozarr.zarr"
196+ if prefix .startswith (("http://" , "https://" , "s3://" , "gs://" )):
197+ if not prefix .endswith ("/" ):
198+ prefix = prefix + "/"
199+ return prefix + target_name
200+ return str (Path (prefix ) / target_name )
201+
202+
203+ def _has_group (tree : xr .DataTree , path : str ) -> bool :
204+ try :
205+ parts = [p for p in str (path ).strip ("/" ).split ("/" ) if p ]
206+ node = tree
207+ for seg in parts :
208+ children = getattr (node , "children" , {}) or {}
209+ if seg not in children :
210+ return False
211+ node = children [seg ]
212+ return True
213+ except Exception :
214+ return False
215+
216+
178217def convert_command (args : argparse .Namespace ) -> None :
179218 """Convert EOPF dataset to GeoZarr compliant format.
180219
@@ -215,6 +254,11 @@ def convert_command(args: argparse.Namespace) -> None:
215254
216255 # Handle output path validation
217256 output_path_str = args .output_path
257+ # Expand trailing-slash prefix to a concrete store
258+ if output_path_str .endswith ("/" ):
259+ item_id = _derive_item_id_from_input_path (str (input_path ))
260+ output_path_str = _resolve_output_prefix (output_path_str , item_id )
261+ print (f"Resolved output store: { output_path_str } " )
218262 if is_s3_path (output_path_str ):
219263 # S3 path - validate S3 access
220264 print ("🔍 Validating S3 access..." )
@@ -268,14 +312,48 @@ def convert_command(args: argparse.Namespace) -> None:
268312 # Load the EOPF DataTree with appropriate storage options
269313 print ("Loading EOPF dataset..." )
270314 storage_options = get_storage_options (input_path )
271- # Metrics setup
315+ # Metrics setup (environment first; set_input after group validation)
272316 if getattr (args , "metrics" , True ):
273317 metrics = MetricsRecorder ()
274318 metrics .set_environment ()
319+
320+ with metrics .time_step ("open_input" ) if metrics else nullcontext ():
321+ dt = xr .open_datatree (
322+ str (input_path ),
323+ engine = "zarr" ,
324+ chunks = "auto" ,
325+ storage_options = storage_options ,
326+ )
327+
328+ # Validate/prune groups if requested
329+ groups_effective = list (getattr (args , "groups" , []) or [])
330+ validate_mode = getattr (args , "validate_groups" , None )
331+ missing : list [str ] = []
332+ if validate_mode in {"warn" , "error" } and groups_effective :
333+ existing : list [str ] = []
334+ for g in groups_effective :
335+ if _has_group (dt , g ):
336+ existing .append (g )
337+ else :
338+ missing .append (g )
339+ if missing :
340+ msg = f"Groups not found: { ', ' .join (missing )} "
341+ if validate_mode == "error" :
342+ print (f"❌ { msg } " )
343+ sys .exit (3 )
344+ else :
345+ print (f"⚠️ { msg } ; proceeding with remaining groups" )
346+ groups_effective = existing
347+ if not groups_effective :
348+ print ("❌ No valid groups to convert after validation" )
349+ sys .exit (3 )
350+
351+ # Now that groups are finalized, set input metadata for metrics
352+ if metrics is not None :
275353 metrics .set_input (
276354 source_uri = str (input_path ),
277355 profile = None ,
278- groups = args . groups ,
356+ groups = groups_effective ,
279357 dask = {
280358 "enabled" : bool (dask_client is not None ),
281359 "mode" : getattr (args , "dask_mode" , None ),
@@ -286,19 +364,13 @@ def convert_command(args: argparse.Namespace) -> None:
286364 },
287365 )
288366
289- with metrics .time_step ("open_input" ) if metrics else nullcontext ():
290- dt = xr .open_datatree (
291- str (input_path ),
292- engine = "zarr" ,
293- chunks = "auto" ,
294- storage_options = storage_options ,
295- )
296-
297367 if args .verbose :
298368 print (f"Loaded DataTree with { len (dt .children )} groups" )
299369 print ("Available groups:" )
300370 for group_name in dt .children :
301371 print (f" - { group_name } " )
372+ if missing :
373+ print (f"After validation, converting groups: { groups_effective } " )
302374
303375 # Convert to GeoZarr compliant format
304376 print ("Converting to GeoZarr compliant format..." )
@@ -330,7 +402,7 @@ def convert_command(args: argparse.Namespace) -> None:
330402 with metrics .time_step ("convert" ) if metrics else nullcontext ():
331403 dt_geozarr = create_geozarr_dataset (
332404 dt_input = dt ,
333- groups = args . groups ,
405+ groups = groups_effective ,
334406 output_path = output_path ,
335407 spatial_chunk = args .spatial_chunk ,
336408 min_dimension = args .min_dimension ,
@@ -344,7 +416,7 @@ def convert_command(args: argparse.Namespace) -> None:
344416 with metrics .time_step ("convert" ) if metrics else nullcontext ():
345417 dt_geozarr = create_geozarr_dataset (
346418 dt_input = dt ,
347- groups = args . groups ,
419+ groups = groups_effective ,
348420 output_path = output_path ,
349421 spatial_chunk = args .spatial_chunk ,
350422 min_dimension = args .min_dimension ,
@@ -381,7 +453,7 @@ def convert_command(args: argparse.Namespace) -> None:
381453 ),
382454 "perf_report" : getattr (args , "dask_perf_html" , None ),
383455 "wall_clock_s" : wall_clock if dask_client is not None else None ,
384- "groups" : args . groups ,
456+ "groups" : groups_effective ,
385457 "spatial_chunk" : args .spatial_chunk ,
386458 "min_dimension" : args .min_dimension ,
387459 "tile_width" : args .tile_width ,
@@ -399,6 +471,17 @@ def convert_command(args: argparse.Namespace) -> None:
399471 MetricsRecorder .write_json (run_summary , payload )
400472 if args .verbose :
401473 print (f"🧾 Wrote metrics: { run_summary } " )
474+ # Optional external run-metadata path
475+ if getattr (args , "run_metadata" , None ):
476+ try :
477+ outp = Path (args .run_metadata )
478+ outp .parent .mkdir (parents = True , exist_ok = True )
479+ MetricsRecorder .write_json (outp , payload )
480+ if args .verbose :
481+ print (f"🧾 Wrote run metadata: { outp } " )
482+ except Exception as _e :
483+ if args .verbose :
484+ print (f"(debug) could not write run-metadata: { _e } " )
402485 except Exception as _exc :
403486 if args .verbose :
404487 print (f"(debug) could not write run summary: { _exc } " )
@@ -424,10 +507,22 @@ def convert_command(args: argparse.Namespace) -> None:
424507 try :
425508 if metrics and debug_dir is not None and getattr (args , "metrics" , True ):
426509 payload = metrics .finalize (status = "error" , exception = error_msg )
427- run_summary = Path (args .output_path ) / "debug" / "run_summary.json"
428- MetricsRecorder .write_json (run_summary , payload )
429- if args .verbose :
430- print (f"🧾 Wrote failure metrics: { run_summary } " )
510+ run_summary = Path (output_path ) / "debug" / "run_summary.json"
511+ try :
512+ MetricsRecorder .write_json (run_summary , payload )
513+ if args .verbose :
514+ print (f"🧾 Wrote failure metrics: { run_summary } " )
515+ except Exception :
516+ pass
517+ if getattr (args , "run_metadata" , None ):
518+ try :
519+ outp = Path (args .run_metadata )
520+ outp .parent .mkdir (parents = True , exist_ok = True )
521+ MetricsRecorder .write_json (outp , payload )
522+ if args .verbose :
523+ print (f"🧾 Wrote run metadata (error): { outp } " )
524+ except Exception :
525+ pass
431526 except Exception :
432527 pass
433528 sys .exit (1 )
@@ -1414,6 +1509,17 @@ def create_parser() -> argparse.ArgumentParser:
14141509 action = "store_false" ,
14151510 help = "Disable metrics emission." ,
14161511 )
1512+ convert_parser .add_argument (
1513+ "--validate-groups" ,
1514+ type = str ,
1515+ choices = ["warn" , "error" ],
1516+ help = "Validate requested groups against input; warn to prune, error to abort." ,
1517+ )
1518+ convert_parser .add_argument (
1519+ "--run-metadata" ,
1520+ type = str ,
1521+ help = "Also write finalized metrics payload to this JSON path." ,
1522+ )
14171523 convert_parser .set_defaults (func = convert_command )
14181524
14191525 # Info command
0 commit comments