Skip to content

Commit 81108c8

Browse files
committed
Drop the requirement for cluster managers to subtype ClusterManager
This should make it easier for existing Distributed cluster managers to add support for DistributedNext.
1 parent 9eec67f commit 81108c8

7 files changed

Lines changed: 81 additions & 23 deletions

File tree

docs/src/_changelog.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ CurrentModule = DistributedNext
77
This documents notable changes in DistributedNext.jl. The format is based on
88
[Keep a Changelog](https://keepachangelog.com).
99

10+
## Unreleased
11+
12+
### Changed
13+
- DistributedNext no longer requires cluster managers implementations to subtype
14+
the [`ClusterManager`](@ref) type, instead cluster manager support can be
15+
implemented using trait methods ([#67]). This was done to allow existing
16+
cluster managers to keep supporting both Distributed and DistributedNext.
17+
1018
## [v1.3.0] - 2026-04-06
1119

1220
### Changed

docs/src/index.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,12 @@ and transport messages between processes. It is possible for Cluster Managers to
7979

8080
```@docs
8181
DistributedNext.ClusterManager
82+
DistributedNext.is_cluster_manager
8283
DistributedNext.WorkerConfig
8384
DistributedNext.launch
8485
DistributedNext.manage
85-
DistributedNext.kill(::ClusterManager, ::Int, ::WorkerConfig)
86-
DistributedNext.connect(::ClusterManager, ::Int, ::WorkerConfig)
86+
DistributedNext.kill(::Any, ::Int, ::WorkerConfig)
87+
DistributedNext.connect(::Any, ::Int, ::WorkerConfig)
8788
DistributedNext.init_worker
8889
DistributedNext.process_messages
8990
DistributedNext.default_addprocs_params

src/DistributedNext.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ include("managers.jl") # LocalManager and SSHManager
171171
worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
172172

173173
# Cluster manager
174-
cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}()
174+
cluster_manager::Ref{Any} = Ref{Any}()
175175

176176
# Synchronization
177177
worker_lock::ReentrantLock = ReentrantLock()

src/cluster.jl

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,46 @@
66
Supertype for cluster managers, which control workers processes as a cluster.
77
Cluster managers implement how workers can be added, removed and communicated with.
88
`SSHManager` and `LocalManager` are subtypes of this.
9+
10+
!!! note
11+
Subtyping `ClusterManager` is no longer required. DistributedNext now
12+
uses the [`is_cluster_manager`](@ref) trait to recognise cluster managers,
13+
so any type can opt in by defining `DistributedNext.is_cluster_manager(::MyMgr) = true`.
14+
`ClusterManager` is kept for backward compatibility; a subtype is automatically
15+
recognised as a cluster manager via a trait fallback.
916
"""
1017
abstract type ClusterManager end
1118

19+
"""
20+
is_cluster_manager(x) -> Bool
21+
22+
Trait identifying `x` as a cluster manager. Defaults to `false`. Cluster
23+
managers opt in by defining a method returning `true`:
24+
25+
```julia
26+
DistributedNext.is_cluster_manager(::MyManager) = true
27+
```
28+
29+
Any subtype of [`ClusterManager`](@ref) is automatically recognised via a
30+
fallback method. Defining this trait does *not* require subtyping
31+
`ClusterManager`, which lets external types (for example, types already
32+
subtyping `Distributed.ClusterManager`) act as DistributedNext cluster
33+
managers without multiple inheritance.
34+
"""
35+
is_cluster_manager(::Any) = false
36+
is_cluster_manager(::ClusterManager) = true
37+
38+
# Throw an ArgumentError unless `manager` has opted into the cluster-manager
39+
# trait. Used by entry points accepting user-supplied managers so we fail early
40+
# with a clear message.
41+
function check_cluster_manager(manager)
42+
if !is_cluster_manager(manager)
43+
throw(ArgumentError("$(typeof(manager)) is not recognised as a cluster manager. " *
44+
"Define `DistributedNext.is_cluster_manager(::$(typeof(manager))) = true` " *
45+
"to opt in."))
46+
end
47+
end
48+
1249
function throw_if_cluster_manager_unassigned()
1350
isassigned(CTX[].cluster_manager) || error("cluster_manager is unassigned")
1451
return nothing
@@ -121,12 +158,12 @@ mutable struct Worker
121158
w_stream::IO
122159
w_serializer::ClusterSerializer # writes can happen from any task hence store the
123160
# serializer as part of the Worker object
124-
manager::ClusterManager
161+
manager::Any
125162
config::WorkerConfig
126163
version::Union{VersionNumber, Nothing} # Julia version of the remote process
127164
initialized::Event
128165

129-
function Worker(id::Int, r_stream::IO, w_stream::IO, manager::ClusterManager;
166+
function Worker(id::Int, r_stream::IO, w_stream::IO, manager;
130167
version::Union{VersionNumber, Nothing}=nothing,
131168
config::WorkerConfig=WorkerConfig())
132169
w = Worker(id)
@@ -404,14 +441,14 @@ function parse_connection_info(str)
404441
end
405442

406443
"""
407-
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
444+
init_worker(cookie::AbstractString, manager=DefaultClusterManager())
408445
409446
Called by cluster managers implementing custom transports. It initializes a newly launched
410447
process as a worker. Command line argument `--worker[=<cookie>]` has the effect of initializing a
411448
process as a worker using TCP/IP sockets for transport.
412449
`cookie` is a [`cluster_cookie`](@ref).
413450
"""
414-
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
451+
function init_worker(cookie::AbstractString, manager=DefaultClusterManager())
415452
myrole!(:worker)
416453

417454
# On workers, the default cluster manager connects via TCP sockets. Custom
@@ -440,7 +477,7 @@ end
440477
# Only one addprocs can be in progress at any time
441478
#
442479
"""
443-
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
480+
addprocs(manager; kwargs...) -> List of process identifiers
444481
445482
Launches worker processes via the specified cluster manager.
446483
@@ -479,7 +516,8 @@ if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't
479516
end
480517
```
481518
"""
482-
function addprocs(manager::ClusterManager; kwargs...)
519+
function addprocs(manager; kwargs...)
520+
check_cluster_manager(manager)
483521
params = merge(default_addprocs_params(manager), Dict{Symbol, Any}(kwargs))
484522

485523
init_multi()
@@ -492,7 +530,7 @@ function addprocs(manager::ClusterManager; kwargs...)
492530
warning_interval, [(manager, params)])
493531

494532
# Add new workers
495-
new_workers = @lock CTX[].worker_lock addprocs_locked(manager::ClusterManager, params)
533+
new_workers = @lock CTX[].worker_lock addprocs_locked(manager, params)
496534

497535
# Call worker-started callbacks
498536
_run_callbacks_concurrently("worker-started", CTX[].worker_started_callbacks,
@@ -501,7 +539,7 @@ function addprocs(manager::ClusterManager; kwargs...)
501539
return new_workers
502540
end
503541

504-
function addprocs_locked(manager::ClusterManager, params)
542+
function addprocs_locked(manager, params)
505543
topology(Symbol(params[:topology]))
506544

507545
if CTX[].pgrp.topology !== :all_to_all
@@ -574,13 +612,13 @@ function set_valid_processes(plist::Array{Int})
574612
end
575613

576614
"""
577-
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
615+
default_addprocs_params(mgr) -> Dict{Symbol, Any}
578616
579617
Implemented by cluster managers. The default keyword parameters passed when calling
580618
`addprocs(mgr)`. The minimal set of options is available by calling
581619
`default_addprocs_params()`
582620
"""
583-
default_addprocs_params(::ClusterManager) = default_addprocs_params()
621+
default_addprocs_params(_) = default_addprocs_params()
584622
default_addprocs_params() = Dict{Symbol,Any}(
585623
:topology => :all_to_all,
586624
:dir => pwd(),
@@ -639,7 +677,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
639677
end
640678
end
641679

642-
function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
680+
function create_worker(manager, wconfig::WorkerConfig)
643681
# only node 1 can add new nodes, since nobody else has the full list of address:port
644682
@assert CTX[].lproc.id == 1
645683
timeout = worker_timeout()

src/managers.jl

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Sy
567567
end
568568

569569
"""
570-
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
570+
launch(manager, params::Dict, launched::Array, launch_ntfy::Condition)
571571
572572
Implemented by cluster managers. For every Julia worker launched by this function, it should
573573
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
@@ -577,7 +577,7 @@ keyword arguments [`addprocs`](@ref) was called with.
577577
launch
578578

579579
"""
580-
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
580+
manage(manager, id::Integer, config::WorkerConfig, op::Symbol)
581581
582582
Implemented by cluster managers. It is called on the master process, during a worker's
583583
lifetime, with appropriate `op` values:
@@ -596,17 +596,17 @@ end
596596

597597

598598
"""
599-
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
599+
connect(manager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
600600
601601
Implemented by cluster managers using custom transports. It should establish a logical
602602
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
603603
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
604604
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
605605
ensure that messages are delivered and received completely and in order.
606-
`connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
606+
`connect(manager, ...)` sets up TCP/IP socket connections in-between
607607
workers.
608608
"""
609-
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
609+
function connect(manager, pid::Int, config::WorkerConfig)
610610
if config.connect_at !== nothing
611611
# this is a worker-to-worker setup call.
612612
return connect_w2w(pid, config)
@@ -755,15 +755,16 @@ end
755755

756756

757757
"""
758-
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
758+
kill(manager, pid::Int, config::WorkerConfig)
759759
760760
Implemented by cluster managers.
761761
It is called on the master process, by [`rmprocs`](@ref).
762762
It should cause the remote worker specified by `pid` to exit.
763-
`kill(manager::ClusterManager.....)` executes a remote `exit()`
763+
`kill(manager, ...)` executes a remote `exit()`
764764
on `pid`.
765765
"""
766-
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
766+
function kill(manager, pid::Int, config::WorkerConfig)
767+
check_cluster_manager(manager)
767768
remote_do(exit, pid)
768769
nothing
769770
end

src/process_messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
367367
send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid()))
368368
end
369369

370-
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
370+
function connect_to_peer(manager, rpid::Int, wconfig::WorkerConfig)
371371
try
372372
(r_s, w_s) = connect(manager, rpid, wconfig)
373373
w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker

test/managers.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,13 @@ using DistributedNext: parse_machine, SSHManager, LocalManager
2626
sprint((t,x) -> show(t, "text/plain", x), SSHManager("127.0.0.1")))
2727
@test sprint((t,x) -> show(t, "text/plain", x), LocalManager(1, true)) == "LocalManager()"
2828
end
29+
30+
@testset "is_cluster_manager trait" begin
31+
# Subtypes of ClusterManager opt in automatically
32+
@test DistributedNext.is_cluster_manager(LocalManager(1, true))
33+
34+
# Arbitrary types do not and cause exceptions
35+
struct NotAManager end
36+
@test !DistributedNext.is_cluster_manager(NotAManager())
37+
@test_throws ArgumentError DistributedNext.addprocs(NotAManager())
38+
end

0 commit comments

Comments
 (0)