Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 53 additions & 28 deletions asar_xarray/asar.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]:
:param gdal_dataset: Dataset with metadata
:return: Dictionary with metadata attributes
"""
attributes: dict[str, Any] = dict()
attributes: dict[str, Any] = {}
process_general_metadata(gdal_dataset, attributes)
process_records_metadata(gdal_dataset, attributes)
process_derived_subdatasets_metadata(gdal_dataset, attributes)
Expand All @@ -35,35 +35,36 @@ def get_metadata(gdal_dataset: gdal.Dataset) -> Dict[str, Any]:
return attributes


def open_asar_dataset(filepath: str | os.PathLike[Any] | ReadBuffer[Any] | AbstractDataStore) -> xr.Dataset:
def open_asar_dataset(filename_or_obj: str | os.PathLike[Any] | ReadBuffer[
Any] | bytes | memoryview | AbstractDataStore) -> xr.Dataset:
"""
Open an ASAR dataset and converts it into an xarray Dataset.

This function reads the metadata and pixel data from the given ASAR dataset file,
processes the metadata into attributes, and constructs an xarray Dataset.

:param filepath: The path to the ASAR dataset file. It can be a string,
:param filename_or_obj: The path to the ASAR dataset file. It can be a string,
a PathLike object, a ReadBuffer, or an AbstractDataStore.
:return: An xarray Dataset containing the pixel data and metadata attributes.
:raises NotImplementedError: If the filepath is not a string.
"""
if not isinstance(filepath, str):
raise NotImplementedError(f'Filepath type {type(filepath)} is not supported')
if not isinstance(filename_or_obj, str):
raise NotImplementedError(f'Filepath type {type(filename_or_obj)} is not supported')

logger.debug('Opening ASAR dataset {}', filepath)
logger.debug('Opening ASAR dataset {}', filename_or_obj)

# Open the dataset using a custom reader
gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filepath)
gdal_dataset: gdal.Dataset = reader.get_gdal_dataset(filename_or_obj)

# Extract metadata attributes
metadata = get_metadata(gdal_dataset)

# Duplicate, read directly from file, as gdal does not parse some necessary metadata

metadata["direct_parse"] = envisat_direct.parse_direct(filepath, metadata)
metadata["direct_parse"] = envisat_direct.parse_direct(filename_or_obj, metadata)

# Create an xarray Dataset with pixel data and metadata attributes
dataset: xr.Dataset = create_dataset(metadata, filepath)
dataset: xr.Dataset = create_dataset(metadata, filename_or_obj)

return dataset

Expand All @@ -89,10 +90,16 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:
range_sampling_rate = metadata["records"]["main_processing_params"]["range_samp_rate"]
image_slant_range_time = metadata["direct_parse"]["slant_time_first"] * 1e-9

if metadata["sph_descriptor"] == "Image Mode SLC Image":
range_pixel_spacing = metadata["records"]["main_processing_params"]["range_spacing"]

product_str = metadata["sph_descriptor"]

if product_str == "Image Mode SLC Image":
product_type = "SLC"
elif product_str == "Image Mode Precision Image":
product_type = "GRD"
else:
raise RuntimeError("Only Image mode SLC files(IMS) supported for now")
raise RuntimeError("Only Image mode files supported(IMS & IMP) supported for now")

attrs = {
"family_name": "Envisat",
Expand All @@ -106,8 +113,7 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:
"product_type": product_type,
"start_time": product_first_line_utc_time,
"stop_time": product_last_line_utc_time,
"range_pixel_spacing" : metadata["range_spacing"],
"azimuth_pixel_spacing" : metadata["azimuth_spacing"],
"range_pixel_spacing": metadata["range_spacing"],
"radar_frequency": metadata["records"]["main_processing_params"]["radar_freq"] / 1e9,
"ascending_node_time": "",
"azimuth_pixel_spacing": metadata["records"]["main_processing_params"]["azimuth_spacing"],
Expand All @@ -116,15 +122,14 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:
"azimuth_time_interval": azimuth_time_interval,
"image_slant_range_time": image_slant_range_time,
"range_sampling_rate": range_sampling_rate,
"incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360 ,
"incidence_angle_mid_swath": metadata["direct_parse"]["incidence_angle_center"] * 2 * math.pi / 360,
"metadata": metadata
}

azimuth_time = compute_azimuth_time(
product_first_line_utc_time, product_last_line_utc_time, number_of_lines
)


swap_dims = {"line": "azimuth_time", "pixel": "slant_range_time"}

coords: dict[str, Any] = {
Expand All @@ -140,12 +145,34 @@ def create_dataset(metadata: dict[str, Any], filepath: str) -> xr.Dataset:
),
}

slant_range_time = np.linspace(
image_slant_range_time,
image_slant_range_time + (number_of_samples - 1) / range_sampling_rate,
number_of_samples,
)
coords["slant_range_time"] = ("pixel", slant_range_time)
if product_type == "SLC":
slant_range_time = np.linspace(
image_slant_range_time,
image_slant_range_time + (number_of_samples - 1) / range_sampling_rate,
number_of_samples,
)
coords["slant_range_time"] = ("pixel", slant_range_time)
else:

# generate slant range times from ground ranges for ground range products
# this is easier due to the Envisat only having GRSR metadata unlike Sentinel1
# however xarray/Sarsen handle GRD conversion differently for S1
ground_range = np.linspace(
0,
range_pixel_spacing * (number_of_samples - 1),
number_of_samples,
)

# numpy polyval expects the polynomial top be highest ranked first
coeffs = list(reversed(metadata["direct_parse"]["grsr_coeffs"]))

slant_ranges = np.polyval(coeffs, ground_range)
slant_ranges *= 2

c = 299792458
slant_range_times = slant_ranges / c

coords["slant_range_time"] = ("pixel", slant_range_times)

data = xr.open_dataarray(filepath, engine='rasterio')
data.encoding.clear()
Expand All @@ -168,19 +195,17 @@ def get_chirp_parameters(dataset: gdal.Dataset) -> dict[str, Any]:
:return: dictionary with chirp parameters.
"""
metadata = dataset.GetMetadata(domain='RECORDS')
params: dict[str, Any] = dict()
params: dict[str, Any] = {
'zero_doppler_time': utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME')),
'attach_flag': bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0'))),
'beam_id': metadata.get('CHIRP_PARAMS_ADS_BEAM_ID'), 'polarisation': metadata.get('CHIRP_PARAMS_ADS_POLAR'),
'chirp': {}}
# Non-float values
params['zero_doppler_time'] = utils.get_envisat_time(metadata.get('CHIRP_PARAMS_ADS_ZERO_DOPPLER_TIME'))
params['attach_flag'] = bool(int(metadata.get('CHIRP_PARAMS_ADS_ATTACH_FLAG', '0')))
params['beam_id'] = metadata.get('CHIRP_PARAMS_ADS_BEAM_ID')
params['polarisation'] = metadata.get('CHIRP_PARAMS_ADS_POLAR')
params['chirp'] = dict()
for key, value in metadata.items():
if 'CHIRP_PARAMS_ADS_CHIRP' in key:
new_key = key.replace('CHIRP_PARAMS_ADS_CHIRP_', '').lower()
params['chirp'][new_key] = float(value)

#params['elev_corr_factor'] = float(metadata.get('CHIRP_PARAMS_ADS_ELEV_CORR_FACTOR'))
params['cal_pulse_info'] = get_chirp_cal_pulse_info(metadata)

return params
Expand Down
Loading
Loading