From e5262cbb291e1b86aae48a2c384a2785512e7485 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 27 Aug 2018 01:47:52 +0200 Subject: [PATCH 01/12] ntuple fitness: get/setfitness required for putting fitness to/from a (shared) array --- src/ntuple_fitness.jl | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/ntuple_fitness.jl b/src/ntuple_fitness.jl index c31c6570..458a2c09 100644 --- a/src/ntuple_fitness.jl +++ b/src/ntuple_fitness.jl @@ -22,6 +22,35 @@ aggregate(f::NTuple{N,F}, fs::TupleFitnessScheme{N,F}) where {N,F} = fs.aggregat @inline is_better(f1::NTuple{N,F}, f2::NTuple{N,F}, fs::TupleFitnessScheme{N,F,NTuple{N,F}}) where {N,F} = hat_compare(f1, f2, fs, -1) == -1 @inline is_worse(f1::NTuple{N,F}, f2::NTuple{N,F}, fs::TupleFitnessScheme{N,F,NTuple{N,F}}) where {N,F} = hat_compare(f1, f2, fs, 1) == 1 +""" + getfitness(::Type{FitnessType}, src::AbstractMatrix, col_ix) + +Get fitness stored in a column of a matrix. +Used by `ParallelEvaluator` to get the fitness from the `SharedArray`. +""" +getfitness(::Type{F}, src::AbstractVector{F}) where F<:Number = src[1] + +@generated function getfitness(::Type{NTuple{N,F}}, src::AbstractVector{F}) where {N, F<:Number} + quote + return Base.Cartesian.@ntuple $N i -> src[i] + end +end + +""" + setfitness!(src::AbstractMatrix, fitness, col_ix) + +Put fitness into a column of a matrix. +Used by `ParallelEvaluator` to store the fitness in the `SharedArray`. +""" +setfitness!(dest::AbstractVector{F}, fitness::F) where F<:Number = + dest[1] = fitness + +@generated function setfitness!(dest::AbstractVector{F}, fitness::NTuple{N,F}) where {N, F<:Number} + quote + Base.Cartesian.@nexprs $N i -> dest[i] = fitness[i] + end +end + """ Pareto dominance for `N`-tuple (`N`≧1) fitnesses. From 07af50579356c364f860f34b73ad442cc0cf8080 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 27 Aug 2018 14:52:15 +0200 Subject: [PATCH 02/12] asynchronous ParallelEvaluator Use N workers to asynchronously calculate fitnesses. Requests for fitness calculation and completion notifications as well as input parameters and output fitness are passed via SharedVector/SharedMatrix to minimize serialization overhead. --- src/parallel_evaluator.jl | 589 +++++++++++++++++++++++++++++--------- 1 file changed, 458 insertions(+), 131 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index 1fd48f72..f4ba3e5e 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -1,180 +1,507 @@ +using Distributed, SharedArrays + +#= master <-> worker params_status/fitness_status codes =# + +const PEStatus_Msg = 1 +const PEStatus_OK = 0 +const PEStatus_Shutdown = -1 +const PEStatus_Error = -2 + """ Internal data for the worker process of the parallel evaluator. """ -struct ParallelEvaluatorWorker{P<:OptimizationProblem} +mutable struct ParallelEvaluatorWorker{T, P<:OptimizationProblem} + id::Int problem::P + param_status::SharedVector{Int} # master notifies worker about new requests + shared_param::SharedVector{T} # master puts candidates parameters + fitness_status::SharedVector{Int} # worker notifies master about completed evaluation + shared_fitness::SharedVector{T} # worker puts calculated fitness - ParallelEvaluatorWorker(problem::P) where {P<:OptimizationProblem} = - new{P}(problem) + ParallelEvaluatorWorker( + id::Int, problem::P, + param_status::SharedVector{Int}, shared_param::SharedVector{T}, + fitness_status::SharedVector{Int}, shared_fitness::SharedVector{T} + ) where {T, P<:OptimizationProblem} = + new{T,P}(id, problem, param_status, shared_param, fitness_status, shared_fitness) end -fitness(params::Individual, worker::ParallelEvaluatorWorker) = - fitness(params, worker.problem) +param_status(worker::ParallelEvaluatorWorker) = @inbounds first(worker.param_status) +fitness_status(worker::ParallelEvaluatorWorker) = @inbounds first(worker.fitness_status) -const ChannelRef{T} = RemoteChannel{Channel{T}} -const ParallelEvaluatorWorkerRef{P} = ChannelRef{ParallelEvaluatorWorker{P}} +# run the wrapper (called in the "main" task) +function run!(worker::ParallelEvaluatorWorker) + while true + #@info "Checking in worker #$(worker.id)" + # continuously poll the worker for the delivery notification for + # the last job or for the new job notification + i = 0 + while param_status(worker) == PEStatus_OK || fitness_status(worker) == PEStatus_Msg + if (i+=1) > 1000 # allow executing the other tasks once in a while + yield() + i = 0 + end + end + # process the new worker status + p_status = param_status(worker) + if p_status == PEStatus_Shutdown # master stopping + worker.fitness_status[1] = PEStatus_Shutdown # notify worker stopped + break # shutdown!() called + elseif p_status == PEStatus_Error # master error (currently not set?) + worker.fitness_status[1] = PEStatus_Shutdown # stopped after receiving an error + break + elseif p_status == PEStatus_Msg # new job + worker.param_status[1] = PEStatus_OK # received, reset the statuts + #@info "PE worker #$(worker.id): got job" + @inbounds setfitness!(worker.shared_fitness, fitness(worker.shared_param, worker.problem)) + worker.fitness_status[1] = PEStatus_Msg # fitness ready + end + end +end -fitness(params::Individual, worker_ref::ParallelEvaluatorWorkerRef{P}) where P = - fitness(params, fetch(fetch(worker_ref))::ParallelEvaluatorWorker{P}) +""" +Create and run the evaluator worker. +The function that the master process spawns at each worker process. +""" +function run_parallel_evaluator_worker(id::Int, + worker_ready::RemoteChannel{Channel{Int}}, + problem::OptimizationProblem, + param_status::SharedVector{Int}, + shared_param::SharedVector{Float64}, + fitness_status::SharedVector{Int}, + shared_fitness::SharedVector{Float64}) + @info "Initializing ParallelEvaluator worker #$id at task=$(myid())" + worker = nothing + try + worker = ParallelEvaluatorWorker(id, deepcopy(problem), + param_status, shared_param, + fitness_status, shared_fitness) + catch e + # send -id to notify about an error and to release + # the master from waiting for worker readiness + @warn "Exception at ParallelEvaluatorWorker initialization: $e" + put!(worker_ready, -id) + rethrow(e) + end + # create immigrants receiving tasks=# + put!(worker_ready, id) + @info "Running worker #$id..." + try + run!(worker) + catch e + # send error candidate to notify about an error and to release + # the master from waiting for worker messages + @warn "Exception while running ParallelEvaluatorWorker: $e" + worker.fitness_status[1] = PEStatus_Error + rethrow(e) + end + @info "Worker #$id stopped" + nothing +end + +const PECandidateDict{FA} = Dict{Int, Candidate{FA}} """ -Current state of fitness function evaluation for the vector of candidates. +Fitness evaluator that asynchronously distributes calculation +among several worker processes. + +Currently the overhead of coordinating parallel processes is relatively high, +so it's recommended to use ParallelEvaluator only when fitness calculation takes +considerable time. """ -mutable struct ParallelEvaluationState{F, FS} - fitness_scheme::FS - candidates::Vector{Candidate{F}} # candidates to calculate fitness for +mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archive} <: Evaluator{P} + problem::P # optimization problem + archive::A # archive where good candidates are automatically stored + num_evals::Int # fitness evaluations counter + last_fitness::F # last fitness + arch_nafitness::FA # NA fitness + + params_status::Vector{SharedVector{Int}} # master notifies workers about new requests + shared_params::Vector{SharedVector{T}} # master puts candidates parameters + fitnesses_status::Vector{SharedVector{Int}} # workers notify master about completed evaluation + shared_fitnesses::Vector{SharedVector{T}} # workers put calculated fitness + + fitness_slots::Base.Semaphore # gets acquired when a worker needs to be assigned to a task; + # used to organize waiting when all workers are busy + + waiting_candidates::PECandidateDict{FA} # candidates waiting their fitness calculation to be completed + unclaimed_candidates::PECandidateDict{FA} # candidates with calculated fitness that were not yet checked for completion (by process_completed()) + + worker2job::Vector{Int} # job IDs assigned to workers + job_assignment::ReentrantLock # lock to provide exclusive access to worker2job + + max_seq_done_job::Int # all jobs from 1 to max_seq_done_job are done + max_done_job::Int # max Id of done job + done_jobs::BitSet # done job Ids beyond max_seq_done_job + + next_job_id::Int # ID to assign for the next job + + is_stopping::Bool # whether the evaluator is in shutdown sequence - worker_busy::Vector{Bool} # if the worker is busy calculating - retry_queue::Vector{Int} # queue to candidate indices to retry + worker_refs::Vector{Future} # references to worker processes + workers_handler::Task # task in the main process that runs workers_handler() - worker_finished::Condition # gets notified each time worker is done - # fitness calculation - next_index::Int # index of the next candidate to calculate fitness + function ParallelEvaluator( + problem::P, archive::A; + pids::AbstractVector{Int} = workers() + ) where {P<:OptimizationProblem, A<:Archive} + fs = fitness_scheme(problem) + F = fitness_type(fs) + T = fitness_eltype(fs) + FA = fitness_type(archive) - function ParallelEvaluationState(fitness_scheme::FS, - nworkers::Int) where {FS<:FitnessScheme} - F = fitness_type(fitness_scheme) - new{F,FS}(fitness_scheme, Vector{Candidate{F}}(), - fill(false, nworkers), Vector{Int}(), Condition(), 0) + etor = new{F, FA, T, typeof(fs), P, A}( + problem, archive, + 0, nafitness(fs), nafitness(FA), + [fill!(SharedArray{Int}((2,), pids=vcat(pid,[myid()])), 0) for pid in pids], + [SharedArray{T}((numdims(problem),), pids=vcat(pid,[myid()])) for pid in pids], + [fill!(SharedArray{Int}((2,), pids=vcat(pid,[myid()])), 0) for pid in pids], + [SharedArray{T}((numobjectives(fs),), pids=vcat(pid,[myid()])) for pid in pids], + Base.Semaphore(length(pids)), + PECandidateDict{FA}(), PECandidateDict{FA}(), + zeros(length(pids)), ReentrantLock(), 0, 0, BitSet(), + 1, false + ) + etor.worker_refs = _create_workers(etor, pids) + etor.workers_handler = @async workers_handler!(etor) + + #finalizer(etor, _shutdown!) + return etor end end -is_stopped(estate::ParallelEvaluationState) = estate.next_index == 0 +ParallelEvaluator( + problem::OptimizationProblem; + pids::AbstractVector{Int} = workers(), + archiveCapacity::Integer = 10) = + ParallelEvaluator(problem, TopListArchive(fitness_scheme(problem), numdims(problem), archiveCapacity), + pids=pids) -abort!(estate::ParallelEvaluationState) = (estate.next_index = 0) +nworkers(etor::ParallelEvaluator) = length(etor.worker_refs) +queue_capacity(etor::ParallelEvaluator) = nworkers(etor) """ -Reset the current `ParallelEvaluationState` and the vector -of candidates that need fitness evaluation. +Count the candidates submitted (including the completed ones), +but not yet claimed. """ -function reset!(estate::ParallelEvaluationState{F}, candidates::Vector{Candidate{F}}) where F - estate.candidates = candidates - fill!(estate.worker_busy, false) - empty!(estate.retry_queue) - estate.next_index = 1 - return estate +queue_length(etor::ParallelEvaluator) = length(etor.waiting_candidates) + length(etor.unclaimed_candidates) + +num_evals(etor::ParallelEvaluator) = etor.num_evals + +is_stopping(etor::ParallelEvaluator) = etor.is_stopping + +# check that worker is stil running. +# If running, its RemoteChannels should not be ready, +# but if there was exception in the worker, +# it would be thrown into the main thread +function check_worker_running(worker::Future) + if isready(worker) + worker_res = fetch(worker) # fetch the worker, this should trigger an exception + # no exception, but the worker should not be ready + error("Worker at pid=$(worker.where) has finished before the master shutdown: $worker_res") + end + return true +end + +function _create_workers(etor::ParallelEvaluator, pids::AbstractVector{Int}) + @info "Initializing parallel workers..." + workers_ready = RemoteChannel(() -> Channel{Int}(length(pids))) # FIXME do we need to wait for the worker? + + # spawn workers + problem = etor.problem + params_status = etor.params_status + shared_params = etor.shared_params + fitnesses_status = etor.fitnesses_status + shared_fitnesses = etor.shared_fitnesses + + worker_refs = Future[ + @spawnat(pid, run_parallel_evaluator_worker(i, workers_ready, problem, + params_status[i], shared_params[i], + fitnesses_status[i], shared_fitnesses[i])) + for (i, pid) in enumerate(pids)] + #@assert !isready(ppopt.is_started) + # wait until all the workers are started + @info "Waiting for the workers to be ready..." + # FIXME is it required? + nready = 0 + while nready < length(pids) + check_worker_running.(worker_refs) + worker_id = take!(workers_ready) + if worker_id < 0 + # worker failed to initialize, reading its task would throw an exception + check_worker_running(worker_refs[-worker_id]) + error("Exception in the worker #$(-worker_id), but all workers still running") + end + @info " Worker #$worker_id is ready" + nready += 1 + end + @info "All workers ready" + return worker_refs +end + +function shutdown!(etor::ParallelEvaluator) + @info "shutdown!(ParallelEvaluator)" + etor.is_stopping && error("Cannot shutdown!(ParallelEvaluator) twice") + etor.is_stopping = true + # notify the workers that they should shutdown (each worker should pick exactly one message) + _shutdown!(etor) + # resume workers handler if it is waiting for the new jobs + lock(etor.job_assignment) + unlock(etor.job_assignment) + # wait for all the workers + for i in 1:nworkers(etor) + Base.acquire(etor.fitness_slots) + end + @assert all(isequal(0), etor.worker2job) "Some workers have not finished" + # release any waiting + for i in 1:nworkers(etor) + Base.release(etor.fitness_slots) + end +end + +function _shutdown!(etor::ParallelEvaluator) + #@info "_shutdown!(ParallelEvaluator)" + if !etor.is_stopping + etor.is_stopping = true + #close(etor.in_fitnesses) + #close(etor.out_individuals) + end + for i in 1:nworkers(etor) + etor.params_status[i][1] = PEStatus_Shutdown + end + etor +end + +function update_done_jobs!(etor::ParallelEvaluator, job_id) + if job_id > etor.max_done_job + etor.max_done_job = job_id + end + if job_id == etor.max_seq_done_job+1 + # the next sequential job + etor.max_seq_done_job = job_id + # see if max_seq_done_job could be further advanced using done jobs + while etor.max_done_job > etor.max_seq_done_job && first(etor.done_jobs) == etor.max_seq_done_job+1 + etor.max_seq_done_job += 1 + shift!(etor.done_jobs) + end + else + push!(etor.done_jobs, job_id) + end +end + +function process_fitness(etor::ParallelEvaluator, worker_ix::Int) + update_archive!(etor, job_id, new_fitness) +end + +function update_archive!(etor::ParallelEvaluator{F}, job_id::Int, fness::F) where F + # update the list of done jobs + #@info "update_archive()" + candi = pop!(etor.waiting_candidates, job_id) + etor.unclaimed_candidates[job_id] = candi + @assert length(etor.unclaimed_candidates) <= 1000_000 # sanity check + update_done_jobs!(etor, job_id) + etor.last_fitness = fness + candi.fitness = archived_fitness(fness, etor.archive) + etor.num_evals += 1 + #@info "update_archive(): add_candidate()" + add_candidate!(etor.archive, candi.fitness, candi.params, candi.tag, etor.num_evals) + return end """ -Get the index of the next candidate for evaluation -based on the Base.pmap() code. +Process all incoming "fitness ready" messages until the evaluator is stopped. """ -function next_candidate!(estate::ParallelEvaluationState, worker_ix::Int) - @assert !estate.worker_busy[worker_ix] +function workers_handler!(etor::ParallelEvaluator{F}) where F + @info "workers_handler!() started" + while !is_stopping(etor) || !isempty(etor.waiting_candidates) + # master critical section + @inbounds for worker_ix in 1:nworkers(etor) + #@info "workers_handler!(): checking worker #$worker_ix..." + #@assert check_worker_running(etor.worker_refs[worker_ix]) + if (job_id = etor.worker2job[worker_ix]) > 0 && + (fitness_status = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK + @assert (fitness_status == PEStatus_Msg || is_stopping(etor)) "Worker #$worker_ix bad status: $(fitness_status)" + #@info "worker_handler!(): fitness_evaluated" + lock(etor.job_assignment) + param_status = etor.params_status[worker_ix][1] + @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) + @assert job_id > 0 - task_ix = 0 - if !is_stopped(estate) && estate.next_index <= length(estate.candidates) - # find the next candidate with unevaluated fitness - while !is_stopped(estate) && estate.next_index <= length(estate.candidates) - if isnafitness(estate.candidates[estate.next_index].fitness, estate.fitness_scheme) - task_ix = estate.next_index - estate.next_index += 1 - break; - else - estate.next_index += 1 + #@info "worker_handler!($worker_ix): got fitness for job #$job_id" + etor.worker2job[worker_ix] = 0 # clear job state + + etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received + unlock(etor.job_assignment) + Base.release(etor.fitness_slots) + + if param_status == PEStatus_OK # communication in normal state, update the archive + update_archive!(etor, job_id, new_fitness) + elseif param_status < 0 # error/stopping on the master side + # remove the candidate + delete!(etor.waiting_candidates, job_id) + end + #@info "workers_handler!(): yield to other tasks after archive update" + #yield() # free slots available, switch to the main task end end - end - if task_ix == 0 - if !isempty(estate.retry_queue) - task_ix = popfirst!(estate.retry_queue) - else - # Handles the condition where we have finished processing the requested - # lsts as well as any retryqueue entries, but there are still some jobs - # active that may result in an error and have to be retried. - while any(estate.worker_busy) - wait(estate.worker_finished) - if !isempty(estate.retry_queue) - task_ix = popfirst!(estate.retry_queue) - break + if length(etor.waiting_candidates) < nworkers(etor) + if !is_stopping(etor) && isempty(etor.waiting_candidates) + wait(etor.job_assignment.cond_wait) + else + #@info "workers_handler!(): yield to other tasks" + if !isempty(fitness_done(etor).waitq) + # somebody still waiting, notify + notify(fitness_done(etor)) end + yield() # free slots available, switch to the main task end end end - estate.worker_busy[worker_ix] = task_ix != 0 - return task_ix + @info "workers_handler!() stopped" end """ -Notify that the worker process is finished and reset its busy flag. +Asynchronously calculate the fitness of a candidate. +If `force`, existing fitness would be re-evaluated. + +Returns -2 if evaluator does not accept jobs because it's shutting down + -1 if no fitness evaluation was scheduled (`wait=false` and all workers occupied), + 0 if fitness is already evaluated, + id of fitness evaluation job (check status using `isready()`) """ -function worker_finished!(estate::ParallelEvaluationState, worker_ix::Int) - @assert estate.worker_busy[worker_ix] - estate.worker_busy[worker_ix] = false - notify(estate.worker_finished; all=true) - return estate +function async_update_fitness( + etor::ParallelEvaluator{F,FA}, candi::Candidate{FA}; + force::Bool=false, wait::Bool=false) where {F, FA} + #@info "async_update_fitness(): starting to assign job #$(etor.next_job_id)" + if etor.is_stopping + return -2 # doesn't accept jobs + elseif !force && !isnafitness(fitness(candi), fitness_scheme(etor.archive)) + return 0 # the candidate has fitness, skip recalculation + end + if length(etor.waiting_candidates) >= queue_capacity(etor) && !wait + #@info "async_update_fitness(): queue is full, skip" + return -1 # queue full, job not submitted + end + #@info "async_update_fitness(): waiting to assign job #$(etor.next_job_id)" + Base.acquire(etor.fitness_slots) + #@info "async_update_fitness(): initial slot_state: $(etor.worker2job), $(etor.fitness_slots.curr_cnt)" + lock(etor.job_assignment) + worker_ix = findfirst(isequal(0), etor.worker2job) + @assert worker_ix !== nothing "Cannot find a worker #$(worker_ix) to put a job to" + etor.worker2job[worker_ix] = job_id = etor.next_job_id + etor.next_job_id += 1 + copyto!(etor.shared_params[worker_ix], candi.params) # share candidate with the workers + #@info "async_update_fitness(): assigning job #$job_id to worker #$worker_ix" + etor.waiting_candidates[job_id] = candi + #@info "async_update_fitness(): assert fitness status" + #@assert etor.fitnesses_status[worker_ix][1] == PEStatus_OK + #@assert etor.params_status[worker_ix][1] == PEStatus_OK + #@info "async_update_fitness(): flip param status" + etor.params_status[worker_ix][1] = PEStatus_Msg # announce a message + #@info "async_update_fitness(): assigned job #$job_id to worker #$worker_ix" + #@info "async_update_fitness(): unlock job assignment" + unlock(etor.job_assignment) + #@info "async_update_fitness(): yield()" + #yield() # dispatch the job ASAP, without this it's not getting queued + return job_id end """ -Fitness evaluator that distributes candidates fitness calculation -among several worker processes. + isready(etor::ParallelEvaluator, fit_job_id::Int) + +Check if given asynchronous fitness job calculation is complete. +`fit_job_id` is assigned by `async_update_fitness()`. """ -mutable struct ParallelEvaluator{F, FS, P<:OptimizationProblem} <: Evaluator{P} - problem::P - archive::Archive - num_evals::Int - last_fitness::F - - worker_refs::Vector{ParallelEvaluatorWorkerRef{P}} - eval_state::ParallelEvaluationState{F, FS} -end - -function ParallelEvaluator( - problem::P, archive::Archive; - pids::Vector{Int} = workers()) where {P<:OptimizationProblem} - fs = fitness_scheme(problem) - ParallelEvaluator{fitness_type(fs), typeof(fs), P}(problem, - archive, - 0, nafitness(fs), - [RemoteChannel(function () - # create fake channel and put problem there - ch = Channel{ParallelEvaluatorWorker{P}}(1) - put!(ch, ParallelEvaluatorWorker(copy(problem))) - ch - end, pid) for pid in pids], - ParallelEvaluationState(fs, length(pids)) - ) -end - -ParallelEvaluator(problem::OptimizationProblem; - pids::Vector{Int} = workers(), - archiveCapacity::Integer = 10) = - ParallelEvaluator(problem, TopListArchive(fitness_scheme(problem), numdims(problem), archiveCapacity), - pids=pids) +function Base.isready(etor::ParallelEvaluator{F,FA}, fit_job_id::Int) where {F, FA} + fit_job_id > 0 || throw(ArgumentError("Incompatible fitness job Id")) + pop!(etor.unclaimed_candidates, fit_job_id, + Candidate{FA}(Individual(), -1, etor.arch_nafitness)) # job was claimed + return fit_job_id <= etor.max_seq_done_job || in(fit_job_id, etor.done_jobs) +end -num_evals(e::ParallelEvaluator) = e.num_evals - -function update_fitness!(e::ParallelEvaluator{F}, candidates::Vector{Candidate{F}}) where F - reset!(e.eval_state, candidates) - - # based on pmap() code - @sync begin - for (widx, wref) in enumerate(e.worker_refs) - @async begin - while (candi_ix = next_candidate!(e.eval_state, widx)) != 0 - try - candi = candidates[candi_ix] - candi.fitness = remotecall_fetch(fitness, wref.where, candi.params, wref) - e.last_fitness = candi.fitness - e.num_evals += 1 - add_candidate!(e.archive, candi.fitness, candi.params, e.num_evals) - catch ex - abort!(e.eval_state) # when one worker fails, the whole process is aborted - rethrow(ex) - finally - worker_finished!(e.eval_state, widx) - end - end - end +""" + process_completed!(f::Function, etor::ParallelEvaluator) + +Processes all completed but not yet claimed candidates. +`f` accepts the completed fitness job Id and corresponding candidate, +returns `true` if the candidate was successfully claimed. +""" +function process_completed!(f::Function, etor::ParallelEvaluator) + for (job_id, candi) in etor.unclaimed_candidates + if f(job_id, candi) + # remove job_id from the waiting list and from the unclaimed list + #@info "process_completed!($job_id)" + delete!(etor.unclaimed_candidates, job_id) end end + return etor +end + +""" + fitness_done(etor::ParallelEvaluator) + +Get the condition that is triggered each time fitness evaluation completes. +""" +@inline fitness_done(etor::ParallelEvaluator) = etor.fitness_slots.cond_wait +""" + update_fitness!(etor, candidates; [force=false]) + +Calculate fitness of given `candidates`. +Waits until all fitnesses have been calculated. +`force` specifies whether to re-evaluate fitnesses already stored in `candidates`. +""" +function update_fitness!(etor::ParallelEvaluator{F,FA}, + candidates::Vector{Candidate{FA}}; + force::Bool=false) where {F,FA} + # submit the jobs + job_ids = sizehint!(BitSet(), length(candidates)) + n_pending = 0 + for candi in candidates + job_id = async_update_fitness(etor, candi, force=force, wait=true) + #@info "update_fitness!(): got job id #$job_id" + if job_id > 0 + n_pending += 1 + push!(job_ids, job_id) + elseif (job_id < 0) || (force && job_id == 0) + @warn "fitness calculation rejected" + end + end + # wait until it's done and the evaluator is active + while n_pending > 0 && !is_stopping(etor) && + !(isempty(etor.waiting_candidates) && isempty(etor.unclaimed_candidates)) + #@info "job_ids: $job_ids" + # pick up the candidates that are for us + process_completed!(etor) do job_id, candi + our_job = pop!(job_ids, job_id, 0)>0 + our_job && (n_pending -= 1) + return our_job + end + # wait until another fitness calculation event + if n_pending > 0 && isempty(etor.unclaimed_candidates) + #@info "update_fitness!(): wait()" + wait(fitness_done(etor)) + end + end + @assert (n_pending == 0) "Fitnesses not evaluated (#$job_ids)" return candidates end -# FIXME it's not efficient to calculate fitness like that with `ParallelEvaluator` -function fitness(params::Individual, e::ParallelEvaluator{F}) where F - candi = Candidate{F}(params) - update_fitness!(e, [candi]) - return candi.fitness +# WARNING it's not efficient to synchronously calculate single fitness using +# asynchronous `ParallelEvaluator` +function fitness(params::Individual, etor::ParallelEvaluator{F,FA}) where {F, FA} + candi = Candidate{FA}(params, -1, etor.arch_nafitness) + job_id = async_update_fitness(etor, candi, wait=true) + @assert job_id > 0 + while !is_stopping(etor) && + !(isempty(etor.waiting_candidates) && isempty(etor.unclaimed_candidates)) + if isready(etor, job_id) + #@info "fitness(): done" + return fitness(candi) + else + #@info "fitness(): wait()" + wait(fitness_done(etor)) + end + end + error("Fitness not evaluated") end From a3cf5b48894c954b5c63540535fb97b52ff4479e Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 27 Aug 2018 01:48:25 +0200 Subject: [PATCH 03/12] Borg: support ParallelEvaluator in async mode Parallelized versions of - update_population_fitness!() - populate_by_mutants!() - step!() --- src/borg_moea.jl | 107 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/src/borg_moea.jl b/src/borg_moea.jl index 5c711eed..a0c15531 100644 --- a/src/borg_moea.jl +++ b/src/borg_moea.jl @@ -11,6 +11,8 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO n_restarts::Int n_steps::Int + n_recombined::Int + n_processed::Int last_restart_check::Int last_restart::Int last_wrecombinate_update::Int @@ -49,7 +51,7 @@ mutable struct BorgMOEA{FS<:FitnessScheme,V<:Evaluator,P<:Population,M<:GeneticO fit_scheme = EpsBoxDominanceFitnessScheme(fit_scheme, params[:ϵ]) archive = EpsBoxArchive(fit_scheme, params) evaluator = make_evaluator(problem, archive, params) - new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0, + new{typeof(fit_scheme),typeof(evaluator),P,M,E}(evaluator, pop, Vector{Int}(), 0, 0, 0, 0, 0, 0, 0, params[:τ], params[:γ], params[:γ_δ], params[:PopulationSize], Categorical(ones(length(recombinate))/length(recombinate)), params[:θ], params[:ζ], params[:OperatorsUpdatePeriod], params[:RestartCheckPeriod], @@ -114,6 +116,12 @@ function step!(alg::BorgMOEA) if alg.n_steps >= alg.last_wrecombinate_update + alg.wrecombinate_update_period update_recombination_weights!(alg) end + recombine_individuals!(alg) + return alg +end + +function recombine_individuals!(alg::BorgMOEA) + prepare_recombination(alg) # Select the operators to apply based on their probabilities recomb_op_ix = rand(alg.recombinate_distr) recombinate!(alg, recomb_op_ix, alg.recombinate[recomb_op_ix]) @@ -138,16 +146,45 @@ function recombinate!(alg::BorgMOEA, recomb_op_ix::Int, recomb_op::CrossoverOper apply!(recomb_op, Individual[child.params for child in children], zeros(Int, length(children)), alg.population, parent_indices) for child in children - child.extra = recomb_op - child.tag = recomb_op_ix - process_candidate!(alg, child, parent_indices[1]) + preprocess_recombined!(alg, child, recomb_op_ix, parent_indices[1]) + alg.n_recombined += 1 + postprocess_recombined!(alg, child) end end -function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int) +prepare_recombination(alg::BorgMOEA) = nothing # do nothing + +function preprocess_recombined!(alg::BorgMOEA, candi::Candidate, recomb_op_ix::Int, ref_index::Int) apply!(alg.embed, candi.params, alg.population, ref_index) reset_fitness!(candi, alg.population) - ifitness = fitness(update_fitness!(alg.evaluator, candi)) # implicitly updates the archive + candi.extra = alg.recombinate[recomb_op_ix] + candi.tag = recomb_op_ix + candi +end + +function postprocess_recombined!(alg::BorgMOEA, candi::Candidate) + update_fitness!(alg.evaluator, candi) # implicitly updates the archive + process_candidate!(alg, candi) +end + +# ParallelEvaluator version -- process previously submitted candidates with the completed fitness +function prepare_recombination(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}) + process_completed!(alg.evaluator) do fit_job_id, candi + process_candidate!(alg, candi) + true + end +end + +# ParallelEvaluator version, just submit to fitness calculation, nothing else +# if the queue is full, waits until some jobs are processed -- that established the +# balance between recombining and fitness evaluation +function postprocess_recombined!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, candi::Candidate) + async_update_fitness(alg.evaluator, candi, wait=true) + return candi +end + +function process_candidate!(alg::BorgMOEA, candi::Candidate) + ifitness = fitness(candi) # test the population hat_comp = HatCompare(fitness_scheme(archive(alg))) popsz = popsize(alg.population) @@ -190,6 +227,7 @@ function process_candidate!(alg::BorgMOEA, candi::Candidate, ref_index::Int) else release_candi(alg.population, candi) end + alg.n_processed += 1 alg end @@ -221,6 +259,38 @@ function update_population_fitness!(alg::BorgMOEA) end end +function update_population_fitness!(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}) + fs = fitness_scheme(alg.evaluator.archive) + popsz = popsize(alg.population) + last_checked = n_processed = 0 + job_ids = BitSet() + while n_processed < popsz + # process the calculated fitnesses + process_completed!(alg.evaluator) do fit_job_id, candi + if pop!(job_ids, fit_job_id, 0) > 0 + alg.population.fitness[candi.index] = candi.fitness + release_candi(alg.population, candi) + n_processed += 1 + return true + else + return false # some unrelated candidate, skip + end + end + if last_checked < popsz + # submit to fitness evaluation + ix = (last_checked += 1) + if isnafitness(fitness(alg.population, ix), fs) + candi = acquire_candi(alg.population, ix) + push!(job_ids, async_update_fitness(alg.evaluator, candi, wait=true)) + else # fitness already calculated + n_processed += 1 + end + else + yield() # allow the other tasks to process the incoming fitnesses + end + end +end + """ Update recombination operator probabilities based on the archive tag counts. """ @@ -254,6 +324,31 @@ function populate_by_mutants(alg::BorgMOEA, last_nonmutant::Int) end end +function populate_by_mutants(alg::BorgMOEA{<:FitnessScheme, <:ParallelEvaluator}, last_nonmutant::Int) + popsz = popsize(alg.population) + last_submitted = last_accepted = last_nonmutant + mutant_job_ids = BitSet() + while last_accepted < popsz + # process the calculated fitnesses + process_completed!(alg.evaluator) do fit_job_id, candi + if pop!(mutant_job_ids, fit_job_id, 0) > 0 # it's our mutant! + candi.index = (last_accepted += 1) + accept_candi!(alg.population, candi) + return true + else + return false # some unrelated candidate, skip + end + end + if last_submitted < popsz + # generate the new mutant and submit to fitness evaluation + mutant = acquire_mutant(alg, last_submitted+=1, last_nonmutant) + push!(mutant_job_ids, async_update_fitness(alg.evaluator, mutant, wait=true)) + else + yield() # allow the other tasks to process the incoming fitnesses + end + end +end + """ Restart Borg MOEA. From 59879a6ce7b41c28c9e018fb0eef84e6ae270e06 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Tue, 30 Aug 2016 16:53:02 +0200 Subject: [PATCH 04/12] Borg: test Borg with ParallelEvaluator --- test/test_borg_moea.jl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/test_borg_moea.jl b/test/test_borg_moea.jl index 4c5ebd54..eb71e5f4 100644 --- a/test/test_borg_moea.jl +++ b/test/test_borg_moea.jl @@ -6,6 +6,17 @@ MaxSteps=5000, TraceMode=:silent) @test BlackBoxOptim.IGD(BlackBoxOptim.Schaffer1Family.opt_value, pareto_frontier(res), fitness_scheme(res), Val{length(best_candidate(res))}) < 0.05 + + @testset "using ParallelEvaluator" begin + opt = bbsetup(BlackBoxOptim.Schaffer1Family; Method=:borg_moea, + FitnessScheme=ParetoFitnessScheme{2}(is_minimizing=true), + SearchRange=(-10.0, 10.0), NumDimensions=2, ϵ=0.01, + MaxSteps=5000, TraceMode=:verbose, TraceInterval=1.0, Workers=workers()) + res = bboptimize(opt) + @test isa(BlackBoxOptim.evaluator(lastrun(opt)), BlackBoxOptim.ParallelEvaluator) + @test BlackBoxOptim.IGD(BlackBoxOptim.Schaffer1Family.opt_value, pareto_frontier(res), + fitness_scheme(res), Val{length(best_candidate(res))}) < 0.05 + end end @testset "CEC09_UP8" begin res = bboptimize(BlackBoxOptim.CEC09_Unconstrained_Set[8]; Method=:borg_moea, From 7e09cb51e1fefeadaa8e46cf149a8fb76627dda5 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Thu, 22 Sep 2016 13:53:09 +0200 Subject: [PATCH 05/12] ParallelEvaluator: multi-objective tests --- test/test_evaluator.jl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/test_evaluator.jl b/test/test_evaluator.jl index a8733212..c3245519 100644 --- a/test/test_evaluator.jl +++ b/test/test_evaluator.jl @@ -79,6 +79,24 @@ end using Distributed evaluator_tests(() -> BlackBoxOptim.ParallelEvaluator(p, pids=workers())) + + @testset "multi-objective problem" begin + schaffer1(x) = (sum(abs2, x), sum(xx -> abs2(xx - 2.0), x)) + p = BlackBoxOptim.FunctionBasedProblem(schaffer1, "Schaffer1", ParetoFitnessScheme{2}(is_minimizing=true), + symmetric_search_space(5, (-10.0, 10.0))) + a = EpsBoxArchive(EpsBoxDominanceFitnessScheme(fitness_scheme(p)), max_size=100) + + e = BlackBoxOptim.ParallelEvaluator(p, a, pids=workers()) + fit1 = fitness([0.0, 1.0, 2.0, 3.0, 4.0], e) + @test BlackBoxOptim.num_evals(e) == 1 + @test BlackBoxOptim.last_fitness(e) == fit1.orig + + fit2 = fitness([0.0, -1.0, -2.0, -3.0, -4.0], e) + @test BlackBoxOptim.num_evals(e) == 2 + @test BlackBoxOptim.last_fitness(e) == fit2.orig + + BlackBoxOptim.shutdown!(e) + end end end From 860d017e989ba1d520b935c5b0b1114da5ae37de Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 12 Nov 2018 12:10:44 +0100 Subject: [PATCH 06/12] using: add SharedArrays looks like using SharedArrays in parallel_evaluator.jl is not enough --- src/BlackBoxOptim.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BlackBoxOptim.jl b/src/BlackBoxOptim.jl index 32aab948..fba59591 100644 --- a/src/BlackBoxOptim.jl +++ b/src/BlackBoxOptim.jl @@ -1,6 +1,6 @@ module BlackBoxOptim -using Distributions, StatsBase, Random, LinearAlgebra, Printf, Distributed, Compat +using Distributions, StatsBase, Random, LinearAlgebra, Printf, Distributed, SharedArrays, Compat using Printf: @printf, @sprintf using Compat: String, view From 90ec83f8797e7c0b3e722b22b7d92b5946a0ae00 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 12 Nov 2018 12:14:29 +0100 Subject: [PATCH 07/12] parall_eval: workers_handler => workers_listener --- src/parallel_evaluator.jl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index f4ba3e5e..6049cfe1 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -139,7 +139,7 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv is_stopping::Bool # whether the evaluator is in shutdown sequence worker_refs::Vector{Future} # references to worker processes - workers_handler::Task # task in the main process that runs workers_handler() + workers_listener::Task # task in the main process that runs workers_listener!() function ParallelEvaluator( problem::P, archive::A; @@ -163,7 +163,7 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv 1, false ) etor.worker_refs = _create_workers(etor, pids) - etor.workers_handler = @async workers_handler!(etor) + etor.workers_listener = @async workers_listener!(etor) #finalizer(etor, _shutdown!) return etor @@ -245,7 +245,7 @@ function shutdown!(etor::ParallelEvaluator) etor.is_stopping = true # notify the workers that they should shutdown (each worker should pick exactly one message) _shutdown!(etor) - # resume workers handler if it is waiting for the new jobs + # resume workers listener if it is waiting for the new jobs lock(etor.job_assignment) unlock(etor.job_assignment) # wait for all the workers @@ -311,23 +311,23 @@ end """ Process all incoming "fitness ready" messages until the evaluator is stopped. """ -function workers_handler!(etor::ParallelEvaluator{F}) where F - @info "workers_handler!() started" +function workers_listener!(etor::ParallelEvaluator{F}) where F + @info "workers_listener!() started" while !is_stopping(etor) || !isempty(etor.waiting_candidates) # master critical section @inbounds for worker_ix in 1:nworkers(etor) - #@info "workers_handler!(): checking worker #$worker_ix..." + #@info "workers_listener!(): checking worker #$worker_ix..." #@assert check_worker_running(etor.worker_refs[worker_ix]) if (job_id = etor.worker2job[worker_ix]) > 0 && (fitness_status = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK @assert (fitness_status == PEStatus_Msg || is_stopping(etor)) "Worker #$worker_ix bad status: $(fitness_status)" - #@info "worker_handler!(): fitness_evaluated" + #@info "worker_listener!(): fitness_evaluated" lock(etor.job_assignment) param_status = etor.params_status[worker_ix][1] @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) @assert job_id > 0 - #@info "worker_handler!($worker_ix): got fitness for job #$job_id" + #@info "worker_listener!($worker_ix): got fitness for job #$job_id" etor.worker2job[worker_ix] = 0 # clear job state etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received @@ -340,7 +340,7 @@ function workers_handler!(etor::ParallelEvaluator{F}) where F # remove the candidate delete!(etor.waiting_candidates, job_id) end - #@info "workers_handler!(): yield to other tasks after archive update" + #@info "workers_listener!(): yield to other tasks after archive update" #yield() # free slots available, switch to the main task end end @@ -348,7 +348,7 @@ function workers_handler!(etor::ParallelEvaluator{F}) where F if !is_stopping(etor) && isempty(etor.waiting_candidates) wait(etor.job_assignment.cond_wait) else - #@info "workers_handler!(): yield to other tasks" + #@info "workers_listener!(): yield to other tasks" if !isempty(fitness_done(etor).waitq) # somebody still waiting, notify notify(fitness_done(etor)) @@ -357,7 +357,7 @@ function workers_handler!(etor::ParallelEvaluator{F}) where F end end end - @info "workers_handler!() stopped" + @info "workers_listener!() stopped" end """ From 0d92e80f440b8f97636dde63ee6c943073cc8239 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 12 Nov 2018 13:55:07 +0100 Subject: [PATCH 08/12] master/worker exchange job ids as statuses worker2job is replaced with busy_workers --- src/parallel_evaluator.jl | 41 ++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index 6049cfe1..573de5bf 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -2,8 +2,7 @@ using Distributed, SharedArrays #= master <-> worker params_status/fitness_status codes =# -const PEStatus_Msg = 1 -const PEStatus_OK = 0 +const PEStatus_OK = 0 # message received; positive statuses = job_id const PEStatus_Shutdown = -1 const PEStatus_Error = -2 @@ -11,7 +10,7 @@ const PEStatus_Error = -2 Internal data for the worker process of the parallel evaluator. """ mutable struct ParallelEvaluatorWorker{T, P<:OptimizationProblem} - id::Int + id::Int # worker ID problem::P param_status::SharedVector{Int} # master notifies worker about new requests shared_param::SharedVector{T} # master puts candidates parameters @@ -36,7 +35,7 @@ function run!(worker::ParallelEvaluatorWorker) # continuously poll the worker for the delivery notification for # the last job or for the new job notification i = 0 - while param_status(worker) == PEStatus_OK || fitness_status(worker) == PEStatus_Msg + while param_status(worker) == PEStatus_OK || fitness_status(worker) > 0 if (i+=1) > 1000 # allow executing the other tasks once in a while yield() i = 0 @@ -50,11 +49,11 @@ function run!(worker::ParallelEvaluatorWorker) elseif p_status == PEStatus_Error # master error (currently not set?) worker.fitness_status[1] = PEStatus_Shutdown # stopped after receiving an error break - elseif p_status == PEStatus_Msg # new job + elseif p_status > 0 # new job (status is job_id) worker.param_status[1] = PEStatus_OK # received, reset the statuts #@info "PE worker #$(worker.id): got job" @inbounds setfitness!(worker.shared_fitness, fitness(worker.shared_param, worker.problem)) - worker.fitness_status[1] = PEStatus_Msg # fitness ready + worker.fitness_status[1] = p_status # fitness ready end end end @@ -127,8 +126,8 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv waiting_candidates::PECandidateDict{FA} # candidates waiting their fitness calculation to be completed unclaimed_candidates::PECandidateDict{FA} # candidates with calculated fitness that were not yet checked for completion (by process_completed()) - worker2job::Vector{Int} # job IDs assigned to workers - job_assignment::ReentrantLock # lock to provide exclusive access to worker2job + busy_workers::BitVector # workers available for accepting jobs + job_assignment::ReentrantLock # lock to provide exclusive access to busy_workers max_seq_done_job::Int # all jobs from 1 to max_seq_done_job are done max_done_job::Int # max Id of done job @@ -159,7 +158,7 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv [SharedArray{T}((numobjectives(fs),), pids=vcat(pid,[myid()])) for pid in pids], Base.Semaphore(length(pids)), PECandidateDict{FA}(), PECandidateDict{FA}(), - zeros(length(pids)), ReentrantLock(), 0, 0, BitSet(), + falses(length(pids)), ReentrantLock(), 0, 0, BitSet(), 1, false ) etor.worker_refs = _create_workers(etor, pids) @@ -252,7 +251,7 @@ function shutdown!(etor::ParallelEvaluator) for i in 1:nworkers(etor) Base.acquire(etor.fitness_slots) end - @assert all(isequal(0), etor.worker2job) "Some workers have not finished" + @assert !any(etor.busy_workers) "Some workers have not finished" # release any waiting for i in 1:nworkers(etor) Base.release(etor.fitness_slots) @@ -318,22 +317,19 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F @inbounds for worker_ix in 1:nworkers(etor) #@info "workers_listener!(): checking worker #$worker_ix..." #@assert check_worker_running(etor.worker_refs[worker_ix]) - if (job_id = etor.worker2job[worker_ix]) > 0 && - (fitness_status = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK - @assert (fitness_status == PEStatus_Msg || is_stopping(etor)) "Worker #$worker_ix bad status: $(fitness_status)" + if etor.busy_workers[worker_ix] && (job_id = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK + @assert (job_id > 0 || is_stopping(etor)) "Worker #$worker_ix bad status: $job_id" #@info "worker_listener!(): fitness_evaluated" lock(etor.job_assignment) - param_status = etor.params_status[worker_ix][1] @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) - @assert job_id > 0 - #@info "worker_listener!($worker_ix): got fitness for job #$job_id" - etor.worker2job[worker_ix] = 0 # clear job state - etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received + etor.busy_workers[worker_ix] = false # available again + unlock(etor.job_assignment) Base.release(etor.fitness_slots) + param_status = etor.params_status[worker_ix][1] if param_status == PEStatus_OK # communication in normal state, update the archive update_archive!(etor, job_id, new_fitness) elseif param_status < 0 # error/stopping on the master side @@ -373,6 +369,7 @@ function async_update_fitness( etor::ParallelEvaluator{F,FA}, candi::Candidate{FA}; force::Bool=false, wait::Bool=false) where {F, FA} #@info "async_update_fitness(): starting to assign job #$(etor.next_job_id)" + job_id = etor.next_job_id # tentative job id, but not assigned yet if etor.is_stopping return -2 # doesn't accept jobs elseif !force && !isnafitness(fitness(candi), fitness_scheme(etor.archive)) @@ -384,11 +381,11 @@ function async_update_fitness( end #@info "async_update_fitness(): waiting to assign job #$(etor.next_job_id)" Base.acquire(etor.fitness_slots) - #@info "async_update_fitness(): initial slot_state: $(etor.worker2job), $(etor.fitness_slots.curr_cnt)" + #@info "async_update_fitness(): initial slot_state: $(etor.busy_workers), $(etor.fitness_slots.curr_cnt)" lock(etor.job_assignment) - worker_ix = findfirst(isequal(0), etor.worker2job) + worker_ix = Base.findfirstnot(etor.busy_workers) @assert worker_ix !== nothing "Cannot find a worker #$(worker_ix) to put a job to" - etor.worker2job[worker_ix] = job_id = etor.next_job_id + etor.busy_workers[worker_ix] = true # busy etor.next_job_id += 1 copyto!(etor.shared_params[worker_ix], candi.params) # share candidate with the workers #@info "async_update_fitness(): assigning job #$job_id to worker #$worker_ix" @@ -397,7 +394,7 @@ function async_update_fitness( #@assert etor.fitnesses_status[worker_ix][1] == PEStatus_OK #@assert etor.params_status[worker_ix][1] == PEStatus_OK #@info "async_update_fitness(): flip param status" - etor.params_status[worker_ix][1] = PEStatus_Msg # announce a message + etor.params_status[worker_ix][1] = job_id # announce a message (status = job_id) #@info "async_update_fitness(): assigned job #$job_id to worker #$worker_ix" #@info "async_update_fitness(): unlock job assignment" unlock(etor.job_assignment) From 9a8954d2e8bf09bbaf736abd7c53bbe83cf2070d Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 12 Nov 2018 14:12:46 +0100 Subject: [PATCH 09/12] par_eval: refactor listener/main task comm - don't use fitness_slots for communication, add dedicated job_done, job_submitted - add fitness to the archive outside of job_assignments critical section --- src/parallel_evaluator.jl | 58 ++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index 573de5bf..285b0018 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -35,7 +35,7 @@ function run!(worker::ParallelEvaluatorWorker) # continuously poll the worker for the delivery notification for # the last job or for the new job notification i = 0 - while param_status(worker) == PEStatus_OK || fitness_status(worker) > 0 + while param_status(worker) == PEStatus_OK if (i+=1) > 1000 # allow executing the other tasks once in a while yield() i = 0 @@ -120,6 +120,8 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv fitnesses_status::Vector{SharedVector{Int}} # workers notify master about completed evaluation shared_fitnesses::Vector{SharedVector{T}} # workers put calculated fitness + job_submitted::Condition + job_done::Condition fitness_slots::Base.Semaphore # gets acquired when a worker needs to be assigned to a task; # used to organize waiting when all workers are busy @@ -156,7 +158,7 @@ mutable struct ParallelEvaluator{F, FA, T, FS, P<:OptimizationProblem, A<:Archiv [SharedArray{T}((numdims(problem),), pids=vcat(pid,[myid()])) for pid in pids], [fill!(SharedArray{Int}((2,), pids=vcat(pid,[myid()])), 0) for pid in pids], [SharedArray{T}((numobjectives(fs),), pids=vcat(pid,[myid()])) for pid in pids], - Base.Semaphore(length(pids)), + Condition(), Condition(), Base.Semaphore(length(pids)), PECandidateDict{FA}(), PECandidateDict{FA}(), falses(length(pids)), ReentrantLock(), 0, 0, BitSet(), 1, false @@ -245,8 +247,8 @@ function shutdown!(etor::ParallelEvaluator) # notify the workers that they should shutdown (each worker should pick exactly one message) _shutdown!(etor) # resume workers listener if it is waiting for the new jobs - lock(etor.job_assignment) - unlock(etor.job_assignment) + notify(etor.job_done) + notify(etor.job_submitted) # wait for all the workers for i in 1:nworkers(etor) Base.acquire(etor.fitness_slots) @@ -288,13 +290,9 @@ function update_done_jobs!(etor::ParallelEvaluator, job_id) end end -function process_fitness(etor::ParallelEvaluator, worker_ix::Int) - update_archive!(etor, job_id, new_fitness) -end - -function update_archive!(etor::ParallelEvaluator{F}, job_id::Int, fness::F) where F +function get_updated_candidate!(etor::ParallelEvaluator{F}, job_id::Int, fness::F) where F # update the list of done jobs - #@info "update_archive()" + #@info "get_updated_candidate(job_id=#$job_id)" candi = pop!(etor.waiting_candidates, job_id) etor.unclaimed_candidates[job_id] = candi @assert length(etor.unclaimed_candidates) <= 1000_000 # sanity check @@ -302,9 +300,7 @@ function update_archive!(etor::ParallelEvaluator{F}, job_id::Int, fness::F) wher etor.last_fitness = fness candi.fitness = archived_fitness(fness, etor.archive) etor.num_evals += 1 - #@info "update_archive(): add_candidate()" - add_candidate!(etor.archive, candi.fitness, candi.params, candi.tag, etor.num_evals) - return + return candi end """ @@ -314,8 +310,9 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F @info "workers_listener!() started" while !is_stopping(etor) || !isempty(etor.waiting_candidates) # master critical section - @inbounds for worker_ix in 1:nworkers(etor) - #@info "workers_listener!(): checking worker #$worker_ix..." + worker_ix = findfirst(etor.busy_workers) + while worker_ix !== nothing + #@info "workers_listener!(): checking busy worker #$worker_ix..." #@assert check_worker_running(etor.worker_refs[worker_ix]) if etor.busy_workers[worker_ix] && (job_id = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK @assert (job_id > 0 || is_stopping(etor)) "Worker #$worker_ix bad status: $job_id" @@ -323,32 +320,28 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F lock(etor.job_assignment) @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) #@info "worker_listener!($worker_ix): got fitness for job #$job_id" + candi = get_updated_candidate!(etor, job_id, new_fitness) etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received etor.busy_workers[worker_ix] = false # available again unlock(etor.job_assignment) Base.release(etor.fitness_slots) + notify(etor.job_done) param_status = etor.params_status[worker_ix][1] if param_status == PEStatus_OK # communication in normal state, update the archive - update_archive!(etor, job_id, new_fitness) - elseif param_status < 0 # error/stopping on the master side - # remove the candidate - delete!(etor.waiting_candidates, job_id) + add_candidate!(etor.archive, candi.fitness, candi.params, candi.tag, etor.num_evals) end + worker_ix = findnext(etor.busy_workers, worker_ix+1) #@info "workers_listener!(): yield to other tasks after archive update" #yield() # free slots available, switch to the main task end end - if length(etor.waiting_candidates) < nworkers(etor) - if !is_stopping(etor) && isempty(etor.waiting_candidates) - wait(etor.job_assignment.cond_wait) + if !is_stopping(etor) + if isempty(etor.waiting_candidates) + wait(etor.job_submitted) else #@info "workers_listener!(): yield to other tasks" - if !isempty(fitness_done(etor).waitq) - # somebody still waiting, notify - notify(fitness_done(etor)) - end yield() # free slots available, switch to the main task end end @@ -398,6 +391,8 @@ function async_update_fitness( #@info "async_update_fitness(): assigned job #$job_id to worker #$worker_ix" #@info "async_update_fitness(): unlock job assignment" unlock(etor.job_assignment) + notify(etor.job_submitted) + #@info "async_update_fitness(): yield()" #yield() # dispatch the job ASAP, without this it's not getting queued return job_id @@ -434,13 +429,6 @@ function process_completed!(f::Function, etor::ParallelEvaluator) return etor end -""" - fitness_done(etor::ParallelEvaluator) - -Get the condition that is triggered each time fitness evaluation completes. -""" -@inline fitness_done(etor::ParallelEvaluator) = etor.fitness_slots.cond_wait - """ update_fitness!(etor, candidates; [force=false]) @@ -477,7 +465,7 @@ function update_fitness!(etor::ParallelEvaluator{F,FA}, # wait until another fitness calculation event if n_pending > 0 && isempty(etor.unclaimed_candidates) #@info "update_fitness!(): wait()" - wait(fitness_done(etor)) + wait(etor.job_done) end end @assert (n_pending == 0) "Fitnesses not evaluated (#$job_ids)" @@ -497,7 +485,7 @@ function fitness(params::Individual, etor::ParallelEvaluator{F,FA}) where {F, FA return fitness(candi) else #@info "fitness(): wait()" - wait(fitness_done(etor)) + wait(etor.job_done) end end error("Fitness not evaluated") From 748f545097ce0940d888164997b7ced27cf99b29 Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 12 Nov 2018 14:21:25 +0100 Subject: [PATCH 10/12] par_eval: update logging - convert `@info` into `@debug` - improve debug message verbosity --- src/parallel_evaluator.jl | 95 ++++++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index 285b0018..f619c544 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -31,7 +31,7 @@ fitness_status(worker::ParallelEvaluatorWorker) = @inbounds first(worker.fitness # run the wrapper (called in the "main" task) function run!(worker::ParallelEvaluatorWorker) while true - #@info "Checking in worker #$(worker.id)" + ##@debug "Checking state of worker #$(worker.id)" # continuously poll the worker for the delivery notification for # the last job or for the new job notification i = 0 @@ -51,8 +51,9 @@ function run!(worker::ParallelEvaluatorWorker) break elseif p_status > 0 # new job (status is job_id) worker.param_status[1] = PEStatus_OK # received, reset the statuts - #@info "PE worker #$(worker.id): got job" + #@debug "PE worker #$(worker.id): got job #$p_status" @inbounds setfitness!(worker.shared_fitness, fitness(worker.shared_param, worker.problem)) + #@debug "PE worker #$(worker.id): job #$p_status done" worker.fitness_status[1] = p_status # fitness ready end end @@ -75,24 +76,23 @@ function run_parallel_evaluator_worker(id::Int, worker = ParallelEvaluatorWorker(id, deepcopy(problem), param_status, shared_param, fitness_status, shared_fitness) - catch e + catch ex # send -id to notify about an error and to release # the master from waiting for worker readiness - @warn "Exception at ParallelEvaluatorWorker initialization: $e" + @warn "Exception at ParallelEvaluatorWorker initialization" exception=ex put!(worker_ready, -id) - rethrow(e) + rethrow(ex) end - # create immigrants receiving tasks=# - put!(worker_ready, id) + put!(worker_ready, id) # notify @info "Running worker #$id..." try run!(worker) - catch e + catch ex # send error candidate to notify about an error and to release # the master from waiting for worker messages - @warn "Exception while running ParallelEvaluatorWorker: $e" + @warn "Exception while running ParallelEvaluatorWorker" exception=ex worker.fitness_status[1] = PEStatus_Error - rethrow(e) + rethrow(ex) end @info "Worker #$id stopped" nothing @@ -261,7 +261,7 @@ function shutdown!(etor::ParallelEvaluator) end function _shutdown!(etor::ParallelEvaluator) - #@info "_shutdown!(ParallelEvaluator)" + #@debug "_shutdown!(ParallelEvaluator)" if !etor.is_stopping etor.is_stopping = true #close(etor.in_fitnesses) @@ -292,7 +292,7 @@ end function get_updated_candidate!(etor::ParallelEvaluator{F}, job_id::Int, fness::F) where F # update the list of done jobs - #@info "get_updated_candidate(job_id=#$job_id)" + #@debug "get_updated_candidate(job_id=#$job_id)" candi = pop!(etor.waiting_candidates, job_id) etor.unclaimed_candidates[job_id] = candi @assert length(etor.unclaimed_candidates) <= 1000_000 # sanity check @@ -312,36 +312,45 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F # master critical section worker_ix = findfirst(etor.busy_workers) while worker_ix !== nothing - #@info "workers_listener!(): checking busy worker #$worker_ix..." + #@debug "workers_listener!(): checking busy worker #$worker_ix..." #@assert check_worker_running(etor.worker_refs[worker_ix]) - if etor.busy_workers[worker_ix] && (job_id = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK + if (job_id = etor.fitnesses_status[worker_ix][1]) != PEStatus_OK @assert (job_id > 0 || is_stopping(etor)) "Worker #$worker_ix bad status: $job_id" - #@info "worker_listener!(): fitness_evaluated" + #@debug "worker_listener!(worker=#$worker_ix): job #$job_id done" + + #@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): locking job_assignment to get fitness" lock(etor.job_assignment) @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) - #@info "worker_listener!($worker_ix): got fitness for job #$job_id" candi = get_updated_candidate!(etor, job_id, new_fitness) etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received etor.busy_workers[worker_ix] = false # available again - + #@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): unlocking job_assignment after getting fitness" unlock(etor.job_assignment) + + #@debug "workers_listener!(worker=#$worker_ix, job=#$job_id): releasing fitness_slots (sem_size=$(etor.fitness_slots.sem_size) cur_count=$(etor.fitness_slots.curr_cnt))" Base.release(etor.fitness_slots) + #@debug "workers_listener!(worker=#$worker_ix, job=#$job_id): released fitness_slots (sem_size=$(etor.fitness_slots.sem_size) cur_count=$(etor.fitness_slots.curr_cnt))" + #@debug "workers_listener!(): notify fitness calculation done" notify(etor.job_done) + ##@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): checking param status" param_status = etor.params_status[worker_ix][1] if param_status == PEStatus_OK # communication in normal state, update the archive + #@debug "workers_listener(job_id=#$job_id): add_candidate(archive)" add_candidate!(etor.archive, candi.fitness, candi.params, candi.tag, etor.num_evals) + #@debug "workers_listener(job_id=#$job_id): done" end worker_ix = findnext(etor.busy_workers, worker_ix+1) - #@info "workers_listener!(): yield to other tasks after archive update" - #yield() # free slots available, switch to the main task + ##@debug "workers_listener!(): yield to other tasks after archive update (worker_ix=#$worker_ix, job_id=#$job_id)" + ##yield() # free slots available, switch to the main task end end if !is_stopping(etor) if isempty(etor.waiting_candidates) + #@debug "workers_listener!(): all jobs done, waiting for the new assignment..." wait(etor.job_submitted) else - #@info "workers_listener!(): yield to other tasks" + #@debug "workers_listener!(): yield to other tasks" yield() # free slots available, switch to the main task end end @@ -361,40 +370,43 @@ Returns -2 if evaluator does not accept jobs because it's shutting down function async_update_fitness( etor::ParallelEvaluator{F,FA}, candi::Candidate{FA}; force::Bool=false, wait::Bool=false) where {F, FA} - #@info "async_update_fitness(): starting to assign job #$(etor.next_job_id)" job_id = etor.next_job_id # tentative job id, but not assigned yet + #@debug "async_update_fitness(): starting to assign job #$job_id" if etor.is_stopping return -2 # doesn't accept jobs elseif !force && !isnafitness(fitness(candi), fitness_scheme(etor.archive)) + #@debug "async_update_fitness(): don't recalculate job #$job_id" return 0 # the candidate has fitness, skip recalculation end if length(etor.waiting_candidates) >= queue_capacity(etor) && !wait - #@info "async_update_fitness(): queue is full, skip" + #@debug "async_update_fitness(): queue is full, skip job #$job_id" return -1 # queue full, job not submitted end - #@info "async_update_fitness(): waiting to assign job #$(etor.next_job_id)" + #@debug "async_update_fitness(job_id=#$job_id): sem_size=$(etor.fitness_slots.sem_size) cur_count=$(etor.fitness_slots.curr_cnt)" + #@debug "async_update_fitness(): waiting to assign job #$job_id" Base.acquire(etor.fitness_slots) - #@info "async_update_fitness(): initial slot_state: $(etor.busy_workers), $(etor.fitness_slots.curr_cnt)" + #@debug "async_update_fitness(job_id=#$job_id): sem_size=$(etor.fitness_slots.sem_size) cur_count=$(etor.fitness_slots.curr_cnt)" + #@debug "async_update_fitness(job_id=#$job_id): busy_workers=$(etor.busy_workers), $(etor.fitness_slots.curr_cnt)" + lock(etor.job_assignment) worker_ix = Base.findfirstnot(etor.busy_workers) - @assert worker_ix !== nothing "Cannot find a worker #$(worker_ix) to put a job to" + @assert worker_ix !== nothing "Cannot find a worker to put a job to" + ##@debug "async_update_fitness(job_id=#$job_id, worker=#$worker_ix): assert fitness status" + ##@assert etor.fitnesses_status[worker_ix][1] == PEStatus_OK + ##@assert etor.params_status[worker_ix][1] == PEStatus_OK + #@debug "async_update_fitness(): assigning job #$job_id to worker #$worker_ix" etor.busy_workers[worker_ix] = true # busy etor.next_job_id += 1 copyto!(etor.shared_params[worker_ix], candi.params) # share candidate with the workers - #@info "async_update_fitness(): assigning job #$job_id to worker #$worker_ix" etor.waiting_candidates[job_id] = candi - #@info "async_update_fitness(): assert fitness status" - #@assert etor.fitnesses_status[worker_ix][1] == PEStatus_OK - #@assert etor.params_status[worker_ix][1] == PEStatus_OK - #@info "async_update_fitness(): flip param status" etor.params_status[worker_ix][1] = job_id # announce a message (status = job_id) - #@info "async_update_fitness(): assigned job #$job_id to worker #$worker_ix" - #@info "async_update_fitness(): unlock job assignment" + #@debug "async_update_fitness(job_id=#$job_id, worker=#$worker_ix): unlock job assignment" unlock(etor.job_assignment) + #@debug "async_update_fitness(job_id=#$job_id, worker=#$worker_ix): assigned job #$job_id to worker #$worker_ix" notify(etor.job_submitted) - #@info "async_update_fitness(): yield()" - #yield() # dispatch the job ASAP, without this it's not getting queued + ##@debug "async_update_fitness(job_id=#$job_id, worker=#$worker_ix): yield()" + ##yield() # dispatch the job ASAP, without this it's not getting queued return job_id end @@ -422,7 +434,7 @@ function process_completed!(f::Function, etor::ParallelEvaluator) for (job_id, candi) in etor.unclaimed_candidates if f(job_id, candi) # remove job_id from the waiting list and from the unclaimed list - #@info "process_completed!($job_id)" + #@debug "process_completed!(job_id=#$job_id)" delete!(etor.unclaimed_candidates, job_id) end end @@ -444,7 +456,7 @@ function update_fitness!(etor::ParallelEvaluator{F,FA}, n_pending = 0 for candi in candidates job_id = async_update_fitness(etor, candi, force=force, wait=true) - #@info "update_fitness!(): got job id #$job_id" + #@debug "update_fitness!(): got job_id=#$job_id" if job_id > 0 n_pending += 1 push!(job_ids, job_id) @@ -455,7 +467,7 @@ function update_fitness!(etor::ParallelEvaluator{F,FA}, # wait until it's done and the evaluator is active while n_pending > 0 && !is_stopping(etor) && !(isempty(etor.waiting_candidates) && isempty(etor.unclaimed_candidates)) - #@info "job_ids: $job_ids" + #@debug "update_fitness!(): job_ids=$job_ids" # pick up the candidates that are for us process_completed!(etor) do job_id, candi our_job = pop!(job_ids, job_id, 0)>0 @@ -464,7 +476,7 @@ function update_fitness!(etor::ParallelEvaluator{F,FA}, end # wait until another fitness calculation event if n_pending > 0 && isempty(etor.unclaimed_candidates) - #@info "update_fitness!(): wait()" + #@debug "update_fitness!(): wait for $n_pending job(s)..." wait(etor.job_done) end end @@ -478,15 +490,18 @@ function fitness(params::Individual, etor::ParallelEvaluator{F,FA}) where {F, FA candi = Candidate{FA}(params, -1, etor.arch_nafitness) job_id = async_update_fitness(etor, candi, wait=true) @assert job_id > 0 + #@debug "fitness(): is_stopping=$(is_stopping(etor)) waiting_candidates=$(isempty(etor.waiting_candidates)) unclaimed_candidates=$(isempty(etor.unclaimed_candidates))" while !is_stopping(etor) && !(isempty(etor.waiting_candidates) && isempty(etor.unclaimed_candidates)) if isready(etor, job_id) - #@info "fitness(): done" + #@debug "fitness(): job #$job_id done" return fitness(candi) else - #@info "fitness(): wait()" + #@debug "fitness(): job #$job_id wait()" wait(etor.job_done) + #@debug "fitness(): job #$job_id wait() done" end + #@debug "fitness(): is_stopping=$(is_stopping(etor)) waiting_candidates=$(isempty(etor.waiting_candidates)) unclaimed_candidates=$(isempty(etor.unclaimed_candidates))" end error("Fitness not evaluated") end From 529ace37f110a0461e2192fffa3b5d75a70d23ea Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Tue, 11 Dec 2018 15:18:18 +0100 Subject: [PATCH 11/12] shorter job assignment locks also avoid race when reading worker param status --- src/parallel_evaluator.jl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index f619c544..174acdc7 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -318,10 +318,12 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F @assert (job_id > 0 || is_stopping(etor)) "Worker #$worker_ix bad status: $job_id" #@debug "worker_listener!(worker=#$worker_ix): job #$job_id done" - #@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): locking job_assignment to get fitness" - lock(etor.job_assignment) @inbounds new_fitness = getfitness(F, etor.shared_fitnesses[worker_ix]) candi = get_updated_candidate!(etor, job_id, new_fitness) + ##@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): getting param status" + param_status = etor.params_status[worker_ix][1] + @debug "worker_listener!(worker=#$worker_ix, job=#$job_id): locking job_assignment to update status" + lock(etor.job_assignment) etor.fitnesses_status[worker_ix][1] = PEStatus_OK # received etor.busy_workers[worker_ix] = false # available again #@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): unlocking job_assignment after getting fitness" @@ -333,8 +335,6 @@ function workers_listener!(etor::ParallelEvaluator{F}) where F #@debug "workers_listener!(): notify fitness calculation done" notify(etor.job_done) - ##@debug "worker_listener!(worker=#$worker_ix, job=#$job_id): checking param status" - param_status = etor.params_status[worker_ix][1] if param_status == PEStatus_OK # communication in normal state, update the archive #@debug "workers_listener(job_id=#$job_id): add_candidate(archive)" add_candidate!(etor.archive, candi.fitness, candi.params, candi.tag, etor.num_evals) @@ -399,6 +399,8 @@ function async_update_fitness( etor.next_job_id += 1 copyto!(etor.shared_params[worker_ix], candi.params) # share candidate with the workers etor.waiting_candidates[job_id] = candi + @assert etor.params_status[worker_ix][1] == PEStatus_OK "async_update_fitness(): worker #$worker_ix input in error state ($(etor.params_status[worker_ix][1]))" + @assert etor.fitnesses_status[worker_ix][1] == PEStatus_OK "async_update_fitness(): worker #$worker_ix output in error state ($(etor.params_status[worker_ix][1]))" etor.params_status[worker_ix][1] = job_id # announce a message (status = job_id) #@debug "async_update_fitness(job_id=#$job_id, worker=#$worker_ix): unlock job assignment" unlock(etor.job_assignment) From 8225be470ff57e5b4bee87633f3e9dc17c3958ab Mon Sep 17 00:00:00 2001 From: Alexey Stukalov Date: Mon, 17 Dec 2018 16:55:11 +0100 Subject: [PATCH 12/12] par_eval: count nevals per worker and output when the worker shuts down --- src/parallel_evaluator.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/parallel_evaluator.jl b/src/parallel_evaluator.jl index 174acdc7..6fa63804 100644 --- a/src/parallel_evaluator.jl +++ b/src/parallel_evaluator.jl @@ -12,6 +12,7 @@ Internal data for the worker process of the parallel evaluator. mutable struct ParallelEvaluatorWorker{T, P<:OptimizationProblem} id::Int # worker ID problem::P + nevals::Integer # number of fitness evaluations param_status::SharedVector{Int} # master notifies worker about new requests shared_param::SharedVector{T} # master puts candidates parameters fitness_status::SharedVector{Int} # worker notifies master about completed evaluation @@ -22,7 +23,7 @@ mutable struct ParallelEvaluatorWorker{T, P<:OptimizationProblem} param_status::SharedVector{Int}, shared_param::SharedVector{T}, fitness_status::SharedVector{Int}, shared_fitness::SharedVector{T} ) where {T, P<:OptimizationProblem} = - new{T,P}(id, problem, param_status, shared_param, fitness_status, shared_fitness) + new{T,P}(id, problem, 0, param_status, shared_param, fitness_status, shared_fitness) end param_status(worker::ParallelEvaluatorWorker) = @inbounds first(worker.param_status) @@ -53,6 +54,7 @@ function run!(worker::ParallelEvaluatorWorker) worker.param_status[1] = PEStatus_OK # received, reset the statuts #@debug "PE worker #$(worker.id): got job #$p_status" @inbounds setfitness!(worker.shared_fitness, fitness(worker.shared_param, worker.problem)) + worker.nevals += 1 #@debug "PE worker #$(worker.id): job #$p_status done" worker.fitness_status[1] = p_status # fitness ready end @@ -94,7 +96,7 @@ function run_parallel_evaluator_worker(id::Int, worker.fitness_status[1] = PEStatus_Error rethrow(ex) end - @info "Worker #$id stopped" + @info "Worker #$id stopped ($(worker.nevals) fitness evaluation(s))" nothing end