Skip to content

Commit 20a2a69

Browse files
committed
add ParallelPopulationOptimizer
1 parent ba049c5 commit 20a2a69

7 files changed

Lines changed: 360 additions & 4 deletions

src/BlackBoxOptim.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat
44

55
export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer,
66
bboptimize, bbsetup, compare_optimizers,
7+
ParallelPopulationOptimizer,
78

89
DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited,
910
adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited,
@@ -177,6 +178,7 @@ include("resampling_memetic_search.jl")
177178
include("simultaneous_perturbation_stochastic_approximation.jl")
178179
include("generating_set_search.jl")
179180
include("direct_search_with_probabilistic_descent.jl")
181+
include("parallel_population_optimizer.jl")
180182

181183
# Fitness
182184
# include("fitness/fitness_types.jl") FIXME merge it with fitness.jl

src/optimization_methods.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const ValidMethods = @compat Dict{Symbol,Union(Any,Function)}(
1414
:simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2,
1515
:generating_set_search => GeneratingSetSearcher,
1616
:probabilistic_descent => direct_search_probabilistic_descent,
17+
:parallel_population_optimizer => parallel_population_optimizer,
1718
)
1819

1920
const MethodNames = sort!(collect(keys(ValidMethods)))
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
const ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}(
2+
:WorkerMethod => :adaptive_de_rand_1_bin_radiuslimited, # worker population optimization method
3+
:NWorkers => 2, # number of workers
4+
:MigrationSize => 1, # number of "migrant" individual to sent to the master
5+
:MigrationPeriod => 100, # number of worker iterations before sending migrants
6+
:ArchiveCapacity => 10, # ParallelPseudoEvaluator archive capacity
7+
:ToWorkerChannelCapacity => 100, # how many unread messages the master->worker channel can store
8+
:FromWorkersChannelCapacity => 1000 # how many unread messages the workers->master channel can store
9+
)
10+
11+
# metrics from worker optimizer
12+
type WorkerMetrics
13+
num_evals::Int # number of function evals
14+
num_steps::Int # number of steps
15+
num_better::Int # number of steps that improved best fitness
16+
num_migrated::Int # number of migrants worker received
17+
num_better_migrated::Int # number of migrants accepted (because fitness improved)
18+
19+
WorkerMetrics() = new(0, 0, 0, 0, 0)
20+
end
21+
22+
function Base.copy!(a::WorkerMetrics, b::WorkerMetrics)
23+
a.num_evals = b.num_evals
24+
a.num_steps = b.num_steps
25+
a.num_better = b.num_better
26+
a.num_migrated = b.num_migrated
27+
a.num_better_migrated = b.num_better_migrated
28+
return a
29+
end
30+
31+
# fake evaluator for ParallelPopulationOptimizer
32+
# it doesn't evaluate itself, but stores some
33+
# metrics from the workers evaluators
34+
type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P}
35+
problem::P
36+
archive::TopListArchive{F}
37+
workers_metrics::Vector{WorkerMetrics} # function evals per worker etc
38+
last_fitness::F
39+
end
40+
41+
num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics)
42+
num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics)
43+
44+
function ParallelPseudoEvaluator{P<:OptimizationProblem}(
45+
problem::P, nworkers::Int;
46+
ArchiveCapacity::Int = 10)
47+
ParallelPseudoEvaluator{fitness_type(problem), P}(
48+
problem,
49+
TopListArchive(fitness_scheme(problem), numdims(problem), ArchiveCapacity),
50+
WorkerMetrics[WorkerMetrics() for i in 1:nworkers],
51+
nafitness(fitness_scheme(problem)))
52+
end
53+
54+
# message with the candidate passed between the workers and the master
55+
immutable CandidateMessage{F}
56+
worker::Int # origin of candidate
57+
metrics::WorkerMetrics # current origin worker metrics
58+
candi::Candidate{F}
59+
end
60+
61+
typealias WorkerChannel{F} Channel{CandidateMessage{F}}
62+
typealias WorkerChannelRef{F} RemoteRef{WorkerChannel{F}}
63+
64+
# Parallel population optimizer
65+
# starts nworkers parallel population optimizers.
66+
# At regular interval, the workers send the master process their random population members
67+
# and the master redirects them to the other workers
68+
type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer
69+
final_fitnesses::Vector{RemoteRef{Channel{Any}}} # references to the @spawnat ID run_worker()
70+
from_workers::WorkerChannelRef{F}
71+
to_workers::Vector{WorkerChannelRef{F}}
72+
evaluator::ParallelPseudoEvaluator{F, P}
73+
74+
ParallelPopulationOptimizer(final_fitnesses::Vector{RemoteRef{Channel{Any}}},
75+
from_workers::WorkerChannelRef{F},
76+
to_workers::Vector{WorkerChannelRef{F}},
77+
evaluator::ParallelPseudoEvaluator{F, P}) =
78+
new(final_fitnesses, from_workers, to_workers, evaluator)
79+
end
80+
81+
nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers)
82+
83+
# read worker's message, stores the worker metrics and updates best fitness using
84+
function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F})
85+
copy!(ppe.workers_metrics[msg.worker], msg.metrics)
86+
if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness
87+
add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe))
88+
end
89+
end
90+
91+
# check that worker stil running
92+
# their RemoteRefs should not be ready,
93+
# but if there was exception in the worker,
94+
# it would be thrown into the main thread
95+
function check_workers_running{T}(workers::AbstractVector{RemoteRef{T}})
96+
for i in eachindex(workers)
97+
if isready(workers[i])
98+
fetch(workers[i]) # fetch the worker, this should trigger an exception
99+
# no exception, but the worker should not be ready
100+
error("Worker #i is finished before the master shutdown")
101+
end
102+
end
103+
return false
104+
end
105+
106+
# outer parallel population optimizer constructor that
107+
# also spawns worker tasks
108+
function ParallelPopulationOptimizer{P<:OptimizationProblem}(
109+
problem::P, optimizer_generator::Function, nworkers::Int,
110+
MigrationSize::Int = 1, MigrationPeriod::Int = 100,
111+
ArchiveCapacity::Int = 10,
112+
ToWorkerChannelCapacity::Int = 1000,
113+
FromWorkersChannelCapacity::Int = nworkers * ToWorkerChannelCapacity)
114+
F = fitness_type(problem)
115+
info("Constructing parallel optimizer...")
116+
from_workers = RemoteRef(() -> WorkerChannel{F}(FromWorkersChannelCapacity))
117+
to_workers = WorkerChannelRef{F}[RemoteRef(() -> WorkerChannel{F}(ToWorkerChannelCapacity), i+1) for i in 1:nworkers]
118+
workers_started = RemoteRef(() -> Channel{Int}(nworkers)) # FIXME do we need to wait for the worker?
119+
all_ready = RemoteRef(() -> Channel{Bool}(1))
120+
final_fitnesses = map(function(id)
121+
info("Initializing worker #$id...");
122+
pid = id+1;
123+
@spawnat pid run_worker(id, workers_started, all_ready, problem, optimizer_generator,
124+
from_workers, to_workers[id],
125+
MigrationSize, MigrationPeriod)
126+
end, 1:nworkers)
127+
# wait until all the workers are started
128+
# FIXME is it required?
129+
nstarted = 0
130+
while nstarted < nworkers
131+
check_workers_running(final_fitnesses)
132+
worker_id = take!(workers_started)
133+
info("Worker #$worker_id is ready")
134+
nstarted += 1
135+
end
136+
put!(all_ready, true)
137+
info("All workers ready")
138+
ParallelPopulationOptimizer{F, P}(final_fitnesses, from_workers, to_workers,
139+
ParallelPseudoEvaluator(problem, nworkers;
140+
ArchiveCapacity = ArchiveCapacity))
141+
end
142+
143+
function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters)
144+
param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain
145+
params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters)
146+
worker_method = params[:WorkerMethod]
147+
info( "Using $worker_method as worker method for parallel optimization")
148+
optimizer_func = ValidMethods[worker_method]
149+
150+
ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict),
151+
params[:NWorkers], params[:MigrationSize], params[:MigrationPeriod],
152+
params[:ArchiveCapacity],
153+
params[:ToWorkerChannelCapacity], params[:FromWorkersChannelCapacity])
154+
end
155+
156+
# redirects candidate to another worker
157+
function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F})
158+
# redirect to the other parallel task
159+
#println("redirecting from $(msg.worker)")
160+
recv_ix = sample(1:(length(ppopt.to_workers)-1))
161+
if recv_ix >= msg.worker # index is the origin worker
162+
recv_ix += 1
163+
end
164+
msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context
165+
msg.candi.tag = 0
166+
put!(ppopt.to_workers[recv_ix], msg)
167+
#println("redirecting done")
168+
end
169+
170+
function step!{F}(ppopt::ParallelPopulationOptimizer{F})
171+
#println("main#: n_evals=$(num_evals(ppopt.evaluator))")
172+
check_workers_running(ppopt.final_fitnesses)
173+
last_better = num_better(ppopt.evaluator)
174+
candidate = take!(ppopt.from_workers)::CandidateMessage{F}
175+
#println("candidate=$candidate")
176+
store!(ppopt.evaluator, candidate)
177+
redirect(ppopt, candidate)
178+
return num_better(ppopt.evaluator) - last_better
179+
end
180+
181+
const FINAL_CANDIDATE = -12345 # special terminating candidate with worker index
182+
183+
# finalize the master: wait for the workers shutdown,
184+
# get their best candidates
185+
function finalize!{F}(ppopt::ParallelPopulationOptimizer{F})
186+
info("Finalizing parallel optimizer...")
187+
# send special terminating candidate
188+
for to_worker in ppopt.to_workers
189+
put!(to_worker, CandidateMessage{F}(FINAL_CANDIDATE, WorkerMetrics(),
190+
Candidate{F}(Individual())))
191+
end
192+
# wait until all threads finish
193+
# the last candidates being sent are the best in the population
194+
info("Waiting for the workers to finish...")
195+
for i in eachindex(ppopt.final_fitnesses)
196+
msg = fetch(ppopt.final_fitnesses[i])::CandidateMessage{F}
197+
@assert msg.worker == i
198+
store!(ppopt.evaluator, msg) # store the best candidate
199+
info("Worker #$(msg.worker) finished")
200+
end
201+
info("Parallel optimizer finished. Metrics per worker: $(ppopt.evaluator.workers_metrics)")
202+
end
203+
204+
# wraps the worker's population optimizer
205+
# and communicates with the master
206+
type PopulationOptimizerWrapper{F,O<:PopulationOptimizer,E<:Evaluator}
207+
id::Int # worker's Id
208+
optimizer::O
209+
evaluator::E
210+
to_master::WorkerChannelRef{F} # outgoing candidates
211+
from_master::WorkerChannelRef{F} # incoming candidates
212+
migrationSize::Int # size of the migrating group
213+
migrationPeriod::Int # number of iterations between the migrations
214+
215+
metrics::WorkerMetrics # current metrics
216+
is_stopping::Bool # if the optimizer is being shut down
217+
can_receive::Condition # condition recv_task waits for
218+
can_run::Condition # condition run!() task waits for
219+
recv_task::Task # task that continuously executes recv_immigrants()
220+
221+
function Base.call{F,O,E}(::Type{PopulationOptimizerWrapper},
222+
id::Int, optimizer::O, evaluator::E,
223+
to_master::WorkerChannelRef{F}, from_master::WorkerChannelRef{F},
224+
migrationSize::Int = 1, migrationPeriod::Int = 100)
225+
res = new{F,O,E}(id, optimizer, evaluator,
226+
to_master, from_master,
227+
migrationSize, migrationPeriod,
228+
WorkerMetrics(), false, Condition(), Condition())
229+
# "background" migrants receiver task
230+
res.recv_task = @schedule while !res.is_stopping
231+
wait(res.can_receive)
232+
recv_immigrants!(res)
233+
notify(res.can_run)
234+
end
235+
return res
236+
end
237+
end
238+
239+
function send_emigrants{F}(wrapper::PopulationOptimizerWrapper{F})
240+
pop = population(wrapper.optimizer)
241+
# prepare the group of emigrants
242+
migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false)
243+
for migrant_ix in migrant_ixs
244+
migrant = acquire_candi(pop, migrant_ix)
245+
# send them outward
246+
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
247+
put!(wrapper.to_master, CandidateMessage{F}(wrapper.id, wrapper.metrics, migrant))
248+
# FIXME check that we the reuse of candidate does not affect
249+
# the migrants while they wait to be sent
250+
release_candi(pop, migrant)
251+
end
252+
end
253+
254+
# receive migrants (called from "background" task)
255+
function recv_immigrants!{F}(wrapper::PopulationOptimizerWrapper{F})
256+
pop = population(wrapper.optimizer)
257+
msg = take!(wrapper.from_master)::CandidateMessage{F}
258+
if msg.worker == FINAL_CANDIDATE # special index sent by master to indicate termination
259+
wrapper.is_stopping = true
260+
return
261+
end
262+
263+
# assign migrants to random population indices
264+
migrant_ix = sample(1:popsize(pop))
265+
candidates = sizehint!(Vector{candidate_type(pop)}(), 2)
266+
push!(candidates, acquire_candi(pop, migrant_ix))
267+
push!(candidates, acquire_candi(pop, msg.candi))
268+
candidates[end].index = migrant_ix # override the incoming index
269+
rank_by_fitness!(wrapper.evaluator, candidates)
270+
wrapper.metrics.num_migrated += 1
271+
wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates)
272+
end
273+
274+
# run the wrapper (called in the "main" task)
275+
function run!(wrapper::PopulationOptimizerWrapper)
276+
while !wrapper.is_stopping
277+
if istaskdone(wrapper.recv_task)
278+
error("recv_task has completed prematurely")
279+
end
280+
wrapper.metrics.num_steps += 1
281+
#println("$(wrapper.metrics.num_steps)-th iteration")
282+
if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0
283+
send_emigrants(wrapper) # before we started processing
284+
notify(wrapper.can_receive) # switch to migrants receiving task
285+
wait(wrapper.can_run)
286+
end
287+
# normal ask/tell sequence
288+
candidates = ask(wrapper.optimizer)
289+
rank_by_fitness!(wrapper.evaluator, candidates)
290+
wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates)
291+
end
292+
finalize!(wrapper.optimizer, wrapper.evaluator)
293+
end
294+
295+
# returns the candidate message with final metrics and the best candidate
296+
function final_fitness{F}(wrapper::PopulationOptimizerWrapper{F})
297+
@assert wrapper.is_stopping
298+
# send the best candidate
299+
pop = population(wrapper.optimizer)
300+
best_candi = acquire_candi(pop)
301+
copy!(best_candi.params, best_candidate(wrapper.evaluator.archive))
302+
best_candi.fitness = best_fitness(wrapper.evaluator.archive)
303+
best_candi.index = -1 # we don't know it
304+
best_candi.tag = 0
305+
# HACK send negative worker index to acknowledge the worker is finishing
306+
wrapper.metrics.num_evals = num_evals(wrapper.evaluator)
307+
CandidateMessage{F}(wrapper.id, wrapper.metrics, best_candi)
308+
end
309+
310+
# Function that the master process spawns at each worker process.
311+
# Creates and run the worker wrapper
312+
function run_worker{F}(id::Int,
313+
workers_started::RemoteRef{Channel{Int}},
314+
all_ready::RemoteRef{Channel{Bool}},
315+
problem::OptimizationProblem,
316+
optimizer_generator::Function,
317+
to_master::WorkerChannelRef{F},
318+
from_master::WorkerChannelRef{F},
319+
migrationSize, migrationPeriod)
320+
info("Initializing parallel optimization worker #$id at task=$(myid())")
321+
wrapper = PopulationOptimizerWrapper(id,
322+
optimizer_generator(problem),
323+
ProblemEvaluator(problem),
324+
to_master, from_master,
325+
migrationSize, migrationPeriod)
326+
# create immigrants receiving tasks=#
327+
put!(workers_started, id)
328+
info("Waiting for the master start signal...")
329+
fetch(all_ready) # wait until the master and other workers are ready
330+
info("Running worker #$id")
331+
run!(wrapper)
332+
info("Worker #$id stopped")
333+
final_fitness(wrapper) # return the best fitness
334+
end

test/helper.jl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ using FactCheck
33
using Compat
44

55
NumTestRepetitions = 100
6+
7+
if nprocs() < 4
8+
addprocs(4-nprocs()) # add procs for parallel population optimizer
9+
end
10+
@everywhere using BlackBoxOptim

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ my_tests = [
1818
"test_differential_evolution.jl",
1919
"test_adaptive_differential_evolution.jl",
2020
"test_natural_evolution_strategies.jl",
21+
"test_parallel_population_optimizer.jl",
2122

2223
"test_toplevel_bboptimize.jl",
2324
"test_smoketest_bboptimize.jl",
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
facts("Parallel population optimizer") do
2+
3+
@everywhere using BlackBoxOptim
4+
5+
rosenbrock2d(x) = (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2
6+
7+
res = bboptimize(rosenbrock2d; Method = :parallel_population_optimizer,
8+
SearchSpace = [(-5.0, 5.0), (-2.0, 2.0)], MaxTime = 100.0,
9+
ShowTrace = true, MigrationSize = 2, MigrationPeriod = 100)
10+
@fact size(best_candidate(res)) => (2,)
11+
@fact typeof(best_fitness(res)) => Float64
12+
@fact best_fitness(res) => less_than(100.0)
13+
end

0 commit comments

Comments
 (0)