Skip to content

Commit 9f09ea9

Browse files
committed
add ParallelPopulationOptimizer
1 parent 9a5b761 commit 9f09ea9

7 files changed

Lines changed: 401 additions & 4 deletions

src/BlackBoxOptim.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat
44

55
export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer,
66
bboptimize, bbsetup, compare_optimizers,
7+
ParallelPopulationOptimizer,
78

89
DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited,
910
adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited,
@@ -177,6 +178,7 @@ include("resampling_memetic_search.jl")
177178
include("simultaneous_perturbation_stochastic_approximation.jl")
178179
include("generating_set_search.jl")
179180
include("direct_search_with_probabilistic_descent.jl")
181+
include("parallel_population_optimizer.jl")
180182

181183
# Fitness
182184
# include("fitness/fitness_types.jl") FIXME merge it with fitness.jl

src/optimization_methods.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const ValidMethods = @compat Dict{Symbol,Union(Any,Function)}(
1414
:simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2,
1515
:generating_set_search => GeneratingSetSearcher,
1616
:probabilistic_descent => direct_search_probabilistic_descent,
17+
:parallel_population_optimizer => parallel_population_optimizer,
1718
)
1819

1920
const MethodNames = sort!(collect(keys(ValidMethods)))
Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
const ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}(
2+
:WorkerMethod => :adaptive_de_rand_1_bin_radiuslimited, # worker population optimization method
3+
:NWorkers => 2, # number of workers
4+
:Workers => Vector{Int}(), # IDs of worker processes, takes precedence over NWorkers
5+
:MigrationSize => 1, # number of "migrant" individual to sent to the master
6+
:MigrationPeriod => 100, # number of worker iterations before sending migrants
7+
:ArchiveCapacity => 10, # ParallelPseudoEvaluator archive capacity
8+
:ToWorkerChannelCapacity => 100, # how many unread messages the master->worker channel can store
9+
:FromWorkersChannelCapacity => 1000 # how many unread messages the workers->master channel can store
10+
)
11+
12+
# metrics from worker optimizer
13+
type WorkerMetrics
14+
num_evals::Int # number of function evals
15+
num_steps::Int # number of steps
16+
num_better::Int # number of steps that improved best fitness
17+
num_migrated::Int # number of migrants worker received
18+
num_better_migrated::Int # number of migrants accepted (because fitness improved)
19+
20+
WorkerMetrics() = new(0, 0, 0, 0, 0)
21+
end
22+
23+
function Base.copy!(a::WorkerMetrics, b::WorkerMetrics)
24+
a.num_evals = b.num_evals
25+
a.num_steps = b.num_steps
26+
a.num_better = b.num_better
27+
a.num_migrated = b.num_migrated
28+
a.num_better_migrated = b.num_better_migrated
29+
return a
30+
end
31+
32+
# fake evaluator for ParallelPopulationOptimizer
33+
# it doesn't evaluate itself, but stores some
34+
# metrics from the workers evaluators
35+
type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P}
36+
problem::P
37+
archive::TopListArchive{F}
38+
workers_metrics::Vector{WorkerMetrics} # function evals per worker etc
39+
last_fitness::F
40+
end
41+
42+
num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics)
43+
num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics)
44+
45+
function ParallelPseudoEvaluator{P<:OptimizationProblem}(
46+
problem::P, nworkers::Int;
47+
ArchiveCapacity::Int = 10)
48+
ParallelPseudoEvaluator{fitness_type(problem), P}(
49+
problem,
50+
TopListArchive(fitness_scheme(problem), numdims(problem), ArchiveCapacity),
51+
WorkerMetrics[WorkerMetrics() for i in 1:nworkers],
52+
nafitness(fitness_scheme(problem)))
53+
end
54+
55+
const FINAL_CANDIDATE = -12345 # special terminating candidate with worker index sent by master
56+
const ERROR_CANDIDATE = -67890 # special error candidate with worker index send by worker
57+
58+
# message with the candidate passed between the workers and the master
59+
immutable CandidateMessage{F}
60+
worker::Int # origin of candidate
61+
metrics::WorkerMetrics # current origin worker metrics
62+
candi::Candidate{F}
63+
end
64+
65+
typealias WorkerChannel{F} Channel{CandidateMessage{F}}
66+
typealias WorkerChannelRef{F} RemoteRef{WorkerChannel{F}}
67+
68+
# Parallel population optimizer
69+
# starts nworkers parallel population optimizers.
70+
# At regular interval, the workers send the master process their random population members
71+
# and the master redirects them to the other workers
72+
type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer
73+
final_fitnesses::Vector{RemoteRef{Channel{Any}}} # references to the @spawnat ID run_worker()
74+
from_workers::WorkerChannelRef{F}
75+
to_workers::Vector{WorkerChannelRef{F}}
76+
is_started::RemoteRef{Channel{Bool}}
77+
evaluator::ParallelPseudoEvaluator{F, P}
78+
end
79+
80+
nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers)
81+
82+
# read worker's message, stores the worker metrics and updates best fitness using
83+
function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F})
84+
copy!(ppe.workers_metrics[msg.worker], msg.metrics)
85+
if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness
86+
add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe))
87+
end
88+
end
89+
90+
# check that worker stil running
91+
# their RemoteRefs should not be ready,
92+
# but if there was exception in the worker,
93+
# it would be thrown into the main thread
94+
function check_workers_running{T}(workers::AbstractVector{RemoteRef{T}})
95+
for i in eachindex(workers)
96+
if isready(workers[i])
97+
fetch(workers[i]) # fetch the worker, this should trigger an exception
98+
# no exception, but the worker should not be ready
99+
error("Worker #i has finished before the master shutdown")
100+
end
101+
end
102+
return true
103+
end
104+
105+
# outer parallel population optimizer constructor that
106+
# also spawns worker tasks
107+
function ParallelPopulationOptimizer{P<:OptimizationProblem}(
108+
problem::P, optimizer_generator::Function; NWorkers::Int = 1, Workers::Vector{Int} = Vector{Int}(),
109+
MigrationSize::Int = 1, MigrationPeriod::Int = 100,
110+
ArchiveCapacity::Int = 10,
111+
ToWorkerChannelCapacity::Int = 1000,
112+
FromWorkersChannelCapacity::Int = (isempty(Workers) ? NWorkers : length(Workers)) * ToWorkerChannelCapacity)
113+
F = fitness_type(problem)
114+
info("Constructing parallel optimizer...")
115+
if isempty(Workers)
116+
# take the first NWorkers workers
117+
Workers = workers()[1:NWorkers]
118+
end
119+
if length(Workers) <= 1
120+
throw(ArgumentError("ParallelPopulationOptimizer requires at least 2 worker processes"))
121+
end
122+
from_workers = RemoteRef(() -> WorkerChannel{F}(FromWorkersChannelCapacity))
123+
to_workers = WorkerChannelRef{F}[RemoteRef(() -> WorkerChannel{F}(ToWorkerChannelCapacity), id) for id in Workers]
124+
workers_ready = RemoteRef(() -> Channel{Int}(length(to_workers))) # FIXME do we need to wait for the worker?
125+
is_started = RemoteRef(() -> Channel{Bool}(1))
126+
final_fitnesses = map(function(i)
127+
procid = Workers[i]
128+
info(" Spawning worker #$i at process #$procid...");
129+
@spawnat procid run_worker(i, workers_ready, is_started, problem, optimizer_generator,
130+
from_workers, to_workers[i],
131+
MigrationSize, MigrationPeriod)
132+
end, 1:length(to_workers))
133+
# wait until all the workers are started
134+
info("Waiting for the workers to be ready...")
135+
# FIXME is it required?
136+
nready = 0
137+
while nready < length(to_workers)
138+
check_workers_running(final_fitnesses)
139+
worker_id = take!(workers_ready)
140+
info(" Worker #$worker_id is ready")
141+
nready += 1
142+
end
143+
info("All workers ready")
144+
ParallelPopulationOptimizer{F, P}(final_fitnesses, from_workers, to_workers, is_started,
145+
ParallelPseudoEvaluator(problem, length(to_workers);
146+
ArchiveCapacity = ArchiveCapacity))
147+
end
148+
149+
function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters)
150+
param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain
151+
params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters)
152+
worker_method = params[:WorkerMethod]
153+
info("Using $worker_method as worker method for parallel optimization")
154+
optimizer_func = ValidMethods[worker_method]
155+
156+
ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict),
157+
NWorkers = params[:NWorkers], Workers = params[:Workers],
158+
MigrationSize = params[:MigrationSize],
159+
MigrationPeriod = params[:MigrationPeriod],
160+
ArchiveCapacity = params[:ArchiveCapacity],
161+
ToWorkerChannelCapacity = params[:ToWorkerChannelCapacity],
162+
FromWorkersChannelCapacity = params[:FromWorkersChannelCapacity])
163+
end
164+
165+
# redirects candidate to another worker
166+
function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F})
167+
# redirect to the other parallel task
168+
#println("redirecting from $(msg.worker)")
169+
recv_ix = sample(1:(length(ppopt.to_workers)-1))
170+
if recv_ix >= msg.worker # index is the origin worker
171+
recv_ix += 1
172+
end
173+
@assert (recv_ix != msg.worker) && (1 <= recv_ix <= nworkers(ppopt))
174+
msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context
175+
msg.candi.tag = 0
176+
put!(ppopt.to_workers[recv_ix], msg)
177+
#println("redirecting done")
178+
end
179+
180+
function step!{F}(ppopt::ParallelPopulationOptimizer{F})
181+
#println("main#: n_evals=$(num_evals(ppopt.evaluator))")
182+
if !isready(ppopt.is_started) put!(ppopt.is_started, true) end # if it's the first iteration
183+
last_better = num_better(ppopt.evaluator)
184+
candidate = take!(ppopt.from_workers)::CandidateMessage{F}
185+
if candidate.worker == ERROR_CANDIDATE
186+
check_workers_running(ppopt.final_fitnesses)
187+
error("Exception in the worker, but all workers still running")
188+
end
189+
#println("candidate=$candidate")
190+
store!(ppopt.evaluator, candidate)
191+
redirect(ppopt, candidate)
192+
return num_better(ppopt.evaluator) - last_better
193+
end
194+
195+
# finalize the master: wait for the workers shutdown,
196+
# get their best candidates
197+
function finalize!{F}(ppopt::ParallelPopulationOptimizer{F})
198+
info("Finalizing parallel optimizer...")
199+
# send special terminating candidate
200+
for to_worker in ppopt.to_workers
201+
put!(to_worker, CandidateMessage{F}(FINAL_CANDIDATE, WorkerMetrics(),
202+
Candidate{F}(Individual())))
203+
end
204+
# wait until all threads finish
205+
# the last candidates being sent are the best in the population
206+
info("Waiting for the workers to finish...")
207+
for i in eachindex(ppopt.final_fitnesses)
208+
msg = fetch(ppopt.final_fitnesses[i])::CandidateMessage{F}
209+
@assert msg.worker == i
210+
store!(ppopt.evaluator, msg) # store the best candidate
211+
info("Worker #$(msg.worker) finished")
212+
end
213+
info("Parallel optimizer finished. Metrics per worker: $(ppopt.evaluator.workers_metrics)")
214+
end
215+
216+
# wraps the worker's population optimizer
217+
# and communicates with the master
218+
type PopulationOptimizerWrapper{F,O<:PopulationOptimizer,E<:Evaluator}
219+
id::Int # worker's Id
220+
optimizer::O
221+
evaluator::E
222+
to_master::WorkerChannelRef{F} # outgoing candidates
223+
from_master::WorkerChannelRef{F} # incoming candidates
224+
migrationSize::Int # size of the migrating group
225+
migrationPeriod::Int # number of iterations between the migrations
226+
227+
metrics::WorkerMetrics # current metrics
228+
is_stopping::Bool # if the optimizer is being shut down
229+
can_receive::Condition # condition recv_task waits for
230+
can_run::Condition # condition run!() task waits for
231+
recv_task::Task # task that continuously executes recv_immigrants()
232+
233+
function Base.call{F,O,E}(::Type{PopulationOptimizerWrapper},
234+
id::Int, optimizer::O, evaluator::E,
235+
to_master::WorkerChannelRef{F}, from_master::WorkerChannelRef{F},
236+
migrationSize::Int = 1, migrationPeriod::Int = 100)
237+
res = new{F,O,E}(id, optimizer, evaluator,
238+
to_master, from_master,
239+
migrationSize, migrationPeriod,
240+
WorkerMetrics(), false, Condition(), Condition())
241+
# "background" migrants receiver task
242+
res.recv_task = @schedule while !res.is_stopping
243+
wait(res.can_receive)
244+
recv_immigrants!(res)
245+
notify(res.can_run)
246+
end
247+
return res
248+
end
249+
end
250+
251+
function send_emigrants{F}(wrapper::PopulationOptimizerWrapper{F})
252+
pop = population(wrapper.optimizer)
253+
# prepare the group of emigrants
254+
migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false)
255+
for migrant_ix in migrant_ixs
256+
migrant = acquire_candi(pop, migrant_ix)
257+
# send them outward
258+
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
259+
put!(wrapper.to_master, CandidateMessage{F}(wrapper.id, wrapper.metrics, migrant))
260+
# FIXME check that we the reuse of candidate does not affect
261+
# the migrants while they wait to be sent
262+
release_candi(pop, migrant)
263+
end
264+
end
265+
266+
function take_master_message!{F}(wrapper::PopulationOptimizerWrapper{F})
267+
take!(wrapper.from_master)::CandidateMessage{F}, isready(wrapper.from_master)
268+
end
269+
270+
# receive migrants (called from "background" task)
271+
function recv_immigrants!{F}(wrapper::PopulationOptimizerWrapper{F})
272+
pop = population(wrapper.optimizer)
273+
# receive all the immigrants in the channel
274+
continue_recv = true
275+
while continue_recv
276+
msg, continue_recv = take_master_message!(wrapper) # here it can hibernate waiting for the immigrant to appear
277+
if msg.worker == FINAL_CANDIDATE # special index sent by master to indicate termination
278+
wrapper.is_stopping = true
279+
break
280+
end
281+
# assign migrants to random population indices
282+
migrant_ix = sample(1:popsize(pop))
283+
candidates = sizehint!(Vector{candidate_type(pop)}(), 2)
284+
push!(candidates, acquire_candi(pop, migrant_ix))
285+
push!(candidates, acquire_candi(pop, msg.candi))
286+
candidates[end].index = migrant_ix # override the incoming index
287+
rank_by_fitness!(wrapper.evaluator, candidates)
288+
wrapper.metrics.num_migrated += 1
289+
wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates)
290+
end
291+
end
292+
293+
# run the wrapper (called in the "main" task)
294+
function run!(wrapper::PopulationOptimizerWrapper)
295+
while !wrapper.is_stopping
296+
if istaskdone(wrapper.recv_task)
297+
error("recv_task has completed prematurely")
298+
end
299+
wrapper.metrics.num_steps += 1
300+
#println("$(wrapper.metrics.num_steps)-th iteration")
301+
if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0
302+
send_emigrants(wrapper) # before we started processing
303+
notify(wrapper.can_receive) # switch to migrants receiving task
304+
wait(wrapper.can_run)
305+
end
306+
# normal ask/tell sequence
307+
candidates = ask(wrapper.optimizer)
308+
rank_by_fitness!(wrapper.evaluator, candidates)
309+
wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates)
310+
end
311+
finalize!(wrapper.optimizer, wrapper.evaluator)
312+
end
313+
314+
# returns the candidate message with final metrics and the best candidate
315+
function final_fitness{F}(wrapper::PopulationOptimizerWrapper{F})
316+
@assert wrapper.is_stopping
317+
# send the best candidate
318+
pop = population(wrapper.optimizer)
319+
best_candi = acquire_candi(pop)
320+
copy!(best_candi.params, best_candidate(wrapper.evaluator.archive))
321+
best_candi.fitness = best_fitness(wrapper.evaluator.archive)
322+
best_candi.index = -1 # we don't know it
323+
best_candi.tag = 0
324+
# HACK send negative worker index to acknowledge the worker is finishing
325+
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
326+
CandidateMessage{F}(wrapper.id, wrapper.metrics, best_candi)
327+
end
328+
329+
# Function that the master process spawns at each worker process.
330+
# Creates and run the worker wrapper
331+
function run_worker{F}(id::Int,
332+
worker_ready::RemoteRef{Channel{Int}},
333+
is_ready::RemoteRef{Channel{Bool}},
334+
problem::OptimizationProblem,
335+
optimizer_generator::Function,
336+
to_master::WorkerChannelRef{F},
337+
from_master::WorkerChannelRef{F},
338+
migrationSize, migrationPeriod)
339+
info("Initializing parallel optimization worker #$id at task=$(myid())")
340+
wrapper = PopulationOptimizerWrapper(id,
341+
optimizer_generator(problem),
342+
ProblemEvaluator(problem),
343+
to_master, from_master,
344+
migrationSize, migrationPeriod)
345+
# create immigrants receiving tasks=#
346+
put!(worker_ready, id)
347+
info("Waiting for the master start signal...")
348+
fetch(is_ready) # wait until the master and other workers are ready
349+
info("Running worker #$id")
350+
try
351+
run!(wrapper)
352+
catch e
353+
# send error candidate to notify about an error and to release
354+
# the master from waiting for worker messages
355+
put!(wrapper.to_master,
356+
CandidateMessage{F}(ERROR_CANDIDATE, WorkerMetrics(),
357+
Candidate{F}(Individual())))
358+
rethrow(e)
359+
end
360+
info("Worker #$id stopped")
361+
final_fitness(wrapper) # return the best fitness
362+
end

test/helper.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ using FactCheck
33
using Compat
44

55
NumTestRepetitions = 100
6+
7+
if nprocs() < 4
8+
addprocs(4-nprocs()) # add procs for parallel population optimizer
9+
end
10+
@everywhere using BlackBoxOptim

0 commit comments

Comments
 (0)