|
| 1 | +import numpy as np |
| 2 | +from gwcs import WCS as GWCS |
| 3 | +from gwcs import coordinate_frames as cf |
| 4 | +from ndcube import NDCube |
| 5 | + |
| 6 | +import astropy.units as u |
| 7 | +from astropy.coordinates import SpectralCoord |
| 8 | +from astropy.modeling.tabular import Tabular1D |
| 9 | +from astropy.utils import lazyproperty |
| 10 | + |
| 11 | +__all__ = ["gwcs_from_array", "SpectralAxis", "Spectrum"] |
| 12 | + |
| 13 | + |
| 14 | +def gwcs_from_array(array): |
| 15 | + """ |
| 16 | + Create a new WCS from provided tabular data. This defaults to being |
| 17 | + a GWCS object. |
| 18 | + """ |
| 19 | + orig_array = u.Quantity(array) |
| 20 | + |
| 21 | + coord_frame = cf.CoordinateFrame(naxes=1, axes_type=("SPECTRAL",), axes_order=(0,)) |
| 22 | + spec_frame = cf.SpectralFrame(unit=array.unit, axes_order=(0,)) |
| 23 | + |
| 24 | + # In order for the world_to_pixel transformation to automatically convert |
| 25 | + # input units, the equivalencies in the lookup table have to be extended |
| 26 | + # with spectral unit information. |
| 27 | + SpectralTabular1D = type("SpectralTabular1D", (Tabular1D,), {"input_units_equivalencies": {"x0": u.spectral()}}) |
| 28 | + |
| 29 | + forward_transform = SpectralTabular1D(np.arange(len(array)), lookup_table=array) |
| 30 | + # If our spectral axis is in descending order, we have to flip the lookup |
| 31 | + # table to be ascending in order for world_to_pixel to work. |
| 32 | + if len(array) == 0 or array[-1] > array[0]: |
| 33 | + forward_transform.inverse = SpectralTabular1D(array, lookup_table=np.arange(len(array))) |
| 34 | + else: |
| 35 | + forward_transform.inverse = SpectralTabular1D(array[::-1], lookup_table=np.arange(len(array))[::-1]) |
| 36 | + |
| 37 | + class SpectralGWCS(GWCS): |
| 38 | + def pixel_to_world(self, *args, **kwargs): |
| 39 | + if orig_array.unit == "": |
| 40 | + return u.Quantity(super().pixel_to_world_values(*args, **kwargs)) |
| 41 | + return super().pixel_to_world(*args, **kwargs).to(orig_array.unit, equivalencies=u.spectral()) |
| 42 | + |
| 43 | + tabular_gwcs = SpectralGWCS(forward_transform=forward_transform, input_frame=coord_frame, output_frame=spec_frame) |
| 44 | + |
| 45 | + # Store the intended unit from the origin input array |
| 46 | + # tabular_gwcs._input_unit = orig_array.unit |
| 47 | + |
| 48 | + return tabular_gwcs |
| 49 | + |
| 50 | + |
| 51 | +class SpectralAxis(SpectralCoord): |
| 52 | + """ |
| 53 | + Coordinate object representing spectral values corresponding to a specific |
| 54 | + spectrum. Overloads SpectralCoord with additional information (currently |
| 55 | + only bin edges). |
| 56 | +
|
| 57 | + Parameters |
| 58 | + ---------- |
| 59 | + bin_specification: str, optional |
| 60 | + Must be "edges" or "centers". Determines whether specified axis values |
| 61 | + are interpreted as bin edges or bin centers. Defaults to "centers". |
| 62 | + """ |
| 63 | + |
| 64 | + _equivalent_unit = SpectralCoord._equivalent_unit + (u.pixel,) |
| 65 | + |
| 66 | + def __new__(cls, value, *args, bin_specification="centers", **kwargs): |
| 67 | + # Enforce pixel axes are ascending |
| 68 | + if ( |
| 69 | + (type(value) is u.quantity.Quantity) |
| 70 | + and (value.size > 1) |
| 71 | + and (value.unit is u.pix) |
| 72 | + and (value[-1] <= value[0]) |
| 73 | + ): |
| 74 | + raise ValueError("u.pix spectral axes should always be ascending") |
| 75 | + |
| 76 | + # Convert to bin centers if bin edges were given, since SpectralCoord |
| 77 | + # only accepts centers |
| 78 | + if bin_specification == "edges": |
| 79 | + bin_edges = value |
| 80 | + value = SpectralAxis._centers_from_edges(value) |
| 81 | + |
| 82 | + obj = super().__new__(cls, value, *args, **kwargs) |
| 83 | + |
| 84 | + if bin_specification == "edges": |
| 85 | + obj._bin_edges = bin_edges |
| 86 | + |
| 87 | + return obj |
| 88 | + |
| 89 | + @staticmethod |
| 90 | + def _edges_from_centers(centers, unit): |
| 91 | + """ |
| 92 | + Calculates interior bin edges based on the average of each pair of |
| 93 | + centers, with the two outer edges based on extrapolated centers added |
| 94 | + to the beginning and end of the spectral axis. |
| 95 | + """ |
| 96 | + a = np.insert(centers, 0, 2 * centers[0] - centers[1]) |
| 97 | + b = np.append(centers, 2 * centers[-1] - centers[-2]) |
| 98 | + edges = (a + b) / 2 |
| 99 | + return edges * unit |
| 100 | + |
| 101 | + @staticmethod |
| 102 | + def _centers_from_edges(edges): |
| 103 | + """ |
| 104 | + Calculates the bin centers as the average of each pair of edges |
| 105 | + """ |
| 106 | + return (edges[1:] + edges[:-1]) / 2 |
| 107 | + |
| 108 | + @lazyproperty |
| 109 | + def bin_edges(self): |
| 110 | + """ |
| 111 | + Calculates bin edges if the spectral axis was created with centers |
| 112 | + specified. |
| 113 | + """ |
| 114 | + if hasattr(self, "_bin_edges"): |
| 115 | + return self._bin_edges |
| 116 | + else: |
| 117 | + return self._edges_from_centers(self.value, self.unit) |
| 118 | + |
| 119 | + |
| 120 | +class Spectrum(NDCube): |
| 121 | + r""" |
| 122 | + Spectrum container for data with one spectral axis. |
| 123 | +
|
| 124 | + Note that "1D" in this case refers to the fact that there is only one |
| 125 | + spectral axis. `Spectrum` can contain "vector 1D spectra" by having the |
| 126 | + ``flux`` have a shape with dimension greater than 1. |
| 127 | +
|
| 128 | + Notes |
| 129 | + ----- |
| 130 | + A stripped down version of `Spectrum1D` from `specutils`. |
| 131 | +
|
| 132 | + Parameters |
| 133 | + ---------- |
| 134 | + data : `~astropy.units.Quantity` |
| 135 | + The data for this spectrum. This can be a simple `~astropy.units.Quantity`, |
| 136 | + or an existing `~Spectrum1D` or `~ndcube.NDCube` object. |
| 137 | + uncertainty : `~astropy.nddata.NDUncertainty` |
| 138 | + Contains uncertainty information along with propagation rules for |
| 139 | + spectrum arithmetic. Can take a unit, but if none is given, will use |
| 140 | + the unit defined in the flux. |
| 141 | + spectral_axis : `~astropy.units.Quantity` or `~specutils.SpectralAxis` |
| 142 | + Dispersion information with the same shape as the dimension specified by spectral_dimension |
| 143 | + of shape plus one if specifying bin edges. |
| 144 | + spectral_dimension : `int` default 0 |
| 145 | + The dimension of the data which represents the spectral information default to first dimension index 0. |
| 146 | + mask : `~numpy.ndarray`-like |
| 147 | + Array where values in the flux to be masked are those that |
| 148 | + ``astype(bool)`` converts to True. (For example, integer arrays are not |
| 149 | + masked where they are 0, and masked for any other value.) |
| 150 | + meta : dict |
| 151 | + Arbitrary container for any user-specific information to be carried |
| 152 | + around with the spectrum container object. |
| 153 | +
|
| 154 | + Examples |
| 155 | + -------- |
| 156 | + >>> import numpy as np |
| 157 | + >>> import astropy.units as u |
| 158 | + >>> from sunkit_spex.spectrum import Spectrum |
| 159 | + >>> spec = Spectrum(np.arange(1, 11)*u.watt, spectral_axis=np.arange(1, 12)*u.keV) |
| 160 | + >>> spec |
| 161 | + <sunkit_spex.spectrum.spectrum.Spectrum object at ... |
| 162 | + NDCube |
| 163 | + ------ |
| 164 | + Dimensions: [10.] pix |
| 165 | + Physical Types of Axes: [('em.energy',)] |
| 166 | + Unit: W |
| 167 | + Data Type: float64 |
| 168 | + """ |
| 169 | + |
| 170 | + def __init__( |
| 171 | + self, data, *, uncertainty=None, spectral_axis=None, spectral_dimension=0, mask=None, meta=None, **kwargs |
| 172 | + ): |
| 173 | + # If the flux (data) argument is already a Spectrum (as it would |
| 174 | + # be for internal arithmetic operations), avoid setup entirely. |
| 175 | + if isinstance(data, Spectrum): |
| 176 | + super().__init__(data) |
| 177 | + return |
| 178 | + |
| 179 | + # Ensure that the flux argument is an astropy quantity |
| 180 | + if data is not None: |
| 181 | + if not isinstance(data, u.Quantity): |
| 182 | + raise ValueError("Flux must be a `Quantity` object.") |
| 183 | + elif data.isscalar: |
| 184 | + data = u.Quantity([data]) |
| 185 | + |
| 186 | + # Ensure that the unit information codified in the quantity object is |
| 187 | + # the One True Unit. |
| 188 | + kwargs.setdefault("unit", data.unit if isinstance(data, u.Quantity) else kwargs.get("unit")) |
| 189 | + |
| 190 | + # If flux and spectral axis are both specified, check that their lengths |
| 191 | + # match or are off by one (implying the spectral axis stores bin edges) |
| 192 | + if data is not None and spectral_axis is not None: |
| 193 | + if spectral_axis.shape[0] == data.shape[spectral_dimension]: |
| 194 | + bin_specification = "centers" |
| 195 | + elif spectral_axis.shape[0] == data.shape[spectral_dimension] + 1: |
| 196 | + bin_specification = "edges" |
| 197 | + else: |
| 198 | + raise ValueError( |
| 199 | + f"Spectral axis length ({spectral_axis.shape[0]}) must be the same size or one " |
| 200 | + "greater (if specifying bin edges) than that of the spextral" |
| 201 | + f"axis ({data.shape[spectral_dimension]})" |
| 202 | + ) |
| 203 | + |
| 204 | + # Attempt to parse the spectral axis. If none is given, try instead to |
| 205 | + # parse a given wcs. This is put into a GWCS object to |
| 206 | + # then be used behind-the-scenes for all operations. |
| 207 | + if spectral_axis is not None: |
| 208 | + # Ensure that the spectral axis is an astropy Quantity |
| 209 | + if not isinstance(spectral_axis, u.Quantity): |
| 210 | + raise ValueError("Spectral axis must be a `Quantity` or " "`SpectralAxis` object.") |
| 211 | + |
| 212 | + # If a spectral axis is provided as an astropy Quantity, convert it |
| 213 | + # to a SpectralAxis object. |
| 214 | + if not isinstance(spectral_axis, SpectralAxis): |
| 215 | + if spectral_axis.shape[0] == data.shape[spectral_dimension] + 1: |
| 216 | + bin_specification = "edges" |
| 217 | + else: |
| 218 | + bin_specification = "centers" |
| 219 | + self._spectral_axis = SpectralAxis(spectral_axis, bin_specification=bin_specification) |
| 220 | + |
| 221 | + wcs = gwcs_from_array(self._spectral_axis) |
| 222 | + |
| 223 | + super().__init__( |
| 224 | + data=data.value if isinstance(data, u.Quantity) else data, |
| 225 | + wcs=wcs, |
| 226 | + mask=mask, |
| 227 | + meta=meta, |
| 228 | + uncertainty=uncertainty, |
| 229 | + **kwargs, |
| 230 | + ) |
0 commit comments