diff --git a/src/mintpy/cli/prep_nisar.py b/src/mintpy/cli/prep_nisar.py index c7bc22936..8ba04362f 100755 --- a/src/mintpy/cli/prep_nisar.py +++ b/src/mintpy/cli/prep_nisar.py @@ -58,6 +58,15 @@ def create_parser(subparsers=None): help="NISAR frequency to load: auto defaults to A (default: %(default)s).", ) + parser.add_argument( + "-ba", + "--band", + dest="sar_band", + choices=["auto", "LSAR", "SSAR"], + default="auto", + help="NISAR product family to load: auto defaults to LSAR (default: %(default)s).", + ) + parser.add_argument( "-o", "--out-dir", diff --git a/src/mintpy/defaults/smallbaselineApp.cfg b/src/mintpy/defaults/smallbaselineApp.cfg index 59d2756f3..7e88b3297 100644 --- a/src/mintpy/defaults/smallbaselineApp.cfg +++ b/src/mintpy/defaults/smallbaselineApp.cfg @@ -30,6 +30,7 @@ mintpy.load.processor = auto #[isce, aria, hyp3, gmtsar, snap, gamma, roi mintpy.load.autoPath = auto #[yes / no], auto for no, use pre-defined auto path mintpy.load.updateMode = auto #[yes / no], auto for yes, skip re-loading if HDF5 files are complete mintpy.load.compression = auto #[gzip / lzf / none / default], auto for default (none/lzf for stack/geometry). +mintpy.load.band = auto #[auto / LSAR / SSAR], auto for LSAR, NISAR only mintpy.load.frequency = auto #[auto / A / B], auto for A, NISAR only ##---------for ISCE only: mintpy.load.metaFile = auto #[path of common metadata file for the stack], i.e.: ./reference/IW1.xml, ./referenceShelve/data.dat diff --git a/src/mintpy/load_data.py b/src/mintpy/load_data.py index 1ec38a044..5c3223919 100644 --- a/src/mintpy/load_data.py +++ b/src/mintpy/load_data.py @@ -636,6 +636,7 @@ def prepare_metadata(iDict): dem_file = iDict['mintpy.load.demFile'] gunw_files = iDict['mintpy.load.unwFile'] water_mask = iDict['mintpy.load.waterMaskFile'] + band = iDict.get('mintpy.load.band', 'auto') frequency = iDict.get('mintpy.load.frequency', 'auto') if str(dem_file).lower() in ['auto', 'none', 'no', '']: @@ -655,7 +656,7 @@ def prepare_metadata(iDict): ) # run prep_*.py - iargs = ['-i', gunw_files, '-d', dem_file, '--frequency', frequency] + iargs = ['-i', gunw_files, '-d', dem_file, '--band', band, '--frequency', frequency] if str(water_mask).lower() not in ['auto', 'none', 'no', ''] and os.path.exists(water_mask): iargs = iargs + ['--mask', water_mask] diff --git a/src/mintpy/prep_nisar.py b/src/mintpy/prep_nisar.py index 629c1c945..01080039a 100644 --- a/src/mintpy/prep_nisar.py +++ b/src/mintpy/prep_nisar.py @@ -9,7 +9,9 @@ import datetime import glob import os +from dataclasses import dataclass, field from pathlib import Path +from typing import Dict, Optional import h5py import numpy as np @@ -29,26 +31,8 @@ "frequencyA": "frequencyA", "frequencyB": "frequencyB", } -IDENTIFICATION = "/science/LSAR/identification" -RADARGRID_ROOT = "science/LSAR/GUNW/metadata/radarGrid" - -PROCESSINFO = { - "orbit_direction": f"{IDENTIFICATION}/orbitPassDirection", - "platform": f"{IDENTIFICATION}/missionId", - "start_time": f"{IDENTIFICATION}/referenceZeroDopplerStartTime", - "end_time": f"{IDENTIFICATION}/referenceZeroDopplerEndTime", - "rdr_xcoord": f"{RADARGRID_ROOT}/xCoordinates", - "rdr_ycoord": f"{RADARGRID_ROOT}/yCoordinates", - "rdr_slant_range": f"{RADARGRID_ROOT}/referenceSlantRange", - "rdr_height": f"{RADARGRID_ROOT}/heightAboveEllipsoid", - "rdr_incidence": f"{RADARGRID_ROOT}/incidenceAngle", - "rdr_los_x": f"{RADARGRID_ROOT}/losUnitVectorX", - "rdr_los_y": f"{RADARGRID_ROOT}/losUnitVectorY", - "rdr_wet_tropo": f"{RADARGRID_ROOT}/wetTroposphericPhaseScreen", - "rdr_hs_tropo": f"{RADARGRID_ROOT}/hydrostaticTroposphericPhaseScreen", - "rdr_SET": f"{RADARGRID_ROOT}/slantRangeSolidEarthTidesPhase", - "bperp": f"{RADARGRID_ROOT}/perpendicularBaseline", -} +DEFAULT_SAR_BAND = "LSAR" +VALID_SAR_BANDS = {DEFAULT_SAR_BAND, "SSAR"} STACK_TYPES = {"ifgram", "ion", "tropo", "set"} @@ -67,52 +51,188 @@ def _normalize_frequency(frequency) -> str: return normalized -def _dataset_root_unw(frequency: str) -> str: - return f"/science/LSAR/GUNW/grids/{frequency}/unwrappedInterferogram" +def _normalize_sar_band(sar_band) -> str: + """Return the normalized NISAR product family name.""" + if sar_band is None or str(sar_band).lower() == "auto": + return DEFAULT_SAR_BAND + normalized = str(sar_band).upper() + if normalized not in VALID_SAR_BANDS: + raise ValueError( + f"sar_band must be one of: auto, {', '.join(sorted(VALID_SAR_BANDS))}" + ) + return normalized -def _parameters_root(frequency: str) -> str: - return ( - "/science/LSAR/GUNW/metadata/processingInformation/parameters/" - f"unwrappedInterferogram/{frequency}" - ) + +@dataclass(frozen=True) +class NisarProductContext: + """Cache the band-specific HDF5 roots and commonly reused dataset paths.""" + + sar_band: str = DEFAULT_SAR_BAND + science_root: str = field(init=False) + identification_root: str = field(init=False) + radargrid_root: str = field(init=False) + processinfo: Dict[str, str] = field(init=False) + + def __post_init__(self): + """Normalize the band name and precompute shared HDF5 path prefixes.""" + sar_band = _normalize_sar_band(self.sar_band) + science_root = f"/science/{sar_band}" + identification_root = f"{science_root}/identification" + radargrid_root = f"{science_root}/GUNW/metadata/radarGrid" + + object.__setattr__(self, "sar_band", sar_band) + object.__setattr__(self, "science_root", science_root) + object.__setattr__(self, "identification_root", identification_root) + object.__setattr__(self, "radargrid_root", radargrid_root) + object.__setattr__( + self, + "processinfo", + { + "orbit_direction": f"{identification_root}/orbitPassDirection", + "platform": f"{identification_root}/missionId", + "start_time": f"{identification_root}/referenceZeroDopplerStartTime", + "end_time": f"{identification_root}/referenceZeroDopplerEndTime", + "ref_start_time": f"{identification_root}/referenceZeroDopplerStartTime", + "sec_start_time": f"{identification_root}/secondaryZeroDopplerStartTime", + "rdr_xcoord": f"{radargrid_root}/xCoordinates", + "rdr_ycoord": f"{radargrid_root}/yCoordinates", + "rdr_slant_range": f"{radargrid_root}/referenceSlantRange", + "rdr_height": f"{radargrid_root}/heightAboveEllipsoid", + "rdr_incidence": f"{radargrid_root}/incidenceAngle", + "rdr_los_x": f"{radargrid_root}/losUnitVectorX", + "rdr_los_y": f"{radargrid_root}/losUnitVectorY", + "rdr_wet_tropo": f"{radargrid_root}/wetTroposphericPhaseScreen", + "rdr_hs_tropo": f"{radargrid_root}/hydrostaticTroposphericPhaseScreen", + "rdr_SET": f"{radargrid_root}/slantRangeSolidEarthTidesPhase", + "bperp": f"{radargrid_root}/perpendicularBaseline", + }, + ) + + def dataset_root_unw(self, frequency: str) -> str: + return f"{self.science_root}/GUNW/grids/{frequency}/unwrappedInterferogram" + + def parameters_root(self, frequency: str) -> str: + return ( + f"{self.science_root}/GUNW/metadata/processingInformation/parameters/" + f"unwrappedInterferogram/{frequency}" + ) + + def center_frequency_path(self, frequency: str) -> str: + return f"{self.science_root}/GUNW/grids/{frequency}/centerFrequency" + + def datasets_for_pol(self, polarization: str, frequency: str) -> Dict[str, str]: + root = self.dataset_root_unw(frequency) + parameters = self.parameters_root(frequency) + return { + "xcoord": f"{root}/{polarization}/xCoordinates", + "ycoord": f"{root}/{polarization}/yCoordinates", + "unw": f"{root}/{polarization}/unwrappedPhase", + "mask": f"{root}/mask", + "cor": f"{root}/{polarization}/coherenceMagnitude", + "connComp": f"{root}/{polarization}/connectedComponents", + "ion": f"{root}/{polarization}/ionospherePhaseScreen", + "epsg": f"{root}/{polarization}/projection", + "xSpacing": f"{root}/{polarization}/xCoordinateSpacing", + "ySpacing": f"{root}/{polarization}/yCoordinateSpacing", + "polarization": ( + f"{self.science_root}/GUNW/grids/{frequency}/listOfPolarizations" + ), + "range_look": f"{parameters}/numberOfRangeLooks", + "azimuth_look": f"{parameters}/numberOfAzimuthLooks", + } + + +def _coerce_product_context( + product_ctx: Optional[NisarProductContext] = None, + sar_band: str = DEFAULT_SAR_BAND, +) -> NisarProductContext: + return product_ctx if product_ctx is not None else NisarProductContext(sar_band) -def _center_frequency_path(frequency: str) -> str: - return f"/science/LSAR/GUNW/grids/{frequency}/centerFrequency" +def _science_root( + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).science_root -def _datasets_for_pol(polarization: str, frequency: str) -> dict: +def _identification_root( + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).identification_root + + +def _radargrid_root( + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).radargrid_root + + +def _processinfo( + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> Dict[str, str]: + return _coerce_product_context(product_ctx, sar_band).processinfo + + +def _dataset_root_unw( + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).dataset_root_unw(frequency) + + +def _parameters_root( + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).parameters_root(frequency) + + +def _center_frequency_path( + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: + return _coerce_product_context(product_ctx, sar_band).center_frequency_path( + frequency + ) + + +def _datasets_for_pol( + polarization: str, + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> Dict[str, str]: """Return per-call dataset paths for the selected frequency/polarization.""" - root = _dataset_root_unw(frequency) - parameters = _parameters_root(frequency) - return { - "xcoord": f"{root}/{polarization}/xCoordinates", - "ycoord": f"{root}/{polarization}/yCoordinates", - "unw": f"{root}/{polarization}/unwrappedPhase", - "mask": f"{root}/mask", - "cor": f"{root}/{polarization}/coherenceMagnitude", - "connComp": f"{root}/{polarization}/connectedComponents", - "ion": f"{root}/{polarization}/ionospherePhaseScreen", - "epsg": f"{root}/{polarization}/projection", - "xSpacing": f"{root}/{polarization}/xCoordinateSpacing", - "ySpacing": f"{root}/{polarization}/yCoordinateSpacing", - "polarization": f"/science/LSAR/GUNW/grids/{frequency}/listOfPolarizations", - "range_look": f"{parameters}/numberOfRangeLooks", - "azimuth_look": f"{parameters}/numberOfAzimuthLooks", - } + return _coerce_product_context(product_ctx, sar_band).datasets_for_pol( + polarization, frequency + ) -def _resolve_frequency(gunw_file: str, frequency, polarization: str) -> str: +def _resolve_frequency( + gunw_file: str, + frequency, + polarization: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> str: """Resolve and validate the requested NISAR frequency.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) resolved = _normalize_frequency(frequency) - datasets = _datasets_for_pol(polarization, resolved) + datasets = _datasets_for_pol(polarization, resolved, product_ctx=product_ctx) with h5py.File(gunw_file, "r") as ds: required_paths = [ - _dataset_root_unw(resolved), + _dataset_root_unw(resolved, product_ctx=product_ctx), datasets["unw"], - _center_frequency_path(resolved), + _center_frequency_path(resolved, product_ctx=product_ctx), ] missing = [path for path in required_paths if path not in ds] @@ -130,8 +250,9 @@ def _resolve_frequency(gunw_file: str, frequency, polarization: str) -> str: "Check that the input file contains frequencyB for this polarization." ) raise ValueError( - f"NISAR {requested} data for polarization {polarization!r} was not found " - f"in {gunw_file}. Missing path: {missing[0]}. {hint}" + f"NISAR {product_ctx.sar_band} {requested} data for " + f"polarization {polarization!r} was not found in {gunw_file}. " + f"Missing path: {missing[0]}. {hint}" ) return resolved @@ -256,9 +377,17 @@ def _coerce_subset_metadata_types(meta): return meta -def _read_unwrapped_phase_valid_mask(gunw_file: str, xybbox, pol: str, frequency: str): +def _read_unwrapped_phase_valid_mask( + gunw_file: str, + xybbox, + pol: str, + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Fallback validity mask based on finite unwrappedPhase (+ _FillValue check).""" - datasets = _datasets_for_pol(pol, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(pol, frequency, product_ctx=product_ctx) path = datasets["unw"] with h5py.File(gunw_file, "r") as ds: dset = ds[path] @@ -271,14 +400,28 @@ def _read_unwrapped_phase_valid_mask(gunw_file: str, xybbox, pol: str, frequency return valid -def _read_is_land_and_valid_mask(gunw_file: str, xybbox, pol: str, frequency: str): +def _read_is_land_and_valid_mask( + gunw_file: str, + xybbox, + pol: str, + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Decode the native GUNW mask into MintPy's keep-mask convention.""" - datasets = _datasets_for_pol(pol, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(pol, frequency, product_ctx=product_ctx) path = datasets["mask"] with h5py.File(gunw_file, "r") as ds: if path not in ds: - return _read_unwrapped_phase_valid_mask(gunw_file, xybbox, pol, frequency) + return _read_unwrapped_phase_valid_mask( + gunw_file, + xybbox, + pol, + frequency, + product_ctx=product_ctx, + ) dset = ds[path] mask_bits = dset[xybbox[1] : xybbox[3], xybbox[0] : xybbox[2]] @@ -297,15 +440,32 @@ def _read_is_land_and_valid_mask(gunw_file: str, xybbox, pol: str, frequency: st return is_valid & ~water_mask -def _read_common_is_land_and_valid_mask(input_files, bbox, pol: str, frequency: str): +def _read_common_is_land_and_valid_mask( + input_files, + bbox, + pol: str, + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Return the common keep-mask across all input GUNW products.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) common_mask = None for gunw_file in input_files: geo_ds = read_subset( - gunw_file, bbox, polarization=pol, frequency=frequency, geometry=True + gunw_file, + bbox, + polarization=pol, + frequency=frequency, + product_ctx=product_ctx, + geometry=True, ) mask = _read_is_land_and_valid_mask( - gunw_file, geo_ds["xybbox"], pol, frequency + gunw_file, + geo_ds["xybbox"], + pol, + frequency, + product_ctx=product_ctx, ).astype(bool, copy=False) if common_mask is None: @@ -336,13 +496,20 @@ def _apply_external_mask( external_mask_file, polarization: str, frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, ): """Refine a keep-mask with an optional external raster mask.""" if not _external_mask_is_set(external_mask_file): return keep_mask + product_ctx = _coerce_product_context(product_ctx, sar_band) dst_epsg, xcoord, ycoord = _read_target_grid( - gunw_file, xybbox, polarization, frequency + gunw_file, + xybbox, + polarization, + frequency, + product_ctx=product_ctx, ) mask_src_epsg = _read_raster_epsg(external_mask_file) external_mask = _warp_to_grid_mem( @@ -356,10 +523,16 @@ def _apply_external_mask( return keep_mask & external_mask -def _read_perpendicular_baseline(gunw_file: str) -> np.float32: +def _read_perpendicular_baseline( + gunw_file: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +) -> np.float32: """Read the NISAR perpendicular baseline as one finite mean value.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) + processinfo = _processinfo(product_ctx=product_ctx) with h5py.File(gunw_file, "r") as ds: - dset = ds[PROCESSINFO["bperp"]] + dset = ds[processinfo["bperp"]] bperp = np.asarray(dset[()], dtype=np.float64).reshape(-1) fill = dset.attrs.get("_FillValue", None) @@ -376,9 +549,17 @@ def _read_perpendicular_baseline(gunw_file: str) -> np.float32: return np.float32(pbase) -def _read_target_grid(gunw_file: str, xybbox, polarization: str, frequency: str): +def _read_target_grid( + gunw_file: str, + xybbox, + polarization: str, + frequency: str, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Read the destination EPSG and subset grid axes from a GUNW file.""" - datasets = _datasets_for_pol(polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(polarization, frequency, product_ctx=product_ctx) with h5py.File(gunw_file, "r") as ds: return ( int(ds[datasets["epsg"]][()]), @@ -387,27 +568,47 @@ def _read_target_grid(gunw_file: str, xybbox, polarization: str, frequency: str) ) -def _read_radar_grid_fields(gunw_file: str, field_map: dict): +def _read_radar_grid_fields( + gunw_file: str, + field_map: dict, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Read radar-grid interpolation axes plus the requested data fields.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) + processinfo = _processinfo(product_ctx=product_ctx) rdr_coords = {} with h5py.File(gunw_file, "r") as ds: - rdr_coords["xcoord_radar_grid"] = ds[PROCESSINFO["rdr_xcoord"]][()] - rdr_coords["ycoord_radar_grid"] = ds[PROCESSINFO["rdr_ycoord"]][()] - rdr_coords["height_radar_grid"] = ds[PROCESSINFO["rdr_height"]][()] + rdr_coords["xcoord_radar_grid"] = ds[processinfo["rdr_xcoord"]][()] + rdr_coords["ycoord_radar_grid"] = ds[processinfo["rdr_ycoord"]][()] + rdr_coords["height_radar_grid"] = ds[processinfo["rdr_height"]][()] for out_key, process_key in field_map.items(): - rdr_coords[out_key] = ds[PROCESSINFO[process_key]][()] + rdr_coords[out_key] = ds[processinfo[process_key]][()] return rdr_coords def _prepare_radar_grid_interpolation( - gunw_file, dem_file, xybbox, polarization, frequency, field_map, valid_mask=None + gunw_file, + dem_file, + xybbox, + polarization, + frequency, + field_map, + valid_mask=None, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, ): """Build the common DEM/grid/valid-mask context for radar-grid interpolation.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) dem_src_epsg = _read_raster_epsg(dem_file) dst_epsg, xcoord, ycoord = _read_target_grid( - gunw_file, xybbox, polarization, frequency + gunw_file, + xybbox, + polarization, + frequency, + product_ctx=product_ctx, ) - rdr_coords = _read_radar_grid_fields(gunw_file, field_map) + rdr_coords = _read_radar_grid_fields(gunw_file, field_map, product_ctx=product_ctx) dem_subset_array = _warp_to_grid_mem( src_path=dem_file, @@ -421,7 +622,11 @@ def _prepare_radar_grid_interpolation( y_2d, x_2d = np.meshgrid(ycoord, xcoord, indexing="ij") if valid_mask is None: valid_mask = _read_is_land_and_valid_mask( - gunw_file, xybbox, polarization, frequency + gunw_file, + xybbox, + polarization, + frequency, + product_ctx=product_ctx, ) else: valid_mask = np.asarray(valid_mask, dtype=np.bool_) @@ -501,26 +706,44 @@ def _resolve_stack_type(stack_type, outfile): ) -def _required_paths_for_stack_type(stack_type, polarization, frequency): +def _required_paths_for_stack_type( + stack_type, + polarization, + frequency, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Return HDF5 source datasets needed to build the requested stack.""" - datasets = _datasets_for_pol(polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(polarization, frequency, product_ctx=product_ctx) + processinfo = _processinfo(product_ctx=product_ctx) if stack_type == "ifgram": return [datasets["unw"], datasets["cor"], datasets["connComp"]] if stack_type == "ion": return [datasets["ion"], datasets["cor"], datasets["connComp"]] if stack_type == "tropo": - return [PROCESSINFO["rdr_wet_tropo"], PROCESSINFO["rdr_hs_tropo"]] + return [processinfo["rdr_wet_tropo"], processinfo["rdr_hs_tropo"]] if stack_type == "set": - return [PROCESSINFO["rdr_SET"]] + return [processinfo["rdr_SET"]] raise ValueError( f"Unsupported stack_type {stack_type!r}; expected one of {sorted(STACK_TYPES)}" ) -def _missing_required_paths(inp_files, stack_type, polarization, frequency): +def _missing_required_paths( + inp_files, + stack_type, + polarization, + frequency, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Return missing required HDF5 source paths as (file, path) pairs.""" - required_paths = _required_paths_for_stack_type(stack_type, polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + required_paths = _required_paths_for_stack_type( + stack_type, polarization, frequency, product_ctx=product_ctx + ) missing = [] for file in inp_files: @@ -530,13 +753,27 @@ def _missing_required_paths(inp_files, stack_type, polarization, frequency): return missing -def _read_stack_observation(file, stack_type, bbox, dem_file, polarization, frequency): +def _read_stack_observation( + file, + stack_type, + bbox, + dem_file, + polarization, + frequency, + sar_band: str = DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Read one observation for the requested stack type.""" - pbase = _read_perpendicular_baseline(file) + product_ctx = _coerce_product_context(product_ctx, sar_band) + pbase = _read_perpendicular_baseline(file, product_ctx=product_ctx) if stack_type in {"ifgram", "ion"}: dataset = read_subset( - file, bbox, polarization=polarization, frequency=frequency + file, + bbox, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, ) unwrap_key = "unw_data" if stack_type == "ifgram" else "ion_data" return { @@ -547,7 +784,12 @@ def _read_stack_observation(file, stack_type, bbox, dem_file, polarization, freq } geo_ds = read_subset( - file, bbox, polarization=polarization, frequency=frequency, geometry=True + file, + bbox, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, + geometry=True, ) if stack_type == "tropo": unwrap_phase = read_and_interpolate_troposphere( @@ -556,6 +798,7 @@ def _read_stack_observation(file, stack_type, bbox, dem_file, polarization, freq geo_ds["xybbox"], polarization=polarization, frequency=frequency, + product_ctx=product_ctx, ) else: unwrap_phase = read_and_interpolate_SET( @@ -564,6 +807,7 @@ def _read_stack_observation(file, stack_type, bbox, dem_file, polarization, freq geo_ds["xybbox"], polarization=polarization, frequency=frequency, + product_ctx=product_ctx, ) return {"unwrap_phase": unwrap_phase, "pbase": pbase} @@ -632,18 +876,37 @@ def load_nisar(inps): # extract metadata pol = getattr(inps, "polarization", "HH") + # Normalize once here so downstream helpers can assume a concrete product family. + product_ctx = NisarProductContext(getattr(inps, "sar_band", DEFAULT_SAR_BAND)) + inps.sar_band = product_ctx.sar_band frequency = _resolve_frequency( - input_files[0], getattr(inps, "frequency", "auto"), pol + input_files[0], + getattr(inps, "frequency", "auto"), + pol, + product_ctx=product_ctx, ) - print(f"Using NISAR {frequency}") + print(f"Using NISAR {product_ctx.sar_band} {frequency}") metadata, bounds = extract_metadata( - input_files, bbox=bbox, polarization=pol, frequency=frequency + input_files, + bbox=bbox, + polarization=pol, + frequency=frequency, + product_ctx=product_ctx, ) common_mask = _read_common_is_land_and_valid_mask( - input_files, bounds, pol=pol, frequency=frequency + input_files, + bounds, + pol=pol, + frequency=frequency, + product_ctx=product_ctx, ) first_geo_ds = read_subset( - input_files[0], bounds, polarization=pol, frequency=frequency, geometry=True + input_files[0], + bounds, + polarization=pol, + frequency=frequency, + product_ctx=product_ctx, + geometry=True, ) common_mask = _apply_external_mask( common_mask, @@ -652,6 +915,7 @@ def load_nisar(inps): inps.mask_file, pol, frequency, + product_ctx=product_ctx, ) print( "Common valid land pixels from all NISAR masks: " @@ -666,7 +930,7 @@ def load_nisar(inps): set_stack_file = os.path.join(inps.out_dir, "inputs/setStack.h5") # date pairs - date12_list = _get_date_pairs(input_files) + date12_list = _get_date_pairs(input_files, product_ctx=product_ctx) # geometry metadata = prepare_geometry( @@ -679,6 +943,7 @@ def load_nisar(inps): commonMask=common_mask, polarization=pol, frequency=frequency, + product_ctx=product_ctx, ) # standalone water mask (MintPy format) @@ -692,6 +957,7 @@ def load_nisar(inps): commonMask=common_mask, polarization=pol, frequency=frequency, + product_ctx=product_ctx, ) # ifgram stack @@ -704,6 +970,7 @@ def load_nisar(inps): date12_list=date12_list, polarization=pol, frequency=frequency, + product_ctx=product_ctx, stack_type="ifgram", commonMask=common_mask, ) @@ -718,6 +985,7 @@ def load_nisar(inps): date12_list=date12_list, polarization=pol, frequency=frequency, + product_ctx=product_ctx, stack_type="ion", commonMask=common_mask, ) @@ -732,6 +1000,7 @@ def load_nisar(inps): date12_list=date12_list, polarization=pol, frequency=frequency, + product_ctx=product_ctx, stack_type="tropo", commonMask=common_mask, ) @@ -746,6 +1015,7 @@ def load_nisar(inps): date12_list=date12_list, polarization=pol, frequency=frequency, + product_ctx=product_ctx, stack_type="set", commonMask=common_mask, ) @@ -756,12 +1026,21 @@ def load_nisar(inps): # --------------------------------------------------------------------- # Metadata / subset utilities # --------------------------------------------------------------------- -def extract_metadata(input_files, bbox=None, polarization="HH", frequency="frequencyA"): +def extract_metadata( + input_files, + bbox=None, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Extract NISAR metadata for MintPy.""" meta_file = input_files[0] meta = {} - datasets = _datasets_for_pol(polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(polarization, frequency, product_ctx=product_ctx) + processinfo = _processinfo(product_ctx=product_ctx) with h5py.File(meta_file, "r") as ds: pixel_height = ds[datasets["ySpacing"]][()] @@ -771,22 +1050,25 @@ def extract_metadata(input_files, bbox=None, polarization="HH", frequency="frequ xcoord = ds[datasets["xcoord"]][()] ycoord = ds[datasets["ycoord"]][()] meta["EPSG"] = int(ds[datasets["epsg"]][()]) - meta["WAVELENGTH"] = SPEED_OF_LIGHT / ds[_center_frequency_path(frequency)][()] - meta["ORBIT_DIRECTION"] = ds[PROCESSINFO["orbit_direction"]][()].decode("utf-8") + meta["WAVELENGTH"] = ( + SPEED_OF_LIGHT + / ds[_center_frequency_path(frequency, product_ctx=product_ctx)][()] + ) + meta["ORBIT_DIRECTION"] = ds[processinfo["orbit_direction"]][()].decode("utf-8") meta["POLARIZATION"] = polarization meta["ALOOKS"] = ds[datasets["azimuth_look"]][()] meta["RLOOKS"] = ds[datasets["range_look"]][()] - meta["PLATFORM"] = ds[PROCESSINFO["platform"]][()].decode("utf-8") + meta["PLATFORM"] = ds[processinfo["platform"]][()].decode("utf-8") meta["STARTING_RANGE"] = float( - np.min(ds[PROCESSINFO["rdr_slant_range"]][()].flatten()) + np.min(ds[processinfo["rdr_slant_range"]][()].flatten()) ) start_time = datetime.datetime.strptime( - ds[PROCESSINFO["start_time"]][()].decode("utf-8")[0:26], + ds[processinfo["start_time"]][()].decode("utf-8")[0:26], "%Y-%m-%dT%H:%M:%S.%f", ) end_time = datetime.datetime.strptime( - ds[PROCESSINFO["end_time"]][()].decode("utf-8")[0:26], + ds[processinfo["end_time"]][()].decode("utf-8")[0:26], "%Y-%m-%dT%H:%M:%S.%f", ) @@ -830,7 +1112,11 @@ def extract_metadata(input_files, bbox=None, polarization="HH", frequency="frequ utm_bbox = None bounds = common_raster_bound( - input_files, utm_bbox, polarization=polarization, frequency=frequency + input_files, + utm_bbox, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, ) meta["bbox"] = ",".join([str(b) for b in bounds]) @@ -893,30 +1179,47 @@ def get_rows_cols(xcoord, ycoord, bounds): return col1, row1, col2, row2 -def get_raster_corners(input_file, polarization="HH", frequency="frequencyA"): +def get_raster_corners( + input_file, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Get the (west, south, east, north) bounds of the image.""" - datasets = _datasets_for_pol(polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(polarization, frequency, product_ctx=product_ctx) + processinfo = _processinfo(product_ctx=product_ctx) with h5py.File(input_file, "r") as ds: xcoord = ds[datasets["xcoord"]][:] ycoord = ds[datasets["ycoord"]][:] - west = max(np.min(ds[PROCESSINFO["rdr_xcoord"]][:]), np.min(xcoord)) - east = min(np.max(ds[PROCESSINFO["rdr_xcoord"]][:]), np.max(xcoord)) - north = min(np.max(ds[PROCESSINFO["rdr_ycoord"]][:]), np.max(ycoord)) - south = max(np.min(ds[PROCESSINFO["rdr_ycoord"]][:]), np.min(ycoord)) + west = max(np.min(ds[processinfo["rdr_xcoord"]][:]), np.min(xcoord)) + east = min(np.max(ds[processinfo["rdr_xcoord"]][:]), np.max(xcoord)) + north = min(np.max(ds[processinfo["rdr_ycoord"]][:]), np.max(ycoord)) + south = max(np.min(ds[processinfo["rdr_ycoord"]][:]), np.min(ycoord)) return float(west), float(south), float(east), float(north) def common_raster_bound( - input_files, utm_bbox=None, polarization="HH", frequency="frequencyA" + input_files, + utm_bbox=None, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, ): """Get common bounds among all data in (xmin, ymin, xmax, ymax).""" + product_ctx = _coerce_product_context(product_ctx, sar_band) wests = [] souths = [] easts = [] norths = [] for file in input_files: west, south, east, north = get_raster_corners( - file, polarization=polarization, frequency=frequency + file, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, ) wests.append(west) souths.append(south) @@ -963,10 +1266,17 @@ def bbox_to_utm(bbox, dst_epsg, src_epsg=4326): def read_subset( - gunw_file, bbox, polarization="HH", frequency="frequencyA", geometry=False + gunw_file, + bbox, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + geometry=False, + product_ctx: Optional[NisarProductContext] = None, ): """Read subset arrays or only geometry bounds for unwrapped products.""" - datasets = _datasets_for_pol(polarization, frequency) + product_ctx = _coerce_product_context(product_ctx, sar_band) + datasets = _datasets_for_pol(polarization, frequency, product_ctx=product_ctx) with h5py.File(gunw_file, "r") as ds: xcoord = ds[datasets["xcoord"]][()] ycoord = ds[datasets["ycoord"]][()] @@ -1024,10 +1334,13 @@ def read_and_interpolate_geometry( xybbox, polarization="HH", frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, external_mask_file=None, valid_mask=None, + product_ctx: Optional[NisarProductContext] = None, ): """Warp DEM to the interferogram grid and interpolate geometry layers.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) interp_ctx = _prepare_radar_grid_interpolation( gunw_file, dem_file, @@ -1041,6 +1354,7 @@ def read_and_interpolate_geometry( "los_y": "rdr_los_y", }, valid_mask=valid_mask, + product_ctx=product_ctx, ) slant_range, incidence_angle, azimuth_angle = interpolate_geometry( interp_ctx["x_2d"], @@ -1060,6 +1374,7 @@ def read_and_interpolate_geometry( external_mask_file, polarization, frequency, + product_ctx=product_ctx, ) return ( @@ -1096,9 +1411,16 @@ def interpolate_geometry(X_2d, Y_2d, dem, rdr_coords, valid_mask): def read_and_interpolate_troposphere( - gunw_file, dem_file, xybbox, polarization="HH", frequency="frequencyA" + gunw_file, + dem_file, + xybbox, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, ): """Warp DEM to aligned grid and interpolate combined tropo at valid pixels only.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) interp_ctx = _prepare_radar_grid_interpolation( gunw_file, dem_file, @@ -1109,6 +1431,7 @@ def read_and_interpolate_troposphere( "wet_tropo": "rdr_wet_tropo", "hydrostatic_tropo": "rdr_hs_tropo", }, + product_ctx=product_ctx, ) total_tropo = interpolate_troposphere( interp_ctx["x_2d"], @@ -1138,9 +1461,16 @@ def interpolate_troposphere(X_2d, Y_2d, dem, rdr_coords, valid_mask): def read_and_interpolate_SET( - gunw_file, dem_file, xybbox, polarization="HH", frequency="frequencyA" + gunw_file, + dem_file, + xybbox, + polarization="HH", + frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, ): """Warp DEM to aligned grid and interpolate SET phase at valid pixels only.""" + product_ctx = _coerce_product_context(product_ctx, sar_band) interp_ctx = _prepare_radar_grid_interpolation( gunw_file, dem_file, @@ -1148,6 +1478,7 @@ def read_and_interpolate_SET( polarization, frequency, {"rdr_SET": "rdr_SET"}, + product_ctx=product_ctx, ) set_phase = interpolate_set( interp_ctx["x_2d"], @@ -1175,17 +1506,23 @@ def interpolate_set(X_2d, Y_2d, dem, rdr_coords, valid_mask): # --------------------------------------------------------------------- # MintPy file builders # --------------------------------------------------------------------- -def _get_date_pairs(filenames): +def _get_date_pairs( + filenames, + sar_band=DEFAULT_SAR_BAND, + product_ctx: Optional[NisarProductContext] = None, +): """Return reference_secondary date pairs in YYYYMMDD_YYYYMMDD format.""" date12_list = [] + product_ctx = _coerce_product_context(product_ctx, sar_band) + processinfo = _processinfo(product_ctx=product_ctx) for filename in filenames: with h5py.File(filename, "r") as ds: if ( - f"{IDENTIFICATION}/referenceZeroDopplerStartTime" in ds - and f"{IDENTIFICATION}/secondaryZeroDopplerStartTime" in ds + processinfo["ref_start_time"] in ds + and processinfo["sec_start_time"] in ds ): - ref_time = ds[f"{IDENTIFICATION}/referenceZeroDopplerStartTime"][()] - sec_time = ds[f"{IDENTIFICATION}/secondaryZeroDopplerStartTime"][()] + ref_time = ds[processinfo["ref_start_time"]][()] + sec_time = ds[processinfo["sec_start_time"]][()] ref_date = ref_time.decode("utf-8").split("T")[0].replace("-", "") sec_date = sec_time.decode("utf-8").split("T")[0].replace("-", "") date12_list.append(f"{ref_date}_{sec_date}") @@ -1214,16 +1551,24 @@ def prepare_geometry( externalMaskFile, polarization="HH", frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, commonMask=None, + product_ctx: Optional[NisarProductContext] = None, ): """Prepare the geometry file.""" print("-" * 50) print(f"preparing geometry file: {outfile}") meta = {key: value for key, value in metadata.items()} + product_ctx = _coerce_product_context(product_ctx, sar_band) geo_ds = read_subset( - metaFile, bbox, polarization=polarization, frequency=frequency, geometry=True + metaFile, + bbox, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, + geometry=True, ) dem_subset_array, slant_range, incidence_angle, azimuth_angle, mask = ( read_and_interpolate_geometry( @@ -1232,6 +1577,7 @@ def prepare_geometry( geo_ds["xybbox"], polarization=polarization, frequency=frequency, + product_ctx=product_ctx, external_mask_file=externalMaskFile, valid_mask=commonMask, ) @@ -1264,23 +1610,35 @@ def prepare_water_mask( externalMaskFile, polarization="HH", frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, commonMask=None, + product_ctx: Optional[NisarProductContext] = None, ): """Prepare a standalone MintPy waterMask.h5 from the GUNW mask.""" print("-" * 50) print(f"preparing water mask file: {outfile}") meta = {key: value for key, value in metadata.items()} + product_ctx = _coerce_product_context(product_ctx, sar_band) # get subset indices geo_ds = read_subset( - metaFile, bbox, polarization=polarization, frequency=frequency, geometry=True + metaFile, + bbox, + polarization=polarization, + frequency=frequency, + product_ctx=product_ctx, + geometry=True, ) xybbox = geo_ds["xybbox"] if commonMask is None: water_mask_bool = _read_is_land_and_valid_mask( - metaFile, xybbox, polarization, frequency + metaFile, + xybbox, + polarization, + frequency, + product_ctx=product_ctx, ) else: water_mask_bool = np.asarray(commonMask, dtype=np.bool_) @@ -1299,6 +1657,7 @@ def prepare_water_mask( externalMaskFile, polarization, frequency, + product_ctx=product_ctx, ) length, width = water_mask_bool.shape @@ -1321,8 +1680,10 @@ def prepare_stack( date12_list, polarization="HH", frequency="frequencyA", + sar_band=DEFAULT_SAR_BAND, stack_type=None, commonMask=None, + product_ctx: Optional[NisarProductContext] = None, ): """Prepare the input stacks.""" effective_stack_type = _resolve_stack_type(stack_type, outfile) @@ -1330,11 +1691,16 @@ def prepare_stack( print(f"preparing {effective_stack_type} stack file: {outfile}") meta = {key: value for key, value in metadata.items()} + product_ctx = _coerce_product_context(product_ctx, sar_band) num_pair = len(inp_files) print(f"number of inputs/unwrapped interferograms: {num_pair}") missing = _missing_required_paths( - inp_files, effective_stack_type, polarization, frequency + inp_files, + effective_stack_type, + polarization, + frequency, + product_ctx=product_ctx, ) if missing: first_file, first_path = missing[0] @@ -1381,7 +1747,13 @@ def prepare_stack( prog_bar = ptime.progressBar(maxValue=num_pair) for i, file in enumerate(inp_files): obs = _read_stack_observation( - file, effective_stack_type, bbox, demFile, polarization, frequency + file, + effective_stack_type, + bbox, + demFile, + polarization, + frequency, + product_ctx=product_ctx, ) obs = _apply_common_mask_to_observation(obs, commonMask) f["unwrapPhase"][i] = obs["unwrap_phase"]