Skip to content

Commit 94649db

Browse files
authored
Improve the ecmwf function to deal with levlist variables. (#1721)
1 parent a10f86a commit 94649db

2 files changed

Lines changed: 200 additions & 58 deletions

File tree

src/extras/weather.jl

Lines changed: 199 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -192,19 +192,24 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset=
192192
bv = BitVector(undef, length(request))
193193
t = collect(request)
194194
for k = 1:numel(request)
195-
bv[k] = (t[k] != ' ' && t[k] != '\n')
195+
bv[k] = (t[k] != ' ' && t[k] != '\n' && t[k] != '\r')
196196
end
197+
request[end] == ',' && (bv[end] = false) # Remove the last comma if present. It is not a valid JSON syntax
197198
t2 = join(t[bv])
198199
if t2[1] == '{' && t2[end] == '}' # We got a JSON request string like the CDSAPI example
200+
(t2[end-1] == ',') && (t2 = replace(t2, ",}" => "}")) # Shit, an extra comma in the end. Illegal JSON
201+
t2 = replace(t2, ",]" => "]") # Python cdsapi examples are full of shits like this that cause JSON errors
199202
return "{\"inputs\":" * t2 * "}", ""
200203
else
201204
dataset = "" # Accept that no dataset name was provided if from clipboard
202205
if ((ind = findfirst("dataset=\"", t2)) !== nothing) # "import cdsapi\n\ndataset = \"reanalysis-era5-land\"\nrequest = {\n
203206
ind2 = findfirst("request={", t2)
204207
dataset = string(t2[ind[end]+1:ind2[1]-2])
205208
ind3 = findlast('}', t2)
209+
t2 = t2[ind2[end]:ind3] * "}"
206210
end
207-
return "{\"inputs\":" * t2[ind2[end]:ind3] * "}", dataset
211+
t2 = replace(replace(t2, ",]" => "]"), ",}" => "}") # Just in case we have those extra commas
212+
return "{\"inputs\":" * t2, dataset
208213
end
209214
end
210215

@@ -236,7 +241,7 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset=
236241
if isa(params, Vector)
237242
params = join(params, '\n')
238243
if (levlist != "") # Pressure levels are provided
239-
pr = getdtp(levlist, "1000"); (pr == "e") && error("Unknown type for 'levlist'")
244+
pr = getdtp(levlist, "1000"); (pr === nothing) && error("Unknown type for 'levlist'")
240245
sp = @sprintf("\"pressure_level\": [\"%s\"],\n", pr)
241246
sp = replace(sp, "[\"[" => "["); sp = replace(sp, "]\"]" => "]"); # Remove double [[ & ]]
242247
params *= sp
@@ -326,7 +331,7 @@ function listecmwfvars(source::Symbol=:reanalysis; single::Bool=true, pressure::
326331
ds = sort(d, by=first)
327332
header = (what == "fc") ? ["ID","Name","Units"] : ["ID","Long-Name (nc var name)","Name","Units"]
328333
align = (what == "fc") ? [:c,:l,:c] : [:c,:l,:l,:c]
329-
t = (what == "fc") ? " (Forecast)" : " (ERA5)";
334+
t = (what == "fc") ? " (Forecast)" : " (ERA5)";
330335
test && return nothing
331336
pretty_table(hcat(collect(keys(ds)),stack(values(ds), dims=1)), header=header, alignment=align,
332337
title=title_str * "-level variables" * t, title_alignment=:l, crop=:horizontal)
@@ -370,14 +375,14 @@ function era5vars(varID::Union{Vector{String}, Vector{Symbol}}; single::Bool=tru
370375
@sprintf("\"variable\": [\n\t\"%s\"\n],", join([d[k][1] for k in _vars], "\",\n\t\""))
371376
end
372377

373-
# ------------------------------------------------------------------------------------------------------------------------
378+
# ---------------------------------------------------------------------------------------------------------------
374379
function helper_ecmwf_vars(single::Bool, pressure::Bool, prefix::String)
375380
pressure && (single = false); !single && (pressure = true)
376381
dim_char = (single) ? "2" : "3"; title_str = ((single) ? "\nSingle" : "\nPressure")
377382
return include(joinpath(dirname(pathof(GMT)), "extras/" * prefix * "vars" * dim_char * "d.jl")), title_str
378383
end
379384

380-
# ------------------------------------------------------------------------------------------------------------------------
385+
# ---------------------------------------------------------------------------------------------------------------
381386
"""
382387
era5time(; year="", month="", day="", hour="") -> String
383388
@@ -406,36 +411,95 @@ var = era5time(year="2023")
406411
"""
407412
function era5time(; year="", month="", day="", hour="")
408413
_y, _m, _d, _h = agora()
409-
yr = getdtp(year, _y); (yr == "e") && error("Unknown type for 'year'")
410-
mo = getdtp(month, _m); (mo == "e") && error("Unknown type for 'month'")
411-
dy = getdtp(day, _d); (dy == "e") && error("Unknown type for 'day'")
412-
hr = getdtp(hour, _h); (hr == "e") && error("Unknown type for 'hour'")
414+
yr = getdtp(year, [_y]); (yr === nothing) && error("Unknown type for 'year'")
415+
mo = getdtp(month, [_m]); (mo === nothing) && error("Unknown type for 'month'")
416+
dy = getdtp(day, [_d]); (dy === nothing) && error("Unknown type for 'day'")
417+
hr = getdtp(hour, [_h]); (hr === nothing) && error("Unknown type for 'hour'")
413418
s = @sprintf("\"year\": [\"%s\"],\n\"month\": [\"%s\"],\n\"day\": [\"%s\"],\n\"time\": [\"%s\"],\n", yr, mo, dy, hr)
414419
s = replace(s, "[\"[" => "["); s = replace(s, "]\"]" => "]"); # Remove double [[ & ]]
415420
return s
416421
end
417422

418-
function getdtp(x, def) # used also in ecmwf() to get the pressure levels
419-
(x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : "e"
423+
function getdtp(x, def)::Union{Vector{String}, Nothing} # used also in ecmwf() to get the pressure levels
424+
(x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : nothing
420425
end
421426

422427
function agora() # Must put this in a separate function because I want to use the keywords year, month, etc
423428
t = now()
424429
return string(year(t)), string(month(t)), string(day(t)), string(hour(t))
425430
end
426431

427-
# ------------------------------------------------------------------------------------------------------------------------
432+
# ---------------------------------------------------------------------------------------------------------------
433+
#ecmwf(:forecast, vars=["10u", "2t"])
434+
"""
435+
- `format`: The format in which to save the downloaded data. It can be "grib" or "netcdf". Default is "netcdf".
436+
- `root`: The root URL of the CDS ERA5 dataset. Default is "https://data.ecmwf.int/forecasts".
437+
- `levlist`: The pressure levels to select. It can be a string to select a unique pressure level,
438+
or a vector of strings or Ints to select multiple pressure levels.
439+
- `stream`: The stream to select. It can be one of: "oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf". Default is "oper".
440+
- `type`: The type of forecast to select.
441+
"""
428442
function ecmwf_fc(; filename="", levlist="", kw...)
443+
429444
d = KW(kw)
445+
root = "https://data.ecmwf.int/forecasts"
446+
if ((val = find_in_dict(d, [:root])[1]) !== nothing) #
447+
root = string(val)::String
448+
(!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL")
449+
end
450+
451+
# ---------- Stream ------------------------------------------------------
452+
stream::String = "oper" # Default is high-res
453+
if ((val = find_in_dict(d, [:stream])[1]) !== nothing) #
454+
stream = string(val)::String
455+
!(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) &&
456+
error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"")
457+
(stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model")
458+
end
459+
460+
# ---------- Variables ---------------------------------------------------
430461
vars::Union{String, Vector{String}} = ""
431-
((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing) && (vars = getdtp(val, "")) # <== TEST THAT VARS ARE VALID
432-
step::String = ((val = find_in_dict(d, [:step])[1]) === nothing) ? "0" : string(val)::String;
462+
if ((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing)
463+
vars = getdtp(val, "")
464+
srf = keys(helper_ecmwf_vars(true, false, "fc")[1]) # Single (surface) vars
465+
pl = keys(helper_ecmwf_vars(false, true, "fc")[1]) # Pressure vars
466+
check_var = zeros(Int, numel(vars))
467+
for k = 1:numel(vars) # Check if var is in srf or pl
468+
if (vars[k] in srf) check_var[k] = 1 # A surface var
469+
elseif (vars[k] in pl) check_var[k] = 2 # A pressure var
470+
else error("Variable \"$(var[k])\" not found in either surface or pressure level datasets")
471+
end
472+
end
473+
end
474+
475+
# ---------- Levels -----------------------------------------------------
476+
levels::Union{Vector{String}, Nothing} = nothing
477+
if (levlist != "") # Pressure levels are provided
478+
list_levs = ["1000", "925", "850", "700", "500", "300", "250", "200", "50"]
479+
levels = (levlist == "all") ? list_levs : getdtp(levlist, ""); (levels === nothing) && error("Unknown type for 'levlist'")
480+
else
481+
levels = ["1000"]
482+
end
483+
484+
step = ((val = find_in_dict(d, [:step :steps])[1]) === nothing) ? ["0"] : getdtp(val, ["0"])
485+
(step === nothing) && error("Unknown type for 'step'")
433486
model::String = ((val = find_in_dict(d, [:model])[1]) === nothing) ? "ifs" : lowercase(string(val)::String)
434-
model = lowercase(model); (model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"")
487+
(model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"")
488+
(model == "aifs") && (model = "aifs-single") # This the name of the directory in the ECMWF servers
489+
490+
# ---------- Forecast ---------------------------------------------------
491+
type::String = "fc" # Default is forecast
492+
if ((val = find_in_dict(d, [:type])[1]) !== nothing) #
493+
type = lowercase(string(val)::String)
494+
!(type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"")
495+
end
496+
format = (type == "tf") ? "bufr" : "grib2" # No choices here
497+
498+
# ---------- Date -------------------------------------------------------
435499
date::String, tim::String, _h::Int = "", "", 0
436500
if ((val = find_in_dict(d, [:date])[1]) !== nothing)
437501
if (isa(val, String))
438-
data::String = val # To f the Any
502+
data::String = val # To f the Any
439503
date = (length(data) == 8) ? data : (length(data) == 10 && contains(data, "-")) ? data[1:4] * data[6:7] * data[9:10] : error("When string, 'date' must be in the form YYYYMMDD")
440504
elseif (isa(val, Date) || isa(val, DateTime))
441505
date = Dates.format(val, dateformat"yyyymmdd")
@@ -445,10 +509,16 @@ function ecmwf_fc(; filename="", levlist="", kw...)
445509
(n > 3) && error(" Forecasts older than 3 days are no longer available")
446510
date = Dates.format(now()-Day(n), dateformat"yyyymmdd");
447511
end
448-
else
512+
autodate = false
513+
else # If no date is given, use the current date
449514
date = Dates.format(now(), dateformat"yyyymmdd"); _h = hour(now())
515+
!check_url(root * "/$date") && (date = Dates.format(now()-Day(1), dateformat"yyyymmdd"); _h = 18) # Too soon? Get yesterday's date
516+
# And the 'ifs' dir exists too?
517+
(_h == 18 && model == "ifs" && !check_url(root * "/$date/$tim" * "z/ifs")) && (tim = @sprintf("%.02d", (_h -= 6)))
518+
autodate = true # To know if we need to check the time too.
450519
end
451-
520+
521+
# ---------- Time -------------------------------------------------------
452522
if ((val = find_in_dict(d, [:time])[1]) !== nothing) # Even if gotten above, a explicit mention to time takes precedence
453523
if (isa(val, String)) _h = parse(Int, val)::Int
454524
elseif (isa(val, Time)) _h = hour(val)::Int
@@ -458,53 +528,125 @@ function ecmwf_fc(; filename="", levlist="", kw...)
458528
_h -= rem(_h, 6) # To get only 0, 6, 12 or 18 hours
459529
tim = @sprintf("%.02d", _h)
460530

461-
stream::String = "oper" # Default is high-res
462-
if ((val = find_in_dict(d, [:stream])[1]) !== nothing) #
463-
stream = string(val)::String
464-
!(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) &&
465-
error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"")
466-
(stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model")
531+
# --------- Now test if this date/time is available in the server --------
532+
if (!check_url(root * "/$date/$(tim)z")) # Check if the URL exists
533+
if (autodate)
534+
found = false
535+
while (_h > 0 && !found) # Check if the date is available in the server. If not, go back 6 hours and check again.
536+
tim = @sprintf("%.02d", (_h -= 6))
537+
found = check_url(root * "/$date/$(tim)z")
538+
end
539+
# OK, but does the 'model' have this date/time? If not, go back another 6 hours and check again.
540+
@label try_again
541+
if (!check_url(root * "/$date/$(tim)z/$model"))
542+
tim = @sprintf("%.02d", (_h -= 6))
543+
(!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6)))
544+
(_h < 0) && (@info "Sorry, giving up trying to find the latest dataset for the requested parameters. Must pass a date/time manually."; return nothing)
545+
else # And does the 'stream' exist?
546+
if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream") # Ghrrr!, no. Must go back to another time
547+
tim = @sprintf("%.02d", (_h -= 6))
548+
(!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6)))
549+
else # OK, but does the .index file exist?
550+
if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$(stream)/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).index") # F...
551+
tim = @sprintf("%.02d", (_h -= 6))
552+
@goto try_again
553+
end
554+
end
555+
end
556+
else
557+
@info "The requested combination date $date and time $tim is not available in the server. Either too soon or too old. \nManually check the $root to see what is available."
558+
return nothing
559+
end
467560
end
468561

469-
type::String = "fc" # Default is forecast
470-
if ((val = find_in_dict(d, [:type])[1]) !== nothing) #
471-
type = lowercase(string(val)::String)
472-
(!type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"")
562+
# All in all, does the base directory exist? (Above checks are not complete)
563+
if (!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream"))
564+
@info "The requested $root/$date/$(tim)z/$(model)/0p25/$stream directory does not (yet?) exists. You must manually check what's available."
565+
return nothing
473566
end
474-
format = (type == "tf") ? "bufr" : "grib2" # No choices here
475567

476-
root = "https://data.ecmwf.int/forecasts"
477-
if ((val = find_in_dict(d, [:root])[1]) !== nothing) #
478-
root = string(val)::String
479-
(!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL")
568+
# [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format]
569+
if (vars == "")
570+
println(root * "/$date/$(tim)z$(model)/0p25/$stream/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).$format")
571+
return nothing
480572
end
481573

482-
# OK, now we have all the parameters to build the URL
483-
# [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format]
484-
if (vars != "")
485-
tmp = tempname()
486-
Downloads.download(root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).index", tmp)
487-
off_len = zeros(Int, length(vars), 2) # Matrix{String}(undef, length(vars), 2)
488-
fid = open(tmp)
489-
iter = eachline(fid)
574+
grdclip_cmd = parse_R(d, "")[1] * " -Sr1/1 -G"
575+
EXT = (((val = find_in_dict(d, [:format])[1]) !== nothing) && val == "grib") ? ".grib" : ".grd"
576+
577+
tmp = tempname()
578+
for ns = 1:numel(step) # Loop over the steps
579+
Downloads.download(root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).index", tmp)
580+
581+
off_len = fish_var_off_len_in_index_file(tmp, vars, levels, check_var) # Find the 'offset' and 'length' of each variable
582+
583+
url = root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).$format"
584+
490585
for k = 1:numel(vars)
491-
_var = "\"$(vars[k])\""
492-
for it in iter
493-
!contains(it, _var) && continue
494-
s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY
495-
off_len[k, 1], off_len[k, 2] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}'
496-
break
586+
@info "Downloading $(vars[k]) from $url and saving to $EXT format"
587+
588+
if (check_var[k] == 2) # A Pressure level variable
589+
for nl = 1:numel(levels)
590+
start, len, stop = off_len[k,1,nl], off_len[k,2,nl], off_len[k,1,nl] + off_len[k,2,nl] - 1 # This var byte range
591+
fname = vars[k] * "_step$(step[ns])_level$(levels[nl])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname
592+
(EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url * grdclip_cmd * fname) :
593+
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
594+
end
595+
else
596+
start, len, stop = off_len[k,1,1], off_len[k,2,1], off_len[k,1,1] + off_len[k,2,1] - 1 # This var byte range
597+
fname = vars[k] * "_step$(step[ns])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname
598+
(EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len),/vsicurl/" * url * grdclip_cmd * fname) :
599+
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
497600
end
601+
#run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
602+
#gdaltranslate("/vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url, save=vars[k] * "_step$(step[ns]).nc")
498603
end
499-
close(fid)
500-
rm(tmp)
501-
url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format"
502-
for k = 1:numel(vars)
503-
fname = vars[k] * ".grib2" # This var file name
504-
start, stop = off_len[k, 1], off_len[k, 1] + off_len[k, 2] - 1 # This var bite range
505-
@info "Downloading $(fname) from $url"
506-
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
604+
end
605+
rm(tmp)
606+
end
607+
608+
function check_url(url::AbstractString)::Bool
609+
# Check if the URL exists. Returns true if it does, false otherwise.
610+
ret = true
611+
try # Need a try catch because it screems when it fails
612+
open(`curl -s --head --silent --fail $url`, "r", stdout) do io
613+
split(readlines(io)[1], ',')
614+
end
615+
catch; ret = false
616+
end
617+
return ret
618+
end
619+
620+
function fish_var_off_len_in_index_file(fname, vars, levels, check_var)
621+
# Find the 'offset' and 'length' of each variable in 'vars' in the index file 'fname'
622+
# Return a 3D array of size (length(vars), 2, length(levels)) with the offsets and lengths in col1 and col2
623+
# The 3rd dimension is for the case where a variable spans over multiple levels.
624+
off_len = zeros(Int, length(vars), 2, (levels === nothing) ? 1 : numel(levels))
625+
fid = open(fname)
626+
iter = eachline(fid)
627+
for k = 1:numel(vars) # Scan the index file to get the offset and length of each variable
628+
first_lev = 1
629+
_var = "\"$(vars[k])\""
630+
for it in iter # Loop over the lines in the index file
631+
!contains(it, _var) && continue
632+
if (check_var[k] == 2) # A Pressure level variable
633+
for lev = first_lev:numel(levels)
634+
this_lev = "\"$(levels[lev])\""
635+
!contains(it, "\"levelist\": $this_lev") && @goto next_it # Not found, read next line
636+
s = split(it, ',')[12:13] # Should contain the "_offset": XX, "_length": YY
637+
off_len[k, 1, lev], off_len[k, 2, lev] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1])
638+
seekstart(fid)
639+
iter = eachline(fid)
640+
first_lev += 1 # So that next time this loop is executed it will start from the next level
641+
end
642+
else # A sigle level variable
643+
s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY
644+
off_len[k, 1, 1], off_len[k, 2, 1] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}'
645+
end
646+
break # No need to continue, we found what we were looking for
647+
@label next_it
507648
end
508649
end
509-
url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format"
650+
close(fid)
651+
return off_len
510652
end

test/test_misc.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@
310310
var = era5vars(["t2m", "skt"]); # "t2m" is the 2m temperature and "skt" is the skin temperature
311311
dt = era5time(hour=10:14);
312312
@test_throws ArgumentError ecmwf(dataset="reanalysis-era5-land", params=[var, dt], pressure=[1000, 900], region=(-10, 0, 30, 45), key="blabla")
313-
ecmwf(:forecast);
313+
ecmwf(:forecast, time=0, type="fc", stream="oper", levels=["1000", "925"]);
314314

315315
# MB-System
316316
println(" MB-System")

0 commit comments

Comments
 (0)