Skip to content

Commit 66aebca

Browse files
EliEli
authored andcommitted
Swapped in what is probably a functional version of download_ncro based on hydstra connection.
1 parent e72ceec commit 66aebca

File tree

3 files changed

+1119
-13
lines changed

3 files changed

+1119
-13
lines changed

dms_datastore/download_ncro2.py

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
#!/usr/bin/env python
2+
import argparse
3+
import ssl
4+
import requests
5+
import pandas as pd
6+
import re
7+
import zipfile
8+
import os
9+
import io
10+
import string
11+
import datetime as dt
12+
import numpy as np
13+
import concurrent
14+
import concurrent.futures
15+
import time
16+
import json
17+
from dms_datastore import read_ts
18+
from dms_datastore.write_ts import write_ts_csv
19+
from dms_datastore.process_station_variable import (
20+
process_station_list,
21+
stationfile_or_stations
22+
)
23+
24+
from dms_datastore import dstore_config
25+
from dms_datastore.logging_config import logger
26+
27+
28+
# station_number,station_type,start_time,end_time,parameter,output_interval,download_link
29+
30+
mappings = {
31+
"Water Temperature": "temp",
32+
"Stage": "elev",
33+
"Conductivity": "ec",
34+
"Electrical Conductivity at 25C": "ec",
35+
"Fluorescent Dissolved Organic Matter": "fdom",
36+
"Water Temperature ADCP": "temp",
37+
"Dissolved Oxygen": "do",
38+
"Chlorophyll": "cla",
39+
"Dissolved Oxygen (%)": None,
40+
"Dissolved Oxygen Percentage": None,
41+
"Velocity": "velocity",
42+
"pH": "ph",
43+
"Turbidity": "turbidity",
44+
"Flow": "flow",
45+
"Salinity": "salinity",
46+
"ECat25C": "ec",
47+
"StreamFlow": "flow",
48+
"WaterTemp": "temp",
49+
"WaterTempADCP": "temp",
50+
"DissolvedOxygenPercentage": None,
51+
"StreamLevel": "elev",
52+
"WaterSurfaceElevationNAVD88": "elev",
53+
"fDOM": "fdom",
54+
}
55+
56+
57+
mapping_df = pd.DataFrame(list(mappings.items()), columns=["src_var_id", "var_name"])
58+
mapping_df['src_name'] = 'ncro'
59+
60+
def similar_ncro_station_names(site_id):
61+
"""This routine is here to convert a single site_id to a short list of related names.
62+
The reason for the routine is that NCRO surface water stations identifiers
63+
don't correspond well to our abstraction of a station.
64+
There are station ids that are stripped down B1234, or that have added 00 digits B9123400
65+
or that have added Q B91234Q
66+
"""
67+
if site_id.lower().endswith("q"):
68+
base_id = site_id[:-1]
69+
elif site_id.lower().endswith("00") and len(site_id)>6:
70+
base_id = site_id[:-2]
71+
else:
72+
base_id = site_id
73+
return [base_id.upper(), base_id.upper()+"Q", base_id.upper()+"00"]
74+
75+
76+
77+
78+
ncro_inventory_file = "ncro_por_inventory.txt"
79+
ncro_inventory = None
80+
inventory_dir = os.path.split(__file__)[0]
81+
inventoryfile = os.path.join(inventory_dir,"ncro_inventory_full.csv")
82+
83+
def load_inventory():
84+
global ncro_inventory,inventoryfile
85+
86+
if ncro_inventory is not None:
87+
return ncro_inventory
88+
if os.path.exists(inventoryfile) and (time.time() - os.stat(inventoryfile).st_mtime) < 6000.:
89+
ncro_inventory = pd.read_csv(inventoryfile,header=0,sep=",",\
90+
parse_dates=["start_time","end_time"])
91+
return(ncro_inventory)
92+
93+
url = "https://wdlhyd.water.ca.gov/hydstra/sites"
94+
95+
dbase = dstore_config.station_dbase()
96+
dbase = dbase.loc[dbase['agency'].str.contains('ncro'),:]
97+
98+
99+
session = requests.Session()
100+
response = session.get(url) #,verify=False, stream=False,headers={'User-Agent': 'Mozilla/6.0'})
101+
response.encoding = 'UTF-8'
102+
inventory_html = response.content.decode('utf-8')
103+
fio = io.StringIO(inventory_html)
104+
data = json.load(fio)
105+
sites = data['return']['sites']
106+
sites_df = pd.DataFrame(sites)
107+
108+
dfs = []
109+
for id,row in dbase.iterrows():
110+
agency_id = row.agency_id
111+
origname = agency_id
112+
names = similar_ncro_station_names(origname)
113+
114+
url2 = f"https://wdlhyd.water.ca.gov/hydstra/sites/{','.join(names)}/traces"
115+
response = session.get(url2) #,verify=False, stream=False,headers={'User-Agent': 'Mozilla/6.0'})
116+
response.encoding = 'UTF-8'
117+
inventory_html = response.content.decode('utf-8')
118+
fio2 = io.StringIO(inventory_html)
119+
data2 = json.load(fio2)
120+
121+
122+
# Flatten the JSON
123+
flattened_data = []
124+
for site in data2['return']['sites']:
125+
if site is None or not 'site' in site:
126+
json.dump(f'examplebad_{origname}.json',data2)
127+
logger.info("Bad file for {origname}")
128+
continue
129+
else:
130+
site_name = site['site']
131+
for trace in site['traces']:
132+
trace_data = trace.copy() # Avoid modifying the original JSON
133+
trace_data['site'] = site_name
134+
flattened_data.append(trace_data)
135+
136+
df2 = pd.DataFrame(flattened_data)
137+
if df2.empty:
138+
continue
139+
140+
df2 = df2.loc[df2['trace'].str.endswith("RAW"),:]
141+
df2['start_time'] = pd.to_datetime(df2.start_time)
142+
df2['end_time'] = pd.to_datetime(df2.end_time)
143+
dfs.append(df2)
144+
145+
df_full = pd.concat(dfs,axis=0)
146+
df_full = df_full.reset_index(drop=True)
147+
df_full.index.name = "index"
148+
df_full.to_csv(inventoryfile)
149+
ncro_inventory = df_full
150+
return df_full
151+
152+
def download_trace(tsrow,row,dest_dir,stime,etime):
153+
"""Download time series trace associated with one request"""
154+
155+
paramname = row.param
156+
site = tsrow.site
157+
trace = tsrow.trace
158+
url_trace = f"https://wdlhyd.water.ca.gov/hydstra/sites/{site}/traces/{trace}/points?start-time={stime.strftime('%Y%m%d%H%M%S')}&end-time={etime.strftime('%Y%m%d%H%M%S')}"
159+
max_attempt = 8
160+
#session = requests.Session()
161+
162+
attempt = 0
163+
while attempt < max_attempt:
164+
time.sleep(0.5)
165+
attempt = attempt + 1
166+
try:
167+
if attempt > 16:
168+
logger.info(f"{station_id} attempt {attempt}")
169+
if attempt > 16:
170+
logger.info(fname)
171+
logger.info(f"Submitting request to URL {url_trace} attempt {attempt}")
172+
#time1=time.time()
173+
streaming = False
174+
response = requests.get(url_trace, stream=streaming,timeout=200)
175+
response.raise_for_status()
176+
if streaming:
177+
pass
178+
#station_html = ""
179+
#for chunk in response.iter_lines(chunk_size=4096): # Iterate over lines
180+
# if chunk: # Filter out keep-alive new chunks
181+
# station_html += chunk.decode()+"\n"
182+
station_html = response.text.replace("\r", "")
183+
break
184+
except Exception as e:
185+
logger.info("Exception: " + str(e) )
186+
if attempt == max_attempt:
187+
station_html = None
188+
else:
189+
time.sleep(3) # Wait one second more second each time to clear any short term bad stuff
190+
if station_html is None:
191+
logger.debug(f"Empty return for site {site} trace {trace}")
192+
else:
193+
logger.debug(f"Query for site {site} trace {trace} produced data")
194+
site,site_details,trace_details,df = parse_json_to_series(station_html)
195+
fname = f"ncro_{row.station_id}_{site}_{paramname}_{stime.year}_{etime.year}.csv".lower()
196+
fpath = os.path.join(dest_dir, fname)
197+
meta = ncro_metadata(row.station_id,site,site_details,trace_details,paramname)
198+
write_ts_csv(df,fpath,metadata=meta,
199+
chunk_years=False,format_version="dwr-ncro-json")
200+
201+
202+
203+
def parse_json_to_series(json_txt):
204+
jsdata = json.loads(json_txt)
205+
206+
traces = jsdata['return']['traces']
207+
if len(traces) > 1: raise ValueError("Multiple trace json responses not supported")
208+
209+
# Preallocate lists for columns
210+
sites, times, values, qualities = [], [], [], []
211+
212+
# Populate the lists efficiently
213+
for trace_entry in traces:
214+
site = trace_entry['site']
215+
site_details = trace_entry['site_details']
216+
trace = trace_entry['trace']
217+
trace_details = trace_entry['trace_details']
218+
219+
for record in trace:
220+
times.append(record['t'])
221+
values.append(record['v'])
222+
qualities.append(record['q'])
223+
224+
# Create DataFrame directly from lists
225+
df = pd.DataFrame({
226+
'datetime': pd.to_datetime(times, format='%Y%m%d%H%M%S'), # Vectorized timestamp parsing
227+
'value': pd.to_numeric(values,errors='coerce'), # Vectorized conversion to float
228+
'qaqc_flag': qualities
229+
})
230+
231+
# Set 't' as the index
232+
df.set_index('datetime', inplace=True)
233+
234+
return site,site_details,trace_details,df
235+
236+
def ncro_metadata(station_id,agency_id,site_details,trace_details,paramname):
237+
meta = {}
238+
meta["provider"] = "DWR-NCRO"
239+
meta["station_id"] = station_id
240+
meta["agency_station_id"] = agency_id
241+
meta["agency_station_name"] = site_details['name']
242+
meta["agency_unit"] = trace_details['unit']
243+
meta["agency_param_desc"] = trace_details['desc']
244+
meta["param"] = paramname
245+
return meta
246+
247+
248+
249+
def ncro_download(stations,dest_dir,start,end=None,param=None,overwrite=False):
250+
""" Download robot for NCRO
251+
Requires a list of stations, destination directory and start/end date
252+
"""
253+
254+
if end == None: end = dt.datetime.now()
255+
if not os.path.exists(dest_dir):
256+
os.mkdir(dest_dir)
257+
258+
failures = []
259+
skips = []
260+
inventory = load_inventory()
261+
dbase = dstore_config.station_dbase()
262+
263+
futures = {}
264+
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
265+
for ndx,row in stations.iterrows():
266+
agency_id = row.agency_id
267+
station = row.station_id
268+
param = row.src_var_id
269+
paramname = row.param
270+
subloc = row.subloc
271+
272+
stime = pd.to_datetime(start)
273+
try:
274+
etime=pd.to_datetime(end)
275+
except:
276+
etime=pd.Timestamp.now()
277+
found = False
278+
279+
subinventory = inventory.loc[(inventory.site.isin(similar_ncro_station_names(row.agency_id))) &
280+
(inventory.param == param) &
281+
(inventory.start_time <= etime) &
282+
(inventory.end_time >= stime), : ]
283+
for tsndx,tsrow in subinventory.iterrows():
284+
site = tsrow.site
285+
trace = tsrow.trace
286+
paramname = row.param
287+
proposed_fname = f"ncro_{row.station_id}_{site}_{paramname}_{stime.year}_{etime.year}.csv".lower()
288+
if os.path.exists(os.path.join(dest_dir,proposed_fname)):
289+
if not overwrite:
290+
logger.info(f"skipping file {proposed_fname} because file exists in dest directory")
291+
continue
292+
293+
print(f"submitting {site} {trace}")
294+
future = executor.submit(
295+
download_trace,
296+
tsrow,
297+
row,
298+
dest_dir,
299+
stime,
300+
etime
301+
)
302+
futures[future] = (tsrow,row)
303+
304+
for future in concurrent.futures.as_completed(futures):
305+
try:
306+
_ = future.result() # This line can be used to handle results or exceptions from the tasks
307+
except Exception as e:
308+
logger.info(f"Exception occurred during download: {e}")
309+
310+
311+
def create_arg_parser():
312+
parser = argparse.ArgumentParser("Download NCRO data")
313+
314+
parser.add_argument('--dest', dest = "dest_dir", default="ncro_download", help = 'Destination directory for downloaded files.')
315+
parser.add_argument('--start',default=None,help = 'Start time, format 2009-03-31 14:00')
316+
parser.add_argument('--end',default = None,help = 'End time, format 2009-03-31 14:00')
317+
parser.add_argument('--param',default = None, help = 'Parameter(s) to be downloaded.')
318+
parser.add_argument('--stations', default=None, nargs="*", required=False,
319+
help='Id or name of one or more stations.')
320+
parser.add_argument('stationfile',nargs="*", help = 'CSV-format station file.')
321+
parser.add_argument('--overwrite', action="store_true", help =
322+
'Overwrite existing files (if False they will be skipped, presumably for speed)')
323+
return parser
324+
325+
326+
327+
328+
def main():
329+
parser = create_arg_parser()
330+
args = parser.parse_args()
331+
destdir = args.dest_dir
332+
stationfile = args.stationfile
333+
overwrite = args.overwrite
334+
start = args.start
335+
end = args.end
336+
if start is None:
337+
stime = pd.Timestamp(2024,1,1)
338+
else:
339+
stime = dt.datetime(*list(map(int, re.split(r'[^\d]', start))))
340+
if end is None:
341+
etime = dt.datetime.now()
342+
else:
343+
etime = dt.datetime(*list(map(int, re.split(r'[^\d]', end))))
344+
param = args.param
345+
346+
stationfile=stationfile_or_stations(args.stationfile,args.stations)
347+
slookup = dstore_config.config_file("station_dbase")
348+
vlookup = mapping_df
349+
#vlookup = dstore_config.config_file("variable_mappings")
350+
df = process_station_list(stationfile,param=param,station_lookup=slookup,
351+
agency_id_col="agency_id",param_lookup=vlookup,source='ncro')
352+
353+
ncro_download(df,destdir,stime,etime,overwrite=overwrite)
354+
355+
def test():
356+
destdir = "."
357+
overwrite = True
358+
stime = pd.Timestamp(2015,1,1)
359+
etime = dt.datetime.now()
360+
params = ["do","elev","flow","velocity","ph","cla","turbidity","temp"]
361+
params = ["fdom"]
362+
params = ["ec"]
363+
for param in params:
364+
stations = ["orm","old","oh1","bet"]
365+
stationfile=stationfile_or_stations(stationfile=None,stations=stations)
366+
slookup = dstore_config.config_file("station_dbase")
367+
vlookup = mapping_df
368+
#vlookup = dstore_config.config_file("variable_mappings")
369+
df = process_station_list(stationfile,param=param,station_lookup=slookup,
370+
agency_id_col="agency_id",param_lookup=vlookup,source='ncro')
371+
ncro_download(df,destdir,stime,etime,overwrite=overwrite)
372+
373+
def test_read():
374+
fname = "ncro_old_b95380_temp_2015_2024.csv"
375+
fname = "ncro_orm_b95370_cla_*.csv"
376+
ts = read_ts.read_ts(fname)
377+
print(ts)
378+
379+
if __name__ == "__main__":
380+
#main()
381+
test()
382+
#test_read()
383+
#df = load_inventory()
384+
#print(df)
385+
#download_ncro_period_record(df,dbase,dest="",variables=None)

0 commit comments

Comments
 (0)