Skip to content

Commit da08234

Browse files
committed
fix(hpc): avoid invalid Dask worker interfaces
1 parent e342aa4 commit da08234

2 files changed

Lines changed: 48 additions & 49 deletions

File tree

CodeEntropy/core/dask_clusters.py

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -31,55 +31,40 @@ def check_slurm_env(self) -> None:
3131
3232
Some HPC systems require this variable to be unset for correct CPU binding.
3333
"""
34-
os.environ.pop("SLURM_CPU_BIND", None)
34+
if "SLURM_CPU_BIND" in os.environ:
35+
os.environ.pop("SLURM_CPU_BIND")
3536

3637
def system_network_interface(self) -> str:
3738
"""
38-
Select the most appropriate network interface for HPC communication.
39+
Get best candidate for HPC network interface from commonly known ones.
3940
40-
If args.hpc_interface is provided, that value is used directly. Otherwise,
41-
commonly used HPC interfaces are preferred. Loopback and container interfaces
42-
are avoided because Dask workers on other nodes cannot connect to a scheduler
43-
advertised on 127.0.0.1.
44-
45-
Returns:
46-
str: Name of selected network interface.
47-
48-
Raises:
49-
RuntimeError: If no suitable non-loopback interface can be found.
41+
This deliberately follows the WaterEntropy behaviour and only selects from
42+
known HPC-safe interfaces. It avoids selecting arbitrary interfaces such as
43+
eno1, which may exist on the master node but not on worker nodes.
5044
"""
51-
configured = getattr(self.args, "hpc_interface", None)
52-
if configured:
53-
return configured
54-
55-
preferred_nics = ["bond0", "ib0", "hsn0", "eth0"]
45+
hpc_nics = ["bond0", "ib0", "hsn0", "eth0"]
5646
interfaces = list(psutil.net_if_addrs().keys())
5747

58-
for iface in preferred_nics:
48+
for iface in hpc_nics:
5949
if iface in interfaces:
6050
return iface
6151

62-
for iface in interfaces:
63-
if not iface.startswith(("lo", "docker", "veth")):
64-
return iface
65-
6652
raise RuntimeError(
67-
"Could not find a non-loopback network interface for Dask workers. "
68-
f"Available interfaces: {interfaces}. Set 'hpc_interface' in config.yaml."
53+
"Could not find a known HPC network interface. "
54+
f"Available interfaces: {interfaces}. "
55+
"Expected one of: bond0, ib0, hsn0, eth0."
6956
)
7057

7158
def slurm_directives(self) -> tuple[list[str], list[str]]:
7259
"""
73-
Build SLURM job directives and skip list.
60+
Process extra SLURM directives and directives to be skipped.
7461
7562
Returns:
76-
Tuple[List[str], List[str]]:
77-
- Extra SLURM directives
78-
- Directives to skip
63+
Tuple containing extra directives and skipped directives.
7964
"""
8065
args = self.args
66+
8167
extra: list[str] = []
82-
skip: list[str] = ["--mem"]
8368

8469
if args.hpc_account:
8570
extra.append(f'--account="{args.hpc_account}"')
@@ -88,14 +73,16 @@ def slurm_directives(self) -> tuple[list[str], list[str]]:
8873
if args.hpc_constraint:
8974
extra.append(f'--constraint="{args.hpc_constraint}"')
9075

76+
skip = ["--mem"]
77+
9178
return extra, skip
9279

9380
def slurm_prologues(self) -> list[str]:
9481
"""
95-
Build SLURM job prologue commands for environment setup.
82+
Process environment setup commands for the SLURM worker job script.
9683
9784
Returns:
98-
List[str]: Shell commands executed before job start.
85+
List of shell commands executed before the Dask worker starts.
9986
"""
10087
args = self.args
10188
prologue: list[str] = []
@@ -115,10 +102,10 @@ def slurm_prologues(self) -> list[str]:
115102

116103
def configure_cluster(self) -> Client:
117104
"""
118-
Configure and launch a SLURM-backed Dask cluster.
105+
Configure a SLURM-backed Dask cluster.
119106
120107
Returns:
121-
Client: Dask distributed client connected to cluster.
108+
Dask distributed client connected to the SLURMCluster.
122109
"""
123110
args = self.args
124111

@@ -140,7 +127,6 @@ def configure_cluster(self) -> Client:
140127
shebang="#!/bin/bash --login",
141128
local_directory="$PWD",
142129
interface=iface,
143-
scheduler_options={"interface": iface},
144130
job_script_prologue=prologue,
145131
)
146132

@@ -155,9 +141,9 @@ def configure_cluster(self) -> Client:
155141

156142
def submit_master(self) -> None:
157143
"""
158-
Submit a SLURM job that runs a master Dask orchestration process.
144+
Submit a SLURM job that runs the master CodeEntropy process.
159145
160-
This generates a temporary SLURM script and submits it via `sbatch`.
146+
This generates a temporary SLURM script and submits it via sbatch.
161147
"""
162148
cli = list(sys.argv[1:])
163149
if "--submit" in cli:

tests/unit/CodeEntropy/core/dask_clusters/test_dask_clusters.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def args_helper(args_list):
2424
parser.add_argument("--hpc-nodes", type=int, default=4)
2525
parser.add_argument("--hpc-processes", type=int, default=20)
2626
parser.add_argument("--hpc-walltime", type=str, default="24:00:00")
27-
parser.add_argument("--hpc-interface", type=str, default=None)
2827
parser.add_argument("--hpc-modules", nargs="+", default=None)
2928

3029
parser.add_argument("--conda-env", type=str, default="codeentropy")
@@ -170,7 +169,17 @@ def test_slurm_prologues_includes_hpc_modules():
170169

171170

172171
@mock.patch("psutil.net_if_addrs")
173-
def test_system_network_interface_prefers_ib0(net_if_addrs):
172+
def test_system_network_interface_prefers_bond0(net_if_addrs):
173+
net_if_addrs.return_value = {"bond0": [], "ib0": [], "eth0": []}
174+
175+
args = args_helper([])
176+
manager = HPCDaskManager(args)
177+
178+
assert manager.system_network_interface() == "bond0"
179+
180+
181+
@mock.patch("psutil.net_if_addrs")
182+
def test_system_network_interface_prefers_ib0_when_bond0_missing(net_if_addrs):
174183
net_if_addrs.return_value = {"ib0": [], "eth0": []}
175184

176185
args = args_helper([])
@@ -180,33 +189,37 @@ def test_system_network_interface_prefers_ib0(net_if_addrs):
180189

181190

182191
@mock.patch("psutil.net_if_addrs")
183-
def test_system_network_interface_uses_configured_interface(net_if_addrs):
184-
net_if_addrs.return_value = {"lo": [], "ib0": []}
192+
def test_system_network_interface_prefers_hsn0_when_bond0_and_ib0_missing(
193+
net_if_addrs,
194+
):
195+
net_if_addrs.return_value = {"hsn0": [], "eth0": []}
185196

186-
args = args_helper(["--hpc-interface", "custom0"])
197+
args = args_helper([])
187198
manager = HPCDaskManager(args)
188199

189-
assert manager.system_network_interface() == "custom0"
200+
assert manager.system_network_interface() == "hsn0"
190201

191202

192203
@mock.patch("psutil.net_if_addrs")
193-
def test_system_network_interface_falls_back_to_non_loopback_interface(net_if_addrs):
194-
net_if_addrs.return_value = {"lo": [], "ens5": [], "docker0": []}
204+
def test_system_network_interface_prefers_eth0_when_only_eth0_known_interface(
205+
net_if_addrs,
206+
):
207+
net_if_addrs.return_value = {"eth0": [], "eno1": []}
195208

196209
args = args_helper([])
197210
manager = HPCDaskManager(args)
198211

199-
assert manager.system_network_interface() == "ens5"
212+
assert manager.system_network_interface() == "eth0"
200213

201214

202215
@mock.patch("psutil.net_if_addrs")
203-
def test_system_network_interface_raises_without_non_loopback_interface(net_if_addrs):
204-
net_if_addrs.return_value = {"lo": [], "docker0": [], "veth123": []}
216+
def test_system_network_interface_raises_without_known_hpc_interface(net_if_addrs):
217+
net_if_addrs.return_value = {"lo": [], "docker0": [], "eno1": []}
205218

206219
args = args_helper([])
207220
manager = HPCDaskManager(args)
208221

209-
with pytest.raises(RuntimeError, match="Could not find a non-loopback"):
222+
with pytest.raises(RuntimeError, match="Could not find a known HPC network"):
210223
manager.system_network_interface()
211224

212225

@@ -424,7 +437,7 @@ def test_configure_cluster_writes_job_script(
424437
slurm_cluster.assert_called_once()
425438
_, kwargs = slurm_cluster.call_args
426439
assert kwargs["interface"] == "ib0"
427-
assert kwargs["scheduler_options"] == {"interface": "ib0"}
440+
assert "scheduler_options" not in kwargs
428441

429442
cluster_instance.scale.assert_called_once_with(jobs=4)
430443
client.assert_called_once_with(cluster_instance)

0 commit comments

Comments
 (0)