Skip to content

Commit 655a63c

Browse files
committed
Optimize a bit again
1 parent 99fbfdd commit 655a63c

5 files changed

Lines changed: 172 additions & 32 deletions

File tree

docs/src/API/API_public.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ For mapping-level multi-rate configuration, combine:
2222
- `InputBindings(...)`
2323
- `OutputRouting(...)`
2424
- `ScopeModel(...)`
25-
- `OutputRequest(...)` with `collect_outputs(...)` for resampled exports
25+
- `OutputRequest(...)` in `tracked_outputs` for resampled exports
2626

2727
`TimeStepModel(...)` accepts:
2828
- `Real` step counts
@@ -45,7 +45,17 @@ Scope selection detail:
4545
```julia
4646
req_hold = OutputRequest("Leaf", :A; name=:A_hourly, process=:assim, policy=HoldLast())
4747
req_day = OutputRequest("Leaf", :A; name=:A_daily_sum, process=:assim, policy=Integrate(), clock=ClockSpec(24.0, 1.0))
48-
out = collect_outputs(sim, [req_hold, req_day]; sink=DataFrame)
48+
run!(sim, meteo; multirate=true, tracked_outputs=[req_hold, req_day], executor=SequentialEx())
49+
out = collect_outputs(sim; sink=DataFrame)
50+
51+
# or directly:
52+
out_status, out = run!(
53+
sim,
54+
meteo;
55+
multirate=true,
56+
tracked_outputs=[req_hold, req_day],
57+
return_requested_outputs=true,
58+
)
4959
```
5060

5161
- `process` is optional when the source is canonical and unique.

docs/src/model_execution.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,19 @@ req = OutputRequest("Leaf", :carbon_assimilation;
100100
clock=ClockSpec(24.0, 1.0)
101101
)
102102

103-
exported = collect_outputs(sim, [req]; sink=DataFrame)
103+
run!(sim, meteo; multirate=true, tracked_outputs=[req], executor=SequentialEx())
104+
exported = collect_outputs(sim; sink=DataFrame)
104105
```
105106

106-
This is independent from `tracked_outputs` and allows per-variable resampling policies at export time.
107+
`tracked_outputs` accepts `OutputRequest` values for these resampled exports.
108+
You can also return them directly from `run!`:
109+
110+
```julia
111+
out_status, exported = run!(
112+
sim,
113+
meteo;
114+
multirate=true,
115+
tracked_outputs=[req],
116+
return_requested_outputs=true,
117+
)
118+
```

src/run.jl

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ multi-threaded way (`executor=ThreadedEx()`, the default), or in a distributed w
2222
- `outputs`: the outputs to get in dynamic for each node type of the MTG.
2323
- `multirate`: experimental feature flag enabling temporal cache-based input resolution for multiscale simulations.
2424
Current implementation supports `HoldLast` resolution only.
25+
- `return_requested_outputs`: when `true` in MTG multi-rate runs, return requested resampled outputs directly
26+
as second return value.
27+
- `requested_outputs_sink`: sink used to materialize requested outputs when `return_requested_outputs=true`.
2528
2629
# Returns
2730
28-
Modifies the status of the object in-place. Users may retrieve the results from the object using
29-
the [`status`](https://virtualplantlab.github.io/PlantSimEngine.jl/stable/API/#PlantSimEngine.status-Tuple{Any})
30-
function (see examples).
31+
Returns status outputs (and optionally requested exports).
32+
For MTG multi-rate runs with `return_requested_outputs=true`, returns
33+
`(status_outputs, requested_outputs)`.
3134
3235
# Details
3336
@@ -124,7 +127,9 @@ function run!(
124127
tracked_outputs=nothing,
125128
check=true,
126129
executor=ThreadedEx(),
127-
multirate=false
130+
multirate=false,
131+
return_requested_outputs=false,
132+
requested_outputs_sink=DataFrames.DataFrame
128133
)
129134
run!(
130135
DataFormat(object),
@@ -135,7 +140,9 @@ function run!(
135140
tracked_outputs,
136141
check,
137142
executor,
138-
multirate
143+
multirate,
144+
return_requested_outputs,
145+
requested_outputs_sink
139146
)
140147
end
141148

@@ -153,11 +160,14 @@ function run!(
153160
tracked_outputs=nothing,
154161
check=true,
155162
executor=ThreadedEx(),
156-
multirate=false
163+
multirate=false,
164+
return_requested_outputs=false,
165+
requested_outputs_sink=DataFrames.DataFrame
157166
) where {T<:Union{AbstractArray,AbstractDict},A}
158167

159168
tracked_outputs isa OutputRequest && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
160169
tracked_outputs isa AbstractVector{<:OutputRequest} && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
170+
return_requested_outputs && error("`return_requested_outputs=true` is only supported for MTG multi-rate simulations.")
161171

162172
if executor != SequentialEx()
163173
@warn string(
@@ -191,11 +201,14 @@ function run!(
191201
tracked_outputs=nothing,
192202
check=true,
193203
executor=ThreadedEx(),
194-
multirate=false
204+
multirate=false,
205+
return_requested_outputs=false,
206+
requested_outputs_sink=DataFrames.DataFrame
195207
) where {T<:ModelList}
196208

197209
tracked_outputs isa OutputRequest && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
198210
tracked_outputs isa AbstractVector{<:OutputRequest} && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
211+
return_requested_outputs && error("`return_requested_outputs=true` is only supported for MTG multi-rate simulations.")
199212

200213
meteo_adjusted = adjust_weather_timesteps_to_given_length(get_status_vector_max_length(object.status), meteo)
201214
nsteps = get_nsteps(meteo_adjusted)
@@ -286,11 +299,14 @@ function run!(
286299
tracked_outputs=nothing,
287300
check=true,
288301
executor=ThreadedEx(),
289-
multirate=false
302+
multirate=false,
303+
return_requested_outputs=false,
304+
requested_outputs_sink=DataFrames.DataFrame
290305
) where {T<:Union{AbstractArray,AbstractDict}}
291306

292307
tracked_outputs isa OutputRequest && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
293308
tracked_outputs isa AbstractVector{<:OutputRequest} && error("`OutputRequest` is only supported for MTG multi-rate simulations.")
309+
return_requested_outputs && error("`return_requested_outputs=true` is only supported for MTG multi-rate simulations.")
294310

295311
dep_graphs = [dep(obj) for obj in collect(values(object))]
296312
#obj_parallelizable = all([object_parallelizable(graph) for graph in dep_graphs])
@@ -376,17 +392,12 @@ end
376392
function _multirate_tracked_outputs(tracked_outputs)
377393
if isnothing(tracked_outputs)
378394
return nothing, OutputRequest[]
379-
elseif tracked_outputs isa Dict
380-
return tracked_outputs, OutputRequest[]
381395
elseif tracked_outputs isa OutputRequest
382396
return nothing, OutputRequest[tracked_outputs]
383397
elseif tracked_outputs isa AbstractVector{<:OutputRequest}
384398
return nothing, collect(tracked_outputs)
385399
end
386-
error(
387-
"For MTG multi-rate runs, `tracked_outputs` must be `nothing`, a Dict of status outputs, ",
388-
"an `OutputRequest`, or a vector of `OutputRequest`."
389-
)
400+
return tracked_outputs, OutputRequest[]
390401
end
391402

392403
function run!(
@@ -399,27 +410,36 @@ function run!(
399410
tracked_outputs=nothing,
400411
check=true,
401412
executor=ThreadedEx(),
402-
multirate=false
413+
multirate=false,
414+
return_requested_outputs=false,
415+
requested_outputs_sink=DataFrames.DataFrame
403416
)
404417
isnothing(nsteps) && (nsteps = get_nsteps(meteo))
405418
meteo_adjusted = adjust_weather_timesteps_to_given_length(nsteps, meteo)
406419
status_outputs, output_requests = _multirate_tracked_outputs(tracked_outputs)
407420
!multirate && !isempty(output_requests) && error("`OutputRequest` requires `multirate=true`.")
421+
return_requested_outputs && !multirate && error("`return_requested_outputs=true` requires `multirate=true`.")
408422

409423
# NOTE : replace_mapping_status_vectors_with_generated_models is assumed to have already run if used
410424
# otherwise there might be vector length conflicts with timesteps
411425
sim = GraphSimulation(object, mapping, nsteps=nsteps, check=check, outputs=status_outputs)
412-
run!(
426+
result = run!(
413427
sim,
414428
meteo_adjusted,
415429
constants,
416430
extra;
417431
check=check,
418432
executor=executor,
419433
multirate=multirate,
420-
tracked_outputs=output_requests
434+
tracked_outputs=output_requests,
435+
return_requested_outputs=return_requested_outputs,
436+
requested_outputs_sink=requested_outputs_sink
421437
)
422438

439+
if return_requested_outputs
440+
return result
441+
end
442+
423443
return outputs(sim)
424444
end
425445

@@ -432,7 +452,9 @@ function run!(
432452
tracked_outputs=nothing,
433453
check=true,
434454
executor=ThreadedEx(),
435-
multirate=false
455+
multirate=false,
456+
return_requested_outputs=false,
457+
requested_outputs_sink=DataFrames.DataFrame
436458
)
437459

438460
dep_graph = object.dependency_graph
@@ -443,6 +465,8 @@ function run!(
443465
validate_canonical_publishers(object)
444466
prepare_output_requests!(object, tracked_outputs, timeline)
445467
configure_temporal_buffers!(object, timeline)
468+
elseif return_requested_outputs
469+
error("`return_requested_outputs=true` requires `multirate=true`.")
446470
end
447471

448472
!isnothing(extra) && error("Extra parameters are not allowed for the simulation of an MTG (already used for statuses).")
@@ -475,6 +499,10 @@ function run!(
475499
resize!(outputs(object)[organ], index - 1)
476500
end
477501

502+
if return_requested_outputs
503+
return outputs(object), collect_outputs(object; sink=requested_outputs_sink)
504+
end
505+
478506
return outputs(object)
479507
end
480508

src/time/runtime/input_resolution.jl

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,66 @@ function _resolved_windowed_value_for_source(
3636
samples = _resolution_samples(sim, key)
3737
isnothing(samples) && return nothing, false
3838

39-
vals = Any[]
40-
for (ts, v) in samples
41-
if ts >= t_start - 1e-8 && ts <= t_end + 1e-8
42-
push!(vals, v)
43-
end
44-
end
45-
isempty(vals) && return nothing, false
39+
if policy isa Union{Integrate,Aggregate}
40+
reducer = policy.reducer
41+
if reducer isa Symbol
42+
reducer in _WINDOW_REDUCER_SYMBOLS || error(
43+
"Unsupported reducer symbol `$(reducer)`. Supported symbols are $(_WINDOW_REDUCER_SYMBOLS)."
44+
)
45+
46+
found = false
47+
n = 0
48+
s = 0.0
49+
first_v = 0.0
50+
last_v = 0.0
51+
min_v = 0.0
52+
max_v = 0.0
53+
54+
for (ts, v) in samples
55+
ts < t_start - 1e-8 && continue
56+
ts > t_end + 1e-8 && continue
57+
v isa Real || return nothing, false
58+
vf = float(v)
59+
60+
if !found
61+
found = true
62+
first_v = vf
63+
min_v = vf
64+
max_v = vf
65+
end
4666

47-
all(v -> v isa Real, vals) || return nothing, false
48-
vals_real = [float(v) for v in vals]
67+
last_v = vf
68+
s += vf
69+
n += 1
70+
vf < min_v && (min_v = vf)
71+
vf > max_v && (max_v = vf)
72+
end
4973

50-
if policy isa Integrate || policy isa Aggregate
74+
!found && return nothing, false
75+
76+
if reducer == :sum
77+
return s, true
78+
elseif reducer == :mean
79+
return s / n, true
80+
elseif reducer == :max
81+
return max_v, true
82+
elseif reducer == :min
83+
return min_v, true
84+
elseif reducer == :first
85+
return first_v, true
86+
elseif reducer == :last
87+
return last_v, true
88+
end
89+
end
90+
91+
vals_real = Float64[]
92+
for (ts, v) in samples
93+
ts < t_start - 1e-8 && continue
94+
ts > t_end + 1e-8 && continue
95+
v isa Real || return nothing, false
96+
push!(vals_real, float(v))
97+
end
98+
isempty(vals_real) && return nothing, false
5199
return _window_reduce(vals_real, policy), true
52100
end
53101

test/test-multirate-output-export.jl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,46 @@ end
7777
)
7878
exported_auto = collect_outputs(sim_canonical; sink=DataFrame)
7979
@test exported_auto[:x_auto][:, :value] == [1.0, 2.0, 3.0, 4.0]
80+
81+
# Optional direct export return from run! on GraphSimulation.
82+
sim_direct = PlantSimEngine.GraphSimulation(
83+
mtg,
84+
Dict(
85+
"Leaf" => (
86+
ModelSpec(MRExportSourceModel(Ref(0))) |>
87+
TimeStepModel(1.0),
88+
),
89+
),
90+
nsteps=4,
91+
check=true,
92+
outputs=Dict("Leaf" => (:X,)),
93+
)
94+
out_status, out_requested = run!(
95+
sim_direct,
96+
meteo4,
97+
multirate=true,
98+
executor=SequentialEx(),
99+
tracked_outputs=[OutputRequest("Leaf", :X; name=:x_direct, policy=HoldLast())],
100+
return_requested_outputs=true,
101+
)
102+
@test haskey(out_status, "Leaf")
103+
@test out_requested[:x_direct][:, :value] == [1.0, 2.0, 3.0, 4.0]
104+
105+
# Optional direct export return from run! on MTG + mapping entry point.
106+
out_status_mtg, out_requested_mtg = run!(
107+
mtg,
108+
Dict(
109+
"Leaf" => (
110+
ModelSpec(MRExportSourceModel(Ref(0))) |>
111+
TimeStepModel(1.0),
112+
),
113+
),
114+
meteo4;
115+
multirate=true,
116+
executor=SequentialEx(),
117+
tracked_outputs=[OutputRequest("Leaf", :X; name=:x_mtg, policy=HoldLast())],
118+
return_requested_outputs=true,
119+
)
120+
@test haskey(out_status_mtg, "Leaf")
121+
@test out_requested_mtg[:x_mtg][:, :value] == [1.0, 2.0, 3.0, 4.0]
80122
end

0 commit comments

Comments
 (0)