1+
2+ export ConsensusActor, NoConsensusActor, AveragingConsensusAlgorithm, AveragingConsensusMessage, create_averaging_consensus_participant, gradient_term
3+
4+ abstract type ConsensusActor end
5+
6+ function gradient_term (actor:: ConsensusActor , λ:: Vector{<:Real} , data:: Any )
7+ return 0
8+ end
9+
10+ mutable struct NoConsensusActor <: ConsensusActor
11+ end
12+
13+ struct AveragingConsensusMessage
14+ λ:: Vector{Float64}
15+ k:: Int
16+ data:: Any
17+ end
18+
19+ struct ConsensusFinishedMessage
20+ λ:: Vector{Float64}
21+ k:: Int
22+ actor:: ConsensusActor
23+ end
24+
25+ @kwdef mutable struct AveragingConsensusAlgorithm <: DistributedAlgorithm
26+ message_queue:: Dict{Int,Vector{AveragingConsensusMessage}} = Dict {Int,Vector{AveragingConsensusMessage}} ()
27+ first_message = true
28+ k:: Int = 0
29+ max_iter:: Int = 50
30+ λ:: Vector{Float64} = Vector {Real} ()
31+
32+ initial_λ:: Real
33+ α:: Real
34+ actor:: ConsensusActor
35+
36+ finish_callback:: Function
37+ end
38+
39+ function on_exchange_message (algorithm_data:: AveragingConsensusAlgorithm , carrier:: Carrier , message:: AveragingConsensusMessage , meta:: Any )
40+ if message. k >= algorithm_data. max_iter
41+ # abort if iteration count is reached
42+ algorithm_data. finish_callback (algorithm_data, carrier)
43+ return
44+ end
45+
46+ if algorithm_data. first_message
47+ algorithm_data. first_message = false
48+ algorithm_data. λ = ones (length (message. λ)) .* algorithm_data. initial_λ
49+
50+ for addr in others (carrier, " " )
51+ send_to_other (carrier, AveragingConsensusMessage (algorithm_data. λ, 0 , message. data), addr)
52+ end
53+ end
54+ queue = get! (algorithm_data. message_queue, message. k, [])
55+
56+ push! (queue, message)
57+
58+ if length (queue) == length (others (carrier, " " ))
59+ avgλ = sum (m. λ for m in queue) ./ length (queue)
60+ algorithm_data. λ .+ = algorithm_data. α .* (avgλ .- algorithm_data. λ) .+ gradient_term (algorithm_data. actor, algorithm_data. λ, message. data)
61+
62+ algorithm_data. k += message. k + 1
63+ delete! (algorithm_data. message_queue, message. k)
64+
65+ for addr in others (carrier, " " )
66+ send_to_other (carrier, AveragingConsensusMessage (algorithm_data. λ, algorithm_data. k, message. data), addr)
67+ end
68+ end
69+ end
70+
71+ function create_averaging_consensus_participant (finish_callback:: Function , consensus_actor:: ConsensusActor ; initial_λ:: Real = 10 , α:: Real = 0.3 , max_iter:: Int = 50 )
72+ appl_consensus_actor = isnothing (consensus_actor) ? NoConsensusActor () : consensus_actor
73+
74+ return AveragingConsensusAlgorithm (finish_callback= finish_callback, initial_λ= initial_λ, α= α, actor= appl_consensus_actor, max_iter= max_iter)
75+ end
0 commit comments