Skip to content

Commit b6aea71

Browse files
committed
Improve parallel scenario runs / result reads
1 parent 1cf4308 commit b6aea71

7 files changed

Lines changed: 462 additions & 77 deletions

DESCRIPTION

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,23 @@ Imports:
1818
dplyr,
1919
forcats,
2020
fs,
21+
future,
22+
future.apply,
2123
ggplot2,
2224
hdf5r,
2325
kwb.event,
2426
kwb.utils,
2527
lubridate,
2628
magrittr,
29+
parallel,
2730
plotly,
31+
progressr,
2832
purrr,
2933
stringr,
3034
tibble
3135
Suggests:
3236
covr,
3337
DT,
34-
future,
35-
future.apply,
3638
knitr,
3739
rmarkdown
3840
VignetteBuilder:

NAMESPACE

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export(add_overflow_events_and_waterbalance)
55
export(find_single_param_variations)
66
export(get_simulation_results_all)
77
export(get_simulation_results_optim)
8+
export(get_simulation_results_optim_parallel)
89
export(h5_ensure_dataset)
910
export(h5_ensure_datasets_from_values)
1011
export(h5_read_values)
@@ -20,6 +21,7 @@ export(read_hdf5_scalars)
2021
export(read_hdf5_timeseries)
2122
export(read_raindrop_errors)
2223
export(run_model)
24+
export(run_scenarios)
2325
importFrom(dplyr,"%>%")
2426
importFrom(dplyr,across)
2527
importFrom(dplyr,all_of)
@@ -45,6 +47,10 @@ importFrom(fs,dir_create)
4547
importFrom(fs,file_copy)
4648
importFrom(fs,file_exists)
4749
importFrom(fs,path_abs)
50+
importFrom(future,multisession)
51+
importFrom(future,plan)
52+
importFrom(future,sequential)
53+
importFrom(future.apply,future_lapply)
4854
importFrom(ggplot2,aes)
4955
importFrom(ggplot2,coord_cartesian)
5056
importFrom(ggplot2,element_text)
@@ -72,7 +78,14 @@ importFrom(kwb.utils,catAndRun)
7278
importFrom(kwb.utils,resolve)
7379
importFrom(lubridate,as_datetime)
7480
importFrom(magrittr,"%>%")
81+
importFrom(parallel,detectCores)
7582
importFrom(plotly,ggplotly)
83+
importFrom(progressr,handler_cli)
84+
importFrom(progressr,handler_rstudio)
85+
importFrom(progressr,handler_txtprogressbar)
86+
importFrom(progressr,handlers)
87+
importFrom(progressr,progressor)
88+
importFrom(progressr,with_progress)
7689
importFrom(purrr,map_chr)
7790
importFrom(purrr,map_dfr)
7891
importFrom(rlang,.data)
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#' Read Raindrop optimisation simulation results from HDF5
2+
#' (parallel via future.apply + progress)
3+
#'
4+
#' Parallel variant of \code{get_simulation_results_optim()} using
5+
#' \code{future.apply::future_lapply()} including progress reporting
6+
#' via \code{progressr}.
7+
#'
8+
#' @inheritParams get_simulation_results_optim
9+
#' @param workers Optional number of parallel workers. If not NULL,
10+
#' a temporary \code{future::multisession} plan is set.
11+
#' @param show_progress Logical (default TRUE).
12+
#' @param future_seed Passed to \code{future.apply::future_lapply()}.
13+
#'
14+
#' @return Named list (see \code{\link{get_simulation_results_optim}}).
15+
#'
16+
#' @export
17+
#' @importFrom stats setNames
18+
#' @importFrom hdf5r H5File
19+
#' @importFrom future plan multisession
20+
#' @importFrom future.apply future_lapply
21+
#' @importFrom progressr with_progress progressor handlers
22+
#' @importFrom kwb.utils resolve
23+
get_simulation_results_optim_parallel <- function(paths,
24+
path_list,
25+
simulation_names,
26+
debug = TRUE,
27+
workers = NULL,
28+
show_progress = TRUE,
29+
future_seed = TRUE) {
30+
31+
if (!requireNamespace("future.apply", quietly = TRUE)) {
32+
stop("Package 'future.apply' is required.")
33+
}
34+
if (!requireNamespace("future", quietly = TRUE)) {
35+
stop("Package 'future' is required.")
36+
}
37+
38+
message(sprintf(
39+
"Reading results files in parallel ('%s') for %d model runs%s",
40+
paste0(c(paths$file_results_hdf5_element,
41+
paths$file_results_hdf5_flaeche), collapse = "|"),
42+
length(simulation_names),
43+
if (!is.null(workers)) sprintf(" (workers=%d)", workers) else ""
44+
))
45+
46+
# Optional temporary future plan
47+
old_plan <- NULL
48+
if (!is.null(workers)) {
49+
old_plan <- future::plan()
50+
on.exit(future::plan(old_plan), add = TRUE)
51+
future::plan(future::multisession, workers = workers)
52+
}
53+
54+
n <- length(simulation_names)
55+
56+
# ---- progressr integration ----
57+
if (show_progress) {
58+
if (!requireNamespace("progressr", quietly = TRUE)) {
59+
stop("Package 'progressr' required for progress reporting.")
60+
}
61+
progressr::handlers(global = TRUE)
62+
}
63+
64+
res_list <- if (show_progress) {
65+
66+
progressr::with_progress({
67+
68+
p <- progressr::progressor(steps = n)
69+
70+
future.apply::future_lapply(
71+
X = seq_along(simulation_names),
72+
FUN = function(i) {
73+
74+
s_name <- simulation_names[[i]]
75+
76+
p(sprintf("Reading %s (%d/%d)", s_name, i, n))
77+
78+
run_paths <- kwb.utils::resolve(path_list, dir_target = s_name)
79+
80+
if (!all(file.exists(c(run_paths$path_results_hdf5_element,
81+
run_paths$path_results_hdf5_flaeche)))) {
82+
if (isTRUE(debug)) {
83+
message(sprintf("Missing files for %s -> returning NULL", s_name))
84+
}
85+
return(NULL)
86+
}
87+
88+
if (isTRUE(debug)) {
89+
message(sprintf("Reading HDF5 for %s (%s)",
90+
s_name, run_paths$dir_target_output))
91+
}
92+
93+
res_hdf5_element <- hdf5r::H5File$new(
94+
run_paths$path_results_hdf5_element, mode = "r"
95+
)
96+
res_hdf5_flaeche <- hdf5r::H5File$new(
97+
run_paths$path_results_hdf5_flaeche, mode = "r"
98+
)
99+
100+
on.exit({
101+
try(res_hdf5_element$close_all(), silent = TRUE)
102+
try(res_hdf5_flaeche$close_all(), silent = TRUE)
103+
}, add = TRUE)
104+
105+
list(
106+
element = list(
107+
meta = read_hdf5_scalars(
108+
res_hdf5_element[["Metainfo"]], numeric_only = FALSE
109+
),
110+
rates = read_hdf5_timeseries(
111+
res_hdf5_element[["Raten"]]
112+
),
113+
water_balance = read_hdf5_scalars(
114+
res_hdf5_element[["Wasserbilanz"]]
115+
),
116+
states = read_hdf5_timeseries(
117+
res_hdf5_element[["Zustandsvariablen"]]
118+
)
119+
),
120+
connected_area = list(
121+
meta = read_hdf5_scalars(
122+
res_hdf5_flaeche[["Metainfo"]], numeric_only = FALSE
123+
),
124+
rates = read_hdf5_timeseries(
125+
res_hdf5_flaeche[["Raten"]]
126+
),
127+
water_balance = read_hdf5_scalars(
128+
res_hdf5_flaeche[["Wasserbilanz"]]
129+
),
130+
states = read_hdf5_timeseries(
131+
res_hdf5_flaeche[["Zustandsvariablen"]]
132+
)
133+
)
134+
)
135+
},
136+
future.seed = future_seed
137+
)
138+
})
139+
140+
} else {
141+
142+
future.apply::future_lapply(
143+
X = seq_along(simulation_names),
144+
FUN = function(i) {
145+
s_name <- simulation_names[[i]]
146+
147+
run_paths <- kwb.utils::resolve(path_list, dir_target = s_name)
148+
149+
if (!all(file.exists(c(run_paths$path_results_hdf5_element,
150+
run_paths$path_results_hdf5_flaeche)))) {
151+
return(NULL)
152+
}
153+
154+
res_hdf5_element <- hdf5r::H5File$new(
155+
run_paths$path_results_hdf5_element, mode = "r"
156+
)
157+
res_hdf5_flaeche <- hdf5r::H5File$new(
158+
run_paths$path_results_hdf5_flaeche, mode = "r"
159+
)
160+
161+
on.exit({
162+
try(res_hdf5_element$close_all(), silent = TRUE)
163+
try(res_hdf5_flaeche$close_all(), silent = TRUE)
164+
}, add = TRUE)
165+
166+
list(
167+
element = list(
168+
meta = read_hdf5_scalars(res_hdf5_element[["Metainfo"]], numeric_only = FALSE),
169+
rates = read_hdf5_timeseries(res_hdf5_element[["Raten"]]),
170+
water_balance = read_hdf5_scalars(res_hdf5_element[["Wasserbilanz"]]),
171+
states = read_hdf5_timeseries(res_hdf5_element[["Zustandsvariablen"]])
172+
),
173+
connected_area = list(
174+
meta = read_hdf5_scalars(res_hdf5_flaeche[["Metainfo"]], numeric_only = FALSE),
175+
rates = read_hdf5_timeseries(res_hdf5_flaeche[["Raten"]]),
176+
water_balance = read_hdf5_scalars(res_hdf5_flaeche[["Wasserbilanz"]]),
177+
states = read_hdf5_timeseries(res_hdf5_flaeche[["Zustandsvariablen"]])
178+
)
179+
)
180+
},
181+
future.seed = future_seed
182+
)
183+
}
184+
185+
stats::setNames(res_list, simulation_names)
186+
}

R/run_scenarios.R

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#' Run scenarios (parallel or sequential) with a user-supplied worker function
2+
#'
3+
#' Executes scenarios by applying \code{run_one_scenario} to each element of
4+
#' \code{indices}. Supports parallel execution via \pkg{future.apply} and
5+
#' sequential execution for debugging. Optionally shows progress via
6+
#' \pkg{progressr}.
7+
#'
8+
#' @param indices Vector. Scenario identifiers to iterate over (often integer
9+
#' row indices).
10+
#' @param run_one_scenario Function. Worker function with signature
11+
#' \code{function(x, timestep_hours, debug, ...)}. Must accept the arguments
12+
#' \code{timestep_hours} and \code{debug}.
13+
#' @param timestep_hours Numeric. Time step (hours) forwarded to
14+
#' \code{run_one_scenario}.
15+
#' @param debug Logical. Debug flag forwarded to \code{run_one_scenario}.
16+
#' @param ... Additional arguments forwarded to \code{run_one_scenario}.
17+
#' @param parallel Logical. If \code{TRUE}, use \code{future.apply::future_lapply}.
18+
#' If \code{FALSE}, use base \code{lapply}.
19+
#' @param workers Integer. Number of workers when \code{parallel = TRUE}.
20+
#' Defaults to \code{parallel::detectCores()}.
21+
#' @param show_progress Logical. If \code{TRUE}, show progress.
22+
#' @param progress_handler Character. Progress handler key. One of
23+
#' \code{"txtprogressbar"}, \code{"rstudio"}, \code{"cli"}.
24+
#'
25+
#' @return A list with one element per \code{indices} entry containing the
26+
#' return values of \code{run_one_scenario}.
27+
#'
28+
#' @importFrom future plan multisession sequential
29+
#' @importFrom future.apply future_lapply
30+
#' @importFrom parallel detectCores
31+
#' @importFrom progressr with_progress progressor handler_txtprogressbar handler_rstudio handler_cli
32+
#' @export
33+
run_scenarios <- function(indices,
34+
run_one_scenario,
35+
timestep_hours,
36+
debug = FALSE,
37+
...,
38+
parallel = TRUE,
39+
workers = parallel::detectCores(),
40+
show_progress = TRUE,
41+
progress_handler = "txtprogressbar") {
42+
43+
stopifnot(is.function(run_one_scenario))
44+
stopifnot(is.numeric(timestep_hours), length(timestep_hours) == 1L, is.finite(timestep_hours))
45+
stopifnot(is.logical(debug), length(debug) == 1L)
46+
47+
if (parallel) {
48+
future::plan(future::multisession, workers = workers)
49+
} else {
50+
future::plan(future::sequential)
51+
}
52+
53+
apply_fun <- if (parallel) future.apply::future_lapply else lapply
54+
n <- length(indices)
55+
56+
# map string -> handler function (LOCAL, no global handlers!)
57+
handler_fun <- switch(
58+
progress_handler,
59+
"txtprogressbar" = progressr::handler_txtprogressbar,
60+
"rstudio" = progressr::handler_rstudio,
61+
"cli" = progressr::handler_cli,
62+
stop(sprintf("Unknown progress_handler: '%s' (use 'txtprogressbar', 'rstudio', or 'cli')",
63+
progress_handler))
64+
)
65+
66+
if (show_progress && parallel) {
67+
68+
progressr::with_progress(
69+
expr = {
70+
p <- progressr::progressor(steps = n)
71+
72+
apply_fun(seq_along(indices), function(k) {
73+
x <- indices[[k]]
74+
p(sprintf("Scenario %d/%d", k, n))
75+
run_one_scenario(x, timestep_hours = timestep_hours, debug = debug, ...)
76+
})
77+
},
78+
handlers = list(handler_fun()) # <-- IMPORTANT
79+
)
80+
81+
} else {
82+
83+
apply_fun(seq_along(indices), function(k) {
84+
x <- indices[[k]]
85+
if (show_progress) message(sprintf("Running %d/%d", k, n))
86+
run_one_scenario(x, timestep_hours = timestep_hours, debug = debug, ...)
87+
})
88+
}
89+
}

man/get_simulation_results_optim_parallel.Rd

Lines changed: 47 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)