diff --git a/asar_xarray/asar.py b/asar_xarray/asar.py index 92c3a9f..c48b39f 100644 --- a/asar_xarray/asar.py +++ b/asar_xarray/asar.py @@ -25,7 +25,7 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]: :param gdal_dataset: Dataset with metadata :return: Dictionary with metadata attributes """ - attributes: dict[str, Any] = dict() + attributes: dict[str, Any] = {} process_general_metadata(gdal_dataset, attributes) process_records_metadata(gdal_dataset, attributes) process_derived_subdatasets_metadata(gdal_dataset, attributes) @@ -35,35 +35,36 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]: return attributes -def open_asar_dataset(filepath: str | os.PathLike[Any] | ReadBuffer[Any] | AbstractDataStore) -> xr.Dataset: +def open_asar_dataset(filename_or_obj: str | os.PathLike[Any] | ReadBuffer[ + Any] | bytes | memoryview | AbstractDataStore) -> xr.Dataset: """ Open an ASAR dataset and converts it into an xarray Dataset. This function reads the metadata and pixel data from the given ASAR dataset file, processes the metadata into attributes, and constructs an xarray Dataset. - :param filepath: The path to the ASAR dataset file. It can be a string, + :param filename_or_obj: The path to the ASAR dataset file. It can be a string, a PathLike object, a ReadBuffer, or an AbstractDataStore. :return: An xarray Dataset containing the pixel data and metadata attributes. :raises NotImplementedError: If the filepath is not a string. """ - if not isinstance(filepath, str): - raise NotImplementedError(f'Filepath type {type(filepath)} is not supported') + if not isinstance(filename_or_obj, str): + raise NotImplementedError(f'Filepath type {type(filename_or_obj)} is not supported') - logger.debug('Opening ASAR dataset {}', filepath) + logger.debug('Opening ASAR dataset {}', filename_or_obj) # Open the dataset using a custom reader - gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filepath) + gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filename_or_obj) # Extract metadata attributes metadata = get_metadata(gdal_dataset) # Duplicate, read directly from file, as gdal does not parse some necessary metadata - metadata["direct_parse"] = envisat_direct.parse_direct(filepath, metadata) + metadata["direct_parse"] = envisat_direct.parse_direct(filename_or_obj, metadata) # Create an xarray Dataset with pixel data and metadata attributes - dataset: xr.Dataset = create_dataset(metadata, filepath) + dataset: xr.Dataset = create_dataset(metadata, filename_or_obj) return dataset @@ -89,10 +90,16 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: range_sampling_rate = metadata["records"]["main_processing_params"]["range_samp_rate"] image_slant_range_time = metadata["direct_parse"]["slant_time_first"] * 1e-9 - if metadata["sph_descriptor"] == "Image Mode SLC Image": + range_pixel_spacing = metadata["records"]["main_processing_params"]["range_spacing"] + + product_str = metadata["sph_descriptor"] + + if product_str == "Image Mode SLC Image": product_type = "SLC" + elif product_str == "Image Mode Precision Image": + product_type = "GRD" else: - raise RuntimeError("Only Image mode SLC files(IMS) supported for now") + raise RuntimeError("Only Image mode files supported(IMS & IMP) supported for now") attrs = { "family_name": "Envisat", @@ -106,8 +113,7 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: "product_type": product_type, "start_time": product_first_line_utc_time, "stop_time": product_last_line_utc_time, - "range_pixel_spacing" : metadata["range_spacing"], - "azimuth_pixel_spacing" : metadata["azimuth_spacing"], + "range_pixel_spacing": metadata["range_spacing"], "radar_frequency": metadata["records"]["main_processing_params"]["radar_freq"] / 1e9, "ascending_node_time": "", "azimuth_pixel_spacing": metadata["records"]["main_processing_params"]["azimuth_spacing"], @@ -116,7 +122,7 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: "azimuth_time_interval": azimuth_time_interval, "image_slant_range_time": image_slant_range_time, "range_sampling_rate": range_sampling_rate, - "incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360 , + "incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360, "metadata": metadata } @@ -124,7 +130,6 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: product_first_line_utc_time, product_last_line_utc_time, number_of_lines ) - swap_dims = {"line": "azimuth_time", "pixel": "slant_range_time"} coords: dict[str, Any] = { @@ -140,12 +145,34 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset: ), } - slant_range_time = np.linspace( - image_slant_range_time, - image_slant_range_time + (number_of_samples - 1) / range_sampling_rate, - number_of_samples, - ) - coords["slant_range_time"] = ("pixel", slant_range_time) + if product_type == "SLC": + slant_range_time = np.linspace( + image_slant_range_time, + image_slant_range_time + (number_of_samples - 1) / range_sampling_rate, + number_of_samples, + ) + coords["slant_range_time"] = ("pixel", slant_range_time) + else: + + # generate slant range times from ground ranges for ground range products + # this is easier due to the Envisat only having GRSR metadata unlike Sentinel1 + # however xarray/Sarsen handle GRD conversion differently for S1 + ground_range = np.linspace( + 0, + range_pixel_spacing * (number_of_samples - 1), + number_of_samples, + ) + + # numpy polyval expects the polynomial top be highest ranked first + coeffs = list(reversed(metadata["direct_parse"]["grsr_coeffs"])) + + slant_ranges = np.polyval(coeffs, ground_range) + slant_ranges *= 2 + + c = 299792458 + slant_range_times = slant_ranges / c + + coords["slant_range_time"] = ("pixel", slant_range_times) data = xr.open_dataarray(filepath, engine='rasterio') data.encoding.clear() @@ -168,19 +195,17 @@ def get_chirp_parameters(dataset: gdal.Dataset) -> dict[str, Any]: :return: dictionary with chirp parameters. """ metadata = dataset.GetMetadata(domain='RECORDS') - params: dict[str, Any] = dict() + params: dict[str, Any] = { + 'zero_doppler_time': utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME')), + 'attach_flag': bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0'))), + 'beam_id': metadata.get('CHIRP_PARAMS_ADS_BEAM_ID'), 'polarisation': metadata.get('CHIRP_PARAMS_ADS_POLAR'), + 'chirp': {}} # Non-float values - params['zero_doppler_time'] = utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME')) - params['attach_flag'] = bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0'))) - params['beam_id'] = metadata.get('CHIRP_PARAMS_ADS_BEAM_ID') - params['polarisation'] = metadata.get('CHIRP_PARAMS_ADS_POLAR') - params['chirp'] = dict() for key, value in metadata.items(): if 'CHIRP_PARAMS_ADS_CHIRP' in key: new_key = key.replace('CHIRP_PARAMS_ADS_CHIRP_', '').lower() params['chirp'][new_key] = float(value) - #params['elev_corr_factor'] = float(metadata.get('CHIRP_PARAMS_ADS_ELEV_CORR_FACTOR')) params['cal_pulse_info'] = get_chirp_cal_pulse_info(metadata) return params diff --git a/asar_xarray/envisat_direct.py b/asar_xarray/envisat_direct.py index 70894d3..35cd24f 100644 --- a/asar_xarray/envisat_direct.py +++ b/asar_xarray/envisat_direct.py @@ -8,40 +8,59 @@ def parse_int(s: str) -> int: - """Parse an integer value from a string representation of a field.""" + """ + Parse an integer value from a string representation of a field. + + Parameters + ---------- + s String representation of the field, e.g., "FIELD_NAME==value". + + Returns + ------- + Integer value of the field. + """ s = s.replace("", "") s = s[s.index("=") + 1:] # print(s) return int(s) +def __calc_distance(latitude: float, longitude: float, altitude: float = 0.0) -> float: + """ + Calculate the distance from the Earth's center to a point specified by latitude, longitude, and altitude. + + Parameters + ---------- + latitude Latitude of the point. + longitude Longitude of the point. + altitude Altitude of the point above sea level (default is 0.0). -def calc_distance(latitude, longitude, altitude=0.0): - """Lat/Lon to distance from earth center""" - DTOR = math.pi / 180.0 - RTOD = 180 / math.pi - A = 6378137.0 - B = 6356752.3142451794975639665996337 - FLAT_EARTH_COEF = 1.0 / ((A - B) / A) - E2 = 2.0 / FLAT_EARTH_COEF - 1.0 / (FLAT_EARTH_COEF * FLAT_EARTH_COEF) - EP2 = E2 / (1 - E2) + Returns + ------- + Distance from the Earth's center to the specified point in meters. + """ + dtor = math.pi / 180.0 + a = 6378137.0 + b = 6356752.3142451794975639665996337 + flat_earth_coef = 1.0 / ((a - b) / a) + e2 = 2.0 / flat_earth_coef - 1.0 / (flat_earth_coef * flat_earth_coef) - lat = latitude * DTOR - lon = longitude * DTOR + lat = latitude * dtor + lon = longitude * dtor sin_lat = math.sin(lat) cos_lat = math.cos(lat) - N = (A / math.sqrt(1.0 - E2 * sin_lat * sin_lat)) - NcosLat = (N + altitude) * cos_lat + n = (a / math.sqrt(1.0 - e2 * sin_lat * sin_lat)) + ncos_lat = (n + altitude) * cos_lat sin_lon = math.sin(lon) cos_lon = math.cos(lon) - x_pos = NcosLat * cos_lon - y_pos = NcosLat * sin_lon - z_pos = (N + altitude - E2 * N) * sin_lat - return math.sqrt(x_pos**2 + y_pos**2 + z_pos**2) + x_pos = ncos_lat * cos_lon + y_pos = ncos_lat * sin_lon + z_pos = (n + altitude - e2 * n) * sin_lat + return math.sqrt(x_pos ** 2 + y_pos ** 2 + z_pos ** 2) class EnvisatADS: @@ -62,7 +81,7 @@ def __str__(self) -> str: return "Envisat ADS: \"{}\" {} {} {}".format(self.name, self.offset, self.size, self.num) -def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: +def parse_direct(path: str, gdal_metadata: dict[str, Any]) -> dict[str, Any]: """ Parse an Envisat product file and extract relevant metadata fields. @@ -72,7 +91,7 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: returns: Dictionary containing extracted metadata fields. """ - metadata = {} + metadata: dict[str, Any] = {} file_buffer = None with open(path, "rb") as fp: file_buffer = fp.read() @@ -88,163 +107,236 @@ def parse_direct(path: str, gdal_metadata) -> dict[str, Any]: dsd_num = 18 dsd_buf = sph_buf[sph_size - dsd_size * dsd_num:] - for i in range(dsd_num): - ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) - #print(ads.name) - if ads.name == "GEOLOCATION GRID ADS": - rec_size = 521 - assert ((ads.size // ads.num) == rec_size) - geoloc_buf = file_buffer[ads.offset:ads.offset + ads.size] - middle_idx = ads.num // 2 - geoloc_record = geoloc_buf[middle_idx * rec_size: (middle_idx + 1) * rec_size] - # Geolocation Grid ADSRs header - header_size = 12 + 1 + 4 + 4 + 4 - - lats_buffer = geoloc_buf[header_size + 11 * 4 * 3 : header_size + 11 * 4 * 4] - lons_buffer = geoloc_buf[header_size + 11 * 4 * 4: header_size + 11 * 4 * 5] - - - lats = list(struct.unpack(">11i", lats_buffer)) - lons = list(struct.unpack(">11i", lons_buffer)) - lats = [e * 1e-6 for e in lats] - lons = [e * 1e-6 for e in lons] - - - # tiepoints, 11 of big endian floats for each of the following: - # samp numbers, slant range times, angles, lats, longs - block_size = 11 * 4 - slant_time_offset = header_size + 1 * block_size - incidence_angle_offset = header_size + 2 * block_size - # adjust to middle of 11 - incidence_angle_offset += 5 * 4 - - slant_time_first = struct.unpack(">f", geoloc_record[slant_time_offset:slant_time_offset + 4])[0] - incidence_angle_middle = \ - struct.unpack(">f", geoloc_record[incidence_angle_offset:incidence_angle_offset + 4])[0] - - - - metadata["slant_time_first"] = slant_time_first - metadata["incidence_angle_center"] = incidence_angle_middle - - - - ext_cal_buf = None - if ads.name == "EXTERNAL CALIBRATION": - - auxfolder = pathlib.Path(os.path.abspath(__file__)).parent - prod_name = gdal_metadata["product"] - auxfolder /= "auxiliary" - if ".N1" in prod_name: - auxfolder /= "ASAR_Auxiliary_Files" - auxfolder /= "ASA_XCA_AX" - elif ".E1" in prod_name: - auxfolder /= "ERS1" - elif ".E2" in prod_name: - auxfolder /= "ERS2" - - for p in os.scandir(auxfolder): - if ads.filename == p.name: - with open(p.path, "rb") as fp: - ext_cal_buf = fp.read() - pol = gdal_metadata["mds1_tx_rx_polar"] - swath = gdal_metadata["swath"] - swath_offset = ord(swath[2]) - ord("1") - pol_offset = {"H/H" : 0, "V/V" :1, "H/V" : 2, "V/H":3}[pol] - - # Envisat_Product_Spec_Vol8.pdf - # 8.6.2 External Calibration Data - offset = 1247 + 378 - - offset += 12 - offset += 4 - offset += 26 * 28 + 4 * 4 - - mid_angles = struct.unpack(">7f", ext_cal_buf[offset:offset+4*7]) - - offset += 8 * 4 - - offset += 804 * 4 * swath_offset - offset += 201 * 4 * pol_offset - - antenna_gains = struct.unpack(">201f", ext_cal_buf[offset:offset + 4 * 201]) - metadata["antenna_ref_elev_angle"] = mid_angles[swath_offset] - - if ads.name == "SR GR ADS" and ads.size > 0: - srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] - - r = struct.unpack(">ff5f", srgr_buf[13:41]) - - - srgr_coeffs = list(r[2:]) - metadata["srgr_coeffs"] = srgr_coeffs - - - - - + antenna_gains, lats, lons, slant_time_first = __process_ads(dsd_buf, dsd_num, dsd_size, file_buffer, + gdal_metadata, metadata) # antenna gain - c = 299792458 n_samp = gdal_metadata["line_length"] - R_first = c * slant_time_first * 1e-9 / 2 - range_spacing = c / (2 * gdal_metadata["records"]["main_processing_params"]["range_samp_rate"]) - range_ref = gdal_metadata["records"]["main_processing_params"]["range_ref"] - R_first = c * slant_time_first * 1e-9 / 2 - - osv = gdal_metadata["records"]["main_processing_params"]["orbit_state_vectors"] + spreading_loss: np.ndarray[Any] = np.array([]) + gain_arr: np.ndarray[Any] = np.array([]) + if gdal_metadata["sample_type"] == "DETECTED": + gain_arr = np.ones(n_samp) + spreading_loss = np.ones(n_samp) + else: + # antenna gain + c = 299792458 + range_spacing = c / (2 * gdal_metadata["records"]["main_processing_params"]["range_samp_rate"]) + range_ref = gdal_metadata["records"]["main_processing_params"]["range_ref"] + r_first = c * slant_time_first * 1e-9 / 2 + + osv = gdal_metadata["records"]["main_processing_params"]["orbit_state_vectors"] + + sat_x = osv[0]["x_pos_1"] * 1e-2 + sat_y = osv[0]["y_pos_1"] * 1e-2 + sat_z = osv[0]["z_pos_1"] * 1e-2 + + for n in range(n_samp): + # https://github.com/senbox-org/microwave-toolbox/blob/master/sar-op-calibration/src/main/java/eu/esa/sar/calibration/gpf/calibrators/ASARCalibrator.java + r = r_first + n * range_spacing + geo_interp_idx = (n / n_samp) * (len(lats) - 1) + idx = int(geo_interp_idx) + fract = geo_interp_idx % 1 + # interpolate geogrid to match first line geogrid to first OSV + lat = lats[idx] + (lats[idx + 1] - lats[idx]) * fract + lon = lons[idx] + (lons[idx + 1] - lons[idx]) * fract + + sar_dis = math.sqrt(sat_x ** 2 + sat_y ** 2 + sat_z ** 2) + + distance = __calc_distance(lat, lon) + + # elevation angle, cosine law from three sides, slant range, + # earth center to satellite, earth center to target + angle_cos = (r * r + sar_dis * sar_dis - distance * distance) / (2 * r * sar_dis) + elev_angle = np.rad2deg(math.acos(angle_cos)) + + # find the gain from LUT, with 201 gains against reference elevation angle with 0.05 degree steps + elev_idx = int((elev_angle - metadata["antenna_ref_elev_angle"]) / 0.05) + elev_idx += 100 + gain = 1.0 + if 0 <= elev_idx <= len(antenna_gains): + # dB -> linear + gain = math.pow(10, antenna_gains[elev_idx] / 10) + + np.append(gain_arr, 1 / gain) + + # calculate spreading loss compensation + for n in range(n_samp): + r = r_first + n * range_spacing + factor = math.sqrt((range_ref / r) ** 3) + np.append(spreading_loss, 1 / factor) - sat_x = osv[0]["x_pos_1"] * 1e-2 - sat_y = osv[0]["y_pos_1"] * 1e-2 - sat_z = osv[0]["z_pos_1"] * 1e-2 - - gain_arr = [] - for n in range(n_samp): - # https://github.com/senbox-org/microwave-toolbox/blob/master/sar-op-calibration/src/main/java/eu/esa/sar/calibration/gpf/calibrators/ASARCalibrator.java - R = R_first + n * range_spacing - n_geogrid = len(lats) - geo_interp_idx = (n / n_samp) * (len(lats) - 1) - idx = int(geo_interp_idx) - fract = geo_interp_idx % 1 - #interpolate geogrid to match first line geogrid to first OSV - lat = lats[idx] + (lats[idx+1] - lats[idx]) * fract - lon = lons[idx] + (lons[idx+1] - lons[idx]) * fract + cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] - sar_dis = math.sqrt(sat_x**2 + sat_y**2 + sat_z**2) + metadata["cal_factor"] = cal_factor + metadata["cal_vector"] = np.array(spreading_loss) * np.array(gain_arr) - distance = calc_distance(lat, lon) + return metadata - # elevation angle, cosine law from three sides, slant range, earth center to satellite, earth center to target - angle_cos = (R * R + sar_dis * sar_dis - distance * distance) / (2 * R * sar_dis) - elev_angle = np.rad2deg(math.acos(angle_cos)) - # find the gain from LUT, with 201 gains against reference elevation angle with 0.05 degree steps - elev_idx = int((elev_angle - metadata["antenna_ref_elev_angle"]) / 0.05) - elev_idx += 100 - gain = 1.0 - if elev_idx >= 0 and elev_idx <= len(antenna_gains): - # dB -> linear - gain = math.pow(10, antenna_gains[elev_idx] / 10) +def __process_ads(dsd_buf: bytes, dsd_num: int, dsd_size: int, file_buffer: bytes, + gdal_metadata: dict[str, Any], metadata: dict[Any, Any]) -> tuple[ + tuple[float, ...], Any, list[float | Any], float]: + """ + Process the Annotation Data Sets (ADS) in the Envisat product file. + + Parameters + ---------- + dsd_buf Buffer containing the ADS descriptors + dsd_num Number of ADS descriptors + dsd_size Size of each ADS descriptor + file_buffer File buffer + gdal_metadata Metadata extracted using GDAL + metadata Metadata dictionary to populate + + Returns + ------- + Tuple containing antenna gains, latitudes, longitudes, and the first slant time. + """ + lats: list[float] = [] + lons: list[float] = [] + slant_time_first = 0.0 + antenna_gains: tuple[float, ...] = (0.0,) + for i in range(dsd_num): + ads = EnvisatADS(dsd_buf[i * dsd_size:(i + 1) * dsd_size]) + lats_new, lons_new, slant_time_first_new = __process_geolocation_grid_ads(ads, file_buffer, metadata) + lats = lats_new if lats_new else lats + lons = lons_new if lons_new else lons + slant_time_first = slant_time_first_new if slant_time_first_new else slant_time_first - gain_arr.append(gain) + antenna_gains_new = __process_cal_ads(ads, gdal_metadata, metadata) + antenna_gains = antenna_gains_new if antenna_gains_new else antenna_gains + process_sr_gr_ads(ads, file_buffer, metadata) + return antenna_gains, lats, lons, slant_time_first +def __process_geolocation_grid_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> tuple[ + Any, list[float | Any], float]: + """ + Process the Geolocation Grid ADS to extract latitude, longitude, slant time, and incidence angle information. + Parameters + ---------- + ads EnvisatADS + file_buffer File buffer + metadata Metadata dictionary to populate + Returns + ------- + Returns a tuple containing lists of latitudes, longitudes, and the first slant time. + """ + if ads.name == "GEOLOCATION GRID ADS": + rec_size = 521 + assert ((ads.size // ads.num) == rec_size) + geoloc_buf = file_buffer[ads.offset:ads.offset + ads.size] + middle_idx = ads.num // 2 + geoloc_record = geoloc_buf[middle_idx * rec_size: (middle_idx + 1) * rec_size] + # Geolocation Grid ADSRs header + header_size = 12 + 1 + 4 + 4 + 4 + + lats_buffer = geoloc_buf[header_size + 11 * 4 * 3: header_size + 11 * 4 * 4] + lons_buffer = geoloc_buf[header_size + 11 * 4 * 4: header_size + 11 * 4 * 5] + + lats = list(struct.unpack(">11i", lats_buffer)) + lons = list(struct.unpack(">11i", lons_buffer)) + lats = [e * 1e-6 for e in lats] + lons = [e * 1e-6 for e in lons] + + # tiepoints, 11 of big endian floats for each of the following: + # samp numbers, slant range times, angles, lats, longs + block_size = 11 * 4 + slant_time_offset = header_size + 1 * block_size + incidence_angle_offset = header_size + 2 * block_size + # adjust to middle of 11 + incidence_angle_offset += 5 * 4 + + slant_time_first = struct.unpack(">f", geoloc_record[slant_time_offset:slant_time_offset + 4])[0] + incidence_angle_middle = \ + struct.unpack(">f", geoloc_record[incidence_angle_offset:incidence_angle_offset + 4])[0] + + metadata["slant_time_first"] = slant_time_first + metadata["incidence_angle_center"] = incidence_angle_middle + return lats, lons, slant_time_first + return [], [], 0.0 + + +def __process_cal_ads(ads: EnvisatADS, gdal_metadata: dict[str, Any], metadata: dict[Any, Any]) -> tuple[float, ...]: + """ + Process the EXTERNAL CALIBRATION ADS to extract antenna gain information. - # calculate spreading loss compensation + Parameters + ---------- + ads EnvisatADS + gdal_metadata Metadata extracted using GDAL + metadata Metadata dictionary to populate - spreading_loss = [] - for n in range(n_samp): - R = R_first + n * range_spacing - factor = math.sqrt((range_ref / R) ** 3) - spreading_loss.append(1/factor) + Returns + ------- + Tuple containing antenna gains. + """ + antenna_gains = () + if ads.name == "EXTERNAL CALIBRATION": + + aux_folder = pathlib.Path(os.path.abspath(__file__)).parent + prod_name = gdal_metadata["product"] + aux_folder /= "auxiliary" + if ".N1" in prod_name: + aux_folder /= "ASAR_Auxiliary_Files" + aux_folder /= "ASA_XCA_AX" + elif ".E1" in prod_name: + aux_folder /= "ERS1" + elif ".E2" in prod_name: + aux_folder /= "ERS2" + + for p in os.scandir(aux_folder): + if ads.filename == p.name: + with open(p.path, "rb") as fp: + ext_cal_buf = fp.read() + pol = gdal_metadata["mds1_tx_rx_polar"] + swath = gdal_metadata["swath"] + swath_offset = ord(swath[2]) - ord("1") + pol_offset = {"H/H": 0, "V/V": 1, "H/V": 2, "V/H": 3}[pol] + + # Envisat_Product_Spec_Vol8.pdf + # 8.6.2 External Calibration Data + offset = 1247 + 378 + + offset += 12 + offset += 4 + offset += 26 * 28 + 4 * 4 + + mid_angles = struct.unpack(">7f", ext_cal_buf[offset:offset + 4 * 7]) + + offset += 8 * 4 + + offset += 804 * 4 * swath_offset + offset += 201 * 4 * pol_offset + + antenna_gains = struct.unpack(">201f", ext_cal_buf[offset:offset + 4 * 201]) + metadata["antenna_ref_elev_angle"] = mid_angles[swath_offset] + return antenna_gains + + +def process_sr_gr_ads(ads: EnvisatADS, file_buffer: bytes, metadata: dict[Any, Any]) -> None: + """ + Extract SR/GR conversion coefficients from the SR GR ADS. - cal_factor = gdal_metadata["records"]["main_processing_params"]["calibration_factors"][0]["ext_cal_fact"] + Parameters + ---------- + ads EnvisatADS + file_buffer File buffer + metadata Metadata dictionary to populatex - metadata["cal_factor"] = cal_factor - metadata["cal_vector"] = np.array(spreading_loss) * np.array(gain_arr) + Returns + ------- + None + """ + if ads.name == "SR GR ADS" and ads.size > 0: + srgr_buf = file_buffer[ads.offset:ads.offset + ads.size] + r = struct.unpack(">ff5f", srgr_buf[13:41]) - return metadata + # Envisat file specification says it is SR/GR Conversion ADSR, + # however the polynomial is for GR to SR... + srgr_coeffs = list(r[2:]) + metadata["grsr_coeffs"] = srgr_coeffs