diff --git a/src/extras/weather.jl b/src/extras/weather.jl index bd25ce63e..913a9b6d8 100644 --- a/src/extras/weather.jl +++ b/src/extras/weather.jl @@ -192,10 +192,13 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset= bv = BitVector(undef, length(request)) t = collect(request) for k = 1:numel(request) - bv[k] = (t[k] != ' ' && t[k] != '\n') + bv[k] = (t[k] != ' ' && t[k] != '\n' && t[k] != '\r') end + request[end] == ',' && (bv[end] = false) # Remove the last comma if present. It is not a valid JSON syntax t2 = join(t[bv]) if t2[1] == '{' && t2[end] == '}' # We got a JSON request string like the CDSAPI example + (t2[end-1] == ',') && (t2 = replace(t2, ",}" => "}")) # Shit, an extra comma in the end. Illegal JSON + t2 = replace(t2, ",]" => "]") # Python cdsapi examples are full of shits like this that cause JSON errors return "{\"inputs\":" * t2 * "}", "" else dataset = "" # Accept that no dataset name was provided if from clipboard @@ -203,8 +206,10 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset= ind2 = findfirst("request={", t2) dataset = string(t2[ind[end]+1:ind2[1]-2]) ind3 = findlast('}', t2) + t2 = t2[ind2[end]:ind3] * "}" end - return "{\"inputs\":" * t2[ind2[end]:ind3] * "}", dataset + t2 = replace(replace(t2, ",]" => "]"), ",}" => "}") # Just in case we have those extra commas + return "{\"inputs\":" * t2, dataset end end @@ -236,7 +241,7 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset= if isa(params, Vector) params = join(params, '\n') if (levlist != "") # Pressure levels are provided - pr = getdtp(levlist, "1000"); (pr == "e") && error("Unknown type for 'levlist'") + pr = getdtp(levlist, "1000"); (pr === nothing) && error("Unknown type for 'levlist'") sp = @sprintf("\"pressure_level\": [\"%s\"],\n", pr) sp = replace(sp, "[\"[" => "["); sp = replace(sp, "]\"]" => "]"); # Remove double [[ & ]] params *= sp @@ -326,7 +331,7 @@ function listecmwfvars(source::Symbol=:reanalysis; single::Bool=true, pressure:: ds = sort(d, by=first) header = (what == "fc") ? ["ID","Name","Units"] : ["ID","Long-Name (nc var name)","Name","Units"] align = (what == "fc") ? [:c,:l,:c] : [:c,:l,:l,:c] - t = (what == "fc") ? " (Forecast)" : " (ERA5)"; + t = (what == "fc") ? " (Forecast)" : " (ERA5)"; test && return nothing pretty_table(hcat(collect(keys(ds)),stack(values(ds), dims=1)), header=header, alignment=align, title=title_str * "-level variables" * t, title_alignment=:l, crop=:horizontal) @@ -370,14 +375,14 @@ function era5vars(varID::Union{Vector{String}, Vector{Symbol}}; single::Bool=tru @sprintf("\"variable\": [\n\t\"%s\"\n],", join([d[k][1] for k in _vars], "\",\n\t\"")) end -# ------------------------------------------------------------------------------------------------------------------------ +# --------------------------------------------------------------------------------------------------------------- function helper_ecmwf_vars(single::Bool, pressure::Bool, prefix::String) pressure && (single = false); !single && (pressure = true) dim_char = (single) ? "2" : "3"; title_str = ((single) ? "\nSingle" : "\nPressure") return include(joinpath(dirname(pathof(GMT)), "extras/" * prefix * "vars" * dim_char * "d.jl")), title_str end -# ------------------------------------------------------------------------------------------------------------------------ +# --------------------------------------------------------------------------------------------------------------- """ era5time(; year="", month="", day="", hour="") -> String @@ -406,17 +411,17 @@ var = era5time(year="2023") """ function era5time(; year="", month="", day="", hour="") _y, _m, _d, _h = agora() - yr = getdtp(year, _y); (yr == "e") && error("Unknown type for 'year'") - mo = getdtp(month, _m); (mo == "e") && error("Unknown type for 'month'") - dy = getdtp(day, _d); (dy == "e") && error("Unknown type for 'day'") - hr = getdtp(hour, _h); (hr == "e") && error("Unknown type for 'hour'") + yr = getdtp(year, [_y]); (yr === nothing) && error("Unknown type for 'year'") + mo = getdtp(month, [_m]); (mo === nothing) && error("Unknown type for 'month'") + dy = getdtp(day, [_d]); (dy === nothing) && error("Unknown type for 'day'") + hr = getdtp(hour, [_h]); (hr === nothing) && error("Unknown type for 'hour'") s = @sprintf("\"year\": [\"%s\"],\n\"month\": [\"%s\"],\n\"day\": [\"%s\"],\n\"time\": [\"%s\"],\n", yr, mo, dy, hr) s = replace(s, "[\"[" => "["); s = replace(s, "]\"]" => "]"); # Remove double [[ & ]] return s end -function getdtp(x, def) # used also in ecmwf() to get the pressure levels - (x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : "e" +function getdtp(x, def)::Union{Vector{String}, Nothing} # used also in ecmwf() to get the pressure levels + (x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : nothing end function agora() # Must put this in a separate function because I want to use the keywords year, month, etc @@ -424,18 +429,77 @@ function agora() # Must put this in a separate function because I want to use th return string(year(t)), string(month(t)), string(day(t)), string(hour(t)) end -# ------------------------------------------------------------------------------------------------------------------------ +# --------------------------------------------------------------------------------------------------------------- +#ecmwf(:forecast, vars=["10u", "2t"]) +""" +- `format`: The format in which to save the downloaded data. It can be "grib" or "netcdf". Default is "netcdf". +- `root`: The root URL of the CDS ERA5 dataset. Default is "https://data.ecmwf.int/forecasts". +- `levlist`: The pressure levels to select. It can be a string to select a unique pressure level, + or a vector of strings or Ints to select multiple pressure levels. +- `stream`: The stream to select. It can be one of: "oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf". Default is "oper". +- `type`: The type of forecast to select. +""" function ecmwf_fc(; filename="", levlist="", kw...) + d = KW(kw) + root = "https://data.ecmwf.int/forecasts" + if ((val = find_in_dict(d, [:root])[1]) !== nothing) # + root = string(val)::String + (!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL") + end + + # ---------- Stream ------------------------------------------------------ + stream::String = "oper" # Default is high-res + if ((val = find_in_dict(d, [:stream])[1]) !== nothing) # + stream = string(val)::String + !(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) && + error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"") + (stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model") + end + + # ---------- Variables --------------------------------------------------- vars::Union{String, Vector{String}} = "" - ((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing) && (vars = getdtp(val, "")) # <== TEST THAT VARS ARE VALID - step::String = ((val = find_in_dict(d, [:step])[1]) === nothing) ? "0" : string(val)::String; + if ((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing) + vars = getdtp(val, "") + srf = keys(helper_ecmwf_vars(true, false, "fc")[1]) # Single (surface) vars + pl = keys(helper_ecmwf_vars(false, true, "fc")[1]) # Pressure vars + check_var = zeros(Int, numel(vars)) + for k = 1:numel(vars) # Check if var is in srf or pl + if (vars[k] in srf) check_var[k] = 1 # A surface var + elseif (vars[k] in pl) check_var[k] = 2 # A pressure var + else error("Variable \"$(var[k])\" not found in either surface or pressure level datasets") + end + end + end + + # ---------- Levels ----------------------------------------------------- + levels::Union{Vector{String}, Nothing} = nothing + if (levlist != "") # Pressure levels are provided + list_levs = ["1000", "925", "850", "700", "500", "300", "250", "200", "50"] + levels = (levlist == "all") ? list_levs : getdtp(levlist, ""); (levels === nothing) && error("Unknown type for 'levlist'") + else + levels = ["1000"] + end + + step = ((val = find_in_dict(d, [:step :steps])[1]) === nothing) ? ["0"] : getdtp(val, ["0"]) + (step === nothing) && error("Unknown type for 'step'") model::String = ((val = find_in_dict(d, [:model])[1]) === nothing) ? "ifs" : lowercase(string(val)::String) - model = lowercase(model); (model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"") + (model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"") + (model == "aifs") && (model = "aifs-single") # This the name of the directory in the ECMWF servers + + # ---------- Forecast --------------------------------------------------- + type::String = "fc" # Default is forecast + if ((val = find_in_dict(d, [:type])[1]) !== nothing) # + type = lowercase(string(val)::String) + !(type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"") + end + format = (type == "tf") ? "bufr" : "grib2" # No choices here + + # ---------- Date ------------------------------------------------------- date::String, tim::String, _h::Int = "", "", 0 if ((val = find_in_dict(d, [:date])[1]) !== nothing) if (isa(val, String)) - data::String = val # To f the Any + data::String = val # To f the Any date = (length(data) == 8) ? data : (length(data) == 10 && contains(data, "-")) ? data[1:4] * data[6:7] * data[9:10] : error("When string, 'date' must be in the form YYYYMMDD") elseif (isa(val, Date) || isa(val, DateTime)) date = Dates.format(val, dateformat"yyyymmdd") @@ -445,10 +509,16 @@ function ecmwf_fc(; filename="", levlist="", kw...) (n > 3) && error(" Forecasts older than 3 days are no longer available") date = Dates.format(now()-Day(n), dateformat"yyyymmdd"); end - else + autodate = false + else # If no date is given, use the current date date = Dates.format(now(), dateformat"yyyymmdd"); _h = hour(now()) + !check_url(root * "/$date") && (date = Dates.format(now()-Day(1), dateformat"yyyymmdd"); _h = 18) # Too soon? Get yesterday's date + # And the 'ifs' dir exists too? + (_h == 18 && model == "ifs" && !check_url(root * "/$date/$tim" * "z/ifs")) && (tim = @sprintf("%.02d", (_h -= 6))) + autodate = true # To know if we need to check the time too. end - + + # ---------- Time ------------------------------------------------------- if ((val = find_in_dict(d, [:time])[1]) !== nothing) # Even if gotten above, a explicit mention to time takes precedence if (isa(val, String)) _h = parse(Int, val)::Int elseif (isa(val, Time)) _h = hour(val)::Int @@ -458,53 +528,125 @@ function ecmwf_fc(; filename="", levlist="", kw...) _h -= rem(_h, 6) # To get only 0, 6, 12 or 18 hours tim = @sprintf("%.02d", _h) - stream::String = "oper" # Default is high-res - if ((val = find_in_dict(d, [:stream])[1]) !== nothing) # - stream = string(val)::String - !(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) && - error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"") - (stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model") + # --------- Now test if this date/time is available in the server -------- + if (!check_url(root * "/$date/$(tim)z")) # Check if the URL exists + if (autodate) + found = false + while (_h > 0 && !found) # Check if the date is available in the server. If not, go back 6 hours and check again. + tim = @sprintf("%.02d", (_h -= 6)) + found = check_url(root * "/$date/$(tim)z") + end + # OK, but does the 'model' have this date/time? If not, go back another 6 hours and check again. + @label try_again + if (!check_url(root * "/$date/$(tim)z/$model")) + tim = @sprintf("%.02d", (_h -= 6)) + (!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6))) + (_h < 0) && (@info "Sorry, giving up trying to find the latest dataset for the requested parameters. Must pass a date/time manually."; return nothing) + else # And does the 'stream' exist? + if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream") # Ghrrr!, no. Must go back to another time + tim = @sprintf("%.02d", (_h -= 6)) + (!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6))) + else # OK, but does the .index file exist? + if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$(stream)/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).index") # F... + tim = @sprintf("%.02d", (_h -= 6)) + @goto try_again + end + end + end + else + @info "The requested combination date $date and time $tim is not available in the server. Either too soon or too old. \nManually check the $root to see what is available." + return nothing + end end - type::String = "fc" # Default is forecast - if ((val = find_in_dict(d, [:type])[1]) !== nothing) # - type = lowercase(string(val)::String) - (!type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"") + # All in all, does the base directory exist? (Above checks are not complete) + if (!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) + @info "The requested $root/$date/$(tim)z/$(model)/0p25/$stream directory does not (yet?) exists. You must manually check what's available." + return nothing end - format = (type == "tf") ? "bufr" : "grib2" # No choices here - root = "https://data.ecmwf.int/forecasts" - if ((val = find_in_dict(d, [:root])[1]) !== nothing) # - root = string(val)::String - (!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL") + # [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format] + if (vars == "") + println(root * "/$date/$(tim)z$(model)/0p25/$stream/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).$format") + return nothing end - # OK, now we have all the parameters to build the URL - # [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format] - if (vars != "") - tmp = tempname() - Downloads.download(root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).index", tmp) - off_len = zeros(Int, length(vars), 2) # Matrix{String}(undef, length(vars), 2) - fid = open(tmp) - iter = eachline(fid) + grdclip_cmd = parse_R(d, "")[1] * " -Sr1/1 -G" + EXT = (((val = find_in_dict(d, [:format])[1]) !== nothing) && val == "grib") ? ".grib" : ".grd" + + tmp = tempname() + for ns = 1:numel(step) # Loop over the steps + Downloads.download(root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).index", tmp) + + off_len = fish_var_off_len_in_index_file(tmp, vars, levels, check_var) # Find the 'offset' and 'length' of each variable + + url = root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).$format" + for k = 1:numel(vars) - _var = "\"$(vars[k])\"" - for it in iter - !contains(it, _var) && continue - s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY - off_len[k, 1], off_len[k, 2] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}' - break + @info "Downloading $(vars[k]) from $url and saving to $EXT format" + + if (check_var[k] == 2) # A Pressure level variable + for nl = 1:numel(levels) + start, len, stop = off_len[k,1,nl], off_len[k,2,nl], off_len[k,1,nl] + off_len[k,2,nl] - 1 # This var byte range + fname = vars[k] * "_step$(step[ns])_level$(levels[nl])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname + (EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url * grdclip_cmd * fname) : + run(`curl --show-error --range $(start)-$(stop) $url -o $fname`) + end + else + start, len, stop = off_len[k,1,1], off_len[k,2,1], off_len[k,1,1] + off_len[k,2,1] - 1 # This var byte range + fname = vars[k] * "_step$(step[ns])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname + (EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len),/vsicurl/" * url * grdclip_cmd * fname) : + run(`curl --show-error --range $(start)-$(stop) $url -o $fname`) end + #run(`curl --show-error --range $(start)-$(stop) $url -o $fname`) + #gdaltranslate("/vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url, save=vars[k] * "_step$(step[ns]).nc") end - close(fid) - rm(tmp) - url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format" - for k = 1:numel(vars) - fname = vars[k] * ".grib2" # This var file name - start, stop = off_len[k, 1], off_len[k, 1] + off_len[k, 2] - 1 # This var bite range - @info "Downloading $(fname) from $url" - run(`curl --show-error --range $(start)-$(stop) $url -o $fname`) + end + rm(tmp) +end + +function check_url(url::AbstractString)::Bool + # Check if the URL exists. Returns true if it does, false otherwise. + ret = true + try # Need a try catch because it screems when it fails + open(`curl -s --head --silent --fail $url`, "r", stdout) do io + split(readlines(io)[1], ',') + end + catch; ret = false + end + return ret +end + +function fish_var_off_len_in_index_file(fname, vars, levels, check_var) + # Find the 'offset' and 'length' of each variable in 'vars' in the index file 'fname' + # Return a 3D array of size (length(vars), 2, length(levels)) with the offsets and lengths in col1 and col2 + # The 3rd dimension is for the case where a variable spans over multiple levels. + off_len = zeros(Int, length(vars), 2, (levels === nothing) ? 1 : numel(levels)) + fid = open(fname) + iter = eachline(fid) + for k = 1:numel(vars) # Scan the index file to get the offset and length of each variable + first_lev = 1 + _var = "\"$(vars[k])\"" + for it in iter # Loop over the lines in the index file + !contains(it, _var) && continue + if (check_var[k] == 2) # A Pressure level variable + for lev = first_lev:numel(levels) + this_lev = "\"$(levels[lev])\"" + !contains(it, "\"levelist\": $this_lev") && @goto next_it # Not found, read next line + s = split(it, ',')[12:13] # Should contain the "_offset": XX, "_length": YY + off_len[k, 1, lev], off_len[k, 2, lev] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) + seekstart(fid) + iter = eachline(fid) + first_lev += 1 # So that next time this loop is executed it will start from the next level + end + else # A sigle level variable + s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY + off_len[k, 1, 1], off_len[k, 2, 1] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}' + end + break # No need to continue, we found what we were looking for + @label next_it end end - url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format" + close(fid) + return off_len end diff --git a/test/test_misc.jl b/test/test_misc.jl index 81eb04075..b451451b0 100644 --- a/test/test_misc.jl +++ b/test/test_misc.jl @@ -310,7 +310,7 @@ var = era5vars(["t2m", "skt"]); # "t2m" is the 2m temperature and "skt" is the skin temperature dt = era5time(hour=10:14); @test_throws ArgumentError ecmwf(dataset="reanalysis-era5-land", params=[var, dt], pressure=[1000, 900], region=(-10, 0, 30, 45), key="blabla") - ecmwf(:forecast); + ecmwf(:forecast, time=0, type="fc", stream="oper", levels=["1000", "925"]); # MB-System println(" MB-System")