|
1 | 1 | import json |
2 | 2 | import os.path |
3 | 3 | from markupsafe import Markup |
| 4 | +import sqlalchemy as sa |
| 5 | +from datetime import datetime |
4 | 6 |
|
5 | 7 | from typing import Dict, Any, Optional, List, Union |
6 | 8 |
|
|
9 | 11 | import ckanapi |
10 | 12 | from ckan.lib.helpers import lang |
11 | 13 |
|
| 14 | +from ckanext.datastore.backend import DatastoreBackend |
| 15 | +from ckanext.datastore.backend.postgres import ( |
| 16 | + DatastorePostgresqlBackend, |
| 17 | + identifier, |
| 18 | + literal_string |
| 19 | +) |
| 20 | + |
12 | 21 | from ckanext.recombinant.tables import ( |
13 | 22 | get_chromo, get_geno, get_dataset_types, |
14 | 23 | get_published_resource_resource_name |
@@ -129,9 +138,24 @@ def recombinant_example(resource_name: str, |
129 | 138 | return left[2:] + ('\n' + left[2:]).join(out.split('\n')[1:-1]) |
130 | 139 |
|
131 | 140 |
|
132 | | -def recombinant_choice_fields(resource_name: str, |
133 | | - all_languages: bool = False, |
134 | | - prefer_lang: Optional[str] = None) -> Dict[str, Any]: |
| 141 | +def get_choices_fiscal_year(min_year: int = 2005, |
| 142 | + max_year: Optional[int] = None, |
| 143 | + month_start: int = 4) -> List[str]: |
| 144 | + """ |
| 145 | + Dynamically generate choices for fiscal years. |
| 146 | + """ |
| 147 | + if not max_year: |
| 148 | + max_year = datetime.now().year if \ |
| 149 | + datetime.now().month >= month_start else datetime.now().year - 1 |
| 150 | + return [f'{i}-{i + 1}' |
| 151 | + for i in range(min_year, max_year + 1)] |
| 152 | + |
| 153 | + |
| 154 | +def recombinant_choice_fields( |
| 155 | + resource_name: str, |
| 156 | + all_languages: bool = False, |
| 157 | + prefer_lang: Optional[str] = None, |
| 158 | + org_name: Optional[str] = None) -> Dict[str, Any]: |
135 | 159 | """ |
136 | 160 | Return a datastore_id: choices dict from the resource definition |
137 | 161 | that contain lists of choices, with labels pre-translated |
@@ -168,11 +192,42 @@ def key_fn(v: str) -> Any: |
168 | 192 | if v not in exclude_choices |
169 | 193 | ] |
170 | 194 |
|
| 195 | + def build_choices_psql(f: Dict[str, Any]): |
| 196 | + # type_ignore_reason: incomplete typing |
| 197 | + backend: DatastorePostgresqlBackend = DatastoreBackend.\ |
| 198 | + get_active_backend() # type: ignore |
| 199 | + with backend._get_read_engine().begin() as connection: |
| 200 | + filter_clause = f.get('choices_filter_query', '') |
| 201 | + if filter_clause and "{org}" in filter_clause and not org_name: |
| 202 | + filter_clause = '' # if no org_name passed, cannot query it |
| 203 | + filter_clause = filter_clause.format( |
| 204 | + org=literal_string(org_name) if org_name else '') |
| 205 | + results = connection.execute(sa.text(""" |
| 206 | + SELECT * FROM {ref_table} {filter_clause} |
| 207 | + ORDER BY {ds_id} ASC; |
| 208 | + """.format( |
| 209 | + ref_table=identifier(f['choices_reference_table']), |
| 210 | + filter_clause=filter_clause, |
| 211 | + ds_id=identifier(f['datastore_id']) |
| 212 | + ).replace(':', r'\:'))).mappings().fetchall() # avoid bind params |
| 213 | + out[f['datastore_id']] = [ |
| 214 | + (r[f['datastore_id']], |
| 215 | + dict(en=r['label_en'], |
| 216 | + fr=r['label_fr'], |
| 217 | + valid_orgs=r.get('org_years') or {}) |
| 218 | + if all_languages else |
| 219 | + r['label_%s' % (prefer_lang or lang())]) for r in results] |
| 220 | + |
171 | 221 | for f in chromo['fields']: |
172 | 222 | if 'choices' in f: |
173 | 223 | build_choices(f, f['choices']) |
174 | 224 | elif 'choices_file' in f and '_path' in chromo: |
175 | 225 | build_choices(f, _read_choices_file(chromo, f)) |
| 226 | + elif 'choices_reference_table' in f: |
| 227 | + build_choices_psql(f) |
| 228 | + elif 'choices_fiscal_year' in f: |
| 229 | + out[f['datastore_id']] = [ |
| 230 | + (v, v) for v in get_choices_fiscal_year(**f['choices_fiscal_year'])] |
176 | 231 |
|
177 | 232 | return out |
178 | 233 |
|
|
0 commit comments