Skip to content

Commit 9a4c34f

Browse files
committed
EH: CS-194 add rss limits
1 parent bc184da commit 9a4c34f

2 files changed

Lines changed: 165 additions & 118 deletions

File tree

source/daemons/execd/execd_ck_to_do.cc

Lines changed: 111 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@
6868
#include "execd_signal_queue.h"
6969
#include "exec_job.h"
7070
#include "execution_states.h"
71-
#include "msg_execd.h"
7271
#include "sig_handlers.h"
7372
#include "sge.h"
7473

74+
#include "msg_common.h"
75+
#include "msg_execd.h"
76+
7577
#ifdef COMPILE_DC
7678
# include "ptf.h"
7779
# ifdef DEBUG_DC
@@ -179,60 +181,116 @@ static void notify_ptf()
179181
DRETURN_VOID;
180182
}
181183

184+
/**
185+
* @brief
186+
* Sum up the queue limit for a job resource limit.
187+
* It is called multiple times for each gdil element. If a limit is INFINITY in one of the gdil elements,
188+
* the resulting limit will be set to INFINITY (DBL_MAX).
189+
*
190+
* @param queue The queue element to read the limit from (from gdil).
191+
* @param limit_nm The name of the limit attribute in the queue element (e.g., QU_h_cpu)
192+
* @param type The type of the limit (e.g., TYPE_TIM for time limits).
193+
* @param nslots The number of slots allocated for the job in this queue.
194+
* @param limit Reference to the variable where the summed limit will be stored.
195+
*/
196+
static void
197+
force_job_rlimit_sum_up_limit(const lListElem *queue, int limit_nm, u_long32 type, int nslots, double &limit) {
198+
double queue_limit{};
199+
parse_ulong_val(&queue_limit, nullptr, type, lGetString(queue, limit_nm), nullptr, 0);
200+
if (queue_limit == DBL_MAX) {
201+
limit = DBL_MAX;
202+
} else {
203+
limit += queue_limit * nslots;
204+
}
205+
}
206+
207+
/**
208+
* @brief
209+
* Check if the job resource limit is exceeded and apply the limit.
210+
* If the limit is exceeded, the job is signaled with SGE_SIGKILL (hard limits) or SGE_SIGXCPU (soft limits).
211+
*
212+
* @param usage The current usage of the resource.
213+
* @param limit The limit for the resource.
214+
* @param limit_name The name of the resource limit (e.g., "h_cpu").
215+
* @param is_hard_limit True if it is a hard limit, false if it is a soft limit.
216+
* @param queue The queue element where the job is running.
217+
* @param jobid The job ID.
218+
* @param jataskid The job task ID.
219+
*
220+
* @return true if the job was signaled, false otherwise.
221+
*/
222+
static bool
223+
force_job_rlimit_apply_limit(double usage, double limit, const char *limit_name,
224+
bool is_hard_limit, const lListElem *queue,
225+
u_long32 jobid, u_long32 jataskid) {
226+
bool signaled = false;
227+
228+
if (limit < usage) {
229+
if (is_hard_limit) {
230+
WARNING(MSG_JOB_EXCEEDHLIM_USSFF, jobid, limit_name,
231+
lGetString(queue, QU_full_name), usage, limit);
232+
signal_job(jobid, jataskid, SGE_SIGKILL);
233+
} else {
234+
WARNING(MSG_JOB_EXCEEDSLIM_USSFF, jobid, limit_name,
235+
lGetString(queue, QU_full_name), usage, limit);
236+
signal_job(jobid, jataskid, SGE_SIGXCPU);
237+
}
238+
signaled = true;
239+
}
240+
241+
return signaled;
242+
}
243+
244+
182245
/* force job resource limits */
183246
static void force_job_rlimit(const char* qualified_hostname)
184247
{
185-
const lListElem *jep;
186-
187248
DENTER(TOP_LAYER);
188249

250+
const lListElem *jep;
189251
for_each_ep(jep, *ocs::DataStore::get_master_list(SGE_TYPE_JOB)) {
190252
const lListElem *jatep;
191-
192253
for_each_ep(jatep, lGetList(jep, JB_ja_tasks)) {
193-
const lListElem *q=nullptr, *cpu_ep, *vmem_ep, *gdil_ep;
194-
double cpu_val, vmem_val;
195-
double s_cpu, h_cpu;
196-
double s_vmem, h_vmem;
197-
int cpu_exceeded;
198-
lList *usage_list;
199-
u_long32 jobid, jataskid;
254+
u_long32 jobid = lGetUlong(jep, JB_job_number);
255+
u_long32 jataskid = lGetUlong(jatep, JAT_task_number);
200256

201-
jobid = lGetUlong(jep, JB_job_number);
202-
jataskid = lGetUlong(jatep, JAT_task_number);
203-
204-
cpu_val = vmem_val = s_cpu = h_cpu = s_vmem = h_vmem = 0;
257+
double s_cpu{}, h_cpu{};
258+
double s_rss{}, h_rss{};
259+
double s_vmem{}, h_vmem{};
205260

206261
/* retrieve cpu and vmem usage */
207-
usage_list = ptf_get_job_usage(jobid, jataskid, "*");
262+
lList *usage_list = ptf_get_job_usage(jobid, jataskid, "*");
208263
if (usage_list == nullptr) {
209264
continue;
210265
}
211266

212-
if ((cpu_ep = lGetElemStr(usage_list, UA_name, USAGE_ATTR_CPU))) {
213-
cpu_val = lGetDouble(cpu_ep, UA_value);
214-
}
267+
double cpu_val = usage_list_get_double_usage(usage_list, USAGE_ATTR_CPU, 0.0);
268+
double vmem_val = usage_list_get_double_usage(usage_list, USAGE_ATTR_VMEM, 0.0);
269+
double rss_val = usage_list_get_double_usage(usage_list, USAGE_ATTR_RSS, 0.0);
215270

216-
if ((vmem_ep = lGetElemStr(usage_list, UA_name, USAGE_ATTR_VMEM))) {
217-
vmem_val = lGetDouble(vmem_ep, UA_value);
218-
}
219-
220-
DPRINTF("JOB " sge_u32 " %s %10.5f %s %10.5f\n", jobid,
221-
cpu_ep != nullptr ? USAGE_ATTR_CPU : "(" USAGE_ATTR_CPU ")", cpu_val,
222-
vmem_ep != nullptr ? USAGE_ATTR_VMEM : "(" USAGE_ATTR_VMEM ")", vmem_val);
271+
DPRINTF("JOB " sge_u32" %s %10.5f %s %10.5f\n", jobid,
272+
USAGE_ATTR_CPU, cpu_val,
273+
USAGE_ATTR_VMEM, vmem_val);
223274

224-
/* free no longer needed usage_list */
275+
// free no longer necessary usage_list
225276
lFreeList(&usage_list);
226-
cpu_ep = vmem_ep = nullptr;
227277

228-
bool first_gdil_ep = true;
229-
for_each_ep(gdil_ep, lGetList(jatep, JAT_granted_destin_identifier_list)) {
230-
double lim;
231-
char err_str[128];
232-
size_t err_size = sizeof(err_str) - 1;
233-
234-
if (sge_hostcmp(qualified_hostname, lGetHost(gdil_ep, JG_qhostname))
235-
|| !(q = lGetObject(gdil_ep, JG_queue))) {
278+
const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list);
279+
const lListElem *q = nullptr, *gdil_ep;
280+
const lListElem *first_gdil_ep = lFirst(gdil);
281+
const void *iterator = nullptr;
282+
const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator);
283+
while ((gdil_ep = next_gdil_ep) != nullptr) {
284+
next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator);
285+
286+
// need the queue object - it must have been delivered by sge_qmaster
287+
q = lGetObject(gdil_ep, JG_queue);
288+
if (q == nullptr) {
289+
// this should never happen, but if it does, we have to skip this gdil_ep
290+
CRITICAL(MSG_SGETEXT_NULLPTRPASSED_S, "gdil_ep->JG_queue");
291+
#if defined (ENABLE_DEBUG_CHECKS)
292+
abort();
293+
#endif
236294
continue;
237295
}
238296

@@ -246,8 +304,8 @@ static void force_job_rlimit(const char* qualified_hostname)
246304
// - the master task + one slave task is running on this host (nslots = 1)
247305
// only in the second case we have to increase nslots,
248306
// but better always increase it and not kill the job erroneously
249-
if (first_gdil_ep) {
250-
first_gdil_ep = false;
307+
// @todo shouldn't we handle ja_task and pe_tasks separately?
308+
if (gdil_ep == first_gdil_ep) {
251309
const lListElem *pe = lGetObject(jatep, JAT_pe_object);
252310
if (pe != nullptr) {
253311
if (!lGetBool(pe, PE_job_is_first_task)) {
@@ -256,50 +314,26 @@ static void force_job_rlimit(const char* qualified_hostname)
256314
}
257315
}
258316

259-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_s_cpu), err_str, err_size);
260-
if (lim == DBL_MAX) {
261-
s_cpu = DBL_MAX;
262-
} else {
263-
s_cpu += lim * nslots;
264-
}
265-
266-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_h_cpu), err_str, err_size);
267-
if (lim == DBL_MAX) {
268-
h_cpu = DBL_MAX;
269-
} else {
270-
h_cpu += lim * nslots;
271-
}
272-
273-
parse_ulong_val(&lim, nullptr, TYPE_MEM, lGetString(q, QU_s_vmem), err_str, err_size);
274-
if (lim == DBL_MAX) {
275-
s_vmem = DBL_MAX;
276-
} else {
277-
s_vmem += lim * nslots;
278-
}
279-
280-
parse_ulong_val(&lim, nullptr, TYPE_MEM, lGetString(q, QU_h_vmem), err_str, err_size);
281-
if (lim == DBL_MAX) {
282-
h_vmem = DBL_MAX;
283-
} else {
284-
h_vmem += lim * nslots;
285-
}
317+
force_job_rlimit_sum_up_limit(q, QU_s_cpu, TYPE_TIM, nslots, s_cpu);
318+
force_job_rlimit_sum_up_limit(q, QU_h_cpu, TYPE_TIM, nslots, h_cpu);
319+
force_job_rlimit_sum_up_limit(q, QU_s_rss, TYPE_MEM, nslots, s_rss);
320+
force_job_rlimit_sum_up_limit(q, QU_h_rss, TYPE_MEM, nslots, h_rss);
321+
force_job_rlimit_sum_up_limit(q, QU_s_vmem, TYPE_MEM, nslots, s_vmem);
322+
force_job_rlimit_sum_up_limit(q, QU_h_vmem, TYPE_MEM, nslots, h_vmem);
286323
} /* foreach gdil_ep */
287324

288-
if (h_cpu < cpu_val || h_vmem < vmem_val) {
289-
cpu_exceeded = (h_cpu < cpu_val);
290-
WARNING(MSG_JOB_EXCEEDHLIM_USSFF, jobid, cpu_exceeded ? "h_cpu" : "h_vmem",
291-
q?lGetString(q, QU_full_name) : "-",
292-
cpu_exceeded ? cpu_val : vmem_val, cpu_exceeded ? h_cpu : h_vmem);
293-
signal_job(jobid, jataskid, SGE_SIGKILL);
325+
// @todo we output the queue name, which might be incorrect, we might have multiple queues on the host
326+
// hard limits
327+
if (force_job_rlimit_apply_limit(cpu_val, h_cpu, "h_cpu", true, q, jobid, jataskid) ||
328+
force_job_rlimit_apply_limit(rss_val, h_rss, "h_rss", true, q, jobid, jataskid) ||
329+
force_job_rlimit_apply_limit(vmem_val, h_vmem, "h_vmem", true, q, jobid, jataskid)) {
294330
continue;
295331
}
296332

297-
if (s_cpu < cpu_val || s_vmem < vmem_val) {
298-
cpu_exceeded = (s_cpu < cpu_val);
299-
WARNING(MSG_JOB_EXCEEDSLIM_USSFF, jobid, cpu_exceeded ? "s_cpu" : "s_vmem",
300-
q?lGetString(q, QU_full_name) : "-",
301-
cpu_exceeded ? cpu_val : vmem_val, cpu_exceeded ? s_cpu : s_vmem);
302-
signal_job(jobid, jataskid, SGE_SIGXCPU);
333+
// soft limits
334+
if (force_job_rlimit_apply_limit(cpu_val, s_cpu, "s_cpu", false, q, jobid, jataskid) ||
335+
force_job_rlimit_apply_limit(rss_val, s_rss, "s_rss", false, q, jobid, jataskid) ||
336+
force_job_rlimit_apply_limit(vmem_val, s_vmem, "s_vmem", false, q, jobid, jataskid)) {
303337
continue;
304338
}
305339
} /* foreach jatep */

source/daemons/execd/job_report_execd.cc

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@
5454
#include "job_report_execd.h"
5555
#include "reaper_execd.h"
5656
#include "execd_signal_queue.h"
57-
#include "msg_execd.h"
5857
#include "load_avg.h"
5958

59+
#include "msg_common.h"
60+
#include "msg_execd.h"
61+
6062
lList *jr_list = nullptr;
6163
static bool flush_jr = false;
6264
static int check_queue_limits = 0;
@@ -390,55 +392,66 @@ int do_ack(ocs::gdi::ClientServerBase::struct_msg_t *aMsg)
390392
DRETURN(0);
391393
}
392394

395+
/**
396+
* @brief
397+
* This function checks if the queue limits for a job are set and updates the
398+
* global check_queue_limits counter accordingly.
399+
*
400+
* @param queue The queue element to check.
401+
* @param type The type of limit to check (e.g., TYPE_TIM, TYPE_MEM).
402+
* @param limit_nm The name of the limit attribute in the queue (e.g., QU_h_cpu).
403+
* @param increase If true, increments the check_queue_limits counter; otherwise, decrements it.
404+
*
405+
* @return true if a limit was found and processed, false otherwise.
406+
*/
407+
static bool
408+
count_queue_limits(const lListElem *queue, u_long32 type, int limit_nm, bool increase) {
409+
// check_queue_limits is a global variable that is used to determine
410+
// whether we need to check queue limits or not.
411+
// @todo: store this once in the ja_task?
412+
double lim{};
413+
bool found_limit = false;
414+
parse_ulong_val(&lim, nullptr, type, lGetString(queue, limit_nm), nullptr, 0);
415+
if (lim != DBL_MAX) {
416+
if (increase) {
417+
check_queue_limits++;
418+
} else {
419+
check_queue_limits--;
420+
}
421+
found_limit = true;
422+
}
423+
424+
return found_limit;
425+
}
426+
393427
void modify_queue_limits_flag_for_job(const char *qualified_hostname, lListElem *jep, bool increase)
394428
{
395429
const lListElem *jatep;
396430
const lListElem *gdil_ep;
397431

398432
for_each_ep(jatep, lGetList(jep, JB_ja_tasks)) {
399-
for_each_ep(gdil_ep, lGetList(jatep, JAT_granted_destin_identifier_list)) {
400-
double lim;
401-
lListElem *q;
402-
403-
if (sge_hostcmp(qualified_hostname, lGetHost(gdil_ep, JG_qhostname))
404-
|| !(q = lGetObject(gdil_ep, JG_queue))) {
433+
const lList *gdil = lGetList(jatep, JAT_granted_destin_identifier_list);
434+
const void *iterator = nullptr;
435+
const lListElem *next_gdil_ep = lGetElemHostFirst(gdil, JG_qhostname, qualified_hostname, &iterator);
436+
while ((gdil_ep = next_gdil_ep) != nullptr) {
437+
next_gdil_ep = lGetElemHostNext(gdil, JG_qhostname, qualified_hostname, &iterator);
438+
439+
const lListElem *q = lGetObject(gdil_ep, JG_queue);
440+
if (q == nullptr) {
441+
// this should never happen, but if it does, we have to skip this gdil_ep
442+
CRITICAL(MSG_SGETEXT_NULLPTRPASSED_S, "gdil_ep->JG_queue");
443+
#if defined (ENABLE_DEBUG_CHECKS)
444+
abort();
445+
#endif
405446
continue;
406447
}
407448

408-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_s_cpu), nullptr, 0);
409-
if (lim != DBL_MAX) {
410-
if (increase) {
411-
check_queue_limits++;
412-
} else {
413-
check_queue_limits--;
414-
}
415-
break;
416-
}
417-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_h_cpu), nullptr, 0);
418-
if (lim != DBL_MAX) {
419-
if (increase) {
420-
check_queue_limits++;
421-
} else {
422-
check_queue_limits--;
423-
}
424-
break;
425-
}
426-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_s_vmem), nullptr, 0);
427-
if (lim != DBL_MAX) {
428-
if (increase) {
429-
check_queue_limits++;
430-
} else {
431-
check_queue_limits--;
432-
}
433-
break;
434-
}
435-
parse_ulong_val(&lim, nullptr, TYPE_TIM, lGetString(q, QU_h_vmem), nullptr, 0);
436-
if (lim != DBL_MAX) {
437-
if (increase) {
438-
check_queue_limits++;
439-
} else {
440-
check_queue_limits--;
441-
}
449+
if (count_queue_limits(q, TYPE_TIM, QU_s_cpu, increase) ||
450+
count_queue_limits(q, TYPE_TIM, QU_h_cpu, increase) ||
451+
count_queue_limits(q, TYPE_MEM, QU_s_rss, increase) ||
452+
count_queue_limits(q, TYPE_MEM, QU_h_rss, increase) ||
453+
count_queue_limits(q, TYPE_MEM, QU_s_vmem, increase) ||
454+
count_queue_limits(q, TYPE_MEM, QU_h_vmem, increase)) {
442455
break;
443456
}
444457
}

0 commit comments

Comments
 (0)