1010mutable struct NoConsensusActor <: ConsensusActor
1111end
1212
13- struct AveragingConsensusMessage
13+ struct AveragingConsensusMessage <: OptimizationMessage
1414 λ:: Vector{Float64}
1515 k:: Int
1616 data:: Any
17+ initial:: Bool
18+ AveragingConsensusMessage (λ, k, data, initial= false ) = new (λ, k, data, initial)
1719end
1820
1921struct ConsensusFinishedMessage
2729 first_message = true
2830 k:: Int = 0
2931 max_iter:: Int = 50
30- λ:: Vector{Float64} = Vector {Real} ()
32+ λ:: Vector{Float64} = [ 1 ]
3133
3234 initial_λ:: Real
3335 α:: Real
3739end
3840
3941function on_exchange_message (algorithm_data:: AveragingConsensusAlgorithm , carrier:: Carrier , message:: AveragingConsensusMessage , meta:: Any )
42+ # @info "Doing something" algorithm_data.first_message message.k algorithm_data.k
4043 if message. k >= algorithm_data. max_iter
41- # abort if iteration count is reached
44+ if algorithm_data. first_message
45+ # negotiation is over, only new with k=0 is expected
46+ return
47+ end
48+ # finish if iteration count is reached, reset all state data
4249 algorithm_data. finish_callback (algorithm_data, carrier)
50+ algorithm_data. first_message = true
51+ empty! (algorithm_data. message_queue)
4352 return
4453 end
45-
46- if algorithm_data. first_message
54+
55+ if algorithm_data. first_message || message . initial
4756 algorithm_data. first_message = false
57+ algorithm_data. k = 0
4858 algorithm_data. λ = ones (length (message. λ)) .* algorithm_data. initial_λ
4959
5060 for addr in others (carrier, " " )
@@ -55,11 +65,11 @@ function on_exchange_message(algorithm_data::AveragingConsensusAlgorithm, carrie
5565
5666 push! (queue, message)
5767
58- if length (queue) == length (others (carrier, " " ))
68+ if length (queue) == length (others (carrier, " " )) || algorithm_data . k < message . k
5969 avgλ = sum (m. λ for m in queue) ./ length (queue)
6070 algorithm_data. λ .+ = algorithm_data. α .* (avgλ .- algorithm_data. λ) .+ gradient_term (algorithm_data. actor, algorithm_data. λ, message. data)
71+ algorithm_data. k = message. k + 1
6172
62- algorithm_data. k += message. k + 1
6373 delete! (algorithm_data. message_queue, message. k)
6474
6575 for addr in others (carrier, " " )
0 commit comments