From 70cdb34c6a453a489eff118216c1e9b6d5de5bdd Mon Sep 17 00:00:00 2001 From: Priit Pender Date: Wed, 3 Sep 2025 11:23:56 +0300 Subject: [PATCH 1/4] Fix issue with improper geolocation of IMP products. Fix antenna gain with IMS mode. --- asar_xarray/asar.py | 53 +++++++++++++++----- asar_xarray/envisat_direct.py | 91 ++++++++++++++++++++--------------- 2 files changed, 91 insertions(+), 53 deletions(-) diff --git a/asar_xarray/asar.py b/asar_xarray/asar.py index 92c3a9f..8f5828d 100644 --- a/asar_xarray/asar.py +++ b/asar_xarray/asar.py @@ -89,10 +89,16 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: range_sampling_rate = metadata["records"]["main_processing_params"]["range_samp_rate"] image_slant_range_time = metadata["direct_parse"]["slant_time_first"] * 1e-9 - if metadata["sph_descriptor"] == "Image Mode SLC Image": + range_pixel_spacing = metadata["records"]["main_processing_params"]["range_spacing"] + + product_str = metadata["sph_descriptor"] + + if product_str == "Image Mode SLC Image": product_type = "SLC" + elif product_str == "Image Mode Precision Image": + product_type = "GRD" else: - raise RuntimeError("Only Image mode SLC files(IMS) supported for now") + raise RuntimeError("Only Image mode files supported(IMS & IMP) supported for now") attrs = { "family_name": "Envisat", @@ -106,8 +112,8 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: "product_type": product_type, "start_time": product_first_line_utc_time, "stop_time": product_last_line_utc_time, - "range_pixel_spacing" : metadata["range_spacing"], - "azimuth_pixel_spacing" : metadata["azimuth_spacing"], + "range_pixel_spacing": metadata["range_spacing"], + "azimuth_pixel_spacing": metadata["azimuth_spacing"], "radar_frequency": metadata["records"]["main_processing_params"]["radar_freq"] / 1e9, "ascending_node_time": "", "azimuth_pixel_spacing": metadata["records"]["main_processing_params"]["azimuth_spacing"], @@ -116,7 +122,7 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: "azimuth_time_interval": azimuth_time_interval, "image_slant_range_time": image_slant_range_time, "range_sampling_rate": range_sampling_rate, - "incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360 , + "incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360, "metadata": metadata } @@ -124,7 +130,6 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: product_first_line_utc_time, product_last_line_utc_time, number_of_lines ) - swap_dims = {"line": "azimuth_time", "pixel": "slant_range_time"} coords: dict[str, Any] = { @@ -140,12 +145,34 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: ), } - slant_range_time = np.linspace( - image_slant_range_time, - image_slant_range_time + (number_of_samples - 1) / range_sampling_rate, - number_of_samples, - ) - coords["slant_range_time"] = ("pixel", slant_range_time) + if product_type == "SLC": + slant_range_time = np.linspace( + image_slant_range_time, + image_slant_range_time + (number_of_samples - 1) / range_sampling_rate, + number_of_samples, + ) + coords["slant_range_time"] = ("pixel", slant_range_time) + else: + + # generate slant range times from ground ranges for ground range products + # this is easier due to the Envisat only having GRSR metadata unlike Sentinel1 + # however xarray/Sarsen handle GRD conversion differently for S1 + ground_range = np.linspace( + 0, + range_pixel_spacing * (number_of_samples - 1), + number_of_samples, + ) + + # numpy polyval expects the polynomial top be highest ranked first + coeffs = list(reversed(metadata["direct_parse"]["grsr_coeffs"])) + + slant_ranges = np.polyval(coeffs, ground_range) + slant_ranges *= 2 + + c = 299792458 + slant_range_times = slant_ranges / c + + coords["slant_range_time"] = ("pixel", slant_range_times) data = xr.open_dataarray(filepath, engine='rasterio') data.encoding.clear() @@ -180,7 +207,7 @@ def get_chirp_parameters(dataset: gdal.Dataset) -> dict[str, Any]: new_key = key.replace('CHIRP_PARAMS_ADS_CHIRP_', '').lower() params['chirp'][new_key] = float(value) - #params['elev_corr_factor'] = float(metadata.get('CHIRP_PARAMS_ADS_ELEV_CORR_FACTOR')) + # params['elev_corr_factor'] = float(metadata.get('CHIRP_PARAMS_ADS_ELEV_CORR_FACTOR')) params['cal_pulse_info'] = get_chirp_cal_pulse_info(metadata) return params diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index 70894d3..cf72758 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -175,9 +175,10 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: r = struct.unpack(">ff5f", srgr_buf[13:41]) - + # Envisat file specification says it is SR/GR Conversion ADSR, + # however the polynomial is for GR to SR... srgr_coeffs = list(r[2:]) - metadata["srgr_coeffs"] = srgr_coeffs + metadata["grsr_coeffs"] = srgr_coeffs @@ -187,59 +188,69 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: # antenna gain c = 299792458 n_samp = gdal_metadata["line_length"] - R_first = c * slant_time_first * 1e-9 / 2 - range_spacing = c / (2 * gdal_metadata["records"]["main_processing_params"]["range_samp_rate"]) - range_ref = gdal_metadata["records"]["main_processing_params"]["range_ref"] - R_first = c * slant_time_first * 1e-9 / 2 - - osv = gdal_metadata["records"]["main_processing_params"]["orbit_state_vectors"] - sat_x = osv[0]["x_pos_1"] * 1e-2 - sat_y = osv[0]["y_pos_1"] * 1e-2 - sat_z = osv[0]["z_pos_1"] * 1e-2 gain_arr = [] - for n in range(n_samp): - # https://github.com/senbox-org/microwave-toolbox/blob/master/sar-op-calibration/src/main/java/eu/esa/sar/calibration/gpf/calibrators/ASARCalibrator.java - R = R_first + n * range_spacing - n_geogrid = len(lats) - geo_interp_idx = (n / n_samp) * (len(lats) - 1) - idx = int(geo_interp_idx) - fract = geo_interp_idx % 1 - #interpolate geogrid to match first line geogrid to first OSV - lat = lats[idx] + (lats[idx+1] - lats[idx]) * fract - lon = lons[idx] + (lons[idx+1] - lons[idx]) * fract + if gdal_metadata["sample_type"] == "DETECTED": + gain_arr = np.ones(n_samp) + spreading_loss = np.ones(n_samp) + else: + # antenna gain + c = 299792458 + R_first = c * slant_time_first * 1e-9 / 2 + range_spacing = c / (2 * gdal_metadata["records"]["main_processing_params"]["range_samp_rate"]) + range_ref = gdal_metadata["records"]["main_processing_params"]["range_ref"] + R_first = c * slant_time_first * 1e-9 / 2 + + osv = gdal_metadata["records"]["main_processing_params"]["orbit_state_vectors"] + + sat_x = osv[0]["x_pos_1"] * 1e-2 + sat_y = osv[0]["y_pos_1"] * 1e-2 + sat_z = osv[0]["z_pos_1"] * 1e-2 + + for n in range(n_samp): + # https://github.com/senbox-org/microwave-toolbox/blob/master/sar-op-calibration/src/main/java/eu/esa/sar/calibration/gpf/calibrators/ASARCalibrator.java + R = R_first + n * range_spacing + n_geogrid = len(lats) + geo_interp_idx = (n / n_samp) * (len(lats) - 1) + idx = int(geo_interp_idx) + fract = geo_interp_idx % 1 + #interpolate geogrid to match first line geogrid to first OSV + lat = lats[idx] + (lats[idx+1] - lats[idx]) * fract + lon = lons[idx] + (lons[idx+1] - lons[idx]) * fract + + sar_dis = math.sqrt(sat_x**2 + sat_y**2 + sat_z**2) + + distance = calc_distance(lat, lon) - sar_dis = math.sqrt(sat_x**2 + sat_y**2 + sat_z**2) + # elevation angle, cosine law from three sides, slant range, earth center to satellite, earth center to target + angle_cos = (R * R + sar_dis * sar_dis - distance * distance) / (2 * R * sar_dis) + elev_angle = np.rad2deg(math.acos(angle_cos)) - distance = calc_distance(lat, lon) + # find the gain from LUT, with 201 gains against reference elevation angle with 0.05 degree steps + elev_idx = int((elev_angle - metadata["antenna_ref_elev_angle"]) / 0.05) + elev_idx += 100 + gain = 1.0 + if elev_idx >= 0 and elev_idx <= len(antenna_gains): + # dB -> linear + gain = math.pow(10, antenna_gains[elev_idx] / 10) - # elevation angle, cosine law from three sides, slant range, earth center to satellite, earth center to target - angle_cos = (R * R + sar_dis * sar_dis - distance * distance) / (2 * R * sar_dis) - elev_angle = np.rad2deg(math.acos(angle_cos)) + gain_arr.append(1/gain) - # find the gain from LUT, with 201 gains against reference elevation angle with 0.05 degree steps - elev_idx = int((elev_angle - metadata["antenna_ref_elev_angle"]) / 0.05) - elev_idx += 100 - gain = 1.0 - if elev_idx >= 0 and elev_idx <= len(antenna_gains): - # dB -> linear - gain = math.pow(10, antenna_gains[elev_idx] / 10) - gain_arr.append(gain) + # calculate spreading loss compensation + spreading_loss = [] + for n in range(n_samp): + R = R_first + n * range_spacing + factor = math.sqrt((range_ref / R) ** 3) + spreading_loss.append(1/factor) - # calculate spreading loss compensation - spreading_loss = [] - for n in range(n_samp): - R = R_first + n * range_spacing - factor = math.sqrt((range_ref / R) ** 3) - spreading_loss.append(1/factor) cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] From 59b8b2708e3be8bb49d45309b7edc728e31697b1 Mon Sep 17 00:00:00 2001 From: Achaad Date: Thu, 4 Sep 2025 10:32:27 +0300 Subject: [PATCH 2/4] style: extract code into auxiliary functions --- asar_xarray/asar.py | 15 +- asar_xarray/envisat_direct.py | 321 ++++++++++++++++++++-------------- 2 files changed, 192 insertions(+), 144 deletions(-) diff --git a/asar_xarray/asar.py b/asar_xarray/asar.py index 8f5828d..26d5eeb 100644 --- a/asar_xarray/asar.py +++ b/asar_xarray/asar.py @@ -25,7 +25,7 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]: :param gdal_dataset: Dataset with metadata :return: Dictionary with metadata attributes """ - attributes: dict[str, Any] = dict() + attributes: dict[str, Any] = {} process_general_metadata(gdal_dataset, attributes) process_records_metadata(gdal_dataset, attributes) process_derived_subdatasets_metadata(gdal_dataset, attributes) @@ -113,7 +113,6 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: "start_time": product_first_line_utc_time, "stop_time": product_last_line_utc_time, "range_pixel_spacing": metadata["range_spacing"], - "azimuth_pixel_spacing": metadata["azimuth_spacing"], "radar_frequency": metadata["records"]["main_processing_params"]["radar_freq"] / 1e9, "ascending_node_time": "", "azimuth_pixel_spacing": metadata["records"]["main_processing_params"]["azimuth_spacing"], @@ -195,19 +194,17 @@ def get_chirp_parameters(dataset: gdal.Dataset) -> dict[str, Any]: :return: dictionary with chirp parameters. """ metadata = dataset.GetMetadata(domain='RECORDS') - params: dict[str, Any] = dict() + params: dict[str, Any] = { + 'zero_doppler_time': utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME')), + 'attach_flag': bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0'))), + 'beam_id': metadata.get('CHIRP_PARAMS_ADS_BEAM_ID'), 'polarisation': metadata.get('CHIRP_PARAMS_ADS_POLAR'), + 'chirp': {}} # Non-float values - params['zero_doppler_time'] = utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME')) - params['attach_flag'] = bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0'))) - params['beam_id'] = metadata.get('CHIRP_PARAMS_ADS_BEAM_ID') - params['polarisation'] = metadata.get('CHIRP_PARAMS_ADS_POLAR') - params['chirp'] = dict() for key, value in metadata.items(): if 'CHIRP_PARAMS_ADS_CHIRP' in key: new_key = key.replace('CHIRP_PARAMS_ADS_CHIRP_', '').lower() params['chirp'][new_key] = float(value) - # params['elev_corr_factor'] = float(metadata.get('CHIRP_PARAMS_ADS_ELEV_CORR_FACTOR')) params['cal_pulse_info'] = get_chirp_cal_pulse_info(metadata) return params diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index cf72758..a1f65c7 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -8,40 +8,59 @@ def parse_int(s: str) -> int: - """Parse an integer value from a string representation of a field.""" + """ + Parse an integer value from a string representation of a field. + + Parameters + ---------- + s String representation of the field, e.g., "FIELD_NAME==value". + + Returns + ------- + Integer value of the field. + """ s = s.replace("", "") s = s[s.index("=") + 1:] # print(s) return int(s) - def calc_distance(latitude, longitude, altitude=0.0): - """Lat/Lon to distance from earth center""" - DTOR = math.pi / 180.0 - RTOD = 180 / math.pi - A = 6378137.0 - B = 6356752.3142451794975639665996337 - FLAT_EARTH_COEF = 1.0 / ((A - B) / A) - E2 = 2.0 / FLAT_EARTH_COEF - 1.0 / (FLAT_EARTH_COEF * FLAT_EARTH_COEF) - EP2 = E2 / (1 - E2) - - lat = latitude * DTOR - lon = longitude * DTOR + """ + Calculate the distance from the Earth's center to a point specified by latitude, longitude, and altitude. + + Parameters + ---------- + latitude Latitude of the point. + longitude Longitude of the point. + altitude Altitude of the point above sea level (default is 0.0). + + Returns + ------- + Distance from the Earth's center to the specified point in meters. + """ + dtor = math.pi / 180.0 + a = 6378137.0 + b = 6356752.3142451794975639665996337 + flat_earth_coef = 1.0 / ((a - b) / a) + e2 = 2.0 / flat_earth_coef - 1.0 / (flat_earth_coef * flat_earth_coef) + + lat = latitude * dtor + lon = longitude * dtor sin_lat = math.sin(lat) cos_lat = math.cos(lat) - N = (A / math.sqrt(1.0 - E2 * sin_lat * sin_lat)) - NcosLat = (N + altitude) * cos_lat + n = (a / math.sqrt(1.0 - e2 * sin_lat * sin_lat)) + ncos_lat = (n + altitude) * cos_lat sin_lon = math.sin(lon) cos_lon = math.cos(lon) - x_pos = NcosLat * cos_lon - y_pos = NcosLat * sin_lon - z_pos = (N + altitude - E2 * N) * sin_lat - return math.sqrt(x_pos**2 + y_pos**2 + z_pos**2) + x_pos = ncos_lat * cos_lon + y_pos = ncos_lat * sin_lon + z_pos = (n + altitude - e2 * n) * sin_lat + return math.sqrt(x_pos ** 2 + y_pos ** 2 + z_pos ** 2) class EnvisatADS: @@ -88,108 +107,20 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: dsd_num = 18 dsd_buf = sph_buf[sph_size - dsd_size * dsd_num:] + antenna_gains = () + slant_time_first = [] + for i in range(dsd_num): ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) - #print(ads.name) - if ads.name == "GEOLOCATION GRID ADS": - rec_size = 521 - assert ((ads.size // ads.num) == rec_size) - geoloc_buf = file_buffer[ads.offset:ads.offset + ads.size] - middle_idx = ads.num // 2 - geoloc_record = geoloc_buf[middle_idx * rec_size: (middle_idx + 1) * rec_size] - # Geolocation Grid ADSRs header - header_size = 12 + 1 + 4 + 4 + 4 - - lats_buffer = geoloc_buf[header_size + 11 * 4 * 3 : header_size + 11 * 4 * 4] - lons_buffer = geoloc_buf[header_size + 11 * 4 * 4: header_size + 11 * 4 * 5] - - - lats = list(struct.unpack(">11i", lats_buffer)) - lons = list(struct.unpack(">11i", lons_buffer)) - lats = [e * 1e-6 for e in lats] - lons = [e * 1e-6 for e in lons] - - - # tiepoints, 11 of big endian floats for each of the following: - # samp numbers, slant range times, angles, lats, longs - block_size = 11 * 4 - slant_time_offset = header_size + 1 * block_size - incidence_angle_offset = header_size + 2 * block_size - # adjust to middle of 11 - incidence_angle_offset += 5 * 4 - - slant_time_first = struct.unpack(">f", geoloc_record[slant_time_offset:slant_time_offset + 4])[0] - incidence_angle_middle = \ - struct.unpack(">f", geoloc_record[incidence_angle_offset:incidence_angle_offset + 4])[0] - - - - metadata["slant_time_first"] = slant_time_first - metadata["incidence_angle_center"] = incidence_angle_middle - - - - ext_cal_buf = None - if ads.name == "EXTERNAL CALIBRATION": - - auxfolder = pathlib.Path(os.path.abspath(__file__)).parent - prod_name = gdal_metadata["product"] - auxfolder /= "auxiliary" - if ".N1" in prod_name: - auxfolder /= "ASAR_Auxiliary_Files" - auxfolder /= "ASA_XCA_AX" - elif ".E1" in prod_name: - auxfolder /= "ERS1" - elif ".E2" in prod_name: - auxfolder /= "ERS2" - - for p in os.scandir(auxfolder): - if ads.filename == p.name: - with open(p.path, "rb") as fp: - ext_cal_buf = fp.read() - pol = gdal_metadata["mds1_tx_rx_polar"] - swath = gdal_metadata["swath"] - swath_offset = ord(swath[2]) - ord("1") - pol_offset = {"H/H" : 0, "V/V" :1, "H/V" : 2, "V/H":3}[pol] - - # Envisat_Product_Spec_Vol8.pdf - # 8.6.2 External Calibration Data - offset = 1247 + 378 - - offset += 12 - offset += 4 - offset += 26 * 28 + 4 * 4 - - mid_angles = struct.unpack(">7f", ext_cal_buf[offset:offset+4*7]) - - offset += 8 * 4 - - offset += 804 * 4 * swath_offset - offset += 201 * 4 * pol_offset - - antenna_gains = struct.unpack(">201f", ext_cal_buf[offset:offset + 4 * 201]) - metadata["antenna_ref_elev_angle"] = mid_angles[swath_offset] - - if ads.name == "SR GR ADS" and ads.size > 0: - srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] - - r = struct.unpack(">ff5f", srgr_buf[13:41]) - - # Envisat file specification says it is SR/GR Conversion ADSR, - # however the polynomial is for GR to SR... - srgr_coeffs = list(r[2:]) - metadata["grsr_coeffs"] = srgr_coeffs + lats, lons, slant_time_first = process_geolocation_grid_ads(ads, file_buffer, metadata) + antenna_gains = process_cal_ads(ads, gdal_metadata, metadata) - - - + process_sr_gr_ads(ads, file_buffer, metadata) # antenna gain - c = 299792458 n_samp = gdal_metadata["line_length"] - gain_arr = [] if gdal_metadata["sample_type"] == "DETECTED": gain_arr = np.ones(n_samp) @@ -197,10 +128,9 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: else: # antenna gain c = 299792458 - R_first = c * slant_time_first * 1e-9 / 2 range_spacing = c / (2 * gdal_metadata["records"]["main_processing_params"]["range_samp_rate"]) range_ref = gdal_metadata["records"]["main_processing_params"]["range_ref"] - R_first = c * slant_time_first * 1e-9 / 2 + r_first = c * slant_time_first * 1e-9 / 2 osv = gdal_metadata["records"]["main_processing_params"]["orbit_state_vectors"] @@ -210,52 +140,173 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: for n in range(n_samp): # https://github.com/senbox-org/microwave-toolbox/blob/master/sar-op-calibration/src/main/java/eu/esa/sar/calibration/gpf/calibrators/ASARCalibrator.java - R = R_first + n * range_spacing - n_geogrid = len(lats) + r = r_first + n * range_spacing geo_interp_idx = (n / n_samp) * (len(lats) - 1) idx = int(geo_interp_idx) fract = geo_interp_idx % 1 - #interpolate geogrid to match first line geogrid to first OSV - lat = lats[idx] + (lats[idx+1] - lats[idx]) * fract - lon = lons[idx] + (lons[idx+1] - lons[idx]) * fract + # interpolate geogrid to match first line geogrid to first OSV + lat = lats[idx] + (lats[idx + 1] - lats[idx]) * fract + lon = lons[idx] + (lons[idx + 1] - lons[idx]) * fract - sar_dis = math.sqrt(sat_x**2 + sat_y**2 + sat_z**2) + sar_dis = math.sqrt(sat_x ** 2 + sat_y ** 2 + sat_z ** 2) distance = calc_distance(lat, lon) - # elevation angle, cosine law from three sides, slant range, earth center to satellite, earth center to target - angle_cos = (R * R + sar_dis * sar_dis - distance * distance) / (2 * R * sar_dis) + # elevation angle, cosine law from three sides, slant range, + # earth center to satellite, earth center to target + angle_cos = (r * r + sar_dis * sar_dis - distance * distance) / (2 * r * sar_dis) elev_angle = np.rad2deg(math.acos(angle_cos)) # find the gain from LUT, with 201 gains against reference elevation angle with 0.05 degree steps elev_idx = int((elev_angle - metadata["antenna_ref_elev_angle"]) / 0.05) elev_idx += 100 gain = 1.0 - if elev_idx >= 0 and elev_idx <= len(antenna_gains): + if 0 <= elev_idx <= len(antenna_gains): # dB -> linear gain = math.pow(10, antenna_gains[elev_idx] / 10) - gain_arr.append(1/gain) + gain_arr.append(1 / gain) + # calculate spreading loss compensation + spreading_loss = [] + for n in range(n_samp): + r = r_first + n * range_spacing + factor = math.sqrt((range_ref / r) ** 3) + spreading_loss.append(1 / factor) + cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] + metadata["cal_factor"] = cal_factor + metadata["cal_vector"] = np.array(spreading_loss) * np.array(gain_arr) + return metadata - # calculate spreading loss compensation +def process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> tuple[ + Any, list[float | Any], list[float | Any]]: + """ + Process the Geolocation Grid ADS to extract latitude, longitude, slant time, and incidence angle information. - spreading_loss = [] - for n in range(n_samp): - R = R_first + n * range_spacing - factor = math.sqrt((range_ref / R) ** 3) - spreading_loss.append(1/factor) + Parameters + ---------- + ads EnvisatADS + file_buffer File buffer + metadata Metadata dictionary to populate + Returns + ------- + Returns a tuple containing lists of latitudes, longitudes, and the first slant time. + """ + if ads.name == "GEOLOCATION GRID ADS": + rec_size = 521 + assert ((ads.size // ads.num) == rec_size) + geoloc_buf = file_buffer[ads.offset:ads.offset + ads.size] + middle_idx = ads.num // 2 + geoloc_record = geoloc_buf[middle_idx * rec_size: (middle_idx + 1) * rec_size] + # Geolocation Grid ADSRs header + header_size = 12 + 1 + 4 + 4 + 4 + + lats_buffer = geoloc_buf[header_size + 11 * 4 * 3: header_size + 11 * 4 * 4] + lons_buffer = geoloc_buf[header_size + 11 * 4 * 4: header_size + 11 * 4 * 5] + + lats = list(struct.unpack(">11i", lats_buffer)) + lons = list(struct.unpack(">11i", lons_buffer)) + lats = [e * 1e-6 for e in lats] + lons = [e * 1e-6 for e in lons] + + # tiepoints, 11 of big endian floats for each of the following: + # samp numbers, slant range times, angles, lats, longs + block_size = 11 * 4 + slant_time_offset = header_size + 1 * block_size + incidence_angle_offset = header_size + 2 * block_size + # adjust to middle of 11 + incidence_angle_offset += 5 * 4 + + slant_time_first = struct.unpack(">f", geoloc_record[slant_time_offset:slant_time_offset + 4])[0] + incidence_angle_middle = \ + struct.unpack(">f", geoloc_record[incidence_angle_offset:incidence_angle_offset + 4])[0] + + metadata["slant_time_first"] = slant_time_first + metadata["incidence_angle_center"] = incidence_angle_middle + return lats, lons, slant_time_first + + +def process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) -> tuple[Any, ...]: + """ + Process the EXTERNAL CALIBRATION ADS to extract antenna gain information. + Parameters + ---------- + ads EnvisatADS + gdal_metadata Metadata extracted using GDAL + metadata Metadata dictionary to populate - cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] + Returns + ------- + Tuple containing antenna gains. + """ + if ads.name == "EXTERNAL CALIBRATION": - metadata["cal_factor"] = cal_factor - metadata["cal_vector"] = np.array(spreading_loss) * np.array(gain_arr) + aux_folder = pathlib.Path(os.path.abspath(__file__)).parent + prod_name = gdal_metadata["product"] + aux_folder /= "auxiliary" + if ".N1" in prod_name: + aux_folder /= "ASAR_Auxiliary_Files" + aux_folder /= "ASA_XCA_AX" + elif ".E1" in prod_name: + aux_folder /= "ERS1" + elif ".E2" in prod_name: + aux_folder /= "ERS2" + for p in os.scandir(aux_folder): + if ads.filename == p.name: + with open(p.path, "rb") as fp: + ext_cal_buf = fp.read() + pol = gdal_metadata["mds1_tx_rx_polar"] + swath = gdal_metadata["swath"] + swath_offset = ord(swath[2]) - ord("1") + pol_offset = {"H/H": 0, "V/V": 1, "H/V": 2, "V/H": 3}[pol] - return metadata + # Envisat_Product_Spec_Vol8.pdf + # 8.6.2 External Calibration Data + offset = 1247 + 378 + + offset += 12 + offset += 4 + offset += 26 * 28 + 4 * 4 + + mid_angles = struct.unpack(">7f", ext_cal_buf[offset:offset + 4 * 7]) + + offset += 8 * 4 + + offset += 804 * 4 * swath_offset + offset += 201 * 4 * pol_offset + + antenna_gains = struct.unpack(">201f", ext_cal_buf[offset:offset + 4 * 201]) + metadata["antenna_ref_elev_angle"] = mid_angles[swath_offset] + return antenna_gains + + +def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]): + """ + Extract SR/GR conversion coefficients from the SR GR ADS. + + Parameters + ---------- + ads EnvisatADS + file_buffer File buffer + metadata Metadata dictionary to populate + + Returns + ------- + None + """ + if ads.name == "SR GR ADS" and ads.size > 0: + srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] + + r = struct.unpack(">ff5f", srgr_buf[13:41]) + + # Envisat file specification says it is SR/GR Conversion ADSR, + # however the polynomial is for GR to SR... + srgr_coeffs = list(r[2:]) + metadata["grsr_coeffs"] = srgr_coeffs From 6820ba39df907025dbefa05b3532f740552bb32f Mon Sep 17 00:00:00 2001 From: Achaad Date: Thu, 4 Sep 2025 11:18:32 +0300 Subject: [PATCH 3/4] style: extract code into auxiliary functions --- asar_xarray/envisat_direct.py | 62 ++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index a1f65c7..27b6bc5 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -25,7 +25,7 @@ def parse_int(s: str) -> int: return int(s) -def calc_distance(latitude, longitude, altitude=0.0): +def __calc_distance(latitude, longitude, altitude=0.0): """ Calculate the distance from the Earth's center to a point specified by latitude, longitude, and altitude. @@ -107,16 +107,8 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: dsd_num = 18 dsd_buf = sph_buf[sph_size - dsd_size * dsd_num:] - antenna_gains = () - slant_time_first = [] - - for i in range(dsd_num): - ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) - lats, lons, slant_time_first = process_geolocation_grid_ads(ads, file_buffer, metadata) - - antenna_gains = process_cal_ads(ads, gdal_metadata, metadata) - - process_sr_gr_ads(ads, file_buffer, metadata) + antenna_gains, lats, lons, slant_time_first = __process_ads(dsd_buf, dsd_num, dsd_size, file_buffer, + gdal_metadata, metadata) # antenna gain n_samp = gdal_metadata["line_length"] @@ -150,7 +142,7 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: sar_dis = math.sqrt(sat_x ** 2 + sat_y ** 2 + sat_z ** 2) - distance = calc_distance(lat, lon) + distance = __calc_distance(lat, lon) # elevation angle, cosine law from three sides, slant range, # earth center to satellite, earth center to target @@ -182,8 +174,44 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: return metadata -def process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> tuple[ - Any, list[float | Any], list[float | Any]]: +def __process_ads(dsd_buf: bytes, dsd_num: int, dsd_size: int, file_buffer: bytes, + gdal_metadata, metadata: dict[Any, Any]) -> tuple[float, Any, list[float | Any], tuple[Any, ...]]: + """ + Process the Annotation Data Sets (ADS) in the Envisat product file. + + Parameters + ---------- + dsd_buf Buffer containing the ADS descriptors + dsd_num Number of ADS descriptors + dsd_size Size of each ADS descriptor + file_buffer File buffer + gdal_metadata Metadata extracted using GDAL + metadata Metadata dictionary to populate + + Returns + ------- + Tuple containing antenna gains, latitudes, longitudes, and the first slant time. + """ + lats = [] + lons = [] + slant_time_first = 0.0 + antenna_gains = () + for i in range(dsd_num): + ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) + lats_new, lons_new, slant_time_first_new = __process_geolocation_grid_ads(ads, file_buffer, metadata) + lats = lats_new if lats_new else lats + lons = lons_new if lons_new else lons + slant_time_first = slant_time_first_new if slant_time_first_new else slant_time_first + + antenna_gains_new = __process_cal_ads(ads, gdal_metadata, metadata) + antenna_gains = antenna_gains_new if antenna_gains_new else antenna_gains + + process_sr_gr_ads(ads, file_buffer, metadata) + return antenna_gains, lats, lons, slant_time_first + + +def __process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> tuple[ + Any, list[float | Any], float]: """ Process the Geolocation Grid ADS to extract latitude, longitude, slant time, and incidence angle information. @@ -228,10 +256,11 @@ def process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata: metadata["slant_time_first"] = slant_time_first metadata["incidence_angle_center"] = incidence_angle_middle - return lats, lons, slant_time_first + return lats, lons, slant_time_first + return [], [], 0.0 -def process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) -> tuple[Any, ...]: +def __process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) -> tuple[Any, ...]: """ Process the EXTERNAL CALIBRATION ADS to extract antenna gain information. @@ -245,6 +274,7 @@ def process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) -> ------- Tuple containing antenna gains. """ + antenna_gains = () if ads.name == "EXTERNAL CALIBRATION": aux_folder = pathlib.Path(os.path.abspath(__file__)).parent From 7af7c3aefde089f5fa92320419e6d51590928532 Mon Sep 17 00:00:00 2001 From: Achaad Date: Thu, 4 Sep 2025 11:59:24 +0300 Subject: [PATCH 4/4] style: added missing type hints --- asar_xarray/asar.py | 17 +++++++++-------- asar_xarray/envisat_direct.py | 30 +++++++++++++++--------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/asar_xarray/asar.py b/asar_xarray/asar.py index 26d5eeb..c48b39f 100644 --- a/asar_xarray/asar.py +++ b/asar_xarray/asar.py @@ -35,35 +35,36 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]: return attributes -def open_asar_dataset(filepath: str | os.PathLike[Any] | ReadBuffer[Any] | AbstractDataStore) -> xr.Dataset: +def open_asar_dataset(filename_or_obj: str | os.PathLike[Any] | ReadBuffer[ + Any] | bytes | memoryview | AbstractDataStore) -> xr.Dataset: """ Open an ASAR dataset and converts it into an xarray Dataset. This function reads the metadata and pixel data from the given ASAR dataset file, processes the metadata into attributes, and constructs an xarray Dataset. - :param filepath: The path to the ASAR dataset file. It can be a string, + :param filename_or_obj: The path to the ASAR dataset file. It can be a string, a PathLike object, a ReadBuffer, or an AbstractDataStore. :return: An xarray Dataset containing the pixel data and metadata attributes. :raises NotImplementedError: If the filepath is not a string. """ - if not isinstance(filepath, str): - raise NotImplementedError(f'Filepath type {type(filepath)} is not supported') + if not isinstance(filename_or_obj, str): + raise NotImplementedError(f'Filepath type {type(filename_or_obj)} is not supported') - logger.debug('Opening ASAR dataset {}', filepath) + logger.debug('Opening ASAR dataset {}', filename_or_obj) # Open the dataset using a custom reader - gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filepath) + gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filename_or_obj) # Extract metadata attributes metadata = get_metadata(gdal_dataset) # Duplicate, read directly from file, as gdal does not parse some necessary metadata - metadata["direct_parse"] = envisat_direct.parse_direct(filepath, metadata) + metadata["direct_parse"] = envisat_direct.parse_direct(filename_or_obj, metadata) # Create an xarray Dataset with pixel data and metadata attributes - dataset: xr.Dataset = create_dataset(metadata, filepath) + dataset: xr.Dataset = create_dataset(metadata, filename_or_obj) return dataset diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index 27b6bc5..35cd24f 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -25,7 +25,7 @@ def parse_int(s: str) -> int: return int(s) -def __calc_distance(latitude, longitude, altitude=0.0): +def __calc_distance(latitude: float, longitude: float, altitude: float = 0.0) -> float: """ Calculate the distance from the Earth's center to a point specified by latitude, longitude, and altitude. @@ -81,7 +81,7 @@ def __str__(self) -> str: return "Envisat ADS: \"{}\" {} {} {}".format(self.name, self.offset, self.size, self.num) -def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: +def parse_direct(path: str, gdal_metadata: dict[str, Any]) -> dict[str, Any]: """ Parse an Envisat product file and extract relevant metadata fields. @@ -91,7 +91,7 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: returns: Dictionary containing extracted metadata fields. """ - metadata = {} + metadata: dict[str, Any] = {} file_buffer = None with open(path, "rb") as fp: file_buffer = fp.read() @@ -112,8 +112,8 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: # antenna gain n_samp = gdal_metadata["line_length"] - - gain_arr = [] + spreading_loss: np.ndarray[Any] = np.array([]) + gain_arr: np.ndarray[Any] = np.array([]) if gdal_metadata["sample_type"] == "DETECTED": gain_arr = np.ones(n_samp) spreading_loss = np.ones(n_samp) @@ -157,14 +157,13 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: # dB -> linear gain = math.pow(10, antenna_gains[elev_idx] / 10) - gain_arr.append(1 / gain) + np.append(gain_arr, 1 / gain) # calculate spreading loss compensation - spreading_loss = [] for n in range(n_samp): r = r_first + n * range_spacing factor = math.sqrt((range_ref / r) ** 3) - spreading_loss.append(1 / factor) + np.append(spreading_loss, 1 / factor) cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] @@ -175,7 +174,8 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: def __process_ads(dsd_buf: bytes, dsd_num: int, dsd_size: int, file_buffer: bytes, - gdal_metadata, metadata: dict[Any, Any]) -> tuple[float, Any, list[float | Any], tuple[Any, ...]]: + gdal_metadata: dict[str, Any], metadata: dict[Any, Any]) -> tuple[ + tuple[float, ...], Any, list[float | Any], float]: """ Process the Annotation Data Sets (ADS) in the Envisat product file. @@ -192,10 +192,10 @@ def __process_ads(dsd_buf: bytes, dsd_num: int, dsd_size: int, file_buffer: byte ------- Tuple containing antenna gains, latitudes, longitudes, and the first slant time. """ - lats = [] - lons = [] + lats: list[float] = [] + lons: list[float] = [] slant_time_first = 0.0 - antenna_gains = () + antenna_gains: tuple[float, ...] = (0.0,) for i in range(dsd_num): ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) lats_new, lons_new, slant_time_first_new = __process_geolocation_grid_ads(ads, file_buffer, metadata) @@ -260,7 +260,7 @@ def __process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata return [], [], 0.0 -def __process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) -> tuple[Any, ...]: +def __process_cal_ads(ads: EnvisatADS, gdal_metadata: dict[str, Any], metadata: dict[Any, Any]) -> tuple[float, ...]: """ Process the EXTERNAL CALIBRATION ADS to extract antenna gain information. @@ -317,7 +317,7 @@ def __process_cal_ads(ads: EnvisatADS, gdal_metadata, metadata: dict[Any, Any]) return antenna_gains -def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]): +def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> None: """ Extract SR/GR conversion coefficients from the SR GR ADS. @@ -325,7 +325,7 @@ def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, A ---------- ads EnvisatADS file_buffer File buffer - metadata Metadata dictionary to populate + metadata Metadata dictionary to populatex Returns -------