Skip to content

Commit a2f623a

Browse files
authored
EH: CS-1239 Do usage and decay handling in worker threads when usage for jobs arrive (#82)
1 parent 0884e06 commit a2f623a

33 files changed

Lines changed: 1660 additions & 798 deletions

doc/markdown/man/man5/sge_conf.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,14 @@ Sets the time interval for spooling the sharetree usage. The default is set to 0
809809
colon-separated string or seconds. There is no setting to turn the sharetree spooling off. (e.g.
810810
STREE_SPOOL_INTERVAL=00:02:00)
811811

812+
***STREE_TICK_INTERVAL***
813+
814+
Sets the cadence at which share-tree usage is decayed and the resulting share-tree state is republished to
815+
clients such as the scheduler and *sge_share_mon*. The value is given in seconds or as a colon-separated time
816+
string. The valid range is 1 to 300 seconds; values above the upper bound are clamped on use, and values
817+
\<= 0 are rejected with a warning and reset to the default. The default value is 5 seconds. Changes take
818+
effect within seconds. (e.g. STREE_TICK_INTERVAL=00:00:30)
819+
812820
***MAX_JOB_DELETION_TIME***
813821

814822
Sets the value of how long the qmaster will spend deleting jobs. After this time, the qmaster will continue with

source/daemons/qmaster/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ set(QMASTER_SOURCES
2727
ocs_BaseAccountingFileWriter.cc
2828
ocs_BaseReportingFileWriter.cc
2929
ocs_CategoryQmaster.cc
30+
ocs_FinishedJob.cc
31+
ocs_SharetreeUsage.cc
3032
ocs_JsonAccountingFileWriter.cc
3133
ocs_JsonReportingFileWriter.cc
3234
ocs_MirrorReaderDataStore.cc

source/daemons/qmaster/configuration_qmaster.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
#include "configuration_qmaster.h"
7373
#include "ocs_ReportingFileWriter.h"
7474
#include "sge.h"
75+
#include "ocs_SharetreeUsage.h"
7576
#include "sge_persistence_qmaster.h"
7677
#include "sge_userprj_qmaster.h"
7778
#include "reschedule.h"
@@ -395,6 +396,20 @@ sge_mod_configuration(lListElem *aConf, lList **anAnswer, const char *aUser, con
395396

396397
// The maximum number of events clients might have changed.
397398
sge_set_max_dynamic_event_clients(mconf_get_max_dynamic_event_clients());
399+
400+
// CS-1239: STREE_TICK_INTERVAL may have changed. The pending one-time
401+
// tick event was scheduled with the *previous* interval, so without
402+
// an explicit re-schedule a tighter new interval would not take effect
403+
// until the already-queued event fires - which can be up to 300 s away.
404+
// Drop the queued event and re-queue at +5 s so the new value is in
405+
// effect within seconds.
406+
sge_reschedule_sharetree_tick();
407+
408+
// CS-1239: same story for STREE_SPOOL_INTERVAL and the dirty-objects
409+
// flush event - the already-queued event keeps its original 240 s
410+
// schedule unless we explicitly re-arm it. issue_1385 pins
411+
// STREE_SPOOL_INTERVAL low then reads the spool file within seconds.
412+
sge_reschedule_sharetree_spool();
398413
}
399414

400415
/* invalidate configuration cache */

source/daemons/qmaster/job_exit.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,19 @@ sge_job_exit(lListElem *jr, lListElem *jep, lListElem *jatep, monitoring_t *moni
160160
ocs::ReportingFileWriter::create_job_logs(nullptr, timestamp, JL_DELETED, MSG_EXECD, hostname, jr, jep, jatep, nullptr,
161161
MSG_LOG_JREMOVED);
162162

163+
/* CS-1239: snapshot what AR cleanup needs because the commit below now also
164+
* buries the ja_task (and possibly the job), invalidating jep/jatep. The AR
165+
* removal check must run AFTER the commit because qinstance_slots_used() on
166+
* the AR's reserved queues only drops once sge_clear_granted_resources()
167+
* inside sge_commit_job() has released this job's bookings. */
168+
uint32_t job_ar_id = lGetUlong(jep, JB_ar);
169+
bool job_was_deleted = (lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED;
170+
163171
sge_commit_job(jep, jatep, jr, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor, gdi_session);
164172

165-
if (lGetUlong(jep, JB_ar) != 0 && (lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED) {
173+
if (job_ar_id != 0 && job_was_deleted) {
166174
/* get AR and remove it if no other jobs are debited */
167-
lListElem *ar = ar_list_locate(master_ar_list, lGetUlong(jep, JB_ar));
175+
lListElem *ar = ar_list_locate(master_ar_list, job_ar_id);
168176

169177
if (ar != nullptr && lGetUlong(ar, AR_state) == AR_DELETED) {
170178
const lListElem *ar_queue;
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*___INFO__MARK_BEGIN_NEW__*/
2+
/***************************************************************************
3+
*
4+
* Copyright 2026 HPC-Gridware GmbH
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
***************************************************************************/
19+
/*___INFO__MARK_END_NEW__*/
20+
21+
#include "uti/sge_time.h"
22+
23+
#include "sgeobj/ocs_DataStore.h"
24+
#include "sgeobj/ocs_Usage.h"
25+
#include "sgeobj/sge_job.h"
26+
#include "sgeobj/sge_ja_task.h"
27+
#include "sgeobj/sge_schedd_conf.h"
28+
#include "sgeobj/sge_usage.h"
29+
#include "sgeobj/sge_userprj.h"
30+
31+
#include "ocs_SharetreeUsage.h"
32+
#include "ocs_FinishedJob.h"
33+
#include "sge_persistence_qmaster.h"
34+
35+
namespace {
36+
/* Remove the debited-job-usage entry for the given job number from the
37+
* passed list, if present. Mirrors the "UPU_old_usage_list == nullptr"
38+
* signal in the ORT_update_user_usage / _project_usage handlers, which is
39+
* how the order-driven path tells qmaster to drop the entry: with the
40+
* worker-thread path the qmaster IS the one doing the modification, so
41+
* the drop is direct. */
42+
void drop_debited_job(lListElem *userprj, int upu_field, uint32_t job_number) {
43+
lList *upu_list = lGetListRW(userprj, upu_field);
44+
if (upu_list == nullptr) {
45+
return;
46+
}
47+
lListElem *upu = lGetElemUlongRW(upu_list, UPU_job_number, job_number);
48+
if (upu != nullptr) {
49+
lRemoveElem(upu_list, &upu);
50+
}
51+
}
52+
53+
/* Bump UU_/PR_version so consumers that key off it see a change.
54+
*
55+
* Do NOT bump UU_/PR_usage_seqno: that field is the periodic decay
56+
* handler's idempotency stamp - decay_userprj_usage skips when
57+
* seqno == stored_seqno. The handler's tet_decay_run counter also
58+
* starts at 0 and ++s by 1, so a booking bump would silently match a
59+
* tick and the decay would be a no-op. (That collision produced
60+
* u0 == u1 bit-exact in the sharetree_decay_periodic test.) */
61+
void bump_change_markers(lListElem *userprj, int version_field) {
62+
lAddUlong(userprj, version_field, 1);
63+
}
64+
}
65+
66+
int
67+
sge_book_finished_job_usage(lListElem *jep, lListElem *jatep, monitoring_t * /* monitor */, uint64_t gdi_session) {
68+
if (jep == nullptr || jatep == nullptr) {
69+
return -1;
70+
}
71+
72+
/* Idempotency guard: skip if this ja_task has already been booked.
73+
* The "finished_jobs" entry in JAT_scaled_usage_list is set below as
74+
* part of the booking; if it is already there, the booking has run
75+
* before and we must not double-count. Mirrors the pre-CS-1239 check
76+
* in sge_calc_tickets ("we already handled this finished job").
77+
* Reschedule is safe: sge_give_jobs.cc clears JAT_scaled_usage_list on
78+
* COMMIT_ST_RESCHEDULED / COMMIT_ST_USER_RESCHEDULED so the next exit
79+
* re-seeds. The guard is defensive coverage for future paths that may
80+
* re-invoke this helper (e.g. CS-1908 finished-job retention). */
81+
if (lGetSubStr(jatep, UA_name, "finished_jobs", JAT_scaled_usage_list) != nullptr) {
82+
return 0;
83+
}
84+
85+
/* CS-1239: skip the entire booking when no master share tree is configured.
86+
* The summed UU_/PR_/UPP_usage is consumed only by the share-tree allocator
87+
* and by sge_share_mon, both of which are dormant in this case. This
88+
* restores the pre-CS-1239 implicit gate ("no UU/PR objects -> no work";
89+
* see calc_usage.md) by checking the direct indicator. Trade-off: a
90+
* qconf -Astree after a quiet period starts the share-tree allocator from
91+
* zero history for one tick. Matching gate is in sge_sharetree_tick_handler.
92+
* Caller holds LOCK_GLOBAL write (always via sge_commit_job), so reading
93+
* the SHARETREE master list without an extra lock is safe. */
94+
if (lFirst(*ocs::DataStore::get_master_list(SGE_TYPE_SHARETREE)) == nullptr) {
95+
return 0;
96+
}
97+
98+
const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER);
99+
const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT);
100+
101+
lListElem *user = user_list_locate(master_user_list, lGetString(jep, JB_owner));
102+
lListElem *project = nullptr;
103+
const char *project_name = lGetString(jep, JB_project);
104+
if (project_name != nullptr && *project_name != '\0') {
105+
project = prj_list_locate(master_project_list, project_name);
106+
}
107+
108+
/* 1. catch up decay since last tick, then sum the finished job's usage
109+
* into UU_/PR_/UPP_. Worker thread's sc_state_t.decay_constant is
110+
* thread-local and uninitialized by default (== 0), so without this
111+
* refresh decay_usage would multiply by 0 and zero everything. Same
112+
* gap previously hit on the scheduler thread (fixed in
113+
* sge_sharetree_tick_handler). */
114+
ocs::Usage::calculate_default_decay_constant(sconf_get_halftime());
115+
116+
const uint64_t curr_time = sge_get_gmt64();
117+
lList *decay_list = ocs::Usage::get_decay_list();
118+
lList *usage_weight_list = sconf_get_usage_weight_list();
119+
120+
/* Seed "finished_jobs"=1 onto the job's scaled_usage_list before we
121+
* sum it into UU_/PR_/UPP_. The pending-priority calculation in
122+
* sge_sort_pending_job_nodes (sgeee.cc) reads the leaf's
123+
* STN_usage_list "finished_jobs" entry and adds it to job_count when
124+
* computing per-pending-job tickets. calc_node_usage propagates the
125+
* project's "finished_jobs" PR_usage entry up to the leaf's
126+
* STN_usage_list. Without this seed the counter is never written
127+
* anywhere and the formula treats every leaf as "0 finished" - the
128+
* leaf with the fewest pending jobs wins by per-job dilution alone,
129+
* starving leaves with many pending (project3 in the sharetree
130+
* test). */
131+
lList *scaled = lGetListRW(jatep, JAT_scaled_usage_list);
132+
if (scaled == nullptr) {
133+
lSetList(jatep, JAT_scaled_usage_list, lCreateList("scaled_usage", UA_Type));
134+
scaled = lGetListRW(jatep, JAT_scaled_usage_list);
135+
}
136+
usage_list_set_double_usage(scaled, "finished_jobs", 1.0);
137+
138+
/* CS-1239 (test convergence fix): the decay between the previous TET
139+
* tick and this finish time must apply to the historical UU_/PR_usage
140+
* BEFORE we sum the new job's scaled_usage onto it. Otherwise the
141+
* stamp jumps forward to curr_time but the historical part is "stuck
142+
* at the previous-tick decay state" - the next tick decays by
143+
* (next_tick - curr_time) which loses the (curr_time - prev_tick)
144+
* interval. For projects that finish often (project2 in the sharetree
145+
* test) this loss accumulates: combined_usage stays artificially high,
146+
* shr stays high relative to truth, allocator keeps picking project2,
147+
* test fails. seqno = (u_long)-1 tells decay_userprj_usage to update
148+
* the stamp but NOT update the stored seqno - the TET tick owns the
149+
* seqno namespace, we just want a catch-up decay tick here. */
150+
ocs::Usage::decay_and_sum_usage(jep, jatep, nullptr, user, project,
151+
decay_list, usage_weight_list,
152+
static_cast<u_long>(-1), curr_time);
153+
lFreeList(&usage_weight_list);
154+
155+
const uint32_t job_number = lGetUlong(jep, JB_job_number);
156+
157+
/* 2. drop the per-job debited entry the sum step just appended (the job
158+
* is gone; no further deltas to compute), and bump change markers so
159+
* consumers see "something happened to this object". */
160+
if (user != nullptr) {
161+
drop_debited_job(user, UU_debited_job_usage, job_number);
162+
bump_change_markers(user, UU_version);
163+
}
164+
if (project != nullptr) {
165+
drop_debited_job(project, PR_debited_job_usage, job_number);
166+
bump_change_markers(project, PR_version);
167+
}
168+
169+
/* 3. mark user / project dirty for both event emission and spool. The
170+
* sgeE_USER_MOD / sgeE_PROJECT_MOD events are deferred to the share-
171+
* tree tick handler (sge_sharetree_tick_handler), which batches them
172+
* with sgeE_NEW_SHARETREE inside one event-master transaction so the
173+
* scheduler mirror sees user/project usage and tree state as one
174+
* atomic snapshot (pre-CS-1239 ORT_share_tree invariant restored).
175+
* Spool is deferred to the share-tree spool handler at its own
176+
* cadence. */
177+
if (user != nullptr) {
178+
const char *uname = lGetString(user, UU_name);
179+
ocs::SharetreeUsage::mark_user_event_dirty(uname);
180+
ocs::SharetreeUsage::mark_user_spool_dirty(uname);
181+
}
182+
if (project != nullptr) {
183+
const char *pname = lGetString(project, PR_name);
184+
ocs::SharetreeUsage::mark_project_event_dirty(pname);
185+
ocs::SharetreeUsage::mark_project_spool_dirty(pname);
186+
}
187+
188+
/* CS-1239 step 5: this finish moved usage in UU_/PR_/UPP_, which means
189+
* the master share tree (read by sge_share_mon and propagated to mirror
190+
* clients via sgeE_NEW_SHARETREE) is now stale. Flag it so the TET
191+
* share-tree tick handler republishes on its next tick. Cheap (one
192+
* bool write under the LOCK_GLOBAL we already hold). */
193+
if (user != nullptr || project != nullptr) {
194+
ocs::SharetreeUsage::mark_share_tree_dirty();
195+
}
196+
197+
/* Bury (sge_bury_job) is performed by the caller (sge_commit_job's
198+
* COMMIT_ST_FINISHED_FAILED_EE case) right after this helper returns.
199+
* Keeping this function a pure "book the usage" routine keeps its
200+
* pre-/post-conditions reviewable in isolation. */
201+
202+
return 0;
203+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
/*___INFO__MARK_BEGIN_NEW__*/
3+
/***************************************************************************
4+
*
5+
* Copyright 2026 HPC-Gridware GmbH
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*
19+
***************************************************************************/
20+
/*___INFO__MARK_END_NEW__*/
21+
22+
#include <cstdint>
23+
24+
#include "cull/cull.h"
25+
#include "uti/sge_monitor.h"
26+
27+
/** Apply the per-finish bookkeeping that, in the pre-CS-1239 scheme, was
28+
* generated by the scheduler thread as the three orders ORT_update_user_usage,
29+
* ORT_update_project_usage, and ORT_remove_job, and applied in worker threads
30+
* via sge_follow_order.
31+
*
32+
* Called from inside sge_commit_job's COMMIT_ST_FINISHED_FAILED_EE case, after
33+
* the JAT_status = JFINISHED transition and before sge_bury_job. External
34+
* callers should never invoke this helper directly - sge_commit_job is the
35+
* contract.
36+
*
37+
* Preconditions: caller holds LOCK_GLOBAL (write); JAT_status is JFINISHED.
38+
*
39+
* Steps:
40+
* - resolve user / project refs;
41+
* - sum the job's final scaled usage into UU_usage (or
42+
* user.UU_project[project].UPP_usage when project is set), UU_long_term_usage
43+
* / UPP_long_term_usage, PR_usage, and PR_long_term_usage. Catches up the
44+
* decay since the last TET tick first, so the historical part stays in sync;
45+
* - drop the job's entry from UU_debited_job_usage / PR_debited_job_usage
46+
* (the job is gone, the tracking record must follow);
47+
* - bump UU_version / PR_version (the change marker). UU_/PR_usage_seqno
48+
* is deliberately NOT touched: it is the periodic decay handler's
49+
* idempotency stamp; sharing it with the change marker would silently
50+
* collide with the handler's run counter and skip decay ticks;
51+
* - mark the (user, project) dirty for the TET sharetree-spool handler,
52+
* which batches the sgeE_USER_MOD / sgeE_PROJECT_MOD events and the spool
53+
* writes with sgeE_NEW_SHARETREE under one event-master transaction;
54+
* - mark the master share tree dirty so the next TET tick republishes it.
55+
*
56+
* Returns 0 on success, a negative value on a hard error (missing jep/jatep).
57+
*/
58+
int sge_book_finished_job_usage(lListElem *jep, lListElem *jatep, monitoring_t *monitor, uint64_t gdi_session);

0 commit comments

Comments
 (0)