Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 199 additions & 57 deletions src/extras/weather.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,24 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset=
bv = BitVector(undef, length(request))
t = collect(request)
for k = 1:numel(request)
bv[k] = (t[k] != ' ' && t[k] != '\n')
bv[k] = (t[k] != ' ' && t[k] != '\n' && t[k] != '\r')
end
request[end] == ',' && (bv[end] = false) # Remove the last comma if present. It is not a valid JSON syntax
t2 = join(t[bv])
if t2[1] == '{' && t2[end] == '}' # We got a JSON request string like the CDSAPI example
(t2[end-1] == ',') && (t2 = replace(t2, ",}" => "}")) # Shit, an extra comma in the end. Illegal JSON
t2 = replace(t2, ",]" => "]") # Python cdsapi examples are full of shits like this that cause JSON errors
return "{\"inputs\":" * t2 * "}", ""
else
dataset = "" # Accept that no dataset name was provided if from clipboard
if ((ind = findfirst("dataset=\"", t2)) !== nothing) # "import cdsapi\n\ndataset = \"reanalysis-era5-land\"\nrequest = {\n
ind2 = findfirst("request={", t2)
dataset = string(t2[ind[end]+1:ind2[1]-2])
ind3 = findlast('}', t2)
t2 = t2[ind2[end]:ind3] * "}"
end
return "{\"inputs\":" * t2[ind2[end]:ind3] * "}", dataset
t2 = replace(replace(t2, ",]" => "]"), ",}" => "}") # Just in case we have those extra commas
return "{\"inputs\":" * t2, dataset
end
end

Expand Down Expand Up @@ -236,7 +241,7 @@ function ecmwf(source::Symbol=:reanalysis; filename="", cb::Bool=false, dataset=
if isa(params, Vector)
params = join(params, '\n')
if (levlist != "") # Pressure levels are provided
pr = getdtp(levlist, "1000"); (pr == "e") && error("Unknown type for 'levlist'")
pr = getdtp(levlist, "1000"); (pr === nothing) && error("Unknown type for 'levlist'")
sp = @sprintf("\"pressure_level\": [\"%s\"],\n", pr)
sp = replace(sp, "[\"[" => "["); sp = replace(sp, "]\"]" => "]"); # Remove double [[ & ]]
params *= sp
Expand Down Expand Up @@ -326,7 +331,7 @@ function listecmwfvars(source::Symbol=:reanalysis; single::Bool=true, pressure::
ds = sort(d, by=first)
header = (what == "fc") ? ["ID","Name","Units"] : ["ID","Long-Name (nc var name)","Name","Units"]
align = (what == "fc") ? [:c,:l,:c] : [:c,:l,:l,:c]
t = (what == "fc") ? " (Forecast)" : " (ERA5)";
t = (what == "fc") ? " (Forecast)" : " (ERA5)";
test && return nothing
pretty_table(hcat(collect(keys(ds)),stack(values(ds), dims=1)), header=header, alignment=align,
title=title_str * "-level variables" * t, title_alignment=:l, crop=:horizontal)
Expand Down Expand Up @@ -370,14 +375,14 @@ function era5vars(varID::Union{Vector{String}, Vector{Symbol}}; single::Bool=tru
@sprintf("\"variable\": [\n\t\"%s\"\n],", join([d[k][1] for k in _vars], "\",\n\t\""))
end

# ------------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------
function helper_ecmwf_vars(single::Bool, pressure::Bool, prefix::String)
pressure && (single = false); !single && (pressure = true)
dim_char = (single) ? "2" : "3"; title_str = ((single) ? "\nSingle" : "\nPressure")
return include(joinpath(dirname(pathof(GMT)), "extras/" * prefix * "vars" * dim_char * "d.jl")), title_str
end

# ------------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------
"""
era5time(; year="", month="", day="", hour="") -> String

Expand Down Expand Up @@ -406,36 +411,95 @@ var = era5time(year="2023")
"""
function era5time(; year="", month="", day="", hour="")
_y, _m, _d, _h = agora()
yr = getdtp(year, _y); (yr == "e") && error("Unknown type for 'year'")
mo = getdtp(month, _m); (mo == "e") && error("Unknown type for 'month'")
dy = getdtp(day, _d); (dy == "e") && error("Unknown type for 'day'")
hr = getdtp(hour, _h); (hr == "e") && error("Unknown type for 'hour'")
yr = getdtp(year, [_y]); (yr === nothing) && error("Unknown type for 'year'")
mo = getdtp(month, [_m]); (mo === nothing) && error("Unknown type for 'month'")
dy = getdtp(day, [_d]); (dy === nothing) && error("Unknown type for 'day'")
hr = getdtp(hour, [_h]); (hr === nothing) && error("Unknown type for 'hour'")
s = @sprintf("\"year\": [\"%s\"],\n\"month\": [\"%s\"],\n\"day\": [\"%s\"],\n\"time\": [\"%s\"],\n", yr, mo, dy, hr)
s = replace(s, "[\"[" => "["); s = replace(s, "]\"]" => "]"); # Remove double [[ & ]]
return s
end

function getdtp(x, def) # used also in ecmwf() to get the pressure levels
(x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : "e"
function getdtp(x, def)::Union{Vector{String}, Nothing} # used also in ecmwf() to get the pressure levels
(x == "") ? def : (typeof(x) <: OrdinalRange) ? string.(collect(x)) : isa(x, Vector{Int}) ? string.(x) : isa(x, Vector{String}) ? x : isa(x, String) ? [x] : nothing
end

function agora() # Must put this in a separate function because I want to use the keywords year, month, etc
t = now()
return string(year(t)), string(month(t)), string(day(t)), string(hour(t))
end

# ------------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------------------
#ecmwf(:forecast, vars=["10u", "2t"])
"""
- `format`: The format in which to save the downloaded data. It can be "grib" or "netcdf". Default is "netcdf".
- `root`: The root URL of the CDS ERA5 dataset. Default is "https://data.ecmwf.int/forecasts".
- `levlist`: The pressure levels to select. It can be a string to select a unique pressure level,
or a vector of strings or Ints to select multiple pressure levels.
- `stream`: The stream to select. It can be one of: "oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf". Default is "oper".
- `type`: The type of forecast to select.
"""
function ecmwf_fc(; filename="", levlist="", kw...)

d = KW(kw)
root = "https://data.ecmwf.int/forecasts"
if ((val = find_in_dict(d, [:root])[1]) !== nothing) #
root = string(val)::String
(!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL")
end

# ---------- Stream ------------------------------------------------------
stream::String = "oper" # Default is high-res
if ((val = find_in_dict(d, [:stream])[1]) !== nothing) #
stream = string(val)::String
!(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) &&
error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"")
(stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model")
end

# ---------- Variables ---------------------------------------------------
vars::Union{String, Vector{String}} = ""
((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing) && (vars = getdtp(val, "")) # <== TEST THAT VARS ARE VALID
step::String = ((val = find_in_dict(d, [:step])[1]) === nothing) ? "0" : string(val)::String;
if ((val = find_in_dict(d, [:variable :var :vars :param])[1]) !== nothing)
vars = getdtp(val, "")
srf = keys(helper_ecmwf_vars(true, false, "fc")[1]) # Single (surface) vars
pl = keys(helper_ecmwf_vars(false, true, "fc")[1]) # Pressure vars
check_var = zeros(Int, numel(vars))
for k = 1:numel(vars) # Check if var is in srf or pl
if (vars[k] in srf) check_var[k] = 1 # A surface var
elseif (vars[k] in pl) check_var[k] = 2 # A pressure var
else error("Variable \"$(var[k])\" not found in either surface or pressure level datasets")
end
end
end

# ---------- Levels -----------------------------------------------------
levels::Union{Vector{String}, Nothing} = nothing
if (levlist != "") # Pressure levels are provided
list_levs = ["1000", "925", "850", "700", "500", "300", "250", "200", "50"]
levels = (levlist == "all") ? list_levs : getdtp(levlist, ""); (levels === nothing) && error("Unknown type for 'levlist'")
else
levels = ["1000"]
end

step = ((val = find_in_dict(d, [:step :steps])[1]) === nothing) ? ["0"] : getdtp(val, ["0"])
(step === nothing) && error("Unknown type for 'step'")
model::String = ((val = find_in_dict(d, [:model])[1]) === nothing) ? "ifs" : lowercase(string(val)::String)
model = lowercase(model); (model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"")
(model != "ifs" && model != "aifs") && error("Model must be either \"ifs\" or \"aifs\"")
(model == "aifs") && (model = "aifs-single") # This the name of the directory in the ECMWF servers

# ---------- Forecast ---------------------------------------------------
type::String = "fc" # Default is forecast
if ((val = find_in_dict(d, [:type])[1]) !== nothing) #
type = lowercase(string(val)::String)
!(type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"")
end
format = (type == "tf") ? "bufr" : "grib2" # No choices here

# ---------- Date -------------------------------------------------------
date::String, tim::String, _h::Int = "", "", 0
if ((val = find_in_dict(d, [:date])[1]) !== nothing)
if (isa(val, String))
data::String = val # To f the Any
data::String = val # To f the Any
date = (length(data) == 8) ? data : (length(data) == 10 && contains(data, "-")) ? data[1:4] * data[6:7] * data[9:10] : error("When string, 'date' must be in the form YYYYMMDD")
elseif (isa(val, Date) || isa(val, DateTime))
date = Dates.format(val, dateformat"yyyymmdd")
Expand All @@ -445,10 +509,16 @@ function ecmwf_fc(; filename="", levlist="", kw...)
(n > 3) && error(" Forecasts older than 3 days are no longer available")
date = Dates.format(now()-Day(n), dateformat"yyyymmdd");
end
else
autodate = false
else # If no date is given, use the current date
date = Dates.format(now(), dateformat"yyyymmdd"); _h = hour(now())
!check_url(root * "/$date") && (date = Dates.format(now()-Day(1), dateformat"yyyymmdd"); _h = 18) # Too soon? Get yesterday's date
# And the 'ifs' dir exists too?
(_h == 18 && model == "ifs" && !check_url(root * "/$date/$tim" * "z/ifs")) && (tim = @sprintf("%.02d", (_h -= 6)))
autodate = true # To know if we need to check the time too.
end


# ---------- Time -------------------------------------------------------
if ((val = find_in_dict(d, [:time])[1]) !== nothing) # Even if gotten above, a explicit mention to time takes precedence
if (isa(val, String)) _h = parse(Int, val)::Int
elseif (isa(val, Time)) _h = hour(val)::Int
Expand All @@ -458,53 +528,125 @@ function ecmwf_fc(; filename="", levlist="", kw...)
_h -= rem(_h, 6) # To get only 0, 6, 12 or 18 hours
tim = @sprintf("%.02d", _h)

stream::String = "oper" # Default is high-res
if ((val = find_in_dict(d, [:stream])[1]) !== nothing) #
stream = string(val)::String
!(stream in ["oper", "enfo", "waef", "wave", "scda", "scwv", "mmsf"]) &&
error("Unknown 'stream' code. Must be either: \"oper\", \"enfo\" , \"waef\", \"wave\", \"scda\", \"scwv\" or \"mmsf\"")
(stream != "oper") && (model != "ifs") && error("The 'stream' option is not available for the \"ifs\" model")
# --------- Now test if this date/time is available in the server --------
if (!check_url(root * "/$date/$(tim)z")) # Check if the URL exists
if (autodate)
found = false
while (_h > 0 && !found) # Check if the date is available in the server. If not, go back 6 hours and check again.
tim = @sprintf("%.02d", (_h -= 6))
found = check_url(root * "/$date/$(tim)z")
end
# OK, but does the 'model' have this date/time? If not, go back another 6 hours and check again.
@label try_again
if (!check_url(root * "/$date/$(tim)z/$model"))
tim = @sprintf("%.02d", (_h -= 6))
(!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6)))
(_h < 0) && (@info "Sorry, giving up trying to find the latest dataset for the requested parameters. Must pass a date/time manually."; return nothing)
else # And does the 'stream' exist?
if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream") # Ghrrr!, no. Must go back to another time
tim = @sprintf("%.02d", (_h -= 6))
(!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream")) && (tim = @sprintf("%.02d", (_h -= 6)))
else # OK, but does the .index file exist?
if !check_url(root * "/$date/$(tim)z/$(model)/0p25/$(stream)/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).index") # F...
tim = @sprintf("%.02d", (_h -= 6))
@goto try_again
end
end
end
else
@info "The requested combination date $date and time $tim is not available in the server. Either too soon or too old. \nManually check the $root to see what is available."
return nothing
end
end

type::String = "fc" # Default is forecast
if ((val = find_in_dict(d, [:type])[1]) !== nothing) #
type = lowercase(string(val)::String)
(!type in ["fc", "ef", "ep", "tf"]) && error("Unknown 'type' code. Must be either: \"fc\", \"ef\", \"ep\" or \"tf\"")
# All in all, does the base directory exist? (Above checks are not complete)
if (!check_url(root * "/$date/$(tim)z/$(model)/0p25/$stream"))
@info "The requested $root/$date/$(tim)z/$(model)/0p25/$stream directory does not (yet?) exists. You must manually check what's available."
return nothing
end
format = (type == "tf") ? "bufr" : "grib2" # No choices here

root = "https://data.ecmwf.int/forecasts"
if ((val = find_in_dict(d, [:root])[1]) !== nothing) #
root = string(val)::String
(!contains(root, "https://") && !contains(root, "ecmwf")) && error("The 'root' option must be a valid URL")
# [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format]
if (vars == "")
println(root * "/$date/$(tim)z$(model)/0p25/$stream/$(date)$(tim)0000-$(step[1])h-$(stream)-$(type).$format")
return nothing
end

# OK, now we have all the parameters to build the URL
# [ROOT]/[yyyymmdd]/[HH]z/[model]/[resol]/[stream]/[yyyymmdd][HH]0000-[step][U]-[stream]-[type].[format]
if (vars != "")
tmp = tempname()
Downloads.download(root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).index", tmp)
off_len = zeros(Int, length(vars), 2) # Matrix{String}(undef, length(vars), 2)
fid = open(tmp)
iter = eachline(fid)
grdclip_cmd = parse_R(d, "")[1] * " -Sr1/1 -G"
EXT = (((val = find_in_dict(d, [:format])[1]) !== nothing) && val == "grib") ? ".grib" : ".grd"

tmp = tempname()
for ns = 1:numel(step) # Loop over the steps
Downloads.download(root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).index", tmp)

off_len = fish_var_off_len_in_index_file(tmp, vars, levels, check_var) # Find the 'offset' and 'length' of each variable

url = root * "/$date/$(tim)z/$(model)/0p25/$stream/$(date)$(tim)0000-$(step[ns])h-$(stream)-$(type).$format"

for k = 1:numel(vars)
_var = "\"$(vars[k])\""
for it in iter
!contains(it, _var) && continue
s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY
off_len[k, 1], off_len[k, 2] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}'
break
@info "Downloading $(vars[k]) from $url and saving to $EXT format"

if (check_var[k] == 2) # A Pressure level variable
for nl = 1:numel(levels)
start, len, stop = off_len[k,1,nl], off_len[k,2,nl], off_len[k,1,nl] + off_len[k,2,nl] - 1 # This var byte range
fname = vars[k] * "_step$(step[ns])_level$(levels[nl])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname
(EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url * grdclip_cmd * fname) :
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
end
else
start, len, stop = off_len[k,1,1], off_len[k,2,1], off_len[k,1,1] + off_len[k,2,1] - 1 # This var byte range
fname = vars[k] * "_step$(step[ns])_$(model)_$(stream)_$(type)_$(date)_$(tim)$(EXT)" # This var fname
(EXT == ".grd") ? gmt("grdclip /vsisubfile/$(start)_$(len),/vsicurl/" * url * grdclip_cmd * fname) :
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
end
#run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
#gdaltranslate("/vsisubfile/$(start)_$(len)" * ",/vsicurl/" * url, save=vars[k] * "_step$(step[ns]).nc")
end
close(fid)
rm(tmp)
url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format"
for k = 1:numel(vars)
fname = vars[k] * ".grib2" # This var file name
start, stop = off_len[k, 1], off_len[k, 1] + off_len[k, 2] - 1 # This var bite range
@info "Downloading $(fname) from $url"
run(`curl --show-error --range $(start)-$(stop) $url -o $fname`)
end
rm(tmp)
end

function check_url(url::AbstractString)::Bool
# Check if the URL exists. Returns true if it does, false otherwise.
ret = true
try # Need a try catch because it screems when it fails
open(`curl -s --head --silent --fail $url`, "r", stdout) do io
split(readlines(io)[1], ',')
end
catch; ret = false
end
return ret
end

function fish_var_off_len_in_index_file(fname, vars, levels, check_var)
# Find the 'offset' and 'length' of each variable in 'vars' in the index file 'fname'
# Return a 3D array of size (length(vars), 2, length(levels)) with the offsets and lengths in col1 and col2
# The 3rd dimension is for the case where a variable spans over multiple levels.
off_len = zeros(Int, length(vars), 2, (levels === nothing) ? 1 : numel(levels))
fid = open(fname)
iter = eachline(fid)
for k = 1:numel(vars) # Scan the index file to get the offset and length of each variable
first_lev = 1
_var = "\"$(vars[k])\""
for it in iter # Loop over the lines in the index file
!contains(it, _var) && continue
if (check_var[k] == 2) # A Pressure level variable
for lev = first_lev:numel(levels)
this_lev = "\"$(levels[lev])\""
!contains(it, "\"levelist\": $this_lev") && @goto next_it # Not found, read next line
s = split(it, ',')[12:13] # Should contain the "_offset": XX, "_length": YY
off_len[k, 1, lev], off_len[k, 2, lev] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1])
seekstart(fid)
iter = eachline(fid)
first_lev += 1 # So that next time this loop is executed it will start from the next level
end
else # A sigle level variable
s = split(it, ',')[11:12] # Should contain the "_offset": XX, "_length": YY
off_len[k, 1, 1], off_len[k, 2, 1] = parse(Int, s[1][12:end]), parse(Int, s[2][12:end-1]) # end-1 becase last char is the '}'
end
break # No need to continue, we found what we were looking for
@label next_it
end
end
url = root * "/$date/$tim" * "z" * "/$model/0p25/$stream/$(date)$(tim)0000-$(step)h-$(stream)-$(type).$format"
close(fid)
return off_len
end
2 changes: 1 addition & 1 deletion test/test_misc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
var = era5vars(["t2m", "skt"]); # "t2m" is the 2m temperature and "skt" is the skin temperature
dt = era5time(hour=10:14);
@test_throws ArgumentError ecmwf(dataset="reanalysis-era5-land", params=[var, dt], pressure=[1000, 900], region=(-10, 0, 30, 45), key="blabla")
ecmwf(:forecast);
ecmwf(:forecast, time=0, type="fc", stream="oper", levels=["1000", "925"]);

# MB-System
println(" MB-System")
Expand Down
Loading