@@ -14,15 +14,22 @@ import Pkg
1414using Distributed: launch, manage, kill, init_worker, connect
1515# ==================================================================
1616
17+ export ElasticManager, elastic_worker
18+
1719
1820# The master process listens on a well-known port
1921# Launched workers connect to the master and redirect their STDOUTs to the same
2022# Workers can join and leave the cluster on demand.
2123
22- export ElasticManager, elastic_worker
23-
2424const HDR_COOKIE_LEN = Distributed. HDR_COOKIE_LEN
2525
26+ @static if Base. VERSION >= v " 1.7-"
27+ # Base.errormonitor() is only available in Julia 1.7+
28+ my_errormonitor (t) = Base. errormonitor (t)
29+ else
30+ my_errormonitor (t) = nothing
31+ end
32+
2633struct ElasticManager <: Distributed.ClusterManager
2734 active:: Dict{Int, Distributed.WorkerConfig} # active workers
2835 pending:: Channel{Sockets.TCPSocket} # to be added workers
@@ -47,20 +54,23 @@ struct ElasticManager <: Distributed.ClusterManager
4754 error (" Failed to automatically get host's IP address. Please specify `addr=` explicitly." )
4855 end
4956 end
50-
57+
5158 l_sock = Distributed. listen (addr, port)
5259
5360 lman = new (Dict {Int, Distributed.WorkerConfig} (), Channel {Sockets.TCPSocket} (typemax (Int)), Set {Int} (), topology, Sockets. getsockname (l_sock), manage_callback, printing_kwargs)
5461
55- @async begin
62+ t1 = @async begin
5663 while true
5764 let s = Sockets. accept (l_sock)
58- @async process_worker_conn (lman, s)
65+ t2 = @async process_worker_conn (lman, s)
66+ my_errormonitor (t2)
5967 end
6068 end
6169 end
70+ my_errormonitor (t1)
6271
63- @async process_pending_connections (lman)
72+ t3 = @async process_pending_connections (lman)
73+ my_errormonitor (t3)
6474
6575 lman
6676 end
@@ -153,7 +163,7 @@ function Base.show(io::IO, mgr::ElasticManager)
153163
154164 println (iob, " Worker connect command : " )
155165 print (iob, " " , get_connect_cmd (mgr; mgr. printing_kwargs... ))
156-
166+
157167 print (io, String (take! (iob)))
158168end
159169
@@ -176,5 +186,21 @@ function elastic_worker(
176186 Distributed. start_worker (c, cookie)
177187end
178188
189+ function get_connect_cmd (em:: ElasticManager ; absolute_exename= true , same_project= true , exeflags:: Tuple = ())
190+ ip = string (em. sockname[1 ])
191+ port = convert (Int,em. sockname[2 ])
192+ cookie = Distributed. cluster_cookie ()
193+ exename = absolute_exename ? joinpath (Sys. BINDIR, Base. julia_exename ()) : " julia"
194+ project = same_project ? (" --project=$(Pkg. API. Context (). env. project_file) " ,) : ()
195+
196+ join ([
197+ exename,
198+ exeflags... ,
199+ project... ,
200+ " -e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\" $cookie \" ,\" $ip \" ,$port )'"
201+ ]," " )
202+
203+ end
204+
179205
180206end # module CustomClusterManagers
0 commit comments