Skip to content

Commit 2d1839c

Browse files
committed
Fixing simple carrier to await the full event chain.
1 parent 6d47242 commit 2d1839c

2 files changed

Lines changed: 18 additions & 7 deletions

File tree

src/carrier/simple.jl

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ using UUIDs
66
abstract type AbstractSimpleCarrier <: Carrier end
77

88
"""
9-
ActorContainer
9+
ActorContainer
1010
1111
A container to manage multiple `SimpleCarrier`.
1212
"""
13-
struct ActorContainer
13+
mutable struct ActorContainer
1414
actors::Vector{AbstractSimpleCarrier}
15+
active_tasks::Threads.Atomic{Int}
16+
done_event::Base.Event
1517
function ActorContainer()
16-
return new(Vector{AbstractSimpleCarrier}())
18+
return new(Vector{AbstractSimpleCarrier}(), Threads.Atomic{Int}(0), Base.Event())
1719
end
1820
end
1921

@@ -65,8 +67,15 @@ function send_to_other(carrier::SimpleCarrier, content::Any, receiver::Real; met
6567
other_carrier::Carrier = carrier.container.actors[receiver]
6668
main_meta = Dict(:sender => carrier.aid, :message_id => uuid4())
6769
union_meta = merge(main_meta, meta) # important: meta can override main_meta entries
70+
Threads.atomic_add!(carrier.container.active_tasks, 1)
6871
return @spawnlog begin
69-
_dispatch_to(other_carrier, content, union_meta)
72+
try
73+
_dispatch_to(other_carrier, content, union_meta)
74+
finally
75+
if Threads.atomic_sub!(carrier.container.active_tasks, 1) == 1
76+
notify(carrier.container.done_event)
77+
end
78+
end
7079
end
7180
end
7281

@@ -113,7 +122,8 @@ Return a waitable object that can be used to monitor the progress of the optimiz
113122
function start_distributed_optimization(actors::Vector{<:DistributedAlgorithm}, start_message::Any)
114123
actor_container = ActorContainer()
115124
carriers = [SimpleCarrier(actor_container, actor) for actor in actors]
116-
return send_to_other(carriers[1], start_message, cid(carriers[2]))
125+
send_to_other(carriers[1], start_message, cid(carriers[2]))
126+
return Threads.@spawn wait(actor_container.done_event)
117127
end
118128

119129
"""

test/cohda/cohda_sc_tests.jl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ using Test
1010

1111
initial_message = create_cohda_start_message([1.2, 2, 3])
1212

13-
wait(send_to_other(actor_one, initial_message, cid(actor_two)))
13+
send_to_other(actor_one, initial_message, cid(actor_two))
14+
wait(container.done_event)
1415

1516
@test actor_one.actor.memory.solution_candidate.perf < 0
1617
end
@@ -23,6 +24,6 @@ end
2324
initial_message = create_cohda_start_message([1.2, 2, 3])
2425

2526
wait(start_distributed_optimization([actor_one, actor_two], initial_message))
26-
27+
@test !isnothing(actor_one.memory.solution_candidate.perf)
2728
@test actor_one.memory.solution_candidate.perf < 0
2829
end

0 commit comments

Comments
 (0)