Skip to content

Commit 1334f07

Browse files
committed
ParallelEvaluator: fix shutdown sequence
1 parent d9cfe18 commit 1334f07

1 file changed

Lines changed: 28 additions & 14 deletions

File tree

src/parallel_evaluator.jl

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,18 @@ function shutdown!(etor::ParallelEvaluator)
229229
etor.is_stopping = true
230230
# notify the workers that they should shutdown (each worker should pick exactly one message)
231231
_shutdown!(etor)
232-
# release any waiting tasks
233-
notify(etor.fitness_slots.cond_wait)
234-
notify(etor.job_assignment.cond_wait)
232+
# notify the workers handler if it's waiting for jobs
233+
lock(etor.job_assignment)
234+
unlock(etor.job_assignment)
235+
# wait for all the workers
236+
for i in 1:nworkers(etor)
237+
Base.acquire(etor.fitness_slots)
238+
end
239+
@assert !any(isposdef, etor.worker2job) "Some workers not finished"
240+
# release any waiting
241+
for i in 1:nworkers(etor)
242+
Base.release(etor.fitness_slots)
243+
end
235244
end
236245

237246
function _shutdown!(etor::ParallelEvaluator)
@@ -243,7 +252,6 @@ function _shutdown!(etor::ParallelEvaluator)
243252
end
244253
for i in 1:nworkers(etor)
245254
etor.params_status[i][1] = -1
246-
etor.fitnesses_status[i][1] = -1
247255
end
248256
etor
249257
end
@@ -290,19 +298,18 @@ end
290298
"""
291299
function workers_handler!{F}(etor::ParallelEvaluator{F})
292300
info("workers_handler!() started")
293-
while !is_stopping(etor)
301+
while !is_stopping(etor) || !isempty(etor.waiting_candidates)
294302
# master critical section
295303
@inbounds for worker_ix in 1:nworkers(etor)
296304
#info("workers_handler!(): checking worker #$worker_ix...")
297305
#@assert check_worker_running(etor.worker_refs[worker_ix])
298-
if etor.worker2job[worker_ix] > 0 && etor.fitnesses_status[worker_ix][1] != 0 && etor.params_status[worker_ix][1] == 0
299-
if etor.fitnesses_status[worker_ix][1] < 0
300-
error("Worker $worker_ix bad status: $(etor.fitnesses_status[worker_ix][1])")
306+
if (job_id = etor.worker2job[worker_ix]) > 0 && (fitness_status = etor.fitnesses_status[worker_ix][1]) != 0
307+
if fitness_status < 0 && !is_stopping(etor)
308+
error("Worker $worker_ix bad status: $(fitness_status)")
301309
end
302310
#info("worker_handler!(): fitness_evaluated")
303-
304311
lock(etor.job_assignment)
305-
job_id = etor.worker2job[worker_ix]
312+
param_status = etor.params_status[worker_ix][1]
306313
new_fitness = get_fitness(F, etor.shared_fitnesses[worker_ix])
307314
@assert job_id > 0
308315

@@ -312,14 +319,21 @@ function workers_handler!{F}(etor::ParallelEvaluator{F})
312319
etor.fitnesses_status[worker_ix][1] = 0 # received
313320
unlock(etor.job_assignment)
314321

315-
update_archive!(etor, job_id, new_fitness)
316-
Base.release(etor.fitness_slots)
322+
if param_status == 0 # communication in normal state, update the archive
323+
update_archive!(etor, job_id, new_fitness)
324+
elseif param_status < 0
325+
# remove the candidate
326+
delete!(etor.waiting_candidates, job_id)
327+
end
328+
if fitness_status > 0
329+
Base.release(etor.fitness_slots)
330+
end
317331
#info("workers_handler!(): yield to other tasks after archive update")
318332
#yield() # free slots available, switch to the main task
319333
end
320334
end
321335
if length(etor.waiting_candidates) < nworkers(etor)
322-
if isempty(etor.waiting_candidates)
336+
if !is_stopping(etor) && isempty(etor.waiting_candidates)
323337
wait(etor.job_assignment.cond_wait)
324338
else
325339
#info("workers_handler!(): yield to other tasks")
@@ -344,7 +358,7 @@ end
344358
"""
345359
function async_update_fitness{F,FA}(etor::ParallelEvaluator{F,FA}, candi::Candidate{FA}; force::Bool=false, wait::Bool=false)
346360
#info("async_update_fitness(): starting to assign job #$(etor.next_job_id)")
347-
if force || isnafitness(fitness(candi), fitness_scheme(etor.archive))
361+
if !etor.is_stopping && (force || isnafitness(fitness(candi), fitness_scheme(etor.archive)))
348362
if length(etor.waiting_candidates) >= queue_capacity(etor) && !wait
349363
#info("async_update_fitness(): queue is full, skip")
350364
return 0

0 commit comments

Comments
 (0)