Skip to content

Commit a3cf5b4

Browse files
committed
Borg: support ParallelEvaluator in async mode
Parallelized versions of - update_population_fitness!() - populate_by_mutants!() - step!()
1 parent 07af505 commit a3cf5b4

1 file changed

Lines changed: 101 additions & 6 deletions

File tree

src/borg_moea.jl

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO
1111

1212
n_restarts::Int
1313
n_steps::Int
14+
n_recombined::Int
15+
n_processed::Int
1416
last_restart_check::Int
1517
last_restart::Int
1618
last_wrecombinate_update::Int
@@ -49,7 +51,7 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO
4951
fit_scheme = EpsBoxDominanceFitnessScheme(fit_scheme, params[])
5052
archive = EpsBoxArchive(fit_scheme, params)
5153
evaluator = make_evaluator(problem, archive, params)
52-
new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0,
54+
new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0, 0, 0,
5355
params[], params[], params[:γ_δ], params[:PopulationSize],
5456
Categorical(ones(length(recombinate))/length(recombinate)),
5557
params[], params[], params[:OperatorsUpdatePeriod], params[:RestartCheckPeriod],
@@ -114,6 +116,12 @@ function step!(alg::BorgMOEA)
114116
if alg.n_steps >= alg.last_wrecombinate_update + alg.wrecombinate_update_period
115117
update_recombination_weights!(alg)
116118
end
119+
recombine_individuals!(alg)
120+
return alg
121+
end
122+
123+
function recombine_individuals!(alg::BorgMOEA)
124+
prepare_recombination(alg)
117125
# Select the operators to apply based on their probabilities
118126
recomb_op_ix = rand(alg.recombinate_distr)
119127
recombinate!(alg, recomb_op_ix, alg.recombinate[recomb_op_ix])
@@ -138,16 +146,45 @@ function recombinate!(alg::BorgMOEA, recomb_op_ix::Int, recomb_op::CrossoverOper
138146
apply!(recomb_op, Individual[child.params for child in children],
139147
zeros(Int, length(children)), alg.population, parent_indices)
140148
for child in children
141-
child.extra = recomb_op
142-
child.tag = recomb_op_ix
143-
process_candidate!(alg, child, parent_indices[1])
149+
preprocess_recombined!(alg, child, recomb_op_ix, parent_indices[1])
150+
alg.n_recombined += 1
151+
postprocess_recombined!(alg, child)
144152
end
145153
end
146154

147-
function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int)
155+
prepare_recombination(alg::BorgMOEA) = nothing # do nothing
156+
157+
function preprocess_recombined!(alg::BorgMOEA, candi::Candidate, recomb_op_ix::Int, ref_index::Int)
148158
apply!(alg.embed, candi.params, alg.population, ref_index)
149159
reset_fitness!(candi, alg.population)
150-
ifitness = fitness(update_fitness!(alg.evaluator, candi)) # implicitly updates the archive
160+
candi.extra = alg.recombinate[recomb_op_ix]
161+
candi.tag = recomb_op_ix
162+
candi
163+
end
164+
165+
function postprocess_recombined!(alg::BorgMOEA, candi::Candidate)
166+
update_fitness!(alg.evaluator, candi) # implicitly updates the archive
167+
process_candidate!(alg, candi)
168+
end
169+
170+
# ParallelEvaluator version -- process previously submitted candidates with the completed fitness
171+
function prepare_recombination(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator})
172+
process_completed!(alg.evaluator) do fit_job_id, candi
173+
process_candidate!(alg, candi)
174+
true
175+
end
176+
end
177+
178+
# ParallelEvaluator version, just submit to fitness calculation, nothing else
179+
# if the queue is full, waits until some jobs are processed -- that established the
180+
# balance between recombining and fitness evaluation
181+
function postprocess_recombined!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, candi::Candidate)
182+
async_update_fitness(alg.evaluator, candi, wait=true)
183+
return candi
184+
end
185+
186+
function process_candidate!(alg::BorgMOEA, candi::Candidate)
187+
ifitness = fitness(candi)
151188
# test the population
152189
hat_comp = HatCompare(fitness_scheme(archive(alg)))
153190
popsz = popsize(alg.population)
@@ -190,6 +227,7 @@ function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int)
190227
else
191228
release_candi(alg.population, candi)
192229
end
230+
alg.n_processed += 1
193231
alg
194232
end
195233

@@ -221,6 +259,38 @@ function update_population_fitness!(alg::BorgMOEA)
221259
end
222260
end
223261

262+
function update_population_fitness!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator})
263+
fs = fitness_scheme(alg.evaluator.archive)
264+
popsz = popsize(alg.population)
265+
last_checked = n_processed = 0
266+
job_ids = BitSet()
267+
while n_processed < popsz
268+
# process the calculated fitnesses
269+
process_completed!(alg.evaluator) do fit_job_id, candi
270+
if pop!(job_ids, fit_job_id, 0) > 0
271+
alg.population.fitness[candi.index] = candi.fitness
272+
release_candi(alg.population, candi)
273+
n_processed += 1
274+
return true
275+
else
276+
return false # some unrelated candidate, skip
277+
end
278+
end
279+
if last_checked < popsz
280+
# submit to fitness evaluation
281+
ix = (last_checked += 1)
282+
if isnafitness(fitness(alg.population, ix), fs)
283+
candi = acquire_candi(alg.population, ix)
284+
push!(job_ids, async_update_fitness(alg.evaluator, candi, wait=true))
285+
else # fitness already calculated
286+
n_processed += 1
287+
end
288+
else
289+
yield() # allow the other tasks to process the incoming fitnesses
290+
end
291+
end
292+
end
293+
224294
"""
225295
Update recombination operator probabilities based on the archive tag counts.
226296
"""
@@ -254,6 +324,31 @@ function populate_by_mutants(alg::BorgMOEA, last_nonmutant::Int)
254324
end
255325
end
256326

327+
function populate_by_mutants(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, last_nonmutant::Int)
328+
popsz = popsize(alg.population)
329+
last_submitted = last_accepted = last_nonmutant
330+
mutant_job_ids = BitSet()
331+
while last_accepted < popsz
332+
# process the calculated fitnesses
333+
process_completed!(alg.evaluator) do fit_job_id, candi
334+
if pop!(mutant_job_ids, fit_job_id, 0) > 0 # it's our mutant!
335+
candi.index = (last_accepted += 1)
336+
accept_candi!(alg.population, candi)
337+
return true
338+
else
339+
return false # some unrelated candidate, skip
340+
end
341+
end
342+
if last_submitted < popsz
343+
# generate the new mutant and submit to fitness evaluation
344+
mutant = acquire_mutant(alg, last_submitted+=1, last_nonmutant)
345+
push!(mutant_job_ids, async_update_fitness(alg.evaluator, mutant, wait=true))
346+
else
347+
yield() # allow the other tasks to process the incoming fitnesses
348+
end
349+
end
350+
end
351+
257352
"""
258353
Restart Borg MOEA.
259354

0 commit comments

Comments
 (0)