Skip to content

Commit 99fbfdd

Browse files
committed
Optimize a bit OutputRequest
1 parent 7871668 commit 99fbfdd

2 files changed

Lines changed: 63 additions & 15 deletions

File tree

src/time/multirate.jl

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,29 @@ mutable struct AggregateCache{T<:Real} <: OutputCache
151151
window_start::Float64
152152
end
153153

154+
"""
155+
ExportBuffer()
156+
157+
Compact in-memory storage for requested output rows during runtime.
158+
"""
159+
mutable struct ExportBuffer{
160+
S<:AbstractString,
161+
P<:Symbol,
162+
V<:Symbol,
163+
TI<:AbstractVector{Int},
164+
NI<:AbstractVector{Int},
165+
VV<:AbstractVector{Any},
166+
}
167+
scale::S
168+
process::P
169+
var::V
170+
timestep::TI
171+
node::NI
172+
value::VV
173+
end
174+
175+
ExportBuffer(scale::AbstractString, process::Symbol, var::Symbol) = ExportBuffer(scale, process, var, Int[], Int[], Any[])
176+
154177
"""
155178
TemporalState(caches, last_run, streams, producer_horizons, export_plans, export_rows)
156179
TemporalState()
@@ -171,7 +194,7 @@ mutable struct TemporalState{
171194
S<:AbstractDict{OutputKey,Vector{Tuple{Float64,Any}}},
172195
H<:AbstractDict{Tuple{String,Symbol,Symbol},Float64},
173196
P<:AbstractVector,
174-
R<:AbstractDict{Symbol,Vector{NamedTuple}}
197+
R<:AbstractDict{Symbol,ExportBuffer}
175198
}
176199
caches::C
177200
last_run::L
@@ -187,5 +210,5 @@ TemporalState() = TemporalState(
187210
Dict{OutputKey,Vector{Tuple{Float64,Any}}}(),
188211
Dict{Tuple{String,Symbol,Symbol},Float64}(),
189212
Any[],
190-
Dict{Symbol,Vector{NamedTuple}}()
213+
Dict{Symbol,ExportBuffer}()
191214
)

src/time/runtime/output_export.jl

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ function prepare_output_requests!(sim::GraphSimulation, requests, timeline::Time
8080
reqs = _normalize_output_requests(requests)
8181

8282
plans = Any[]
83-
rows = Dict{Symbol,Vector{NamedTuple}}()
83+
rows = Dict{Symbol,ExportBuffer}()
8484

8585
for req in reqs
8686
scale = String(req.scale)
@@ -104,7 +104,7 @@ function prepare_output_requests!(sim::GraphSimulation, requests, timeline::Time
104104
model_spec=model_spec,
105105
source_dt=float(source_clock.dt),
106106
))
107-
rows[req.name] = NamedTuple[]
107+
rows[req.name] = ExportBuffer(scale, process, req.var)
108108
end
109109

110110
sim.temporal_state.export_plans = plans
@@ -191,11 +191,13 @@ Materialize configured output requests online at runtime time `t`.
191191
"""
192192
function update_requested_outputs!(sim::GraphSimulation, t::Float64)
193193
isempty(sim.temporal_state.export_plans) && return nothing
194+
timestep = Int(round(t))
194195

195196
for plan in sim.temporal_state.export_plans
196197
_should_run_at_time(plan.clock, t) || continue
197198
source_statuses = get(status(sim), plan.scale, nothing)
198199
isnothing(source_statuses) && continue
200+
buf = sim.temporal_state.export_rows[plan.name]
199201

200202
t_start = _window_start_for_clock(plan.clock, t)
201203
for st in source_statuses
@@ -213,23 +215,46 @@ function update_requested_outputs!(sim::GraphSimulation, t::Float64)
213215
t_start,
214216
)
215217

216-
push!(sim.temporal_state.export_rows[plan.name], (
217-
timestep=Int(round(t)),
218-
scale=plan.scale,
219-
process=plan.process,
220-
var=plan.var,
221-
node=nodeid,
222-
value=v,
223-
))
218+
push!(buf.timestep, timestep)
219+
push!(buf.node, nodeid)
220+
push!(buf.value, v)
224221
end
225222
end
226223

227224
return nothing
228225
end
229226

230-
function _materialize_output_rows(rows, sink)
231-
isnothing(sink) && return rows
232-
return sink(rows)
227+
function _materialize_output_rows(rows::ExportBuffer, sink)
228+
n = length(rows.timestep)
229+
scale_col = fill(rows.scale, n)
230+
process_col = fill(rows.process, n)
231+
var_col = fill(rows.var, n)
232+
233+
if sink === DataFrames.DataFrame
234+
return DataFrames.DataFrame(
235+
timestep=rows.timestep,
236+
scale=scale_col,
237+
process=process_col,
238+
var=var_col,
239+
node=rows.node,
240+
value=rows.value,
241+
)
242+
end
243+
244+
table = Vector{NamedTuple{(:timestep, :scale, :process, :var, :node, :value),Tuple{Int,String,Symbol,Symbol,Int,Any}}}(undef, n)
245+
@inbounds for i in 1:n
246+
table[i] = (
247+
timestep=rows.timestep[i],
248+
scale=scale_col[i],
249+
process=process_col[i],
250+
var=var_col[i],
251+
node=rows.node[i],
252+
value=rows.value[i],
253+
)
254+
end
255+
256+
isnothing(sink) && return table
257+
return sink(table)
233258
end
234259

235260
"""

0 commit comments

Comments
 (0)