|
| 1 | +import logging |
| 2 | +from typing import Any, Dict, List |
| 3 | + |
| 4 | +from django import forms |
| 5 | + |
| 6 | +from tom_dataservices.dataservices import DataService |
| 7 | +from tom_dataservices.forms import BaseQueryForm as QueryForm |
| 8 | +from tom_targets.models import Target, TargetName |
| 9 | + |
| 10 | +from astroquery.simbad import Simbad |
| 11 | +from astropy.table import Table |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | +# logger.setLevel(logging.DEBUG) |
| 15 | + |
| 16 | + |
| 17 | +class SimbadDataService(DataService): |
| 18 | + """ |
| 19 | + The ``SimbadDataService`` is the interface to the SIMBAD catalog. At present, it is only queryable by identifier. |
| 20 | + For information regarding identifier format, please see http://simbad.u-strasbg.fr/simbad/sim-fid or |
| 21 | + https://astroquery.readthedocs.io/en/latest/simbad/simbad.html. |
| 22 | + """ |
| 23 | + name = 'Simbad' |
| 24 | + |
| 25 | + def __init__(self, *args, **kwargs): |
| 26 | + self.simbad = Simbad() |
| 27 | + self.simbad.add_votable_fields('pmra', 'pmdec', 'ra', 'dec', 'main_id', 'parallax', 'distance') |
| 28 | + |
| 29 | + @classmethod |
| 30 | + def get_form_class(cls): |
| 31 | + return SimbadForm |
| 32 | + |
| 33 | + def build_query_parameters(self, parameters, **kwargs): |
| 34 | + """ |
| 35 | + Use this function to convert the form results into the query parameters understood |
| 36 | + by the Data Service. |
| 37 | + """ |
| 38 | + logger.debug(f'SIMBAD.build_query_parameters: parameters {parameters}') |
| 39 | + self.query_parameters = parameters |
| 40 | + return self.query_parameters |
| 41 | + |
| 42 | + def query_service(self, query_parameters, **kwargs): |
| 43 | + """ |
| 44 | + This is where you actually make the call to the Data Service, |
| 45 | + in this case Simbad. |
| 46 | +
|
| 47 | + Return the results. |
| 48 | + """ |
| 49 | + logger.debug(f'SIMBAD.query_service.query_parameters {query_parameters}') |
| 50 | + # Get search term from query parameters |
| 51 | + search_term: str = query_parameters.get('search_term', '') # TODO: what default??? |
| 52 | + |
| 53 | + # Simbad returns an astropy.table.Table |
| 54 | + catalog_data: Table = self.simbad.query_object(search_term) # type: ignore |
| 55 | + logger.debug(f'SIMBAD.query_service.catalog_data{catalog_data}') |
| 56 | + |
| 57 | + # astroquery <0.4.10, > 0.4.7 has issues joining the distance field, failing to find any results. |
| 58 | + # This workaround checks if the query result is an empty table and then tries the query a 2nd time without the |
| 59 | + # distance field. |
| 60 | + if not catalog_data: |
| 61 | + self.simbad.reset_votable_fields() |
| 62 | + self.simbad.add_votable_fields('pmra', 'pmdec', 'ra', 'dec', 'main_id', 'parallax') |
| 63 | + catalog_data = self.simbad.query_object(search_term) # type: ignore |
| 64 | + logger.debug(f'SIMBAD.query_service (reset) catalog_data{catalog_data}') |
| 65 | + |
| 66 | + self.query_results = catalog_data |
| 67 | + return self.query_results |
| 68 | + |
| 69 | + def query_targets(self, query_parameters, **kwargs) -> List[Dict[str, Any]]: |
| 70 | + """ |
| 71 | + Query SIMBAD and convert results to target data dictionaries. |
| 72 | +
|
| 73 | + Queries the SIMBAD catalog using the provided parameters, then transforms |
| 74 | + the returned astropy Table into a list of dictionaries containing target |
| 75 | + information suitable for target creation. |
| 76 | +
|
| 77 | + :param query_parameters: Dictionary of query parameters collected from SimbadForm |
| 78 | + :type query_parameters: dict |
| 79 | + :param kwargs: Additional keyword arguments passed to query_service |
| 80 | + :type kwargs: dict |
| 81 | +
|
| 82 | + :return: List of dictionaries; one dict for each row, Table.colnames are keys |
| 83 | + :rtype: List[Dict[str, Any]] |
| 84 | + """ |
| 85 | + # do the actual SIMBAD via query_service |
| 86 | + target_table: Table = self.query_service(query_parameters, **kwargs) |
| 87 | + |
| 88 | + # these are the fields (==table columns==dict keys) that we keep |
| 89 | + votable_fields = ['ra', 'dec', 'pmra', 'pmdec', 'main_id', 'mesdistance.dist', 'mesdistance.unit'] |
| 90 | + # convert astroquery.table.Table to list of target dict (keeping only votable_fields) |
| 91 | + targets: List[Dict[str, Any]] = [ |
| 92 | + {key: row[key] for key in votable_fields if key in target_table.colnames} |
| 93 | + for row in target_table] |
| 94 | + |
| 95 | + # add name and alias items to each target |
| 96 | + for target in targets: |
| 97 | + target['name'] = target['main_id'] |
| 98 | + # if the returned target name (main_id) was not the search term, make the main_id the alias |
| 99 | + if query_parameters['search_term'] != target['main_id']: |
| 100 | + target['name'] = query_parameters['search_term'] # and make the search term the name |
| 101 | + target['aliases'] = [str(target['main_id']).replace(' ', '')] # remove whitespace |
| 102 | + |
| 103 | + return targets |
| 104 | + |
| 105 | + def create_target_from_query(self, target_result: Dict[str, Any], **kwargs) -> Target: |
| 106 | + """Create a new target from the query results. This method will be called from |
| 107 | + `SimbadDataService.to_target()` via `DataService.CreateTargetFromQueryView.post()`. |
| 108 | +
|
| 109 | + :param target_result: Dictionary containing target data. For example: |
| 110 | + ```python |
| 111 | + { |
| 112 | + 'ra': np.float64(350.8584), |
| 113 | + 'dec': np.float64(58.8113), |
| 114 | + 'pmra': masked, |
| 115 | + 'pmdec': masked, |
| 116 | + 'main_id': 'NAME Cas A', |
| 117 | + 'mesdistance.dist': np.float64(3.4), |
| 118 | + 'mesdistance.unit': 'kpc ', |
| 119 | + 'name': 'Cas A', |
| 120 | + 'alias': 'alias for CasA', |
| 121 | + 'id': 0 |
| 122 | + } |
| 123 | + ``` |
| 124 | + :type target_result: Dict[str, Any] |
| 125 | + :param kwargs: Additional keyword arguments |
| 126 | + :type kwargs: dict |
| 127 | +
|
| 128 | + :return: Unsaved Target instance populated with SIMBAD data |
| 129 | + :rtype: Target |
| 130 | + """ |
| 131 | + target = Target( |
| 132 | + name=target_result['name'], |
| 133 | + type='SIDEREAL', |
| 134 | + ra=target_result['ra'], |
| 135 | + dec=target_result['dec'], |
| 136 | + pm_ra=target_result['pmra'], |
| 137 | + pm_dec=target_result['pmdec'], |
| 138 | + ) |
| 139 | + |
| 140 | + # Convert all distances to pc |
| 141 | + target_distance = target_result.get('mesdistance.dist', None) |
| 142 | + if target_distance and 'kpc' in target_result.get('mesdistance.unit', ''): |
| 143 | + target.distance = target_distance * 1000 # kilo |
| 144 | + elif target_distance and 'mpc' in target_result.get('mesdistance.unit', ''): |
| 145 | + target.distance = target_distance * 1000000 # mega |
| 146 | + |
| 147 | + return target # not saved yet |
| 148 | + |
| 149 | + def create_aliases_from_query(self, alias_results, **kwargs) -> List[TargetName]: |
| 150 | + """ |
| 151 | + The query_result is a target dictionary created by query_targets() |
| 152 | + It has name and alias fields. Use the name to get the Target and give it |
| 153 | + the alias. |
| 154 | + """ |
| 155 | + aliases = [] |
| 156 | + for alias in alias_results: |
| 157 | + aliases.append(TargetName(name=alias)) |
| 158 | + |
| 159 | + return aliases # not saved yet |
| 160 | + |
| 161 | + |
| 162 | +class SimbadForm(QueryForm): |
| 163 | + search_term = forms.CharField( |
| 164 | + required=False, |
| 165 | + widget=forms.TextInput( |
| 166 | + attrs={'placeholder': 'Target name, e.g. Arcturus'} |
| 167 | + ), |
| 168 | + ) |
0 commit comments