diff --git a/doc/markdown/man/man5/sge_conf.md b/doc/markdown/man/man5/sge_conf.md index 7ebf762bc..db26cd867 100644 --- a/doc/markdown/man/man5/sge_conf.md +++ b/doc/markdown/man/man5/sge_conf.md @@ -809,6 +809,14 @@ Sets the time interval for spooling the sharetree usage. The default is set to 0 colon-separated string or seconds. There is no setting to turn the sharetree spooling off. (e.g. STREE_SPOOL_INTERVAL=00:02:00) +***STREE_TICK_INTERVAL*** + +Sets the cadence at which share-tree usage is decayed and the resulting share-tree state is republished to +clients such as the scheduler and *sge_share_mon*. The value is given in seconds or as a colon-separated time +string. The valid range is 1 to 300 seconds; values above the upper bound are clamped on use, and values +\<= 0 are rejected with a warning and reset to the default. The default value is 5 seconds. Changes take +effect within seconds. (e.g. STREE_TICK_INTERVAL=00:00:30) + ***MAX_JOB_DELETION_TIME*** Sets the value of how long the qmaster will spend deleting jobs. After this time, the qmaster will continue with diff --git a/source/daemons/qmaster/CMakeLists.txt b/source/daemons/qmaster/CMakeLists.txt index b99af3fb4..00686d530 100644 --- a/source/daemons/qmaster/CMakeLists.txt +++ b/source/daemons/qmaster/CMakeLists.txt @@ -27,6 +27,8 @@ set(QMASTER_SOURCES ocs_BaseAccountingFileWriter.cc ocs_BaseReportingFileWriter.cc ocs_CategoryQmaster.cc + ocs_FinishedJob.cc + ocs_SharetreeUsage.cc ocs_JsonAccountingFileWriter.cc ocs_JsonReportingFileWriter.cc ocs_MirrorReaderDataStore.cc diff --git a/source/daemons/qmaster/configuration_qmaster.cc b/source/daemons/qmaster/configuration_qmaster.cc index 1492443f6..46aae9831 100644 --- a/source/daemons/qmaster/configuration_qmaster.cc +++ b/source/daemons/qmaster/configuration_qmaster.cc @@ -72,6 +72,7 @@ #include "configuration_qmaster.h" #include "ocs_ReportingFileWriter.h" #include "sge.h" +#include "ocs_SharetreeUsage.h" #include "sge_persistence_qmaster.h" #include "sge_userprj_qmaster.h" #include "reschedule.h" @@ -395,6 +396,20 @@ sge_mod_configuration(lListElem *aConf, lList **anAnswer, const char *aUser, con // The maximum number of events clients might have changed. sge_set_max_dynamic_event_clients(mconf_get_max_dynamic_event_clients()); + + // CS-1239: STREE_TICK_INTERVAL may have changed. The pending one-time + // tick event was scheduled with the *previous* interval, so without + // an explicit re-schedule a tighter new interval would not take effect + // until the already-queued event fires - which can be up to 300 s away. + // Drop the queued event and re-queue at +5 s so the new value is in + // effect within seconds. + sge_reschedule_sharetree_tick(); + + // CS-1239: same story for STREE_SPOOL_INTERVAL and the dirty-objects + // flush event - the already-queued event keeps its original 240 s + // schedule unless we explicitly re-arm it. issue_1385 pins + // STREE_SPOOL_INTERVAL low then reads the spool file within seconds. + sge_reschedule_sharetree_spool(); } /* invalidate configuration cache */ diff --git a/source/daemons/qmaster/job_exit.cc b/source/daemons/qmaster/job_exit.cc index 1672dcab0..f8f5cf637 100644 --- a/source/daemons/qmaster/job_exit.cc +++ b/source/daemons/qmaster/job_exit.cc @@ -160,11 +160,19 @@ sge_job_exit(lListElem *jr, lListElem *jep, lListElem *jatep, monitoring_t *moni ocs::ReportingFileWriter::create_job_logs(nullptr, timestamp, JL_DELETED, MSG_EXECD, hostname, jr, jep, jatep, nullptr, MSG_LOG_JREMOVED); + /* CS-1239: snapshot what AR cleanup needs because the commit below now also + * buries the ja_task (and possibly the job), invalidating jep/jatep. The AR + * removal check must run AFTER the commit because qinstance_slots_used() on + * the AR's reserved queues only drops once sge_clear_granted_resources() + * inside sge_commit_job() has released this job's bookings. */ + uint32_t job_ar_id = lGetUlong(jep, JB_ar); + bool job_was_deleted = (lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED; + sge_commit_job(jep, jatep, jr, COMMIT_ST_FINISHED_FAILED_EE, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor, gdi_session); - if (lGetUlong(jep, JB_ar) != 0 && (lGetUlong(jatep, JAT_state) & JDELETED) == JDELETED) { + if (job_ar_id != 0 && job_was_deleted) { /* get AR and remove it if no other jobs are debited */ - lListElem *ar = ar_list_locate(master_ar_list, lGetUlong(jep, JB_ar)); + lListElem *ar = ar_list_locate(master_ar_list, job_ar_id); if (ar != nullptr && lGetUlong(ar, AR_state) == AR_DELETED) { const lListElem *ar_queue; diff --git a/source/daemons/qmaster/ocs_FinishedJob.cc b/source/daemons/qmaster/ocs_FinishedJob.cc new file mode 100644 index 000000000..283595ef7 --- /dev/null +++ b/source/daemons/qmaster/ocs_FinishedJob.cc @@ -0,0 +1,203 @@ +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include "uti/sge_time.h" + +#include "sgeobj/ocs_DataStore.h" +#include "sgeobj/ocs_Usage.h" +#include "sgeobj/sge_job.h" +#include "sgeobj/sge_ja_task.h" +#include "sgeobj/sge_schedd_conf.h" +#include "sgeobj/sge_usage.h" +#include "sgeobj/sge_userprj.h" + +#include "ocs_SharetreeUsage.h" +#include "ocs_FinishedJob.h" +#include "sge_persistence_qmaster.h" + +namespace { + /* Remove the debited-job-usage entry for the given job number from the + * passed list, if present. Mirrors the "UPU_old_usage_list == nullptr" + * signal in the ORT_update_user_usage / _project_usage handlers, which is + * how the order-driven path tells qmaster to drop the entry: with the + * worker-thread path the qmaster IS the one doing the modification, so + * the drop is direct. */ + void drop_debited_job(lListElem *userprj, int upu_field, uint32_t job_number) { + lList *upu_list = lGetListRW(userprj, upu_field); + if (upu_list == nullptr) { + return; + } + lListElem *upu = lGetElemUlongRW(upu_list, UPU_job_number, job_number); + if (upu != nullptr) { + lRemoveElem(upu_list, &upu); + } + } + + /* Bump UU_/PR_version so consumers that key off it see a change. + * + * Do NOT bump UU_/PR_usage_seqno: that field is the periodic decay + * handler's idempotency stamp - decay_userprj_usage skips when + * seqno == stored_seqno. The handler's tet_decay_run counter also + * starts at 0 and ++s by 1, so a booking bump would silently match a + * tick and the decay would be a no-op. (That collision produced + * u0 == u1 bit-exact in the sharetree_decay_periodic test.) */ + void bump_change_markers(lListElem *userprj, int version_field) { + lAddUlong(userprj, version_field, 1); + } +} + +int +sge_book_finished_job_usage(lListElem *jep, lListElem *jatep, monitoring_t * /* monitor */, uint64_t gdi_session) { + if (jep == nullptr || jatep == nullptr) { + return -1; + } + + /* Idempotency guard: skip if this ja_task has already been booked. + * The "finished_jobs" entry in JAT_scaled_usage_list is set below as + * part of the booking; if it is already there, the booking has run + * before and we must not double-count. Mirrors the pre-CS-1239 check + * in sge_calc_tickets ("we already handled this finished job"). + * Reschedule is safe: sge_give_jobs.cc clears JAT_scaled_usage_list on + * COMMIT_ST_RESCHEDULED / COMMIT_ST_USER_RESCHEDULED so the next exit + * re-seeds. The guard is defensive coverage for future paths that may + * re-invoke this helper (e.g. CS-1908 finished-job retention). */ + if (lGetSubStr(jatep, UA_name, "finished_jobs", JAT_scaled_usage_list) != nullptr) { + return 0; + } + + /* CS-1239: skip the entire booking when no master share tree is configured. + * The summed UU_/PR_/UPP_usage is consumed only by the share-tree allocator + * and by sge_share_mon, both of which are dormant in this case. This + * restores the pre-CS-1239 implicit gate ("no UU/PR objects -> no work"; + * see calc_usage.md) by checking the direct indicator. Trade-off: a + * qconf -Astree after a quiet period starts the share-tree allocator from + * zero history for one tick. Matching gate is in sge_sharetree_tick_handler. + * Caller holds LOCK_GLOBAL write (always via sge_commit_job), so reading + * the SHARETREE master list without an extra lock is safe. */ + if (lFirst(*ocs::DataStore::get_master_list(SGE_TYPE_SHARETREE)) == nullptr) { + return 0; + } + + const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER); + const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT); + + lListElem *user = user_list_locate(master_user_list, lGetString(jep, JB_owner)); + lListElem *project = nullptr; + const char *project_name = lGetString(jep, JB_project); + if (project_name != nullptr && *project_name != '\0') { + project = prj_list_locate(master_project_list, project_name); + } + + /* 1. catch up decay since last tick, then sum the finished job's usage + * into UU_/PR_/UPP_. Worker thread's sc_state_t.decay_constant is + * thread-local and uninitialized by default (== 0), so without this + * refresh decay_usage would multiply by 0 and zero everything. Same + * gap previously hit on the scheduler thread (fixed in + * sge_sharetree_tick_handler). */ + ocs::Usage::calculate_default_decay_constant(sconf_get_halftime()); + + const uint64_t curr_time = sge_get_gmt64(); + lList *decay_list = ocs::Usage::get_decay_list(); + lList *usage_weight_list = sconf_get_usage_weight_list(); + + /* Seed "finished_jobs"=1 onto the job's scaled_usage_list before we + * sum it into UU_/PR_/UPP_. The pending-priority calculation in + * sge_sort_pending_job_nodes (sgeee.cc) reads the leaf's + * STN_usage_list "finished_jobs" entry and adds it to job_count when + * computing per-pending-job tickets. calc_node_usage propagates the + * project's "finished_jobs" PR_usage entry up to the leaf's + * STN_usage_list. Without this seed the counter is never written + * anywhere and the formula treats every leaf as "0 finished" - the + * leaf with the fewest pending jobs wins by per-job dilution alone, + * starving leaves with many pending (project3 in the sharetree + * test). */ + lList *scaled = lGetListRW(jatep, JAT_scaled_usage_list); + if (scaled == nullptr) { + lSetList(jatep, JAT_scaled_usage_list, lCreateList("scaled_usage", UA_Type)); + scaled = lGetListRW(jatep, JAT_scaled_usage_list); + } + usage_list_set_double_usage(scaled, "finished_jobs", 1.0); + + /* CS-1239 (test convergence fix): the decay between the previous TET + * tick and this finish time must apply to the historical UU_/PR_usage + * BEFORE we sum the new job's scaled_usage onto it. Otherwise the + * stamp jumps forward to curr_time but the historical part is "stuck + * at the previous-tick decay state" - the next tick decays by + * (next_tick - curr_time) which loses the (curr_time - prev_tick) + * interval. For projects that finish often (project2 in the sharetree + * test) this loss accumulates: combined_usage stays artificially high, + * shr stays high relative to truth, allocator keeps picking project2, + * test fails. seqno = (u_long)-1 tells decay_userprj_usage to update + * the stamp but NOT update the stored seqno - the TET tick owns the + * seqno namespace, we just want a catch-up decay tick here. */ + ocs::Usage::decay_and_sum_usage(jep, jatep, nullptr, user, project, + decay_list, usage_weight_list, + static_cast(-1), curr_time); + lFreeList(&usage_weight_list); + + const uint32_t job_number = lGetUlong(jep, JB_job_number); + + /* 2. drop the per-job debited entry the sum step just appended (the job + * is gone; no further deltas to compute), and bump change markers so + * consumers see "something happened to this object". */ + if (user != nullptr) { + drop_debited_job(user, UU_debited_job_usage, job_number); + bump_change_markers(user, UU_version); + } + if (project != nullptr) { + drop_debited_job(project, PR_debited_job_usage, job_number); + bump_change_markers(project, PR_version); + } + + /* 3. mark user / project dirty for both event emission and spool. The + * sgeE_USER_MOD / sgeE_PROJECT_MOD events are deferred to the share- + * tree tick handler (sge_sharetree_tick_handler), which batches them + * with sgeE_NEW_SHARETREE inside one event-master transaction so the + * scheduler mirror sees user/project usage and tree state as one + * atomic snapshot (pre-CS-1239 ORT_share_tree invariant restored). + * Spool is deferred to the share-tree spool handler at its own + * cadence. */ + if (user != nullptr) { + const char *uname = lGetString(user, UU_name); + ocs::SharetreeUsage::mark_user_event_dirty(uname); + ocs::SharetreeUsage::mark_user_spool_dirty(uname); + } + if (project != nullptr) { + const char *pname = lGetString(project, PR_name); + ocs::SharetreeUsage::mark_project_event_dirty(pname); + ocs::SharetreeUsage::mark_project_spool_dirty(pname); + } + + /* CS-1239 step 5: this finish moved usage in UU_/PR_/UPP_, which means + * the master share tree (read by sge_share_mon and propagated to mirror + * clients via sgeE_NEW_SHARETREE) is now stale. Flag it so the TET + * share-tree tick handler republishes on its next tick. Cheap (one + * bool write under the LOCK_GLOBAL we already hold). */ + if (user != nullptr || project != nullptr) { + ocs::SharetreeUsage::mark_share_tree_dirty(); + } + + /* Bury (sge_bury_job) is performed by the caller (sge_commit_job's + * COMMIT_ST_FINISHED_FAILED_EE case) right after this helper returns. + * Keeping this function a pure "book the usage" routine keeps its + * pre-/post-conditions reviewable in isolation. */ + + return 0; +} diff --git a/source/daemons/qmaster/ocs_FinishedJob.h b/source/daemons/qmaster/ocs_FinishedJob.h new file mode 100644 index 000000000..95c7560cb --- /dev/null +++ b/source/daemons/qmaster/ocs_FinishedJob.h @@ -0,0 +1,58 @@ +#pragma once +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include + +#include "cull/cull.h" +#include "uti/sge_monitor.h" + +/** Apply the per-finish bookkeeping that, in the pre-CS-1239 scheme, was + * generated by the scheduler thread as the three orders ORT_update_user_usage, + * ORT_update_project_usage, and ORT_remove_job, and applied in worker threads + * via sge_follow_order. + * + * Called from inside sge_commit_job's COMMIT_ST_FINISHED_FAILED_EE case, after + * the JAT_status = JFINISHED transition and before sge_bury_job. External + * callers should never invoke this helper directly - sge_commit_job is the + * contract. + * + * Preconditions: caller holds LOCK_GLOBAL (write); JAT_status is JFINISHED. + * + * Steps: + * - resolve user / project refs; + * - sum the job's final scaled usage into UU_usage (or + * user.UU_project[project].UPP_usage when project is set), UU_long_term_usage + * / UPP_long_term_usage, PR_usage, and PR_long_term_usage. Catches up the + * decay since the last TET tick first, so the historical part stays in sync; + * - drop the job's entry from UU_debited_job_usage / PR_debited_job_usage + * (the job is gone, the tracking record must follow); + * - bump UU_version / PR_version (the change marker). UU_/PR_usage_seqno + * is deliberately NOT touched: it is the periodic decay handler's + * idempotency stamp; sharing it with the change marker would silently + * collide with the handler's run counter and skip decay ticks; + * - mark the (user, project) dirty for the TET sharetree-spool handler, + * which batches the sgeE_USER_MOD / sgeE_PROJECT_MOD events and the spool + * writes with sgeE_NEW_SHARETREE under one event-master transaction; + * - mark the master share tree dirty so the next TET tick republishes it. + * + * Returns 0 on success, a negative value on a hard error (missing jep/jatep). + */ +int sge_book_finished_job_usage(lListElem *jep, lListElem *jatep, monitoring_t *monitor, uint64_t gdi_session); diff --git a/source/daemons/qmaster/ocs_SharetreeUsage.cc b/source/daemons/qmaster/ocs_SharetreeUsage.cc new file mode 100644 index 000000000..218d63a8b --- /dev/null +++ b/source/daemons/qmaster/ocs_SharetreeUsage.cc @@ -0,0 +1,238 @@ +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include + +#include "evm/sge_event_master.h" + +#include "sgeobj/ocs_DataStore.h" +#include "sgeobj/sge_answer.h" +#include "sgeobj/sge_userprj.h" + +#include "uti/ocs_UniqueFifo.h" +#include "uti/sge_rmon_macros.h" +#include "uti/sge_time.h" + +#include "ocs_SharetreeUsage.h" +#include "sge_persistence_qmaster.h" +#include "sge_qmaster_timed_event.h" + +namespace { + /* Spool FIFOs - drained by the TET share-tree spool handler, no event + * traffic, just disk writes. */ + ocs::UniqueFifo dirty_users_spool; + ocs::UniqueFifo dirty_projects_spool; + + /* Event FIFOs - drained by the TET share-tree tick handler inside one + * event-master transaction together with sgeE_NEW_SHARETREE. */ + ocs::UniqueFifo dirty_users_event; + ocs::UniqueFifo dirty_projects_event; + + /* CS-1239 step 5: single bool, no FIFO - mirror clients only care that + * "something changed", not which leaves. Guarded by LOCK_GLOBAL like the + * FIFOs above. Starts true so the +5s bootstrap publish in + * sge_timer_start_periodic_tasks always populates a freshly started + * qmaster's master share tree with combined_usage / m_share. */ + bool share_tree_dirty = true; + + /* Spool one user by name. Skips silently if the user has been removed + * between mark_user_spool_dirty and the flush (qconf -duser race). */ + void spool_one_user(const std::string &name, uint64_t gdi_session) { + const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER); + lListElem *u = user_list_locate(master_user_list, name.c_str()); + if (u == nullptr) { + return; + } + lList *answer_list = nullptr; + sge_event_spool(&answer_list, 0, sgeE_USER_MOD, 0, 0, name.c_str(), + nullptr, nullptr, u, nullptr, nullptr, false, true, gdi_session); + answer_list_output(&answer_list); + } + + /* Spool one project by name. Skips silently if removed in the meantime. */ + void spool_one_project(const std::string &name, uint64_t gdi_session) { + const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT); + lListElem *p = prj_list_locate(master_project_list, name.c_str()); + if (p == nullptr) { + return; + } + lList *answer_list = nullptr; + sge_event_spool(&answer_list, 0, sgeE_PROJECT_MOD, 0, 0, name.c_str(), + nullptr, nullptr, p, nullptr, nullptr, false, true, gdi_session); + answer_list_output(&answer_list); + } + + /* Emit sgeE_USER_MOD for one user. Caller owns the event-master + * transaction. Skips silently if the user was removed since the mark. */ + void emit_one_user_event(const std::string &name, uint64_t gdi_session) { + const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER); + lListElem *u = user_list_locate(master_user_list, name.c_str()); + if (u == nullptr) { + return; + } + sge_add_event(0, sgeE_USER_MOD, 0, 0, name.c_str(), nullptr, nullptr, u, gdi_session); + } + + /* Emit sgeE_PROJECT_MOD for one project. Same contract as the user + * helper above. */ + void emit_one_project_event(const std::string &name, uint64_t gdi_session) { + const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT); + lListElem *p = prj_list_locate(master_project_list, name.c_str()); + if (p == nullptr) { + return; + } + sge_add_event(0, sgeE_PROJECT_MOD, 0, 0, name.c_str(), nullptr, nullptr, p, gdi_session); + } +} + +void +ocs::SharetreeUsage::mark_user_spool_dirty(const char *name) { + if (name == nullptr) { + return; + } + dirty_users_spool.push(name); +} + +void +ocs::SharetreeUsage::mark_project_spool_dirty(const char *name) { + if (name == nullptr) { + return; + } + dirty_projects_spool.push(name); +} + +void +ocs::SharetreeUsage::mark_user_event_dirty(const char *name) { + if (name == nullptr) { + return; + } + dirty_users_event.push(name); +} + +void +ocs::SharetreeUsage::mark_project_event_dirty(const char *name) { + if (name == nullptr) { + return; + } + dirty_projects_event.push(name); +} + +bool +ocs::SharetreeUsage::has_event_dirty() { + return !dirty_users_event.empty() || !dirty_projects_event.empty(); +} + +void +ocs::SharetreeUsage::emit_dirty_user_events(uint64_t gdi_session) { + while (!dirty_users_event.empty()) { + emit_one_user_event(dirty_users_event.pop_front(), gdi_session); + } +} + +void +ocs::SharetreeUsage::emit_dirty_project_events(uint64_t gdi_session) { + while (!dirty_projects_event.empty()) { + emit_one_project_event(dirty_projects_event.pop_front(), gdi_session); + } +} + +void +ocs::SharetreeUsage::emit_events_all(uint64_t gdi_session) { + emit_dirty_user_events(gdi_session); + emit_dirty_project_events(gdi_session); +} + +int +ocs::SharetreeUsage::spool_budget(int budget_ms, uint64_t gdi_session) { + const uint64_t budget_end = sge_get_gmt64() + static_cast(budget_ms) * 1000; + // @todo We might want to get master_user_list and master_project_list here once and pass it + // to spool_one_user() and spool_one_project(). + + /* Interleave users and projects so neither side starves the other when a + * burst hits one of them. */ + while (sge_get_gmt64() < budget_end) { + bool did_work = false; + if (!dirty_users_spool.empty()) { + spool_one_user(dirty_users_spool.pop_front(), gdi_session); + did_work = true; + } + if (!dirty_projects_spool.empty()) { + spool_one_project(dirty_projects_spool.pop_front(), gdi_session); + did_work = true; + } + if (!did_work) { + break; + } + } + + return static_cast(dirty_users_spool.size() + dirty_projects_spool.size()); +} + +void +ocs::SharetreeUsage::spool_all(uint64_t gdi_session) { + while (!dirty_users_spool.empty()) { + spool_one_user(dirty_users_spool.pop_front(), gdi_session); + } + while (!dirty_projects_spool.empty()) { + spool_one_project(dirty_projects_spool.pop_front(), gdi_session); + } +} + +void +ocs::SharetreeUsage::mark_share_tree_dirty() { + share_tree_dirty = true; +} + +bool +ocs::SharetreeUsage::consume_share_tree_dirty() { + const bool was_dirty = share_tree_dirty; + share_tree_dirty = false; + return was_dirty; +} + +void +sge_reschedule_sharetree_tick() { + DENTER(TOP_LAYER); + + te_delete_one_time_event(TYPE_SHARETREE_TICK_EVENT, 0, 0, "sharetree-tick"); + + te_event_t ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(5), + TYPE_SHARETREE_TICK_EVENT, ONE_TIME_EVENT, + 0, 0, "sharetree-tick"); + te_add_event(ev); + te_free_event(&ev); + + DRETURN_VOID; +} + +void +sge_reschedule_sharetree_spool() { + DENTER(TOP_LAYER); + + te_delete_one_time_event(TYPE_SHARETREE_SPOOL_EVENT, 0, 0, "sharetree-spool"); + + te_event_t ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(5), + TYPE_SHARETREE_SPOOL_EVENT, ONE_TIME_EVENT, + 0, 0, "sharetree-spool"); + te_add_event(ev); + te_free_event(&ev); + + DRETURN_VOID; +} diff --git a/source/daemons/qmaster/ocs_SharetreeUsage.h b/source/daemons/qmaster/ocs_SharetreeUsage.h new file mode 100644 index 000000000..709f591be --- /dev/null +++ b/source/daemons/qmaster/ocs_SharetreeUsage.h @@ -0,0 +1,151 @@ +#pragma once +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include + +namespace ocs { + /** Dirty-set of user and project objects awaiting work (event emission and/or + * spool to the persistent store), introduced by CS-1239. + * + * Two independent dirty axes, both backed by FIFO + dedup: + * + * - SPOOL dirty: worker threads call mark_user_spool_dirty / + * mark_project_spool_dirty after summing a finished job's usage into + * the in-memory user/project objects. The TET share-tree spool + * handler calls spool_budget once per tick to spool a budget-bounded + * slice. Shutdown calls spool_all to drain whatever is left before + * the spool backend is torn down. + * + * - EVENT dirty: worker threads and the TET decay handler call + * mark_user_event_dirty / mark_project_event_dirty for each user / + * project whose UU_/PR_/UPP_ usage just changed. The TET share-tree + * tick handler drains both FIFOs inside one event-master transaction + * (sge_set_commit_required / sge_commit) and batches the resulting + * sgeE_USER_MOD / sgeE_PROJECT_MOD events with the sgeE_NEW_SHARETREE + * republish, restoring the pre-CS-1239 invariant that mirror clients + * see the three event types as one atomic snapshot. Shutdown calls + * emit_events_all to emit whatever is left before the event master + * is torn down. + * + * The share-tree-dirty bool is independent of the user/project FIFOs: + * it is set when the master share tree itself is now stale (any finish, + * any decay pass) and consumed when the tick handler republishes. + * + * All methods must be called under LOCK_GLOBAL. The FIFOs and the bool + * are implicitly serialised through that lock; no second-level mutex. + */ + class SharetreeUsage { + public: + /** Mark a user object dirty for spooling. Names are de-duplicated: a + * second call with the same name while the first is still queued is + * a no-op. + */ + static void mark_user_spool_dirty(const char *name); + + /** Mark a project object dirty for spooling. De-duplicated. */ + static void mark_project_spool_dirty(const char *name); + + /** Drain the head of the user and project spool FIFOs to the spool + * backend for up to budget_ms milliseconds. Returns the residual queue + * size (sum of users + projects still spool-dirty). A non-zero return + * means the caller should reschedule sooner; zero means the next idle + * gap is fine. + */ + static int spool_budget(int budget_ms, uint64_t gdi_session); + + /** Drain both spool FIFOs entirely, ignoring any time budget. Called + * from the qmaster shutdown sequence before the spool backend is torn + * down. + */ + static void spool_all(uint64_t gdi_session); + + /** Mark a user object dirty for event emission. De-duplicated. The + * next share-tree tick will emit sgeE_USER_MOD for this user inside + * the publish transaction. + */ + static void mark_user_event_dirty(const char *name); + + /** Mark a project object dirty for event emission. De-duplicated. */ + static void mark_project_event_dirty(const char *name); + + /** Returns true if either event FIFO has at least one entry. The + * share-tree tick handler uses this to decide whether to open an + * event-master transaction at all. + */ + static bool has_event_dirty(); + + /** Drain the user event FIFO, emitting sgeE_USER_MOD for each entry + * via sge_add_event. Must be called inside an open event-master + * transaction (sge_set_commit_required ... sge_commit) so the events + * ship as one package with the trailing sgeE_NEW_SHARETREE. + */ + static void emit_dirty_user_events(uint64_t gdi_session); + + /** Drain the project event FIFO, emitting sgeE_PROJECT_MOD for each + * entry. Same transaction-context contract as emit_dirty_user_events. + */ + static void emit_dirty_project_events(uint64_t gdi_session); + + /** Drain both event FIFOs entirely, emitting the queued MOD events. + * Called from the qmaster shutdown sequence so finish-driven events + * do not get dropped on the last tick interval. Caller is responsible + * for the surrounding transaction (or for accepting the per-event + * package overhead if shutdown speed beats atomicity here). + */ + static void emit_events_all(uint64_t gdi_session); + + /** CS-1239 step 5: mark the master share tree as "needs republish". + * Set by the worker thread after sge_book_finished_job_usage sums a + * finish into UU_/PR_/UPP_ usage, and by the TET decay handler after + * a decay pass. Consumed (and cleared) by the TET share-tree tick + * handler when it next emits sgeE_NEW_SHARETREE. + */ + static void mark_share_tree_dirty(); + + /** Returns true and clears the flag atomically (under LOCK_GLOBAL). + * The tick handler uses this so the "saw dirty -> clear -> publish" + * sequence cannot lose a concurrent mark. + */ + static bool consume_share_tree_dirty(); + }; +} + +/** CS-1239: re-schedule the periodic share-tree tick event with the *current* + * mconf_get_sharetree_tick_interval() value. Drops the pending one-time event + * and queues a fresh one at +5 s so a config change to STREE_TICK_INTERVAL + * takes effect within seconds rather than after the remaining (up to 300 s) + * lifetime of the already-queued event. Called from + * configuration_qmaster.cc::do_mod_config after merge_configuration() + * re-parses qmaster_params. Safe to call when no tick event is queued + * (delete is a no-op in that case); also safe to call from any thread + * (te_add_event / te_delete_one_time_event are MT-safe). */ +void +sge_reschedule_sharetree_tick(); + +/** CS-1239: re-schedule the periodic share-tree spool event. Same + * rationale as sge_reschedule_sharetree_tick() but for STREE_SPOOL_INTERVAL: + * a config change to STREE_SPOOL_INTERVAL would otherwise only take effect + * after the already-queued event fires (up to 240 s away on default + * configs), which made tests like issue_1385 - which pin the interval and + * read the spool file shortly after - flake. Drops the pending event and + * re-queues at +5 s. Same thread-safety / no-op-if-not-queued guarantees. */ +void +sge_reschedule_sharetree_spool(); diff --git a/source/daemons/qmaster/reschedule.cc b/source/daemons/qmaster/reschedule.cc index 9096b6d19..fcf42b5f9 100644 --- a/source/daemons/qmaster/reschedule.cc +++ b/source/daemons/qmaster/reschedule.cc @@ -229,8 +229,17 @@ reschedule_jobs(lListElem *ep, uint32_t force, lList **answer, monitoring_t *mon /* * Find all jobs currently running on the host/queue * append the jobids/taskids into a sublist of the exechost object + * + * Cache the next pointer before processing: with ENABLE_RESCHEDULE_KILL + * reschedule_job() -> sge_commit_job(COMMIT_ST_FINISHED_FAILED_EE) now + * buries the job synchronously (CS-1239 consolidation), which removes + * the current jep from this very list. A plain for_each_rw_lv would + * dereference next through the freed element. */ - for_each_rw_lv(jep, *(ocs::DataStore::get_master_list_rw(SGE_TYPE_JOB))) { + lListElem *jep; + lListElem *next_jep = lFirstRW(*ocs::DataStore::get_master_list_rw(SGE_TYPE_JOB)); + while ((jep = next_jep) != nullptr) { + next_jep = lNextRW(jep); reschedule_job(jep, nullptr, ep, force, answer, monitor, is_manual, gdi_session); } ret = 0; diff --git a/source/daemons/qmaster/sge_c_gdi.cc b/source/daemons/qmaster/sge_c_gdi.cc index 4b2605bef..bf3900d42 100644 --- a/source/daemons/qmaster/sge_c_gdi.cc +++ b/source/daemons/qmaster/sge_c_gdi.cc @@ -338,18 +338,6 @@ sge_c_gdi_process_in_worker(ocs::gdi::Packet *packet, ocs::gdi::Task *task, case ORT_ptickets: sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_ptickets"); break; - case ORT_remove_job: - sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_remove_job"); - break; - case ORT_update_project_usage: - sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_update_project_usage"); - break; - case ORT_update_user_usage: - sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_update_user_usage"); - break; - case ORT_share_tree: - sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_share_tree"); - break; case ORT_remove_immediate_job: sge_dstring_sprintf_append(&target_dstr, " %s", "ORT_remove_immediate_job"); break; diff --git a/source/daemons/qmaster/sge_follow.cc b/source/daemons/qmaster/sge_follow.cc index 1d280cdfc..bf2d37f56 100644 --- a/source/daemons/qmaster/sge_follow.cc +++ b/source/daemons/qmaster/sge_follow.cc @@ -204,7 +204,6 @@ sge_follow_order(lListElem *ep, char *ruser, char *rhost, lList **topp, monitori uint32_t state; uint32_t pe_slots = 0, q_slots = 0, q_version; lListElem *pe = nullptr; - bool is_spool = do_stree_spooling(); uint64_t now = sge_get_gmt64(); or_type = lGetUlong(ep, OR_type); @@ -1075,23 +1074,21 @@ sge_follow_order(lListElem *ep, char *ruser, char *rhost, lList **topp, monitori } break; - /* ----------------------------------------------------------------------- - * REMOVE JOBS THAT ARE WAITING FOR SCHEDD'S PERMISSION TO GET DELETED - * - * Using this order schedd can only remove jobs in the - * "dead but not buried" state - * - * ----------------------------------------------------------------------- */ - case ORT_remove_job: /* ----------------------------------------------------------------------- * REMOVE IMMEDIATE JOBS THAT COULD NOT GET SCHEDULED IN THIS PASS * * Using this order schedd can only remove idle immediate jobs * (former ORT_remove_interactive_job) + * + * CS-1239: ORT_remove_job is gone - finished-job removal now happens + * inline in the worker thread (sge_commit_job(COMMIT_ST_FINISHED_FAILED_EE) + * books usage and buries the job in one step), so the scheduler never + * emits ORT_remove_job and this case no longer needs to handle the + * shared "remove finished job" path. * ----------------------------------------------------------------------- */ case ORT_remove_immediate_job: { lList *master_job_list = *ocs::DataStore::get_master_list_rw(SGE_TYPE_JOB); - DPRINTF("ORDER: ORT_remove_immediate_job or ORT_remove_job\n"); + DPRINTF("ORDER: ORT_remove_immediate_job\n"); job_number = lGetUlong(ep, OR_job_number); if (job_number == 0) { @@ -1100,22 +1097,15 @@ sge_follow_order(lListElem *ep, char *ruser, char *rhost, lList **topp, monitori } task_number = lGetUlong(ep, OR_ja_task_number); if (task_number == 0) { - ERROR(MSG_JOB_NOORDERTASK_US, job_number, (or_type == ORT_remove_immediate_job) ? "ORT_remove_immediate_job" : "ORT_remove_job"); + ERROR(MSG_JOB_NOORDERTASK_US, job_number, "ORT_remove_immediate_job"); DRETURN(-2); } - DPRINTF("ORDER: remove %sjob " sge_u32 "." sge_u32 "\n", or_type == ORT_remove_immediate_job ? "immediate " : "", job_number, task_number); + DPRINTF("ORDER: remove immediate job " sge_u32 "." sge_u32 "\n", job_number, task_number); jep = lGetElemUlongRW(master_job_list, JB_job_number, job_number); if (jep == nullptr) { - if (or_type == ORT_remove_job) { - ERROR(MSG_JOB_FINDJOB_U, job_number); - /* try to repair schedd data - session is unknown here */ - sge_add_event(now, sgeE_JOB_DEL, job_number, task_number, nullptr, nullptr, nullptr, nullptr, gdi_session); - DRETURN(-1); - } else { - /* in case of an immediate parallel job the job could be missing */ - INFO(MSG_JOB_FINDJOB_U, job_number); - DRETURN(0); - } + /* in case of an immediate parallel job the job could be missing */ + INFO(MSG_JOB_FINDJOB_U, job_number); + DRETURN(0); } jatp = job_search_task(jep, nullptr, task_number); @@ -1131,10 +1121,6 @@ sge_follow_order(lListElem *ep, char *ruser, char *rhost, lList **topp, monitori DRETURN(-1); } - if (or_type == ORT_remove_job) { - ERROR(MSG_JOB_ORDERDELINCOMPLETEJOB_UU, job_number, task_number); - lSetUlong(jatp, JAT_status, JFINISHED); - } sge_event_spool(&answer_list, 0, sgeE_JATASK_ADD, job_number, task_number, nullptr, nullptr, lGetString(jep, JB_session), @@ -1142,295 +1128,40 @@ sge_follow_order(lListElem *ep, char *ruser, char *rhost, lList **topp, monitori answer_list_output(&answer_list); } - if (or_type == ORT_remove_job) { - if (lGetUlong(jatp, JAT_status) != JFINISHED) { - ERROR(MSG_JOB_REMOVENOTFINISHED_U, lGetUlong(jep, JB_job_number)); - DRETURN(-1); - } - - /* remove it */ - sge_commit_job(jep, jatp, nullptr, COMMIT_ST_DEBITED_EE, COMMIT_DEFAULT, monitor, gdi_session); - } else { - if (!JOB_TYPE_IS_IMMEDIATE(lGetUlong(jep, JB_type))) { - if (lGetString(jep, JB_script_file)) { - ERROR(MSG_JOB_REMOVENONINTERACT_U, lGetUlong(jep, JB_job_number)); - } else { - ERROR(MSG_JOB_REMOVENONIMMEDIATE_U, lGetUlong(jep, JB_job_number)); - } - DRETURN(-1); - } - if (lGetUlong(jatp, JAT_status) != JIDLE) { - ERROR(MSG_JOB_REMOVENOTIDLEIA_U, lGetUlong(jep, JB_job_number)); - DRETURN(-1); - } - INFO(MSG_JOB_NOFREERESOURCEIA_UU, lGetUlong(jep, JB_job_number), lGetUlong(jatp, JAT_task_number), lGetString(jep, JB_owner)); - - /* remove it */ - sge_commit_job(jep, jatp, nullptr, COMMIT_ST_NO_RESOURCES, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor, gdi_session); - } - break; - } - -// @todo CS-272: should not be required as soon as deletion of job specific -#if 1 - /* ----------------------------------------------------------------------- - * REPLACE A PROJECT'S - * - * - PR_usage - * - PR_usage_time_stamp - * - PR_long_term_usage - * - PR_debited_job_usage - * - * Using this order schedd can debit usage on users/projects - * both orders are handled identically except target list - * ----------------------------------------------------------------------- */ - case ORT_update_project_usage: - DPRINTF("ORDER: ORT_update_project_usage\n"); - { - const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT); - - DPRINTF("ORDER: update %d projects\n", lGetNumberOfElem(lGetList(ep, OR_joker))); - - lListElem *up_order; - for_each_rw (up_order, lGetList(ep, OR_joker)) { - int pos = lGetPosViaElem(up_order, PR_name, SGE_NO_ABORT); - if (pos < 0) { - continue; - } - const char *up_name = lGetString(up_order, PR_name); - if (up_name == nullptr) { - continue; - } - lListElem *up = prj_list_locate(master_project_list, up_name); - if (up == nullptr) { - /* order contains reference to unknown user/prj object */ - continue; - } - - DPRINTF("%s %s usage updating with %d jobs\n", MSG_OBJ_PRJ, - up_name, lGetNumberOfElem(lGetList(up_order, PR_debited_job_usage))); - - if (!(up = prj_list_locate(master_project_list, up_name))) { - /* order contains reference to unknown user/prj object */ - continue; - } - - // @todo does the version have a meaning here? - if ((pos = lGetPosViaElem(up_order, PR_version, SGE_NO_ABORT)) >= 0 && - (lGetPosUlong(up_order, pos) != lGetUlong(up, PR_version))) { - /* order contains update for outdated user/project usage */ - WARNING(MSG_ORD_USRPRJVERSION_SUU, up_name, lGetPosUlong(up_order, pos), lGetUlong(up, PR_version)); - /* Note: Should we apply the debited job usage in this case? */ - continue; - } - - lAddUlong(up, PR_version, 1); - - if ((pos = lGetPosViaElem(up_order, PR_project, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, PR_project, up, PR_project); - } - - if ((pos = lGetPosViaElem(up_order, PR_usage_time_stamp, SGE_NO_ABORT)) >= 0) - lSetUlong64(up, PR_usage_time_stamp, lGetPosUlong64(up_order, pos)); - - if ((pos = lGetPosViaElem(up_order, PR_usage, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, PR_usage, up, PR_usage); - } - - if ((pos = lGetPosViaElem(up_order, PR_long_term_usage, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, PR_long_term_usage, up, PR_long_term_usage); - } - - /* update old usage in up for each job appearing in - PR_debited_job_usage of 'up_order' */ - lListElem *ju; - lListElem *next = lFirstRW(lGetList(up_order, PR_debited_job_usage)); - while ((ju = next)) { - next = lNextRW(ju); - - job_number = lGetUlong(ju, UPU_job_number); - - /* seek for existing debited usage of this job */ - lListElem *up_ju; - if ((up_ju = lGetSubUlongRW(up, UPU_job_number, job_number, PR_debited_job_usage))) { - - /* if passed old usage list is nullptr, delete existing usage */ - if (lGetList(ju, UPU_old_usage_list) == nullptr) { - lRemoveElem(lGetListRW(up_order, PR_debited_job_usage), &ju); - lRemoveElem(lGetListRW(up, PR_debited_job_usage), &up_ju); - } else { - /* still exists - replace old usage with new one */ - DPRINTF("updating debited usage for job " sge_u32 "\n", job_number); - lSwapList(ju, UPU_old_usage_list, up_ju, UPU_old_usage_list); - } - - } else { - /* unchain ju element and chain it into our user/prj object */ - DPRINTF("adding debited usage for job " sge_u32 "\n", job_number); - lDechainElem(lGetListRW(up_order, PR_debited_job_usage), ju); - - if (lGetList(ju, UPU_old_usage_list) != nullptr) { - /* unchain ju element and chain it into our user/prj object */ - lList *tlp; - if (!(tlp = lGetListRW(up, PR_debited_job_usage))) { - tlp = lCreateList(up_name, UPU_Type); - lSetList(up, PR_debited_job_usage, tlp); - } - lInsertElem(tlp, nullptr, ju); - } else { - /* do not chain in empty empty usage records */ - lFreeElem(&ju); - } - } - } - - /* spool and send event */ - lList *answer_list = nullptr; - sge_event_spool(&answer_list, 0, sgeE_PROJECT_MOD, 0, 0, up_name, - nullptr, nullptr, up, nullptr, nullptr, - true, is_spool, gdi_session); - answer_list_output(&answer_list); - } - } - break; - - /* ----------------------------------------------------------------------- - * REPLACE A USER - * - * - UU_usage - * - UU_usage_time_stamp - * - UU_long_term_usage - * - UU_debited_job_usage - * - * Using this order schedd can debit usage on users/projects - * both orders are handled identically except target list - * ----------------------------------------------------------------------- */ - case ORT_update_user_usage: - DPRINTF("ORDER: ORT_update_user_usage\n"); - { - lListElem *up_order, *up, *ju, *up_ju, *next; - int pos; - const char *up_name; - lList *tlp; - const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER); - - DPRINTF("ORDER: update %d users\n", lGetNumberOfElem(lGetList(ep, OR_joker))); - - for_each_rw (up_order, lGetList(ep, OR_joker)) { - if ((pos = lGetPosViaElem(up_order, UU_name, SGE_NO_ABORT)) < 0 || - !(up_name = lGetString(up_order, UU_name))) { - continue; - } - - DPRINTF("%s %s usage updating with %d jobs\n", MSG_OBJ_USER, - up_name, lGetNumberOfElem(lGetList(up_order, UU_debited_job_usage))); - - if (!(up = user_list_locate(master_user_list, up_name))) { - /* order contains reference to unknown user/prj object */ - continue; - } - - if ((pos = lGetPosViaElem(up_order, UU_version, SGE_NO_ABORT)) >= 0 && - (lGetPosUlong(up_order, pos) != lGetUlong(up, UU_version))) { - /* order contains update for outdated user/project usage */ - WARNING(MSG_ORD_USRPRJVERSION_SUU, up_name, lGetPosUlong(up_order, pos), lGetUlong(up, UU_version)); - /* Note: Should we apply the debited job usage in this case? */ - continue; - } - - lAddUlong(up, UU_version, 1); - - if ((pos = lGetPosViaElem(up_order, UU_project, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, UU_project, up, UU_project); - } - - if ((pos = lGetPosViaElem(up_order, UU_usage_time_stamp, SGE_NO_ABORT)) >= 0) - lSetUlong64(up, UU_usage_time_stamp, lGetPosUlong64(up_order, pos)); - - if ((pos = lGetPosViaElem(up_order, UU_usage, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, UU_usage, up, UU_usage); - } - - if ((pos = lGetPosViaElem(up_order, UU_long_term_usage, SGE_NO_ABORT)) >= 0) { - lSwapList(up_order, UU_long_term_usage, up, UU_long_term_usage); - } - - /* update old usage in up for each job appearing in - UU_debited_job_usage of 'up_order' */ - next = lFirstRW(lGetList(up_order, UU_debited_job_usage)); - while ((ju = next)) { - next = lNextRW(ju); - - job_number = lGetUlong(ju, UPU_job_number); - - /* seek for existing debited usage of this job */ - if ((up_ju = lGetSubUlongRW(up, UPU_job_number, job_number, UU_debited_job_usage))) { - - /* if passed old usage list is nullptr, delete existing usage */ - if (lGetList(ju, UPU_old_usage_list) == nullptr) { - - lRemoveElem(lGetListRW(up_order, UU_debited_job_usage), &ju); - lRemoveElem(lGetListRW(up, UU_debited_job_usage), &up_ju); - - } else { - - /* still exists - replace old usage with new one */ - DPRINTF("updating debited usage for job " sge_u32 "\n", job_number); - lSwapList(ju, UPU_old_usage_list, up_ju, UPU_old_usage_list); - } - - } else { - /* unchain ju element and chain it into our user/prj object */ - DPRINTF("adding debited usage for job " sge_u32 "\n", job_number); - lDechainElem(lGetListRW(up_order, UU_debited_job_usage), ju); - - if (lGetList(ju, UPU_old_usage_list) != nullptr) { - /* unchain ju element and chain it into our user/prj object */ - if (!(tlp = lGetListRW(up, UU_debited_job_usage))) { - tlp = lCreateList(up_name, UPU_Type); - lSetList(up, UU_debited_job_usage, tlp); - } - lInsertElem(tlp, nullptr, ju); - } else { - /* do not chain in empty empty usage records */ - lFreeElem(&ju); - } - } - } - - /* spool and send event */ - - { - lList *answer_list = nullptr; - sge_event_spool(&answer_list, 0, sgeE_USER_MOD, 0, 0, up_name, - nullptr, nullptr, up, nullptr, nullptr, - true, is_spool, gdi_session); - answer_list_output(&answer_list); - } + if (!JOB_TYPE_IS_IMMEDIATE(lGetUlong(jep, JB_type))) { + if (lGetString(jep, JB_script_file)) { + ERROR(MSG_JOB_REMOVENONINTERACT_U, lGetUlong(jep, JB_job_number)); + } else { + ERROR(MSG_JOB_REMOVENONIMMEDIATE_U, lGetUlong(jep, JB_job_number)); } + DRETURN(-1); } - break; -#endif - - /* ----------------------------------------------------------------------- - * FILL IN SEVERAL SCHEDULING VALUES INTO QMASTER'S SHARE TREE - * TO BE DISPLAYED BY QMON AND OTHER CLIENTS - * ----------------------------------------------------------------------- */ - case ORT_share_tree: { - lList *master_stree_list = *ocs::DataStore::get_master_list_rw(SGE_TYPE_SHARETREE); - - DPRINTF("ORDER: ORT_share_tree\n"); - ocs::ShareTree::zero_fields(lFirstRW(master_stree_list)); - if (update_sharetree(master_stree_list, lGetListRW(ep, OR_joker)) != 0) { - DPRINTF("ORDER: ORT_share_tree failed\n"); + if (lGetUlong(jatp, JAT_status) != JIDLE) { + ERROR(MSG_JOB_REMOVENOTIDLEIA_U, lGetUlong(jep, JB_job_number)); DRETURN(-1); } + INFO(MSG_JOB_NOFREERESOURCEIA_UU, lGetUlong(jep, JB_job_number), lGetUlong(jatp, JAT_task_number), lGetString(jep, JB_owner)); - // no need to spool but other data stores need to be updated - sge_add_event(now, sgeE_NEW_SHARETREE, 0, 0, nullptr, nullptr, - nullptr, lFirstRW(master_stree_list), gdi_session); - } + /* remove it */ + sge_commit_job(jep, jatp, nullptr, COMMIT_ST_NO_RESOURCES, COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor, gdi_session); break; + } + + /* CS-1239: ORT_update_project_usage and ORT_update_user_usage handlers + * removed. User/project debited-job-usage updates now happen directly + * in the worker thread (sge_book_finished_job_usage), which marks the + * affected user/project event-dirty + spool-dirty via + * ocs::SharetreeUsage. The TET share-tree tick handler + * (sge_sharetree_tick_handler in sge_thread_timer.cc) drains the event + * FIFOs and batches USER_MOD / PROJECT_MOD with the trailing + * NEW_SHARETREE inside one event-master transaction; the TET dirty- + * objects flush handler takes care of throttled spool persistence. */ + + /* CS-1239 step 5: ORT_share_tree handler removed. Recomputation + + * sgeE_NEW_SHARETREE emission now happen in the merged TET share-tree + * tick handler (see above), driven by ocs::SharetreeUsage:: + * mark_share_tree_dirty() set from sge_book_finished_job_usage and + * from the decay step of the tick itself. */ /* ----------------------------------------------------------------------- * UPDATE FIELDS IN SCHEDULING CONFIGURATION diff --git a/source/daemons/qmaster/sge_give_jobs.cc b/source/daemons/qmaster/sge_give_jobs.cc index b185a71e1..5e2093706 100644 --- a/source/daemons/qmaster/sge_give_jobs.cc +++ b/source/daemons/qmaster/sge_give_jobs.cc @@ -78,6 +78,7 @@ #include "spool/sge_spooling.h" #include "ocs_CategoryQmaster.h" +#include "ocs_FinishedJob.h" #include "ocs_ReportingFileWriter.h" #include "sge.h" #include @@ -884,24 +885,35 @@ create_timed_events_for_simulated_jobs() { * sge_commit_mode_t mode, sge_commit_flags_t commit_flags) * * FUNCTION -* sge_commit_job() implements job state transitons. When a dispatch -* order arrives from schedd the job is sent asynchonously to execd and -* sge_commit_job() is called with mode==COMMIT_ST_SENT. When a job report -* arrives from the execd mode is COMMIT_ST_ARRIVED. When the job failed -* or finished mode is COMMIT_ST_FINISHED_FAILED or -* COMMIT_ST_FINISHED_FAILED_EE depending on product mode: +* sge_commit_job() implements job state transitions. The mode parameter +* selects which transition: * -* A SGE job can be removed immediately when it is finished -* (mode==COMMIT_ST_FINISHED_FAILED). A SGEEE job may not be deleted -* (mode==COMMIT_ST_FINISHED_FAILED_EE) before the SGEEE scheduler has debited -* the jobs resource consumption in the corresponding objects (project/user/..). -* Only the job script may be deleted at this stage. When an order arrives at -* qmaster telling that debitation was done (mode==COMMIT_ST_DEBITED_EE) the -* job can be deleted. +* COMMIT_ST_SENT Dispatch order from schedd: send the job +* asynchronously to execd and mark JTRANSFERING. +* COMMIT_ST_ARRIVED Job report from execd: mark JRUNNING. +* COMMIT_ST_FINISHED_FAILED Tear down an unenrolled array task that +* never ran (callers pair this with +* COMMIT_UNENROLLED_TASK). Log, fire the +* DRMAA finish event, bury. No usage to book, +* no host/queue debits to release. +* COMMIT_ST_FINISHED_FAILED_EE GEEE finish: clear host/queue debits, +* emit JOB_FINAL_USAGE events, book usage into +* UU_/PR_/UPP_, bury. Pre-CS-1239 the booking +* was done by the scheduler via the ORT_remove_job +* order and a follow-up COMMIT_ST_DEBITED_EE +* call; CS-1239 folded both into this mode. +* COMMIT_ST_NO_RESOURCES Schedd's ORT_remove_immediate_job order: bury +* an interactive job that could not be scheduled. +* COMMIT_ST_RESCHEDULED / +* _USER_RESCHEDULED / +* _FAILED_AND_ERROR Reset the ja_task back to JIDLE/JQUEUED for +* another scheduling pass; preserve previous usage. +* COMMIT_ST_DELIVERY_FAILED Same as RESCHEDULED, but sge_clear_granted_ +* resources() must not increase free slots. * -* sge_commit_job() releases resources when a job is no longer running. -* Also state changes jobs are spooled and finally spooling files are -* deleted. Also jobs start time is set to the actual time when job is sent. +* sge_commit_job() spools the affected ja_task / job and emits the matching +* mirror events. For the finish modes it also burys the ja_task / job (and +* potentially the whole job, if this was the last ja_task). * * INPUTS * lListElem *jep - the job @@ -917,13 +929,10 @@ create_timed_events_for_simulated_jobs() { void sge_commit_job(lListElem *jep, lListElem *jatep, lListElem *jr, sge_commit_mode_t mode, int commit_flags, monitoring_t *monitor, uint64_t gdi_session) { - lListElem *tmp_ja_task; lListElem *global_host_ep; lUlong jobid, jataskid; - int no_unlink = 0; int spool_job = !(commit_flags & COMMIT_NO_SPOOLING); int no_events = (commit_flags & COMMIT_NO_EVENTS); - int unenrolled_task = (commit_flags & COMMIT_UNENROLLED_TASK); uint64_t now = sge_get_gmt64(); const char *session; lList *answer_list = nullptr; @@ -1225,18 +1234,28 @@ sge_commit_job(lListElem *jep, lListElem *jatep, lListElem *jr, sge_commit_mode_ break; case COMMIT_ST_FINISHED_FAILED: - ocs::ReportingFileWriter::create_job_logs(nullptr, now, JL_FINISHED, MSG_QMASTER, qualified_hostname, jr, jep, - jatep, nullptr, MSG_LOG_EXITED); + /* "Tear down an unenrolled array task" - in current callers this mode is + * always paired with COMMIT_UNENROLLED_TASK, i.e. the ja_task only existed + * as a pending range entry, was never enrolled, never ran. So there are + * no host/queue debits to release and no UU_/PR_ usage to book - we just + * log, fire the DRMAA finish event, and bury. */ + ocs::ReportingFileWriter::create_job_logs(nullptr, now, JL_FINISHED, MSG_QMASTER, qualified_hostname, + jr, jep, jatep, nullptr, MSG_LOG_EXITED); remove_from_reschedule_unknown_lists(jobid, jataskid, gdi_session); - if (!unenrolled_task) { - sge_clear_granted_resources(jep, jatep, 1, monitor, gdi_session); - } sge_job_finish_event(jep, jatep, jr, commit_flags, nullptr, gdi_session); sge_bury_job(sge_root, jep, jobid, jatep, spool_job, no_events, gdi_session); break; case COMMIT_ST_FINISHED_FAILED_EE: + /* CS-1239 consolidation: the legacy two-step (FINISHED_FAILED_EE then, + * after the scheduler debited usage via ORT_remove_job, DEBITED_EE) is + * collapsed into one step. The order roundtrip is gone, so the booking + * and the bury happen here, right after the JFINISHED transition. + * Eliminates one reporting-log entry, one sgeE_JATASK_MOD event, one + * jatep spool write that the next instruction would delete, and the + * duplicated script-delete + successor-release pair (sge_bury_job + * does both). */ ocs::ReportingFileWriter::create_job_logs(nullptr, now, JL_FINISHED, MSG_QMASTER, qualified_hostname, - jr, jep, jatep, nullptr, MSG_LOG_WAIT4SGEDEL); + jr, jep, jatep, nullptr, MSG_LOG_EXITED); remove_from_reschedule_unknown_lists(jobid, jataskid, gdi_session); lSetUlong(jatep, JAT_status, JFINISHED); @@ -1249,53 +1268,29 @@ sge_commit_job(lListElem *jep, lListElem *jatep, lListElem *jr, sge_commit_mode_ } sge_clear_granted_resources(jep, jatep, 1, monitor, gdi_session); for_each_rw_lv(petask, lGetList(jatep, JAT_task_list)) { - sge_add_list_event(now, sgeE_JOB_FINAL_USAGE, jobid, - jataskid, - lGetString(petask, PET_id), - nullptr, session, + sge_add_list_event(now, sgeE_JOB_FINAL_USAGE, jobid, jataskid, + lGetString(petask, PET_id), nullptr, session, lGetListRW(petask, PET_scaled_usage), gdi_session); } - - sge_add_list_event(now, sgeE_JOB_FINAL_USAGE, jobid, jataskid, - nullptr, nullptr, session, lGetListRW(jatep, JAT_scaled_usage_list), gdi_session); - - spool_transaction(&answer_list, spool_get_default_context(), STC_begin); - - // @todo send event and do this earlier - sge_event_spool(&answer_list, 0, sgeE_JATASK_MOD, jobid, jataskid, nullptr, nullptr, - session, jep, jatep, nullptr, true, true, gdi_session); - - if (job_get_not_enrolled_ja_tasks(jep)) { - no_unlink = 1; - } else { - /* finished all ja-tasks => remove job script */ - for_each_rw(tmp_ja_task, lGetList(jep, JB_ja_tasks)) { - if (lGetUlong(tmp_ja_task, JAT_status) != JFINISHED) { - no_unlink = 1; - break; - } - } - } - - if (!no_unlink) { - release_successor_jobs(jep, gdi_session); - release_successor_jobs_ad(jep, gdi_session); - if ((lGetString(jep, JB_exec_file) != nullptr) && !JOB_TYPE_IS_BINARY(lGetUlong(jep, JB_type))) { - spool_delete_script(&answer_list, jobid, jep); - } - } - spool_transaction(&answer_list, spool_get_default_context(), STC_commit); - answer_list_output(&answer_list); + sge_add_list_event(now, sgeE_JOB_FINAL_USAGE, jobid, jataskid, nullptr, nullptr, session, + lGetListRW(jatep, JAT_scaled_usage_list), gdi_session); + + /* Book the finished job's usage into UU_/PR_/UPP_ (was the second + * chained call from callers). No events or spool here - those are + * deferred to the TET sharetree-spool handler. */ + sge_book_finished_job_usage(jep, jatep, monitor, gdi_session); + + /* Bury the ja_task / job. sge_bury_job opens its own spool transaction + * around its writes (spool_delete_script + spool_delete_object) and + * emits sgeE_JATASK_DEL or sgeE_JOB_DEL. No outer transaction here + * because BerkeleyDB transactions do not nest. */ + sge_bury_job(sge_root, jep, jobid, jatep, spool_job, no_events, gdi_session); break; - case COMMIT_ST_DEBITED_EE: /* triggered by ORT_remove_job */ case COMMIT_ST_NO_RESOURCES: /* triggered by ORT_remove_immediate_job */ - ocs::ReportingFileWriter::create_job_logs(nullptr, now, JL_DELETED, MSG_SCHEDD, qualified_hostname, jr, jep, jatep, nullptr, - (mode == COMMIT_ST_DEBITED_EE) ? MSG_LOG_DELSGE : MSG_LOG_DELIMMEDIATE); - - if (mode == COMMIT_ST_NO_RESOURCES) { - sge_job_finish_event(jep, jatep, jr, commit_flags, nullptr, gdi_session); - } + ocs::ReportingFileWriter::create_job_logs(nullptr, now, JL_DELETED, MSG_SCHEDD, qualified_hostname, + jr, jep, jatep, nullptr, MSG_LOG_DELIMMEDIATE); + sge_job_finish_event(jep, jatep, jr, commit_flags, nullptr, gdi_session); sge_bury_job(sge_root, jep, jobid, jatep, spool_job, no_events, gdi_session); break; @@ -1307,6 +1302,8 @@ sge_commit_job(lListElem *jep, lListElem *jatep, lListElem *jr, sge_commit_mode_ lSetUlong(jatep, JAT_status, JIDLE); lSetUlong(jatep, JAT_state, JQUEUED | JWAITING); sge_clear_granted_resources(jep, jatep, 0, monitor, gdi_session); + // @todo: Verify: When delivery has been tried, hasn't the JATASK been spooled yet with state JTRANSFERRING? + sge_event_spool(&answer_list, now, sgeE_JATASK_MOD, jobid, jataskid, nullptr, nullptr, session, jep, jatep, nullptr, true, false, gdi_session); answer_list_output(&answer_list); @@ -1821,11 +1818,9 @@ sge_bury_job(const char *sge_root, lListElem *job, uint32_t job_id, lListElem *j * Remove one ja task */ if (remove_job) { - /* we might have done this before, but to make sure, that we - did not miss it, we do it again.... */ release_successor_jobs(job, gdi_session); - /* and this too, also flushes task dependency cache */ + /* also flushes task dependency cache */ release_successor_jobs_ad(job, gdi_session); /* diff --git a/source/daemons/qmaster/sge_give_jobs.h b/source/daemons/qmaster/sge_give_jobs.h index 7abe82bf3..1c196432f 100644 --- a/source/daemons/qmaster/sge_give_jobs.h +++ b/source/daemons/qmaster/sge_give_jobs.h @@ -52,8 +52,7 @@ typedef enum { COMMIT_ST_ARRIVED = 1, /* job was reported as running by execd */ COMMIT_ST_RESCHEDULED = 2, /* job gets rescheduled */ COMMIT_ST_FINISHED_FAILED = 3, /* GE job finished or failed (FINISH/ABORT) */ - COMMIT_ST_FINISHED_FAILED_EE = 4, /* GEEE job finished or failed (FINISH/ABORT) */ - COMMIT_ST_DEBITED_EE = 5, /* remove after GEEE scheduler debited usage */ + COMMIT_ST_FINISHED_FAILED_EE = 4, /* GEEE job finished or failed: books usage and buries the job */ COMMIT_ST_NO_RESOURCES = 6, /* remove interacive job (FINISH/ABORT) */ COMMIT_ST_DELIVERY_FAILED = 7, /* delivery failed and rescheduled */ COMMIT_ST_FAILED_AND_ERROR = 8, /* job failed and error state set */ diff --git a/source/daemons/qmaster/sge_job_qmaster.cc b/source/daemons/qmaster/sge_job_qmaster.cc index b8dd51fb4..9953a5786 100644 --- a/source/daemons/qmaster/sge_job_qmaster.cc +++ b/source/daemons/qmaster/sge_job_qmaster.cc @@ -4032,7 +4032,6 @@ static int sge_delete_all_tasks_of_job(const ocs::gdi::Packet *packet, lList **a for (task_number = enrolled_start; task_number <= enrolled_end; task_number += *step) { - int spool_job = 1; int is_defined = job_is_ja_task_defined(job, task_number); if (is_defined) { @@ -4130,9 +4129,8 @@ static int sge_delete_all_tasks_of_job(const ocs::gdi::Packet *packet, lList **a job_ja_task_send_abort_mail(job, tmp_task, packet->user, packet->host, nullptr); get_rid_of_job_due_to_qdel(job, tmp_task, alpp, packet->user, forced, monitor, packet->gdi_session); } else { - // @todo: spool_job = 1 = COMMIT_NO_SPOOLING = 0x0001 - sge_commit_job(job, tmp_task, nullptr, COMMIT_ST_FINISHED_FAILED_EE, spool_job | COMMIT_NEVER_RAN, - monitor, packet->gdi_session); + sge_commit_job(job, tmp_task, nullptr, COMMIT_ST_FINISHED_FAILED_EE, + COMMIT_DEFAULT | COMMIT_NEVER_RAN, monitor, packet->gdi_session); showmessage = 1; if (!*alltasks && showmessage) { range_list_insert_id(&range_list, nullptr, task_number); diff --git a/source/daemons/qmaster/sge_qmaster_timed_event.h b/source/daemons/qmaster/sge_qmaster_timed_event.h index c441789de..039612293 100644 --- a/source/daemons/qmaster/sge_qmaster_timed_event.h +++ b/source/daemons/qmaster/sge_qmaster_timed_event.h @@ -63,6 +63,8 @@ typedef enum { TYPE_AR_EVENT, /**/ TYPE_ENFORCE_LIMIT_EVENT, /**/ TYPE_SESSION_CLEANUP_EVENT, //< trigger that cleans up old sessions that have not been used for a longer time + TYPE_SHARETREE_SPOOL_EVENT, //< CS-1239: drain ocs::SharetreeUsage FIFOs to the spool backend + TYPE_SHARETREE_TICK_EVENT, //< CS-1239: periodic decay + batched USER_MOD/PROJECT_MOD/NEW_SHARETREE publish #if defined(OCS_WITH_OPENSSL) TYPE_SSL_CERT_RENEWAL_EVENT, #endif diff --git a/source/daemons/qmaster/sge_sched_process_events.cc b/source/daemons/qmaster/sge_sched_process_events.cc index 6ba7a9606..896e533e3 100644 --- a/source/daemons/qmaster/sge_sched_process_events.cc +++ b/source/daemons/qmaster/sge_sched_process_events.cc @@ -157,9 +157,6 @@ subscribe_scheduler(sge_evc_class_t *evc, sge_where_what_t *where_what) set_job_flushing(evc); - /* for some reason we flush sharetree changes */ - evc->ec_set_flush(evc, sgeE_NEW_SHARETREE, true, 0); - /* configuration changes and trigger should have immediate effevc->ect */ evc->ec_set_flush(evc, sgeE_SCHED_CONF, true, 0); evc->ec_set_flush(evc, sgeE_SCHEDDMONITOR, true, 0); diff --git a/source/daemons/qmaster/sge_thread_timer.cc b/source/daemons/qmaster/sge_thread_timer.cc index 7394c0ec5..b03260a4a 100644 --- a/source/daemons/qmaster/sge_thread_timer.cc +++ b/source/daemons/qmaster/sge_thread_timer.cc @@ -35,14 +35,28 @@ #include #include +#include "uti/sge_lock.h" #include "uti/sge_mtutil.h" #include "uti/sge_rmon_macros.h" #include "uti/sge_time.h" +#include "sgeobj/ocs_DataStore.h" +#include "sgeobj/ocs_ShareTree.h" +#include "sgeobj/ocs_UserProject.h" +#include "sgeobj/ocs_Usage.h" #include "sgeobj/sge_answer.h" #include "sgeobj/sge_conf.h" +#include "sgeobj/sge_ja_task.h" +#include "sgeobj/sge_job.h" +#include "sgeobj/sge_object.h" +#include "sgeobj/sge_schedd_conf.h" +#include "sgeobj/sge_userprj.h" #include "sgeobj/ocs_Session.h" +#include "evm/sge_event_master.h" + +#include "sched/sgeee.h" + #include "comm/cl_commlib.h" #include "uti/sge_profiling.h" @@ -52,6 +66,7 @@ #include "gdi/ocs_gdi_Packet.h" #include +#include "ocs_SharetreeUsage.h" #include "ocs_ReportingFileWriter.h" #include "ocs_security_qmaster.h" #include "setup_qmaster.h" @@ -80,6 +95,237 @@ sge_timer_cleanup_monitor(monitoring_t *monitor) { DRETURN_VOID; } +/** CS-1239: drain the per-finish user/project dirty FIFOs to the spool backend. + * + * Self-rescheduling one-shot event: pulls up to a 100ms work budget from the + * FIFOs under LOCK_GLOBAL (read), then re-arms itself at +1s while the queue + * is still non-empty (resume-drain cadence), or at +STREE_SPOOL_INTERVAL when + * caught up (idle cadence). The cadence knob lives in qmaster_params + * (mconf_get_spool_time, default 240s). + */ +static void +sge_sharetree_spool_handler(te_event_t /* anEvent */, monitoring_t *monitor) { + DENTER(TOP_LAYER); + + MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_READ), monitor); + const int remaining = ocs::SharetreeUsage::spool_budget(100, ocs::SessionManager::GDI_SESSION_NONE); + SGE_UNLOCK(LOCK_GLOBAL, LOCK_READ); + + const uint32_t next_in_s = (remaining > 0) ? 1u : static_cast(mconf_get_spool_time()); + te_event_t ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(next_in_s), + TYPE_SHARETREE_SPOOL_EVENT, ONE_TIME_EVENT, + 0, 0, "sharetree-spool"); + te_add_event(ev); + te_free_event(&ev); + + DRETURN_VOID; +} + +/** CS-1239 step 5: periodic share-tree tick - decay UU_/PR_/UPP_, then (if + * anything is dirty) emit batched sgeE_USER_MOD / sgeE_PROJECT_MOD / + * sgeE_NEW_SHARETREE inside one event-master transaction. + * + * Replaces the previous split into TYPE_USAGE_DECAY_EVENT (decay + per-user + * MOD events) and TYPE_SHARETREE_PUBLISH_EVENT (republish). Pre-CS-1239 the + * scheduler emitted USER_MOD / PROJECT_MOD / NEW_SHARETREE as one event + * package on every share-tree-affecting order roundtrip; mirror clients + * (including the scheduler thread itself) relied on that atomicity to + * compute consistent tickets. The split path broke the invariant because + * MOD events shipped on the worker thread at job-finish time and + * NEW_SHARETREE shipped a tick later: between them the scheduler saw fresh + * usage against a stale tree and dispatched the wrong jobs. + * + * This handler restores the invariant: one tick, one transaction, one event + * package. + * + * Inside the tick (under LOCK_GLOBAL write): + * 1. Decay every master user / project via decay_userprj_usage and mark + * each event-dirty (UniqueFifo dedup absorbs the case where worker + * booking already marked the same name in this interval). Master is + * the sole owner of usage decay - scheduler PASS 0 and sge_share_mon + * both read UU_/PR_usage at face value with no local catch-up, so + * the mass MOD storm IS the mechanism that keeps every mirror's + * UU_/PR_usage current. + * 2. Refresh the default decay constant (formerly in sge_calc_tickets). + * 3. If anything is dirty (event-dirty queues OR the share-tree-dirty + * flag), open a commit-required transaction: + * a. Emit sgeE_USER_MOD for each event-dirty user. + * b. Emit sgeE_PROJECT_MOD for each event-dirty project. + * c. Recompute master share tree (init nodes, count active ja_tasks + * per leaf, propagate counts, m_share, calc_node_usage, targets, + * STN_version bump). + * d. Emit sgeE_NEW_SHARETREE. + * e. Commit - the events ship as one package. + * 4. Self-reschedule at +mconf_get_sharetree_tick_interval(). + * + * Note on the leaf counting at step 3c: we walk the master job list and + * count active ja_tasks per leaf via search_user_project_node, mirroring + * what the scheduler does on its mirror copy but on master state. We + * deliberately do NOT create temp "default" sibling nodes (a + * scheduler-mirror-only construct), so default-mapped users see their + * counts aggregated on the "default" leaf instead of split per-user. + * Acceptable trade-off flagged in CS-1239 step 5. + * + * Seqno: handler keeps its own monotonic counter so we never collide with + * the scheduler thread's sge_scheduling_run namespace and never get skipped + * by the "seqno already processed" idempotency guard in + * decay_userprj_usage. The calc_node_usage call inside the publish step + * passes (u_long)-1 to recompute without stamping UU_/PR_usage_seqno + * (that stamp is owned by the decay step a few lines above). + * + * Cadence: mconf_get_sharetree_tick_interval() returns the STREE_TICK_INTERVAL + * qmaster_param value (default 5 s, clamped to [1, 300]). Runtime changes + * take effect on the next tick; configuration_qmaster.cc also calls + * sge_reschedule_sharetree_tick() to apply tighter intervals faster. + */ +static void +sge_sharetree_tick_handler(te_event_t /* anEvent */, monitoring_t *monitor) { + DENTER(TOP_LAYER); + + static u_long tet_decay_run = 0; + ++tet_decay_run; + + const uint64_t curr_time = sge_get_gmt64(); + lList *decay_list = ocs::Usage::get_decay_list(); + + MONITOR_WAIT_TIME(SGE_LOCK(LOCK_GLOBAL, LOCK_WRITE), monitor); + + /* CS-1239: short-circuit when no master share tree is configured. The + * decay walk would mark every user / project event-dirty for nothing (no + * consumer), and the publish step has nothing to publish. Self-reschedule + * so a later qconf -Astree is picked up on the next tick. This restores + * the pre-CS-1239 implicit gate ("no UU/PR objects -> no work"; see + * calc_usage.md) by checking the direct indicator. Matching gate is in + * sge_book_finished_job_usage. */ + if (lFirst(*ocs::DataStore::get_master_list(SGE_TYPE_SHARETREE)) == nullptr) { + SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE); + + const uint32_t next_in_s = static_cast(mconf_get_sharetree_tick_interval()); + te_event_t ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(next_in_s), + TYPE_SHARETREE_TICK_EVENT, ONE_TIME_EVENT, + 0, 0, "sharetree-tick"); + te_add_event(ev); + te_free_event(&ev); + + DRETURN_VOID; + } + + /* 1+2: decay all users / projects on the master GLOBAL store and mark each + * one event-dirty so the publish step below ships sgeE_USER_MOD / + * sgeE_PROJECT_MOD to every mirror. With CS-1239 the master is the + * sole owner of usage decay: scheduler PASS 0 and sge_share_mon both + * read UU_/PR_usage at face value (no local catch-up), so the mirror + * must carry the current decayed value or those consumers compute + * against a stale historic value. The mass mark is the price of "one + * authoritative decay site"; spool-dirty stays worker-only because + * the spool snapshot doesn't need to reflect every decay tick (the + * load-time stamp lets the recovery-side catch-up resolve it). + * Refresh the default decay constant in case halftime changed. */ + ocs::Usage::calculate_default_decay_constant(sconf_get_halftime()); + + bool any_object = false; + lList **user_list = ocs::DataStore::get_master_list_rw(SGE_TYPE_USER); + if (user_list != nullptr) { + for_each_rw_lv(user, *user_list) { + ocs::UserProject::decay_userprj_usage(user, true, decay_list, tet_decay_run, curr_time); + ocs::SharetreeUsage::mark_user_event_dirty(lGetString(user, UU_name)); + any_object = true; + } + } + lList **project_list = ocs::DataStore::get_master_list_rw(SGE_TYPE_PROJECT); + if (project_list != nullptr) { + for_each_rw_lv(project, *project_list) { + ocs::UserProject::decay_userprj_usage(project, false, decay_list, tet_decay_run, curr_time); + ocs::SharetreeUsage::mark_project_event_dirty(lGetString(project, PR_name)); + any_object = true; + } + } + if (any_object) { + ocs::SharetreeUsage::mark_share_tree_dirty(); + } + + /* 3: if anything is dirty, open a transaction and ship the batched + * USER_MOD / PROJECT_MOD / NEW_SHARETREE as one event package. */ + const bool tree_dirty = ocs::SharetreeUsage::consume_share_tree_dirty(); + const bool any_events = ocs::SharetreeUsage::has_event_dirty(); + + if (tree_dirty || any_events) { + lList **master_stree_list = ocs::DataStore::get_master_list_rw(SGE_TYPE_SHARETREE); + lListElem *root = (master_stree_list != nullptr) ? lFirstRW(*master_stree_list) : nullptr; + + sge_set_commit_required(); + + ocs::SharetreeUsage::emit_dirty_user_events(ocs::SessionManager::GDI_SESSION_NONE); + ocs::SharetreeUsage::emit_dirty_project_events(ocs::SessionManager::GDI_SESSION_NONE); + + if (root != nullptr) { + const lList *master_user_list = *ocs::DataStore::get_master_list(SGE_TYPE_USER); + const lList *master_project_list = *ocs::DataStore::get_master_list(SGE_TYPE_PROJECT); + const lList *master_job_list = *ocs::DataStore::get_master_list(SGE_TYPE_JOB); + + sge_init_share_tree_nodes(root); + ocs::ShareTree::set_node_project_flag(root, master_project_list); + + /* Active-ja_task predicate matches scheduler's job_is_active. */ + for_each_ep_lv(job, master_job_list) { + for_each_rw_lv(ja_task, lGetList(job, JB_ja_tasks)) { + const u_long status = lGetUlong(ja_task, JAT_status); + const u_long state = lGetUlong(ja_task, JAT_state); + const bool active = (status & (JRUNNING | JMIGRATING | JTRANSFERING)) + && !(state & (JSUSPENDED | JSUSPENDED_ON_THRESHOLD)); + if (!active) { + continue; + } + const char *owner = lGetString(job, JB_owner); + const char *project = lGetString(job, JB_project); + if (owner == nullptr && project == nullptr) { + continue; + } + lListElem *parent = nullptr; + lListElem *leaf = ocs::ShareTree::search_user_project_node(root, owner, project, + &parent, root); + if (leaf != nullptr) { + lSetUlong(leaf, STN_job_ref_count, + lGetUlong(leaf, STN_job_ref_count) + 1); + lSetUlong(leaf, STN_active_job_ref_count, + lGetUlong(leaf, STN_active_job_ref_count) + 1); + } + } + } + + update_job_ref_count(root); + update_active_job_ref_count(root); + lSetDouble(root, STN_m_share, 1.0); + calculate_m_shares(root); + + /* Seqno (u_long)-1 tells calc_node_usage to recurse + recompute + * without stamping UU_/PR_usage_seqno - the decay step above owns + * that stamp and we must not double-stamp inside the same tick. */ + ocs::ShareTree::calc_node_usage(root, master_user_list, master_project_list, + decay_list, curr_time, nullptr, (u_long)-1); + sge_calc_node_targets(root, root); + + lAddUlong(root, STN_version, 1); + + sge_add_event(curr_time, sgeE_NEW_SHARETREE, 0, 0, nullptr, nullptr, + nullptr, root, ocs::SessionManager::GDI_SESSION_NONE); + } + + sge_commit(ocs::SessionManager::GDI_SESSION_NONE); + } + + SGE_UNLOCK(LOCK_GLOBAL, LOCK_WRITE); + + const uint32_t next_in_s = static_cast(mconf_get_sharetree_tick_interval()); + te_event_t ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(next_in_s), + TYPE_SHARETREE_TICK_EVENT, ONE_TIME_EVENT, + 0, 0, "sharetree-tick"); + te_add_event(ev); + te_free_event(&ev); + + DRETURN_VOID; +} + /****** qmaster/sge_thread_timer/sge_timer_register_event_handler() ************* * NAME * sge_timer_register_event_handler() -- register event handlers @@ -111,6 +357,10 @@ sge_timer_register_event_handler() { te_register_event_handler(ocs::SessionManager::session_cleanup_handler, TYPE_SESSION_CLEANUP_EVENT); + te_register_event_handler(sge_sharetree_spool_handler, TYPE_SHARETREE_SPOOL_EVENT); + + te_register_event_handler(sge_sharetree_tick_handler, TYPE_SHARETREE_TICK_EVENT); + /* * one time events */ @@ -175,6 +425,35 @@ void sge_timer_start_periodic_tasks() { te_add_event(ev); te_free_event(&ev); + /* CS-1239: bootstrap the share-tree spool event at +5 s so a freshly + * started qmaster drains the per-finish dirty FIFOs within seconds + * rather than waiting up to STREE_SPOOL_INTERVAL (240 s default). The + * handler self-reschedules (ONE_TIME_EVENT, not RECURRING_EVENT) so the + * next due time can depend on how much work is left after each tick. + * Same caveat as for the decay event: a runtime change to + * STREE_SPOOL_INTERVAL takes effect only on the *next* tick, so + * configuration_qmaster.cc::do_mod_config also calls + * sge_reschedule_sharetree_spool() to drop the pending event and + * re-queue at +5 s. */ + ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(5), + TYPE_SHARETREE_SPOOL_EVENT, ONE_TIME_EVENT, 0, 0, "sharetree-spool"); + te_add_event(ev); + te_free_event(&ev); + + /* CS-1239: bootstrap the periodic share-tree tick (decay + batched + * USER_MOD/PROJECT_MOD/NEW_SHARETREE publish) at +5 s so a freshly + * started qmaster has STN_combined_usage / m_share populated for + * sge_share_mon within seconds. After the first tick the handler + * self-reschedules at +STREE_TICK_INTERVAL. ONE_TIME_EVENT + + * handler-side reschedule means a runtime change to STREE_TICK_INTERVAL + * takes effect on the next tick. Config changes that need to take effect + * *sooner* go through sge_reschedule_sharetree_tick() from + * configuration_qmaster.cc. */ + ev = te_new_event(sge_get_gmt64() + sge_gmt32_to_gmt64(5), + TYPE_SHARETREE_TICK_EVENT, ONE_TIME_EVENT, 0, 0, "sharetree-tick"); + te_add_event(ev); + te_free_event(&ev); + DRETURN_VOID; } diff --git a/source/daemons/qmaster/sge_thread_worker.cc b/source/daemons/qmaster/sge_thread_worker.cc index ceda75abf..c4e7a6a3b 100644 --- a/source/daemons/qmaster/sge_thread_worker.cc +++ b/source/daemons/qmaster/sge_thread_worker.cc @@ -61,6 +61,7 @@ #include "comm/cl_commlib.h" #include +#include "ocs_SharetreeUsage.h" #include "setup_qmaster.h" #include "sge_persistence_qmaster.h" #include "sge_reporting_qmaster.h" @@ -161,6 +162,12 @@ sge_worker_terminate() { sge_store_job_number(nullptr, nullptr); sge_store_ar_id(nullptr, nullptr); DPRINTF("job/ar counter were made persistent\n"); + /* CS-1239: drain anything the periodic TET flush did not get to yet. + * sge_userprj_spool below then re-spools the full master lists; the + * overlap is intentional and harmless - this hook is the canonical + * dirty-FIFO drain that survives if sge_userprj_spool is later + * dropped (CS-1239 step 8 / follow-ups). */ + ocs::SharetreeUsage::spool_all(ocs::SessionManager::GDI_SESSION_NONE); sge_userprj_spool(ocs::SessionManager::GDI_SESSION_NONE); /* spool the latest usage */ DPRINTF("final job and user/project spooling has been triggered\n"); } diff --git a/source/libs/sched/sge_orders.cc b/source/libs/sched/sge_orders.cc index 1aaa435dc..7dffbe80a 100644 --- a/source/libs/sched/sge_orders.cc +++ b/source/libs/sched/sge_orders.cc @@ -367,26 +367,10 @@ sge_send_orders2master(sge_evc_class_t *evc, lList **orders) -/*-------------------------------------------------------------------- - * build a ORT_remove_job order for each finished job - *--------------------------------------------------------------------*/ -lList *create_delete_job_orders( -lList *finished_jobs, -lList *order_list -) { - DENTER(TOP_LAYER); - - for_each_ep_lv(job, finished_jobs) { - for_each_ep_lv(ja_task, lGetList(job, JB_ja_tasks)) { - DPRINTF("DELETE JOB " sge_u32 "." sge_u32 "\n", lGetUlong(job, JB_job_number), lGetUlong(ja_task, JAT_task_number)); - order_list = sge_create_orders(order_list, ORT_remove_job, job, ja_task, nullptr, true); - } - } - - DRETURN(order_list); -} - - +/* CS-1239: create_delete_job_orders removed - the worker thread now buries + * the finished job inline (sge_commit_job(COMMIT_ST_FINISHED_FAILED_EE) + * books usage and buries the job in one step), so the scheduler no longer + * needs to emit ORT_remove_job orders. */ /****** sge_orders/sge_join_orders() ****************************************** * NAME diff --git a/source/libs/sched/sge_orders.h b/source/libs/sched/sge_orders.h index 055a8f8b1..ce0b5ef91 100644 --- a/source/libs/sched/sge_orders.h +++ b/source/libs/sched/sge_orders.h @@ -38,8 +38,7 @@ typedef struct { lList *configOrderList; /* Type: ORT_unsuspend_on_threshold, ORT_suspend_on_threshold */ - lList *pendingOrderList; /* Type: ORT_tickets, ORT_ptickets, ORT_remove_job, ORT_update_user_usage, - ORT_update_project_usage, ORT_share_tree, ORT_sched_conf */ + lList *pendingOrderList; /* Type: ORT_tickets, ORT_ptickets, ORT_sched_conf */ lList *jobStartOrderList; /* Type: ORT_remove_immediate_job, job start orders, job info orders */ lList *sentOrderList; /* already send job start orders, need to get a correct order amount for the profiling. It is also needed for a warring @@ -56,8 +55,6 @@ lList *sge_add_schedd_info(lList *or_list, int *global_mes_count, int *job_mes_c lList *sge_create_orders(lList *or_list, uint32_t type, const lListElem *job, const lListElem *ja_task, const lList *queue_list, bool update_execd); -lList *create_delete_job_orders(lList *finished_jobs, lList *order_list); - lList *sge_join_orders(order_t *orders); int sge_GetNumberOfOrders(order_t *orders); diff --git a/source/libs/sched/sgeee.cc b/source/libs/sched/sgeee.cc index 4b2add2ef..c46059865 100644 --- a/source/libs/sched/sgeee.cc +++ b/source/libs/sched/sgeee.cc @@ -155,15 +155,12 @@ static int sge_calc_tickets (scheduler_all_data_t *lists, int do_usage, double *max_tickets); -static lListElem *get_mod_share_tree(lListElem *node, lEnumeration *what, int seqno); static lList *sge_sort_pending_job_nodes(lListElem *root, lListElem *node, double total_share_tree_tickets); -static int sge_calc_node_targets(lListElem *root, lListElem *node, scheduler_all_data_t *lists); static int sge_calc_sharetree_targets(lListElem *root, scheduler_all_data_t *lists, lList *decay_list, uint64_t curr_time, u_long seqno); static int sge_init_share_tree_node_fields( lListElem *node, void *ptr ); -static int sge_init_share_tree_nodes( lListElem *root ); static double calc_pjob_override_tickets_shared( sge_ref_t *ref); @@ -197,13 +194,8 @@ static uint32_t task_ref_entries = 0; /* thread local */ static uint32_t task_ref_job_pos = 0; /* thread local */ static double Master_min_tix = 0.0; /* thread local */ static double Master_max_tix = 0.0; /* thread local */ -static uint32_t halflife = 0; /* stores the last used halflife time to detect changes thread_local*/ static int last_seqno = 0; /* stores the last used seqno for the orders thread_local*/ static uint64_t past = 0; /* stores the last re-order send time thread local */ -static lEnumeration *user_usage_what = nullptr; /* thread local */ -static lEnumeration *prj_usage_what = nullptr; /* thread local */ -static lEnumeration *share_tree_what = nullptr; /* thread local */ - /****** sgeee/tix_range_set() ************************************************** * NAME @@ -846,7 +838,7 @@ sge_unset_job_cnts(sge_ref_t *ref, int queued) { * every time a job becomes active or inactive (in adjust_m_shares). *--------------------------------------------------------------------*/ -static void +void calculate_m_shares( lListElem *parent_node ) { DENTER(TOP_LAYER); @@ -917,7 +909,7 @@ update_job_ref_count( lListElem *node ) * update_active_job_ref_count - update active_job_ref_count for node and descendants *--------------------------------------------------------------------*/ -static u_long +u_long update_active_job_ref_count( lListElem *node ) { int active_job_count=0; @@ -994,64 +986,12 @@ sge_init_share_tree_node_fields( lListElem *node, * that will be set and used during sge_calc_tickets *--------------------------------------------------------------------*/ -static int +int sge_init_share_tree_nodes( lListElem *root ) { return ocs::ShareTree::foreach_call_func(root, sge_init_share_tree_node_fields, nullptr); } - -/*-------------------------------------------------------------------- - * delete_debited_job_usage - deleted debitted job usage for job - *--------------------------------------------------------------------*/ - -static void -delete_debited_job_usage(lListElem *job, lListElem *user, lListElem *project, u_long seqno) -{ - const lList *upu_list; - lListElem *upu; - - DENTER(TOP_LAYER); - - DPRINTF("DDJU (1) " sge_u32 "\n", lGetUlong(job, JB_job_number)); - - if (user) { - upu_list = lGetList(user, UU_debited_job_usage); - DPRINTF("DDJU (2) " sge_u32 "\n", lGetUlong(job, JB_job_number)); - if (upu_list) { - - /* Note: In order to cause the qmaster to delete the - usage for this job, we zero out UPU_old_usage_list - for this job in the UU_debited_job_usage list */ - DPRINTF("DDJU (3) " sge_u32 "\n", lGetUlong(job, JB_job_number)); - - if ((upu = lGetElemUlongRW(upu_list, UPU_job_number, lGetUlong(job, JB_job_number)))) { - DPRINTF("DDJU (4) " sge_u32 "\n", lGetUlong(job, JB_job_number)); - lSetList(upu, UPU_old_usage_list, nullptr); - lSetUlong(user, UU_usage_seqno, seqno); - } - } - } - - if (project) { - upu_list = lGetList(project, PR_debited_job_usage); - if (upu_list) { - - /* Note: In order to cause the qmaster to delete the - usage for this job, we zero out UPU_old_usage_list - for this job in the PR_debited_job_usage list */ - - if ((upu = lGetElemUlongRW(upu_list, UPU_job_number, lGetUlong(job, JB_job_number)))) { - lSetList(upu, UPU_old_usage_list, nullptr); - lSetUlong(project, PR_usage_seqno, seqno); - } - } - } - - DRETURN_VOID; -} - - /*-------------------------------------------------------------------- * combine_usage - combines a node's associated user/project usage * into a single value and stores it in the node. @@ -2131,8 +2071,6 @@ sge_calc_tickets( scheduler_all_data_t *lists, all_lists = lists; - uint64_t curr_time = sge_get_gmt64(); - sge_scheduling_run++; lList *decay_list = ocs::Usage::get_decay_list(); @@ -2142,30 +2080,6 @@ sge_calc_tickets( scheduler_all_data_t *lists, * user/project usage lists (CS-1385) */ lList *usage_weight_list = sconf_get_usage_weight_list(); - // @todo CS-272: move this to sconf-MOD in worker -#if 1 - // Decay usage for all users and projects if halflife changes - if (do_usage && sconf_is() && halflife != sconf_get_halftime()) { - int oldhalflife = halflife; - halflife = sconf_get_halftime(); - /* decay up till now based on old half life (unless it's zero), - all future decay will be based on new halflife */ - if (oldhalflife == 0) { - ocs::Usage::calculate_default_decay_constant(halflife); - } else { - ocs::Usage::calculate_default_decay_constant(oldhalflife); - } - - for_each_rw_lv(user, lists->user_list) { - ocs::UserProject::decay_userprj_usage(user, true, decay_list, sge_scheduling_run, curr_time); - } - for_each_rw_lv(project, lists->project_list) { - ocs::UserProject::decay_userprj_usage(project, false, decay_list, sge_scheduling_run, curr_time); - } - } else { - ocs::Usage::calculate_default_decay_constant(sconf_get_halftime()); - } -#endif /*------------------------------------------------------------- * Init job_ref_count in each share tree node to zero @@ -2322,48 +2236,6 @@ sge_calc_tickets( scheduler_all_data_t *lists, calculate_m_shares(root); } - // @todo CS-272: move this to final usage reporting in report handling -#if 1 - /*----------------------------------------------------------------- - * Handle finished jobs. We add the finished job usage to the - * user and project objects and then we delete the debited job - * usage from the associated user/project object. The debited - * job usage is a usage list which is maintained in the user/project - * object to indicate how much of the job's usage has been added - * to the user/project usage so far. - * We also add a usage value called finished_jobs to the job usage - * so it will be summed and decayed in the user and project usage. - * If the finished_jobs usage value already exists, then we know - * we already handled this finished job, but the qmaster hasn't - * been updated yet. - *-----------------------------------------------------------------*/ - - if (do_usage && finished_jobs) { - for_each_rw(job, finished_jobs) { - sge_ref_t jref; - - for_each_rw_lv(ja_task, lGetList(job, JB_ja_tasks)) { - memset(&jref, 0, sizeof(jref)); - sge_set_job_refs(job, ja_task, &jref, nullptr, lists, 0); - if (lGetElemStr(lGetList(jref.ja_task, JAT_scaled_usage_list), UA_name, "finished_jobs") == nullptr) { - lListElem *u = lAddSubStr(jref.ja_task, UA_name, "finished_jobs", JAT_scaled_usage_list, UA_Type); - if (u != nullptr) { - lSetDouble(u, UA_value, 1.0); - } - ocs::Usage::decay_and_sum_usage(jref.job, jref.ja_task, jref.node, jref.user, jref.project, - decay_list, usage_weight_list, sge_scheduling_run, curr_time); - DPRINTF("DDJU (0) " sge_u32 "." sge_u32"\n", lGetUlong(job, JB_job_number), lGetUlong(ja_task, JAT_task_number)); - delete_debited_job_usage(jref.job, jref.user, jref.project, sge_scheduling_run); - } - } - } - } else { - DPRINTF("\n"); - DPRINTF("no DDJU: do_usage: %d finished_jobs %d\n", do_usage, (finished_jobs!=nullptr)); - DPRINTF("\n"); - } -#endif - PROF_STOP_MEASUREMENT(SGE_PROF_SCHEDLIB4); prof_init = prof_get_measurement_wallclock(SGE_PROF_SCHEDLIB4, false, nullptr); PROF_START_MEASUREMENT(SGE_PROF_SCHEDLIB4); @@ -2385,17 +2257,29 @@ sge_calc_tickets( scheduler_all_data_t *lists, // @todo CS-272: move this to online usage reporting in report handling #if 1 if (do_usage) { - ocs::Usage::decay_and_sum_usage(job_ref[job_ndx].job, job_ref[job_ndx].ja_task, job_ref[job_ndx].node, job_ref[job_ndx].user, - job_ref[job_ndx].project, decay_list, usage_weight_list, sge_scheduling_run, curr_time); + /* CS-1239: scheduler no longer decays locally - the TET share-tree + * tick is the sole decay site and ships the decayed UU_/PR_usage + * to every mirror via sgeE_USER_MOD / sgeE_PROJECT_MOD. PASS 0 + * only sums the in-flight running-job projection onto whatever + * mirror currently holds. sum_usage = book_user_project_usage + * with do_decay=false. */ + ocs::Usage::sum_usage(job_ref[job_ndx].job, job_ref[job_ndx].ja_task, + job_ref[job_ndx].user, job_ref[job_ndx].project, usage_weight_list); } #endif combine_usage(&job_ref[job_ndx]); } - if (root) - sge_calc_sharetree_targets(root, lists, decay_list, - curr_time, sge_scheduling_run); + if (root) { + /* CS-1239: pass curr_time=0 so the calc_node_usage call inside + * sge_calc_sharetree_targets does NOT decay UU_/PR_usage locally. + * Decay is owned by the TET share-tree tick handler which ships + * decayed values via sgeE_USER_MOD / sgeE_PROJECT_MOD; scheduler + * just reads what mirror has. seqno is irrelevant when now=0 + * because the per-userprj decay branch is gated on now != 0. */ + sge_calc_sharetree_targets(root, lists, decay_list, 0, 0); + } PROF_STOP_MEASUREMENT(SGE_PROF_SCHEDLIB4); prof_pass0 = prof_get_measurement_wallclock(SGE_PROF_SCHEDLIB4,false, nullptr); @@ -3029,7 +2913,7 @@ sge_calc_sharetree_targets( lListElem *root, /* Calculate targeted proportions for each node */ - sge_calc_node_targets(root, root, lists); + sge_calc_node_targets(root, root); DRETURN(0); } @@ -3040,10 +2924,9 @@ sge_calc_sharetree_targets( lListElem *root, * for the sub-tree rooted at this node *--------------------------------------------------------------------*/ -static int +int sge_calc_node_targets( lListElem *root, - lListElem *node, - scheduler_all_data_t *lists ) + lListElem *node ) { const lList *children; double sum_shares, sum, compensation_factor; @@ -3227,58 +3110,13 @@ sge_calc_node_targets( lListElem *root, for_each_rw_lv(child, children) { if (lGetUlong(child, STN_job_ref_count)>0) { - sge_calc_node_targets(root, child, lists); + sge_calc_node_targets(root, child); } } DRETURN(0); } -/*-------------------------------------------------------------------- - * get_mod_share_tree - return reduced modified share tree - *--------------------------------------------------------------------*/ - -static lListElem * -get_mod_share_tree( lListElem *node, - lEnumeration *what, - int seqno ) -{ - lListElem *new_node=nullptr; - const lList *children; - lListElem *child; - - if (((children = lGetList(node, STN_children))) && - lGetNumberOfElem(children) > 0) { - - lList *child_list=nullptr; - - for_each_rw(child, children) { - lListElem *child_node; - child_node = get_mod_share_tree(child, what, seqno); - if (child_node) { - if (!child_list) - child_list = lCreateList("", STN_Type); - lAppendElem(child_list, child_node); - } - } - - if (child_list) { - new_node = lCopyElem(node); - lSetList(new_node, STN_children, child_list); - } - - } else { - - if (lGetUlong(node, STN_pass2_seqno) > (uint32_t)seqno && - lGetUlong(node, STN_temp) == 0) { - new_node = lCopyElem(node); - } - - } - - return new_node; -} - /****** sgeee/sge_build_sgeee_orders() ******************************************* * NAME * sge_build_sgeee_orders() -- build orders for updating qmaster @@ -3292,15 +3130,9 @@ get_mod_share_tree( lListElem *node, * FUNCTION * Builds generates the orderlist for sending the scheduling decisions * to the qmaster. The following orders are generated: -* - running job tickets -* - pending job tickets -* - delete order for finished jobs -* - update user usage order -* - update project usage order -* - update share tree order -* - update scheduler configuration order -* - orders updating user/project resource usage (ORT_update_project_usage) -* - orders updating running tickets needed for dynamic repriorization (ORT_ticket) +* - running job tickets (ORT_tickets) +* - pending job tickets (ORT_ptickets / ORT_clear_pri_info) +* - update scheduler configuration order (ORT_sched_conf) * Most orders are generated by using the sge_create_orders function. * * INPUTS @@ -3325,12 +3157,9 @@ sge_build_sgeee_orders(scheduler_all_data_t *lists, lList *running_jobs, lList * lList *finished_jobs, order_t *orders, bool update_usage_and_configuration, int seqno, bool update_execd) { - lCondition *user_where=nullptr; - lCondition *prj_where=nullptr; lList *up_list = nullptr; lList *order_list = nullptr; lListElem *order = nullptr; - lListElem *root = nullptr; int norders = 0; uint32_t max_pending_tasks_per_job = sconf_get_max_pending_tasks_per_job(); @@ -3338,26 +3167,6 @@ sge_build_sgeee_orders(scheduler_all_data_t *lists, lList *running_jobs, lList * DENTER(TOP_LAYER); - if (share_tree_what == nullptr) { - share_tree_what = lWhat("%T(%I %I %I %I %I %I)", STN_Type, - STN_version, STN_name, STN_job_ref_count, STN_m_share, - STN_last_actual_proportion, - STN_adjusted_current_proportion); - } - - if (user_usage_what == nullptr) { - user_usage_what = lWhat("%T(%I %I %I %I %I %I %I)", UU_Type, - UU_name, UU_usage, UU_usage_time_stamp, - UU_long_term_usage, UU_project, - UU_debited_job_usage, UU_version); - } - if (prj_usage_what == nullptr) { - prj_usage_what = lWhat("%T(%I %I %I %I %I %I %I)", PR_Type, - PR_name, PR_usage, PR_usage_time_stamp, - PR_long_term_usage, PR_project, - PR_debited_job_usage, PR_version); - } - if (orders->pendingOrderList == nullptr) { orders->pendingOrderList = lCreateList("orderlist", OR_Type); } @@ -3440,91 +3249,8 @@ sge_build_sgeee_orders(scheduler_all_data_t *lists, lList *running_jobs, lList * } - /*----------------------------------------------------------------- - * build delete job orders for finished jobs - *-----------------------------------------------------------------*/ - if (finished_jobs) { - order_list = create_delete_job_orders(finished_jobs, order_list); - } - if (update_usage_and_configuration) { - /*----------------------------------------------------------------- - * build update user usage order - *-----------------------------------------------------------------*/ - - /* NOTE: make sure we get all usage entries which have been decayed - or have accumulated additional usage */ - - user_where = lWhere("%T(%I > %u)", UU_Type, UU_usage_seqno, last_seqno); - - if (lists->user_list) { - norders = lGetNumberOfElem(order_list); - if ((up_list = lSelect("", lists->user_list, user_where, user_usage_what))) { - if (lGetNumberOfElem(up_list)>0) { - order = lCreateElem(OR_Type); - lSetUlong(order, OR_type, ORT_update_user_usage); - lSetList(order, OR_joker, up_list); - lAppendElem(order_list, order); - } else { - lFreeList(&up_list); - } - } - DPRINTF(" added %d orders for updating usage of user\n", - lGetNumberOfElem(order_list) - norders); - } - - /*----------------------------------------------------------------- - * build update project usage order - *-----------------------------------------------------------------*/ - if (lists->project_list) { - /* Only select projects whose usage actually changed since the last - * order generation - symmetric to user_where above. Without this - * filter every project is selected on every scheduler run, producing - * a redundant ORT_update_project_usage order and a PROJECT_MOD event - * per project per cycle even on a completely idle cluster (CS-2266). - * PR_usage_seqno is stamped (like UU_usage_seqno) only when usage is - * booked, so the empty-list guard below then skips quiet cycles. */ - prj_where = lWhere("%T(%I > %u)", PR_Type, PR_usage_seqno, last_seqno); - norders = lGetNumberOfElem(order_list); - if ((up_list = lSelect("", lists->project_list, prj_where, prj_usage_what))) { - if (lGetNumberOfElem(up_list)>0) { - order = lCreateElem(OR_Type); - lSetUlong(order, OR_type, ORT_update_project_usage); - lSetList(order, OR_joker, up_list); - lAppendElem(order_list, order); - } else { - lFreeList(&up_list); - } - } - DPRINTF(" added %d orders for updating usage of project\n", - lGetNumberOfElem(order_list) - norders); - } - - lFreeWhere(&user_where); - lFreeWhere(&prj_where); - - /*----------------------------------------------------------------- - * build update share tree order - *-----------------------------------------------------------------*/ - - if (lists->share_tree && ((root = lFirstRW(lists->share_tree)))) { - lListElem *node = nullptr; - norders = lGetNumberOfElem(order_list); - - if ((node = get_mod_share_tree(root, share_tree_what, last_seqno))) { - up_list = lCreateList("", STN_Type); - lAppendElem(up_list, node); - - order = lCreateElem(OR_Type); - lSetUlong(order, OR_type, ORT_share_tree); - lSetList(order, OR_joker, up_list); - lAppendElem(order_list, order); - } - DPRINTF(" added %d orders for updating share tree\n", - lGetNumberOfElem(order_list) - norders); - } - /*----------------------------------------------------------------- * build update scheduler configuration order *-----------------------------------------------------------------*/ @@ -3643,8 +3369,10 @@ int sgeee_scheduler(scheduler_all_data_t *lists, min_tix = 0; max_tix = -1; - /* calculate tickets for pending jobs and compute their shares */ - sge_calc_tickets(lists, running_jobs, finished_jobs, pending_jobs, 1, &max_tix); + /* calculate tickets for pending jobs and compute their shares. + * CS-1239: finished-job booking moved to worker thread - pass nullptr so + * the (now inert) finished-jobs branch in sge_calc_tickets is skipped. */ + sge_calc_tickets(lists, running_jobs, nullptr, pending_jobs, 1, &max_tix); /* correct the shares for the running jobs */ seqno = sge_calc_tickets(lists, running_jobs, nullptr, nullptr, 0, &max_tix); @@ -3694,8 +3422,10 @@ int sgeee_scheduler(scheduler_all_data_t *lists, } /* we are not calculation pending tickets here, only the running, finished jobs, and the sharetree changes are processed. The pending tickets are calculated at the - end of the scheduler cycle */ - sge_build_sgeee_orders(lists, running_jobs, nullptr, finished_jobs, orders, true, seqno, + end of the scheduler cycle. + CS-1239: finished-job booking + delete order moved to worker thread - pass + nullptr so the (now inert) finished-jobs branch is skipped. */ + sge_build_sgeee_orders(lists, running_jobs, nullptr, nullptr, orders, true, seqno, update_execd); } diff --git a/source/libs/sched/sgeee.h b/source/libs/sched/sgeee.h index 83542c42f..8301be483 100644 --- a/source/libs/sched/sgeee.h +++ b/source/libs/sched/sgeee.h @@ -28,7 +28,7 @@ * * All Rights Reserved. * - * Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH + * Portions of this software are Copyright (c) 2023-2026 HPC-Gridware GmbH * ************************************************************************/ /*___INFO__MARK_END__*/ @@ -53,8 +53,18 @@ int sort_host_list_by_share_load ( lList *host_list, /* EH_Type */ void sge_clear_job( lListElem *job, bool is_clear_all); -void +void sge_build_sgeee_orders(scheduler_all_data_t *lists, lList *running_jobs, lList *queued_jobs, lList *finished_jobs, order_t *orders, bool update_usage_and_configuration, int seqno, bool update_execd); +/* CS-1239 step 5: share-tree compute primitives exposed so the qmaster + * TET share-tree publish handler can recompute master-sharetree node + * fields without depending on a scheduler_all_data_t. The scheduler + * itself still uses these on its mirror data store. */ +int sge_init_share_tree_nodes(lListElem *root); +void calculate_m_shares(lListElem *parent_node); +u_long update_job_ref_count(lListElem *node); +u_long update_active_job_ref_count(lListElem *node); +int sge_calc_node_targets(lListElem *root, lListElem *node); + diff --git a/source/libs/sgeobj/cull/sge_order_OR_L.h b/source/libs/sgeobj/cull/sge_order_OR_L.h index a45fd2150..bf77fbd15 100644 --- a/source/libs/sgeobj/cull/sge_order_OR_L.h +++ b/source/libs/sgeobj/cull/sge_order_OR_L.h @@ -37,7 +37,6 @@ * - ORT_start_job * - ORT_tickets * - ORT_ptickets -* - ORT_remove_job * - ORT_... * * SGE_ULONG(OR_job_number) - Job Number @@ -63,10 +62,7 @@ * SGE_LIST(OR_joker) - Order Specific Data * Sublist with order specific data, depending on the order type: * - ORT_start_job: empty -* - ORT_remove_job: empty * - ORT_tickets: reduced job element -* - ORT_update_*_usage: reduced user or project object -* - ORT_share_tree: reduced share tree root node * - ORT_remove_immediate_job: empty * - ORT_job_schedd_info: scheduler messages (SME_Type) * - ORT_ptickets: reduced job element diff --git a/source/libs/sgeobj/json/OR.json b/source/libs/sgeobj/json/OR.json index f3d5f8086..bec4bf275 100644 --- a/source/libs/sgeobj/json/OR.json +++ b/source/libs/sgeobj/json/OR.json @@ -24,9 +24,6 @@ { "line": " - ORT_ptickets" }, - { - "line": " - ORT_remove_job" - }, { "line": " - ORT_..." } @@ -120,18 +117,9 @@ { "line": " - ORT_start_job: empty" }, - { - "line": " - ORT_remove_job: empty" - }, { "line": " - ORT_tickets: reduced job element" }, - { - "line": " - ORT_update_*_usage: reduced user or project object" - }, - { - "line": " - ORT_share_tree: reduced share tree root node" - }, { "line": " - ORT_remove_immediate_job: empty" }, diff --git a/source/libs/sgeobj/ocs_Usage.cc b/source/libs/sgeobj/ocs_Usage.cc index ea4c0c44e..6bf3382d4 100644 --- a/source/libs/sgeobj/ocs_Usage.cc +++ b/source/libs/sgeobj/ocs_Usage.cc @@ -67,8 +67,17 @@ void ocs::Usage::calculate_decay_constant(const double halftime, double *decay_rate, double *decay_constant) { if (halftime < 0) { - *decay_rate = 1.0; - *decay_constant = 0; + /* Sentinel "no decay" (used for the "finished_jobs" counter by + * get_decay_list when no halflife_decay_list is configured). + * Previously this branch set decay_constant=0 which meant + * pow(0, interval)=0 in decay_usage and zeroed the counter every + * tick - the "finished_jobs" PR_usage entry could never accumulate + * and sge_sort_pending_job_nodes' job_count denominator stayed at + * 0 + pending_position, starving leaves with many pending jobs. + * decay_constant=1.0 means pow(1, x)=1, leaving the value + * untouched - the intended "no decay" semantics for the sentinel. */ + *decay_rate = 0; + *decay_constant = 1.0; } else if (halftime == 0) { *decay_rate = 0; *decay_constant = 1.0; @@ -120,7 +129,17 @@ ocs::Usage::get_decay_list() { add_decay_element(&decay_list, lGetDouble(ep, UA_value), lGetString(ep, UA_name)); } } else { - // @todo: what is the purpose of this? + /* The "finished_jobs" counter is summed into UU_/PR_/UPP_ from each + * finished ja_task (see sge_book_finished_job_usage). It is read by + * the pending-priority formula in sge_sort_pending_job_nodes + * (sgeee.cc) as the historical-throughput denominator that keeps + * leaves with many finished jobs from being out-prioritized by + * leaves with fewer pending. We seed it with the "no decay" + * sentinel (halftime < 0 in calculate_decay_constant -> decay + * constant 1.0) so the counter is a stable count, not a decayed + * weight. If the admin configures halflife_decay_list explicitly, + * they are expected to provide their own "finished_jobs" entry + * with the decay they want. */ add_decay_element(&decay_list, -1, "finished_jobs"); } lFreeList(&halflife_decay_list); @@ -178,13 +197,15 @@ usage_relevant_for_sharetree(const char *name, const lList *usage_weight_list) { } /*-------------------------------------------------------------------- - * decay_and_sum_usage - accumulates and decays usage in the correct - * user and project objects for the specified job + * book_user_project_usage - shared body of decay_and_sum_usage and + * sum_usage. The decay step is gated by do_decay so the worker-thread + * booking path (CS-1239) can sum the finished job's usage without + * applying decay; decay is then done by a periodic TET task. *--------------------------------------------------------------------*/ - -void -ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *node, lListElem *user, lListElem *project, - lList *decay_list, const lList *usage_weight_list, u_long seqno, uint64_t curr_time) { +static void +book_user_project_usage(lListElem *job, lListElem *ja_task, lListElem *user, lListElem *project, + lList *decay_list, const lList *usage_weight_list, u_long seqno, uint64_t curr_time, + bool do_decay) { lList *job_usage_list=nullptr, *old_usage_list=nullptr, *user_usage_list=nullptr, @@ -194,7 +215,7 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n lListElem *userprj = nullptr; int obj_debited_job_usage = PR_debited_job_usage; - if (!node && !user && !project) { + if (!user && !project) { return; } @@ -210,12 +231,14 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n * Decay the usage for the associated user and project *-------------------------------------------------------------*/ - if (user) { - ocs::UserProject::decay_userprj_usage(user, true, decay_list, seqno, curr_time); - } + if (do_decay) { + if (user) { + ocs::UserProject::decay_userprj_usage(user, true, decay_list, seqno, curr_time); + } - if (project) { - ocs::UserProject::decay_userprj_usage(project, false, decay_list, seqno, curr_time); + if (project) { + ocs::UserProject::decay_userprj_usage(project, false, decay_list, seqno, curr_time); + } } /*------------------------------------------------------------- @@ -258,7 +281,7 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n } if (!old_usage_list) { - old_usage_list = build_usage_list("old_usage_list", nullptr); + old_usage_list = ocs::Usage::build_usage_list("old_usage_list", nullptr); } if (user) { @@ -278,24 +301,24 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n upp = lAddElemStr(&upp_list, UPP_name, project_name, UPP_Type); user_long_term_usage_list = lGetListRW(upp, UPP_long_term_usage); if (!user_long_term_usage_list) { - user_long_term_usage_list = build_usage_list("upp_long_term_usage_list", nullptr); + user_long_term_usage_list = ocs::Usage::build_usage_list("upp_long_term_usage_list", nullptr); lSetList(upp, UPP_long_term_usage, user_long_term_usage_list); } user_usage_list = lGetListRW(upp, UPP_usage); if (!user_usage_list) { - user_usage_list = build_usage_list("upp_usage_list", nullptr); + user_usage_list = ocs::Usage::build_usage_list("upp_usage_list", nullptr); lSetList(upp, UPP_usage, user_usage_list); } } else { user_long_term_usage_list = lGetListRW(user, UU_long_term_usage); if (!user_long_term_usage_list) { - user_long_term_usage_list = build_usage_list("user_long_term_usage_list", nullptr); + user_long_term_usage_list = ocs::Usage::build_usage_list("user_long_term_usage_list", nullptr); lSetList(user, UU_long_term_usage, user_long_term_usage_list); } user_usage_list = lGetListRW(user, UU_usage); if (!user_usage_list) { - user_usage_list = build_usage_list("user_usage_list", nullptr); + user_usage_list = ocs::Usage::build_usage_list("user_usage_list", nullptr); lSetList(user, UU_usage, user_usage_list); } } @@ -304,12 +327,12 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n if (project) { project_long_term_usage_list = lGetListRW(project, PR_long_term_usage); if (!project_long_term_usage_list) { - project_long_term_usage_list = build_usage_list("project_long_term_usage_list", nullptr); + project_long_term_usage_list = ocs::Usage::build_usage_list("project_long_term_usage_list", nullptr); lSetList(project, PR_long_term_usage, project_long_term_usage_list); } project_usage_list = lGetListRW(project, PR_usage); if (!project_usage_list) { - project_usage_list = build_usage_list("project_usage_list", nullptr); + project_usage_list = ocs::Usage::build_usage_list("project_usage_list", nullptr); lSetList(project, PR_usage, project_usage_list); } } @@ -344,43 +367,43 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n *---------------------------------------------------------*/ if (old_usage_list) { - old_usage = get_usage(old_usage_list, usage_name); + old_usage = ocs::Usage::get_usage(old_usage_list, usage_name); if (!old_usage) { - old_usage = create_usage_elem(usage_name); + old_usage = ocs::Usage::create_usage_elem(usage_name); lAppendElem(old_usage_list, old_usage); } } if (user_usage_list) { - user_usage = get_usage(user_usage_list, usage_name); + user_usage = ocs::Usage::get_usage(user_usage_list, usage_name); if (!user_usage) { - user_usage = create_usage_elem(usage_name); + user_usage = ocs::Usage::create_usage_elem(usage_name); lAppendElem(user_usage_list, user_usage); } } if (user_long_term_usage_list) { - user_long_term_usage = get_usage(user_long_term_usage_list, + user_long_term_usage = ocs::Usage::get_usage(user_long_term_usage_list, usage_name); if (!user_long_term_usage) { - user_long_term_usage = create_usage_elem(usage_name); + user_long_term_usage = ocs::Usage::create_usage_elem(usage_name); lAppendElem(user_long_term_usage_list, user_long_term_usage); } } if (project_usage_list) { - project_usage = get_usage(project_usage_list, usage_name); + project_usage = ocs::Usage::get_usage(project_usage_list, usage_name); if (!project_usage) { - project_usage = create_usage_elem(usage_name); + project_usage = ocs::Usage::create_usage_elem(usage_name); lAppendElem(project_usage_list, project_usage); } } if (project_long_term_usage_list) { project_long_term_usage = - get_usage(project_long_term_usage_list, usage_name); + ocs::Usage::get_usage(project_long_term_usage_list, usage_name); if (!project_long_term_usage) { - project_long_term_usage = create_usage_elem(usage_name); + project_long_term_usage = ocs::Usage::create_usage_elem(usage_name); lAppendElem(project_long_term_usage_list, project_long_term_usage); } @@ -464,6 +487,24 @@ ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *n } +/*-------------------------------------------------------------------- + * decay_and_sum_usage - accumulates and decays usage in the correct + * user and project objects for the specified job + *--------------------------------------------------------------------*/ + +void +ocs::Usage::decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem * /* node */, lListElem *user, + lListElem *project, lList *decay_list, const lList *usage_weight_list, u_long seqno, + uint64_t curr_time) { + book_user_project_usage(job, ja_task, user, project, decay_list, usage_weight_list, seqno, curr_time, true); +} + +void +ocs::Usage::sum_usage(lListElem *job, lListElem *ja_task, lListElem *user, lListElem *project, + const lList *usage_weight_list) { + book_user_project_usage(job, ja_task, user, project, nullptr, usage_weight_list, 0, 0, false); +} + /** @brief Remove usage attributes that are irrelevant for share tree usage. * diff --git a/source/libs/sgeobj/ocs_Usage.h b/source/libs/sgeobj/ocs_Usage.h index d3ed04f10..cb51b7d19 100644 --- a/source/libs/sgeobj/ocs_Usage.h +++ b/source/libs/sgeobj/ocs_Usage.h @@ -54,6 +54,15 @@ namespace ocs { static void decay_and_sum_usage(lListElem *job, lListElem *ja_task, lListElem *node, lListElem *user, lListElem *project, lList *decay_list, const lList *usage_weight_list, u_long seqno, uint64_t curr_time); + /** Sum a finished job's scaled usage into UU_usage / PR_usage / UPP_usage + * without applying decay. Used by the worker-thread booking path + * introduced in CS-1239: decay is moved to a periodic Timed Event Thread + * task, and only the additive part of decay_and_sum_usage runs at finish + * time. usage_time_stamp is left untouched; the TET decay task owns it. + */ + static void sum_usage(lListElem *job, lListElem *ja_task, lListElem *user, lListElem *project, + const lList *usage_weight_list); + static bool strip_irrelevant_usage(lList *usage_list, const lList *usage_weight_list); static lList *build_usage_list(const char *name, lList *old_usage_list); diff --git a/source/libs/sgeobj/sge_conf.cc b/source/libs/sgeobj/sge_conf.cc index 9a93e50d7..54409a700 100644 --- a/source/libs/sgeobj/sge_conf.cc +++ b/source/libs/sgeobj/sge_conf.cc @@ -60,8 +60,10 @@ #include "sgeobj/ocs_TopologyString.h" #include "sgeobj/msg_sgeobjlib.h" +#include "sgeobj/cull/sge_usage_UA_L.h" #include "sgeobj/sge_conf.h" #include "sgeobj/sge_answer.h" +#include "sgeobj/sge_schedd_conf.h" #include "sgeobj/sge_userprj.h" #include "sgeobj/sge_userset.h" #include "sgeobj/sge_utility.h" @@ -71,6 +73,13 @@ #define SGE_BIN "bin" #define STREESPOOLTIMEDEF 240 +/* CS-1239: STREE_TICK_INTERVAL bounds (seconds) for the periodic share-tree + * decay + republish tick driven from the Timed Event Thread. Values outside + * [MIN, MAX] are clamped at read time. */ +#define STREE_TICK_INTERVAL_DEF 5 +#define STREE_TICK_INTERVAL_MIN 1 +#define STREE_TICK_INTERVAL_MAX 300 + /* This list is *ONLY* used by the execd and should be moved eventually */ lList *Execd_Config_List = nullptr; @@ -255,6 +264,11 @@ static int scheduler_timeout = 0; */ static int spool_time = STREESPOOLTIMEDEF; +/* CS-1239: STREE_TICK_INTERVAL qmaster_param - seconds between periodic + * decay ticks driven from the Timed Event Thread. See + * mconf_get_sharetree_tick_interval(). */ +static int sharetree_tick_interval = STREE_TICK_INTERVAL_DEF; + // Maximum time in milliseconds to wait before update of secondary DS is enforced #define DEFAULT_DS_DEVIATION (1000) static int max_ds_deviation = DEFAULT_DS_DEVIATION; @@ -785,6 +799,7 @@ int merge_configuration(lList **answer_list, uint32_t progid, const char *cell_r do_authentication = true; is_monitor_message = true; spool_time = STREESPOOLTIMEDEF; + sharetree_tick_interval = STREE_TICK_INTERVAL_DEF; max_ds_deviation = DEFAULT_DS_DEVIATION; use_qidle = false; disable_reschedule = false; @@ -852,6 +867,19 @@ int merge_configuration(lList **answer_list, uint32_t progid, const char *cell_r } continue; } + /* CS-1239: STREE_TICK_INTERVAL = seconds between share-tree decay + * ticks. Out-of-range values (<= 0) reject + warn + fall back to + * the default. Values above the upper bound are clamped silently + * at read time in mconf_get_sharetree_tick_interval(). */ + if (parse_int_param(s, "STREE_TICK_INTERVAL", &sharetree_tick_interval, ocs::CEntry::Type::TIME)) { + if (sharetree_tick_interval <= 0) { + answer_list_add_sprintf(answer_list, STATUS_ESYNTAX, ANSWER_QUALITY_WARNING, + MSG_CONF_INVALIDPARAM_SSI, "qmaster_params", "STREE_TICK_INTERVAL", + STREE_TICK_INTERVAL_DEF); + sharetree_tick_interval = STREE_TICK_INTERVAL_DEF; + } + continue; + } if (parse_bool_param(s, "FORBID_APPERROR", &forbid_apperror)) { continue; } @@ -2770,6 +2798,27 @@ int mconf_get_spool_time() { DRETURN(ret); } +/* CS-1239: share-tree decay tick interval in seconds, as used by the + * Timed Event Thread decay task. Returns the configured qmaster_params + * STREE_TICK_INTERVAL value, clamped into + * [STREE_TICK_INTERVAL_MIN, STREE_TICK_INTERVAL_MAX]. The configured + * value defaults to STREE_TICK_INTERVAL_DEF if not set. */ +int mconf_get_sharetree_tick_interval() { + DENTER(BASIS_LAYER); + + SGE_LOCK(LOCK_MASTER_CONF, LOCK_READ); + int value = sharetree_tick_interval; + SGE_UNLOCK(LOCK_MASTER_CONF, LOCK_READ); + + if (value < STREE_TICK_INTERVAL_MIN) { + value = STREE_TICK_INTERVAL_MIN; + } else if (value > STREE_TICK_INTERVAL_MAX) { + value = STREE_TICK_INTERVAL_MAX; + } + + DRETURN(value); +} + int mconf_get_max_ds_deviation() { int ret; diff --git a/source/libs/sgeobj/sge_conf.h b/source/libs/sgeobj/sge_conf.h index b0f01a231..40b5d0c29 100644 --- a/source/libs/sgeobj/sge_conf.h +++ b/source/libs/sgeobj/sge_conf.h @@ -180,6 +180,7 @@ bool mconf_get_set_lib_path(); bool mconf_get_inherit_env(); bool mconf_get_enable_hwloc(); int mconf_get_spool_time(); +int mconf_get_sharetree_tick_interval(); int mconf_get_max_ds_deviation(); uint32_t mconf_get_monitor_time(); bool mconf_get_do_accounting(); diff --git a/source/libs/sgeobj/sge_order.h b/source/libs/sgeobj/sge_order.h index 83eaf5aa3..d30ec57d8 100644 --- a/source/libs/sgeobj/sge_order.h +++ b/source/libs/sgeobj/sge_order.h @@ -28,7 +28,7 @@ * * All Rights Reserved. * - * Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH + * Portions of this software are Copyright (c) 2023-2026 HPC-Gridware GmbH * ************************************************************************/ /*___INFO__MARK_END__*/ @@ -41,20 +41,21 @@ * valid values for OR_type */ enum { - ORT_start_job = 1, /* 1*/ - ORT_tickets, /* 2*/ - ORT_ptickets, /* 3*/ - ORT_remove_job, /* 4*/ - ORT_update_project_usage, /* 5*/ - ORT_update_user_usage, /* 6*/ - ORT_share_tree, /* 7*/ - ORT_remove_immediate_job, /* 8*/ - ORT_sched_conf, /* 9*/ - ORT_suspend_on_threshold, /*10*/ - ORT_unsuspend_on_threshold, /*11*/ - ORT_job_schedd_info, /*12*/ - ORT_clear_pri_info /*13*/ /*the ja_task_number field has a special meaning with the order -: */ + /* CS-1239: ORT_remove_job, ORT_update_project_usage, ORT_update_user_usage + * and ORT_share_tree are gone - finished-job booking + bury moved to the + * worker thread (sge_book_finished_job_usage), and master share-tree + * recomputation + sgeE_NEW_SHARETREE emission moved to the qmaster TET + * sharetree publish handler. The scheduler no longer emits any of these + * and no handlers exist in sge_follow.cc. */ + ORT_start_job = 1, + ORT_tickets, + ORT_ptickets, + ORT_remove_immediate_job, + ORT_sched_conf, + ORT_suspend_on_threshold, + ORT_unsuspend_on_threshold, + ORT_job_schedd_info, + ORT_clear_pri_info /*the ja_task_number field has a special meaning with the order: */ /* == 0 : only pending jobs are set to 0*/ /* != 0 : pending and running jobs are set to 0 */ }; diff --git a/source/libs/uti/ocs_UniqueFifo.h b/source/libs/uti/ocs_UniqueFifo.h new file mode 100644 index 000000000..312398c42 --- /dev/null +++ b/source/libs/uti/ocs_UniqueFifo.h @@ -0,0 +1,83 @@ +#pragma once +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include +#include +#include +#include +#include + +namespace ocs { + /** Linked-hash FIFO: a queue that ignores duplicate pushes while preserving + * insertion order for the unique entries. Combines a std::deque (for fair + * drain in arrival order) with an std::unordered_set (for O(1) dedup on + * push); the two backing containers are kept in sync internally and + * cannot be desynced from outside. + * + * Useful when a producer can mark the same key dirty many times before a + * consumer drains it, and the consumer wants each key processed exactly + * once in arrival order - for example the per-user / per-project spool + * fan-in in ocs::SharetreeUsage. + * + * Not thread-safe. Callers serialise access through whatever lock already + * protects the surrounding state (LOCK_GLOBAL in the SharetreeUsage case). + */ + template , typename Equal = std::equal_to> + class UniqueFifo { + public: + /** Enqueue value at the tail if it is not already present. A duplicate + * push is silently dropped. Returns true if value was newly added, + * false if it was already in the queue. + */ + bool push(T value) { + if (!seen_.insert(value).second) { + return false; + } + order_.push_back(std::move(value)); + return true; + } + + /** Remove and return the front element. Caller must ensure !empty(). */ + T pop_front() { + T value = std::move(order_.front()); + order_.pop_front(); + seen_.erase(value); + return value; + } + + bool empty() const noexcept { + return order_.empty(); + } + + std::size_t size() const noexcept { + return order_.size(); + } + + void clear() noexcept { + order_.clear(); + seen_.clear(); + } + + private: + std::deque order_; + std::unordered_set seen_; + }; +} diff --git a/test/daemons/qmaster/CMakeLists.txt b/test/daemons/qmaster/CMakeLists.txt index a43a3f46e..189ade643 100644 --- a/test/daemons/qmaster/CMakeLists.txt +++ b/test/daemons/qmaster/CMakeLists.txt @@ -30,6 +30,8 @@ add_test(NAME test_qmaster_timed_event COMMAND test_qmaster_timed_event) add_executable(test_qmaster_calendar test_qmaster_calendar.cc ../../../source/daemons/qmaster/ocs_CategoryQmaster.cc + ../../../source/daemons/qmaster/ocs_FinishedJob.cc + ../../../source/daemons/qmaster/ocs_SharetreeUsage.cc ../../../source/daemons/qmaster/ocs_ReportingFileWriter.cc ../../../source/daemons/qmaster/ocs_BaseAccountingFileWriter.cc ../../../source/daemons/qmaster/ocs_BaseReportingFileWriter.cc diff --git a/test/libs/sgeobj/CMakeLists.txt b/test/libs/sgeobj/CMakeLists.txt index efce9384e..4008b81bf 100644 --- a/test/libs/sgeobj/CMakeLists.txt +++ b/test/libs/sgeobj/CMakeLists.txt @@ -1,7 +1,7 @@ #___INFO__MARK_BEGIN_NEW__ ########################################################################### # -# Copyright 2024 HPC-Gridware GmbH +# Copyright 2024-2026 HPC-Gridware GmbH # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -70,6 +70,11 @@ target_include_directories(test_sgeobj_role PRIVATE "./") target_link_libraries(test_sgeobj_role PRIVATE sgeobj cull comm commlists uti ${SGE_LIBS}) add_test(NAME test_sgeobj_role COMMAND test_sgeobj_role) +add_executable(test_sgeobj_usage test_sgeobj_usage.cc) +target_include_directories(test_sgeobj_usage PRIVATE "./") +target_link_libraries(test_sgeobj_usage PRIVATE sgeobj cull comm commlists uti ${SGE_LIBS}) +add_test(NAME test_sgeobj_usage COMMAND test_sgeobj_usage) + if (INSTALL_SGE_TEST) install(TARGETS test_sgeobj_eval_expression DESTINATION testbin/${SGE_ARCH}) install(TARGETS test_sgeobj_performance DESTINATION testbin/${SGE_ARCH}) @@ -81,4 +86,5 @@ if (INSTALL_SGE_TEST) install(TARGETS test_sgeobj_schedd_conf DESTINATION testbin/${SGE_ARCH}) install(TARGETS test_sgeobj_utility DESTINATION testbin/${SGE_ARCH}) install(TARGETS test_sgeobj_role DESTINATION testbin/${SGE_ARCH}) + install(TARGETS test_sgeobj_usage DESTINATION testbin/${SGE_ARCH}) endif () diff --git a/test/libs/sgeobj/test_sgeobj_usage.cc b/test/libs/sgeobj/test_sgeobj_usage.cc new file mode 100644 index 000000000..01dcd5304 --- /dev/null +++ b/test/libs/sgeobj/test_sgeobj_usage.cc @@ -0,0 +1,277 @@ +/*___INFO__MARK_BEGIN_NEW__*/ +/*************************************************************************** + * + * Copyright 2026 HPC-Gridware GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************************/ +/*___INFO__MARK_END_NEW__*/ + +#include +#include +#include + +#include "uti/sge_component.h" +#include "uti/sge_profiling.h" +#include "uti/sge_rmon_macros.h" +#include "uti/sge_time.h" + +#include "cull/cull.h" + +#include "sgeobj/cull/sge_all_listsL.h" +#include "sgeobj/ocs_Usage.h" +#include "sgeobj/ocs_UserProject.h" + +/* CS-1239 step 10: pure-function unit tests for the usage primitives the + * worker-thread booking helper (sge_book_finished_job_usage) and the TET + * decay handler stand on. We test ocs::Usage::sum_usage and + * ocs::UserProject::decay_userprj_usage directly because they live in + * sgeobj with no qmaster / event-master dependency - the orchestration + * around them (sge_event_spool, sge_commit_job, dirty-FIFO mark) is + * exercised end-to-end by the testsuite check + * system_tests/scheduler/sharetree_usage_booking. */ + +static int s_fail = 0; + +#define CHECK(id, label, expr) \ + do { \ + if (!(expr)) { \ + printf("FAIL [T%02d] %s\n", (id), (label)); \ + ++s_fail; \ + } else { \ + printf("ok [T%02d] %s\n", (id), (label)); \ + } \ + } while (0) + +#define CHECK_NEAR(id, label, actual, expected, tol) \ + do { \ + const double a = (actual); \ + const double e = (expected); \ + if (std::fabs(a - e) > (tol)) { \ + printf("FAIL [T%02d] %s (got %.6f, expected %.6f +/- %.6f)\n", \ + (id), (label), a, e, (double)(tol)); \ + ++s_fail; \ + } else { \ + printf("ok [T%02d] %s (%.6f)\n", (id), (label), a); \ + } \ + } while (0) + +/* Helpers ----------------------------------------------------------------- */ + +/* Build a single-attribute scaled-usage list with cpu=value. Used both as + * JAT_scaled_usage_list on the ja_task we feed to sum_usage and as + * UU_usage on the seeded user for decay tests. */ +static lList * +make_usage_list(const char *list_name, const char *attr, double value) { + lList *l = lCreateList(list_name, UA_Type); + lListElem *e = lAddElemStr(&l, UA_name, attr, UA_Type); + lSetDouble(e, UA_value, value); + return l; +} + +static lListElem * +make_user(const char *name) { + lListElem *u = lCreateElem(UU_Type); + lSetString(u, UU_name, name); + return u; +} + +static lListElem * +make_project(const char *name) { + lListElem *p = lCreateElem(PR_Type); + lSetString(p, PR_name, name); + return p; +} + +/* Build a 1-task job. JB_job_number is what sum_usage keys debited entries + * by; ja_task carries the JAT_scaled_usage_list that becomes the finish + * delta. */ +static lListElem * +make_job(uint32_t job_number, const char *owner, const char *project, + double cpu) { + lListElem *job = lCreateElem(JB_Type); + lSetUlong(job, JB_job_number, job_number); + lSetString(job, JB_owner, owner); + if (project != nullptr) { + lSetString(job, JB_project, project); + } + + lList *ja_tasks = lCreateList("ja_tasks", JAT_Type); + lListElem *ja_task = lCreateElem(JAT_Type); + lSetUlong(ja_task, JAT_task_number, 1); + lSetList(ja_task, JAT_scaled_usage_list, make_usage_list("scaled", "cpu", cpu)); + lAppendElem(ja_tasks, ja_task); + lSetList(job, JB_ja_tasks, ja_tasks); + + return job; +} + +static double +get_usage_value(const lListElem *parent, int list_field, const char *attr) { + const lList *l = lGetList(parent, list_field); + if (l == nullptr) { + return -1.0; + } + const lListElem *e = lGetElemStr(l, UA_name, attr); + if (e == nullptr) { + return -1.0; + } + return lGetDouble(e, UA_value); +} + +/* Test 1-4: sum_usage for a user-only job -------------------------------- */ + +static void +test_sum_usage_user_only(int *id) { + printf("\n--- sum_usage: user-only job ---\n"); + + lListElem *user = make_user("u1"); + lListElem *job = make_job(/*job_number=*/1, /*owner=*/"u1", + /*project=*/nullptr, /*cpu=*/10.0); + lListElem *ja_task = lFirstRW(lGetListRW(job, JB_ja_tasks)); + + ocs::Usage::sum_usage(job, ja_task, user, /*project=*/nullptr, + /*usage_weight_list=*/nullptr); + + CHECK_NEAR((*id)++, "UU_usage cpu == 10 after first finish", + get_usage_value(user, UU_usage, "cpu"), 10.0, 1e-9); + CHECK_NEAR((*id)++, "UU_long_term_usage cpu == 10", + get_usage_value(user, UU_long_term_usage, "cpu"), 10.0, 1e-9); + + /* CS-1239 sharetree routing invariant: user-only job does NOT populate + * UU_project[*].UPP_usage. */ + CHECK((*id)++, "UU_project remains empty for user-only job", + lGetList(user, UU_project) == nullptr || + lGetNumberOfElem(lGetList(user, UU_project)) == 0); + + /* Debited-job entry for the finished job should be present so the + * worker thread's drop_debited_job step has something to remove. */ + const lList *debited = lGetList(user, UU_debited_job_usage); + const lListElem *upu = (debited != nullptr) ? + lGetElemUlong(debited, UPU_job_number, 1) : nullptr; + CHECK((*id)++, "UU_debited_job_usage contains job 1", upu != nullptr); + + lFreeElem(&user); + lFreeElem(&job); +} + +/* Test 5-9: sum_usage for a user+project job ----------------------------- */ + +static void +test_sum_usage_user_and_project(int *id) { + printf("\n--- sum_usage: user+project job ---\n"); + + lListElem *user = make_user("u1"); + lListElem *project = make_project("p1"); + lListElem *job = make_job(/*job_number=*/2, /*owner=*/"u1", + /*project=*/"p1", /*cpu=*/7.0); + lListElem *ja_task = lFirstRW(lGetListRW(job, JB_ja_tasks)); + + ocs::Usage::sum_usage(job, ja_task, user, project, + /*usage_weight_list=*/nullptr); + + /* PR_ side gets the full delta. */ + CHECK_NEAR((*id)++, "PR_usage cpu == 7 after first finish", + get_usage_value(project, PR_usage, "cpu"), 7.0, 1e-9); + CHECK_NEAR((*id)++, "PR_long_term_usage cpu == 7", + get_usage_value(project, PR_long_term_usage, "cpu"), 7.0, 1e-9); + + /* CS-1239 sharetree routing: project-bound finish populates + * UU_project[p1].UPP_usage, NOT UU_usage. */ + const lList *upp_list = lGetList(user, UU_project); + const lListElem *upp = (upp_list != nullptr) ? + lGetElemStr(upp_list, UPP_name, "p1") : nullptr; + CHECK((*id)++, "UU_project[p1] entry created", upp != nullptr); + if (upp != nullptr) { + CHECK_NEAR((*id)++, "UU_project[p1].UPP_usage cpu == 7", + get_usage_value(upp, UPP_usage, "cpu"), 7.0, 1e-9); + } else { + ++(*id); + } + CHECK((*id)++, "UU_usage stays empty for project-bound finish", + lGetList(user, UU_usage) == nullptr || + get_usage_value(user, UU_usage, "cpu") <= 0.0); + + /* Debited entry recorded on the user object (sum_usage routes debited + * to UU_ when user is non-null). */ + const lList *debited = lGetList(user, UU_debited_job_usage); + const lListElem *upu = (debited != nullptr) ? + lGetElemUlong(debited, UPU_job_number, 2) : nullptr; + CHECK((*id)++, "UU_debited_job_usage contains job 2", upu != nullptr); + + lFreeElem(&user); + lFreeElem(&project); + lFreeElem(&job); +} + +/* Test 10-12: decay_userprj_usage over wallclock time -------------------- */ + +static void +test_decay_over_interval(int *id) { + printf("\n--- decay_userprj_usage: halftime-based decay ---\n"); + + /* Seed a user with cpu=100 stamped at t0. */ + lListElem *user = make_user("u1"); + lSetList(user, UU_usage, make_usage_list("usage", "cpu", 100.0)); + + const uint64_t t0 = sge_get_gmt64(); + lSetUlong64(user, UU_usage_time_stamp, t0); + + /* 1-hour halftime via the default decay constant; nullptr decay_list + * routes through the same default. */ + ocs::Usage::calculate_default_decay_constant(/*halftime_hours=*/1); + + /* Advance time by exactly one halftime - cpu should halve. */ + const uint64_t t1 = t0 + sge_gmt32_to_gmt64(3600); + ocs::UserProject::decay_userprj_usage(user, /*is_user=*/true, + /*decay_list=*/nullptr, + /*seqno=*/1, t1); + + /* 5% tolerance: the decay primitive uses an exponential-equivalent + * approximation, not a closed-form halving. */ + CHECK_NEAR((*id)++, "UU_usage cpu ~= 50 after one halftime", + get_usage_value(user, UU_usage, "cpu"), 50.0, 2.5); + + /* Same seqno again: idempotent, no further decay. */ + const double after_first = get_usage_value(user, UU_usage, "cpu"); + ocs::UserProject::decay_userprj_usage(user, true, nullptr, 1, t1); + CHECK_NEAR((*id)++, "second call with same seqno is a no-op", + get_usage_value(user, UU_usage, "cpu"), after_first, 1e-9); + + /* New seqno + another halftime: cpu should halve again (~25). */ + const uint64_t t2 = t1 + sge_gmt32_to_gmt64(3600); + ocs::UserProject::decay_userprj_usage(user, true, nullptr, /*seqno=*/2, t2); + CHECK_NEAR((*id)++, "UU_usage cpu ~= 25 after two halftimes", + get_usage_value(user, UU_usage, "cpu"), 25.0, 1.5); + + lFreeElem(&user); +} + +/* main ------------------------------------------------------------------- */ + +int +main(int /*argc*/, char * /*argv*/[]) { + DENTER_MAIN(TOP_LAYER, "test_sgeobj_usage"); + component_set_daemonized(true); + sge_prof_set_enabled(false); + lInit(nmv); + + int id = 1; + test_sum_usage_user_only(&id); + test_sum_usage_user_and_project(&id); + test_decay_over_interval(&id); + + printf("\n--- summary: %s ---\n", s_fail == 0 ? "all green" : "FAILURES"); + return s_fail; +}