Skip to content

Commit 0afde78

Browse files
committed
feat: Add cgroup2 limit support
1 parent 2a9d7d6 commit 0afde78

5 files changed

Lines changed: 353 additions & 9 deletions

File tree

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
#!/usr/bin/env python3
2+
"""cgroup2 support for DIRAC pilot."""
3+
4+
import os
5+
import functools
6+
import subprocess
7+
from DIRAC import S_OK, S_ERROR, gLogger
8+
from DIRAC.Core.Utilities.DIRACSingleton import DIRACSingleton
9+
from DIRAC.Core.Utilities import Subprocess
10+
11+
12+
class CG2Manager(metaclass=DIRACSingleton):
13+
"""A class to manage cgroup2 hierachy for a typical pilot job use-case.
14+
15+
This creates a group for all of the pilot processes (anything in the
16+
group at the start. This is a requirement for controlling the
17+
sub-groups (no processes in non-leaf groups).
18+
19+
A group is then created on request for each "slot" under the pilot,
20+
with the requested limits.
21+
"""
22+
23+
# Paths used to lookup cgroup info
24+
FILE_MOUNTS = "/proc/mounts"
25+
FILE_CUR_CGROUP = f"/proc/{os.getpid()}/cgroup"
26+
# Control file names within the cgroup2 hierachy
27+
CTRL_CONTROLLERS = "cgroup.controllers"
28+
CTRL_PROCS = "cgroup.procs"
29+
CTRL_SUBTREE = "cgroup.subtree_control"
30+
CTRL_MEM_OOM_GROUP = "memory.oom.group"
31+
CTRL_MEM_EVENTS = "memory.events"
32+
CTRL_MEM_MAX = "memory.max"
33+
CTRL_MEM_SWAP_MAX = "memory.swap.max"
34+
CTRL_MEM_PEAK = "memory.peak"
35+
CTRL_CPU_MAX = "cpu.max"
36+
# CPU controller constants
37+
# Weight is the max value for 1 CPU core
38+
CPU_WEIGHT = 100000
39+
# Period is the averaging time in us to apply the limit
40+
# The default is 100k and I see no particularly reason this should change
41+
CPU_PERIOD = 100000
42+
# Name of the group for the existing pilot processes
43+
PILOT_GROUP = f"dirac_pilot_{os.getpid()}"
44+
45+
def __init__(self):
46+
"""Set-up CGroup2 manager."""
47+
# This boolean will be set to True if the cgroups are configured
48+
# in the expected way
49+
self._ready = False
50+
# A counter of number of subgroups created
51+
# Used to create unique group names
52+
self._subproc_num = 0
53+
# Physical path to the starting cgroup for this process
54+
# (i.e. the base of our hierachy)
55+
self._cgroup_path = None
56+
# Logger
57+
self.log = gLogger.getSubLogger("CG2Manager")
58+
59+
@staticmethod
60+
def _filter_file(path, filterfcn):
61+
"""Opens a file and runs filterfcn for each line.
62+
If filterfcn returns any value, that value will be returned
63+
by this function.
64+
Returns None if no line matches.
65+
"""
66+
with open(path, encoding="ascii") as file_in:
67+
for line in file_in.readlines():
68+
line = line.strip()
69+
if res := filterfcn(line):
70+
return res
71+
return None
72+
73+
def _detect_root(self):
74+
"""Find the cgroup2 filesystem mountpoint on this system.
75+
Returns the mountpoint path or None if it isn't found.
76+
"""
77+
78+
def filt(line):
79+
"""Filter function to find the first cgroup2 mount point
80+
from a standard /proc/mounts layout file.
81+
"""
82+
parts = line.split(" ")
83+
if len(parts) < 3:
84+
return None
85+
if parts[2] == "cgroup2":
86+
return parts[1]
87+
return None
88+
89+
return self._filter_file(self.FILE_MOUNTS, filt)
90+
91+
def _detect_path(self):
92+
"""Finds the full physical path to the current cgroup control dir.
93+
Sets self._cgroup_path on success.
94+
Raises a RuntimeError if the path cannot be determined.
95+
"""
96+
97+
def filt(line):
98+
"""Filter to find the current cgroup2 name for the current
99+
process, without the leading /.
100+
"""
101+
if line.startswith("0::/"):
102+
return line[4:]
103+
return False
104+
105+
if not (root_path := self._detect_root()):
106+
raise RuntimeError("Failed to find cgroup mount point")
107+
if not (cur_group := self._filter_file(self.FILE_CUR_CGROUP, filt)):
108+
raise RuntimeError("Failed to find current cgroup")
109+
self._cgroup_path = os.path.join(root_path, cur_group)
110+
111+
def _create_group(self, group_name, isolate_oom=True):
112+
"""Creates a new group.
113+
If "isolate_oom" is True, the new group will be decoupled
114+
from the parent's OOM group.
115+
Raises a RuntimeError if the group cannot be created.
116+
"""
117+
try:
118+
os.mkdir(os.path.join(self._cgroup_path, group_name))
119+
except PermissionError as err:
120+
raise RuntimeError(f"Permission denied creating sub-cgroup '{group_name}'") from err
121+
if isolate_oom:
122+
self._write_control(group_name, self.CTRL_MEM_OOM_GROUP, "0")
123+
124+
def _remove_group(self, group_name):
125+
"""Removes a group."""
126+
os.rmdir(os.path.join(self._cgroup_path, group_name))
127+
128+
def _move_init_procs(self):
129+
"""Creates the pilot sub-group and moves all of the initial processes
130+
from the top group into the new sub-group.
131+
Will raise a RuntimeError if any cgroup configuration problem
132+
prevents this from completing succesfully.
133+
"""
134+
self._create_group(self.PILOT_GROUP, isolate_oom=False)
135+
cur_pids = self._read_control("", self.CTRL_PROCS)
136+
self._write_control(self.PILOT_GROUP, self.CTRL_PROCS, cur_pids)
137+
138+
def _read_control(self, group_name, ctrl_name):
139+
"""Reads a control value for the given group_name (relative to our base path).
140+
The returned value varies depending on the value content:
141+
- For a single token value, a string containing that token will be returned.
142+
- For a single line value with space-seperated tokens, a list of tokens will be returned.
143+
- For a multi-line value (where each line is a token), a list of tokens will be returned.
144+
All tokens in the return values are strings.
145+
A RuntimeError will be raised if the control cannot be read.
146+
"""
147+
try:
148+
with open(
149+
os.path.join(self._cgroup_path, group_name, ctrl_name),
150+
encoding="ascii",
151+
) as file_in:
152+
values = [line.strip() for line in file_in.readlines()]
153+
if " " in values and len(values) == 1:
154+
values = values[0].split(" ")
155+
if len(values) == 1:
156+
values = values[0]
157+
return values
158+
except PermissionError as err:
159+
raise RuntimeError(f"Access denied reading read control '{group_name}/{ctrl_name}'") from err
160+
161+
def _write_control(self, group_name, ctrl_name, value):
162+
"""Writes a control value for a given group_name (relative to our base path).
163+
The value can be a string or an iterable of strings. The values should not
164+
contain any whitespace characters.
165+
A RuntimeError will be raised if the control cannot be set.
166+
"""
167+
try:
168+
ctrl_path = os.path.join(self._cgroup_path, group_name, ctrl_name)
169+
with open(ctrl_path, "w", encoding="ascii") as file_out:
170+
if isinstance(value, str):
171+
value = [value]
172+
for arg in value:
173+
file_out.write(f"{arg}\n")
174+
# Flush is critical here as setting multiple values at the same time may fail
175+
file_out.flush()
176+
except PermissionError as err:
177+
raise RuntimeError(f"Access denied writing control '{group_name}/{ctrl_name}'") from err
178+
except OSError as err:
179+
# This generally happens if we're trying to set a value that is
180+
# considered invalid, for example delegating a controller that isn't enabled
181+
# in the first place.
182+
raise RuntimeError(f"Error writing control '{group_name}/{ctrl_name}' = {value}") from err
183+
184+
def _get_oom_count(self, slot_name):
185+
"""Extracts the OOM counter as an int for the given slot.
186+
Returns an int on success, can return a None if the memory.events
187+
doesn't contain an oom counter or throws RuntimeError on failure.
188+
"""
189+
190+
def filt(line):
191+
"""Filter to find the oom counter from a memory.events file."""
192+
if line.startswith("oom "):
193+
return int(line[4:])
194+
return False
195+
196+
mem_events = os.path.join(self._cgroup_path, slot_name, self.CTRL_MEM_EVENTS)
197+
return self._filter_file(mem_events, filt)
198+
199+
def _set_limits(self, group_name, cores=None, memory=None, noswap=False):
200+
"""Sets the limits for an existing group.
201+
See create_slot for a description of the other parameters.
202+
This will raise a RuntimeError if appyling any of the limits fail to apply.
203+
"""
204+
if cores:
205+
proc_max = int(cores * self.CPU_WEIGHT)
206+
self._write_control(group_name, self.CTRL_CPU_MAX, f"{proc_max} {self.CPU_PERIOD}")
207+
if memory:
208+
self._write_control(group_name, self.CTRL_MEM_MAX, f"{memory}")
209+
if noswap:
210+
self._write_control(group_name, self.CTRL_MEM_SWAP_MAX, "0")
211+
212+
def _prepare(self):
213+
"""Sets up the cgroup tree for the current process.
214+
Should be called once, before using any of the other functions in this class.
215+
216+
Note that this function (specifcally the _move_init_procs call) assumes that
217+
the list of processes is static. If the process list changes while this is running,
218+
it is likely that this will fail to set things up properly.
219+
"""
220+
self._detect_path()
221+
controllers = self._read_control("", self.CTRL_CONTROLLERS)
222+
if not controllers:
223+
raise RuntimeError("No controllers enabled")
224+
for ctrl in ["cpu", "memory"]:
225+
if not ctrl in controllers:
226+
raise RuntimeError(f"{ctrl} controller not enabled")
227+
self._move_init_procs()
228+
self._write_control("", self.CTRL_SUBTREE, ["+cpu", "+memory"])
229+
self._ready = True
230+
231+
def _create_slot(self, slot_name, cores=None, memory=None, noswap=False):
232+
"""Creates a slot for a job with the given slot_name.
233+
Cores is a float, number of CPU cores this group may use.
234+
Memory is a string or int, either a number of bytes to limit the group RSS,
235+
or a string limit with a unit suffix, e.g. "1G" as supported by the cgroup memory
236+
controller.
237+
If noswap is set to true, the swap memory limit will be set to 0; this is mostly
238+
useful for testing (where the system may swap memory instead of triggering an
239+
OOM, which may allow a process to use more than the memory limit).
240+
This will raise a RuntimeError if setting up the slot fails.
241+
"""
242+
if not self._ready:
243+
return
244+
self._create_group(slot_name)
245+
self._set_limits(slot_name, cores, memory, noswap)
246+
247+
def _remove_slot(self, slot_name):
248+
"""Removes a slot with the given name.
249+
Can raise usual filesystem OSError if the slot doesn't exist.
250+
"""
251+
if not self._ready:
252+
return
253+
self._remove_group(slot_name)
254+
255+
def _setup_subproc(self, slot_name):
256+
"""A subprocess preexec function for setting up cgroups.
257+
This will move te current process into the given cgroup slot.
258+
On failure, no error will be reported.
259+
"""
260+
# Threading danger!
261+
# There are potential threading issues with preexec functions
262+
# They must not hold any locks that the parent process might already
263+
# be holding, including ones in standard library functions.
264+
# This function should be kept as minimal as possible.
265+
try:
266+
self._write_control(slot_name, self.CTRL_PROCS, f"{os.getpid()}")
267+
except Exception as err:
268+
# We can't even really log here as we're in the set-up
269+
# context of the new proces
270+
pass
271+
272+
def setUp(self):
273+
"""Creates the base cgroup tree if possible. Should be called once
274+
per process before using systemCall.
275+
Returns S_OK/S_ERROR.
276+
"""
277+
try:
278+
self._prepare()
279+
except Exception as err:
280+
# The majority of CGroup failures will be RuntimeError
281+
# However we don't want any unexpected failure to crash the upstream module,
282+
# We just want to continue without cgroup support instead
283+
return S_ERROR(str(err))
284+
return S_OK()
285+
286+
def systemCall(self, *args, **kwargs):
287+
"""A proxy function for Subprocess.systemCall but will create a cgroup2 slot
288+
if the functionality is available. An optional ceParameters dictionary
289+
may be included, which will be searched for specific cgroup memory options.
290+
Returns the usual S_OK/S_ERROR from Subprocess.systemCall.
291+
"""
292+
preexec_fn = None
293+
slot_name = f"subproc_{os.getpid()}_{self._subproc_num}"
294+
self._subproc_num += 1
295+
if self._ready:
296+
self.log.info(f"Creating slot cgroup {slot_name}")
297+
memory = None
298+
noswap = False
299+
if "ceParameters" in kwargs:
300+
memoryMB = int(kwargs["ceParameters"].get("MemoryLimitMB", 0))
301+
if memoryMB:
302+
memory = memoryMB * 1024 * 1024
303+
if kwargs["ceParameters"].get("MemoryNoSwap", "no").lower() in ("yes", "true"):
304+
noswap = True
305+
try:
306+
self.log.info(f"CGroup Limits, Mem: {memory}, NoSwap: {noswap}")
307+
self._create_slot(slot_name, memory=memory, noswap=noswap)
308+
preexec_fn = functools.partial(CG2Manager._setup_subproc, self, slot_name)
309+
except Exception as err:
310+
self.log.warn("Failed to create slot cgroup:", str(err))
311+
kwargs["preexec_fn"] = preexec_fn
312+
kwargs.pop("ceParameters", None)
313+
res = Subprocess.systemCall(*args, **kwargs)
314+
if self._ready:
315+
self.log.info(f"Removing slot cgroup {slot_name}")
316+
try:
317+
oom_count = self._get_oom_count(slot_name)
318+
if oom_count:
319+
# Child process triggered an OOM
320+
# We can't readily report this upstream (child process will probably
321+
# fail with an error code), so just log it and continue
322+
self.log.info(f"OOM detected from child process (slot {slot_name})")
323+
self._remove_slot(slot_name)
324+
except Exception as err:
325+
self.log.warn(f"Failed to delete slot {slot_name} cgroup:", str(err))
326+
return res

src/DIRAC/Core/Utilities/Subprocess.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ def __readFromSystemCommandOutput(self, fd, bufferIndex):
424424
exitStatus = self.killChild()
425425
return self.__generateSystemCommandError(exitStatus, f"{retDict['Message']} for '{self.cmdSeq}' call")
426426

427-
def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None):
427+
def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None, preexec_fn=None):
428428
"""system call (no shell) - execute :cmdSeq:"""
429429

430430
if shell:
@@ -444,6 +444,7 @@ def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None):
444444
close_fds=closefd,
445445
env=env,
446446
universal_newlines=True,
447+
preexec_fn=preexec_fn,
447448
)
448449
self.childPID = self.child.pid
449450
except OSError as v:
@@ -545,7 +546,7 @@ def __callLineCallback(self, bufferIndex):
545546
return False
546547

547548

548-
def systemCall(timeout, cmdSeq, callbackFunction=None, env=None, bufferLimit=52428800):
549+
def systemCall(timeout, cmdSeq, callbackFunction=None, env=None, bufferLimit=52428800, preexec_fn=None):
549550
"""
550551
Use SubprocessExecutor class to execute cmdSeq (it can be a string or a sequence)
551552
with a timeout wrapper, it is executed directly without calling a shell
@@ -555,13 +556,15 @@ def systemCall(timeout, cmdSeq, callbackFunction=None, env=None, bufferLimit=524
555556
sysCall = Watchdog(
556557
spObject.systemCall,
557558
args=(cmdSeq,),
558-
kwargs={"callbackFunction": callbackFunction, "env": env, "shell": False},
559+
kwargs={"callbackFunction": callbackFunction, "env": env, "shell": False, "preexec_fn": preexec_fn},
559560
)
560561
spObject.log.verbose("Subprocess Watchdog timeout set to %d" % timeout)
561562
result = sysCall(timeout + 1)
562563
else:
563564
spObject = Subprocess(timeout, bufferLimit=bufferLimit)
564-
result = spObject.systemCall(cmdSeq, callbackFunction=callbackFunction, env=env, shell=False)
565+
result = spObject.systemCall(
566+
cmdSeq, callbackFunction=callbackFunction, env=env, shell=False, preexec_fn=preexec_fn
567+
)
565568
return result
566569

567570

src/DIRAC/Resources/Computing/InProcessComputingElement.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import stat
88

99
from DIRAC import S_OK, S_ERROR
10-
from DIRAC.Core.Utilities.Subprocess import systemCall
1110
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
11+
from DIRAC.Core.Utilities.CGroups2 import CG2Manager
1212

1313
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
1414

@@ -61,7 +61,9 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
6161
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
6262
cmd = os.path.abspath(executableFile)
6363
self.log.verbose("CE submission command:", cmd)
64-
result = systemCall(0, cmd, callbackFunction=self.sendOutput, env=payloadEnv)
64+
result = CG2Manager().systemCall(
65+
0, cmd, callbackFunction=self.sendOutput, env=payloadEnv, ceParameters=self.ceParameters
66+
)
6567
if payloadProxy:
6668
os.unlink(payloadProxy)
6769

0 commit comments

Comments
 (0)