Skip to content
Merged
34 changes: 34 additions & 0 deletions doc/markdown/man/man5/sge_conf.md
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,40 @@ Examples:
usage_patterns=gpu:nvidia.*|amd.*
usage_patterns=gpu:nvidia.*;power:power-*

***online_usage***

When set to a non-empty value, xxQS_NAMExx writes a continuous stream of *online_usage* records to the
JSONL reporting file. One record is generated for every job report received by xxqs_name_sxx_qmaster(8)
from xxqs_name_sxx_execd(8) for a running job. The value of this parameter selects which usage variables
shall be included in each record. The format is:

<var>[|<var>[|...]]

Variables are separated by `|`. There is no closed list of accepted names: any token that is a valid
xxQS_NAMExx *complex_name* (see xxqs_name_sxx_types(1)) is accepted by the configuration parser.
A record will carry only those configured variables that the execution daemon actually reports for
the job — names the execd does not report are silently skipped. The set of variables reported by
xxqs_name_sxx_execd(8) today includes *cpu*, *mem*, *io*, *iow*, *ioops*, *vmem*, *maxvmem*, *rss*,
*maxrss*, *pss*, *maxpss*, *smem*, *pmem*, *wallclock*.

Examples:

online_usage=cpu|mem|maxvmem|wallclock
online_usage=cpu|mem|io|iow|ioops|vmem|rss|maxrss

If the parameter is absent or set to an empty value, no *online_usage* records will be written. This
is the default — the feature must be opted into explicitly.

For tightly-integrated parallel jobs the granularity of *online_usage* records follows the same
convention as the accounting record: if the parallel environment has *accounting_summary* set to
*TRUE*, one record per ja_task is written and pe_task usage is aggregated into it; otherwise one
record is written per pe_task (and one for the master task).

*online_usage* records are written only to the new JSONL reporting file. The deprecated colon-separated
reporting file (see *old_reporting*) does not carry *online_usage* records.

See xxqs_name_sxx_reporting(5) for the *online_usage* record schema.

## finished_jobs

Note: Deprecated, may be removed in future release. xxQS_NAMExx stores a certain number of *just finished* jobs to
Expand Down
38 changes: 38 additions & 0 deletions doc/markdown/man/man5/sge_reporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,44 @@ fine-grained resource usage monitoring over time.

For the contents and structure of the accounting records see xxqs_name_sxx_accounting(5).

## online_usage

Records of type online_usage are written for every job report that xxqs_name_sxx_qmaster(8) receives
from xxqs_name_sxx_execd(8) for a running job, when *online_usage* in *reporting_params* has been
configured with a non-empty list of usage variables (see xxqs_name_sxx_sge_conf(5)). They provide a
continuous time series of scaled usage values per running job and are intended for live monitoring
and for being consumed by a downstream database writer.

For tightly-integrated parallel jobs, the granularity is driven by the parallel environment's
*accounting_summary* attribute (see xxqs_name_sxx_sge_pe(5)):

* `accounting_summary=TRUE` — one record per ja_task. Usage values are aggregated across all pe_tasks
of the ja_task — the same convention the *acct* record follows.
* `accounting_summary=FALSE` — one record per pe_task and one for the master task.

online_usage records have the following fields:

* job_number
The job number.

* task_number
The array task id. 0 for non-array jobs, otherwise the index of the array task.

* pe_taskid
The task id of a parallel sub-task. Present only when the record describes the usage of a single
pe_task. Absent when the record represents the master task or an aggregated ja_task-level record
for a PE with *accounting_summary=TRUE*.

* usage
A JSON object containing the usage variables configured in
`reporting_params=...online_usage=<var>[|<var>...]` that are present in the job report. Each entry
is the scaled usage value of one variable. Variables that are configured but not yet reported by
the execd are silently skipped (no `null` value is emitted). For the list of valid variable names
see the description of *online_usage* in xxqs_name_sxx_sge_conf(5).

The online_usage record type is emitted only to the JSONL reporting file. The deprecated colon-separated
reporting format does not carry online_usage records.

## queue

Records of type queue contain state information for queues (queue instances). A queue record has the following fields:
Expand Down
23 changes: 23 additions & 0 deletions source/daemons/qmaster/configuration_qmaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <string>
#include <vector>
#include <unistd.h>
#include <pwd.h>

Expand Down Expand Up @@ -480,6 +482,27 @@ check_config(lList **alpp, lListElem *conf) {
answer_list_add(alpp, SGE_EVENT, STATUS_EEXIST, ANSWER_QUALITY_ERROR);
DRETURN(STATUS_EEXIST);
}
} else if (!strcmp(name, "reporting_params")) {
// Validate parameters whose grammar we can fail-fast on. Today this
// covers only online_usage; other reporting_params tokens are
// sanity-checked at merge time in sge_conf.cc::merge_configuration().
struct saved_vars_s *context = nullptr;
bool rp_valid = true;
for (const char *s = sge_strtok_r(value, PARAMS_DELIMITER, &context);
rp_valid && s != nullptr;
s = sge_strtok_r(nullptr, PARAMS_DELIMITER, &context)) {
std::string online_usage_str;
if (parse_string_param(s, "online_usage", online_usage_str)) {
std::vector<std::string> discard;
if (!parse_online_usage_value(alpp, online_usage_str.c_str(), discard)) {
rp_valid = false;
}
}
}
sge_free_saved_vars(context);
if (!rp_valid) {
DRETURN(STATUS_EEXIST);
}
} else if (!strcmp(name, "admin_user")) {
struct passwd pw_struct;
char *buffer;
Expand Down
29 changes: 28 additions & 1 deletion source/daemons/qmaster/job_report_qmaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* All Rights Reserved.
*
* Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH
* Portions of this software are Copyright (c) 2023-2026 HPC-Gridware GmbH
*
************************************************************************/
/*___INFO__MARK_END__*/
Expand All @@ -43,6 +43,7 @@
#include "sgeobj/ocs_DataStore.h"
#include "sgeobj/sge_answer.h"
#include "sgeobj/sge_ja_task.h"
#include "sgeobj/sge_pe.h"
#include "sgeobj/sge_pe_task.h"
#include "sgeobj/sge_usage.h"
#include "sgeobj/sge_host.h"
Expand Down Expand Up @@ -383,6 +384,32 @@ void process_job_report(lListElem *report, lListElem *hep, char *rhost, char *co
}
}

/*
* online_usage record: emit a JSONL snapshot of the scaled
* usage on every job report from execd, gated by the
* reporting_params=online_usage list. Honour the PE's
* accounting_summary flag with the same convention as
* sge_write_rusage / create_acct_records:
* - pe_task JR + accounting_summary TRUE -> skip
* (aggregated record will be written on the master JR)
* - pe_task JR + accounting_summary FALSE -> per-pe_task
* - master JR -> ja_task,
* summing pe_task usage when accounting_summary TRUE
*/
if (ocs::ReportingFileWriter::is_online_usage_required()) {
const bool do_accounting_summary =
pe_do_accounting_summary(lGetObject(jatep, JAT_pe_object));
if (pe_task_id_str != nullptr) {
if (petask != nullptr && !do_accounting_summary) {
ocs::ReportingFileWriter::create_online_usage_records(nullptr, jr, jep, jatep,
petask, false);
}
} else {
ocs::ReportingFileWriter::create_online_usage_records(nullptr, jr, jep, jatep,
nullptr, do_accounting_summary);
}
}

/*
* once a day write an intermediate usage record to the
* reporting file to have correct daily usage reporting with
Expand Down
123 changes: 123 additions & 0 deletions source/daemons/qmaster/ocs_JsonReportingFileWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "sgeobj/sge_qinstance.h"
#include "sgeobj/sge_qinstance_state.h"
#include "sgeobj/sge_str.h"
#include "sgeobj/sge_usage.h"

#include "uti/sge_lock.h"
#include "uti/sge_log.h"
Expand Down Expand Up @@ -76,6 +77,128 @@ namespace ocs {
DRETURN(ret);
}

/**
* Emit a single JSONL online_usage record for the running job.
*
* When pe_task is non-null the record carries that pe_task's scaled
* usage and a pe_taskid field. When pe_task is null the record carries
* the ja_task's scaled usage; if aggregate_pe_tasks is set, scaled
* usage of every pe_task in `ja_task->JAT_task_list` is summed into the
* ja_task value (matching the convention used by sge_write_rusage when
* the PE has `accounting_summary` set). Only the variables listed in
* `reporting_params=online_usage=...` are emitted under `usage`;
* variables missing from the scaled usage list are silently skipped
* (no `null` field).
*
* If none of the configured variables have any data yet (e.g. right
* after job start, before the execd has produced its first usage
* report), no record is emitted at all — empty-`usage` records would
* clutter the reporting file without adding information.
*
* @see ReportingFileWriter::create_online_usage_records()
*/
bool
JsonReportingFileWriter::create_online_usage_record(lList **answer_list, lListElem *job_report,
lListElem *job, lListElem *ja_task, lListElem *pe_task,
bool aggregate_pe_tasks) {
DENTER(TOP_LAYER);

if (job == nullptr || ja_task == nullptr) {
DRETURN(true);
}

// take a snapshot of the configured variable names
std::vector<std::string> vars;
sge_mutex_lock(config_mutex_name.c_str(), __func__, __LINE__, &config_mutex);
vars = online_usage_vars;
sge_mutex_unlock(config_mutex_name.c_str(), __func__, __LINE__, &config_mutex);

if (vars.empty()) {
DRETURN(true);
}

// pe_task != nullptr: report scaled usage of the pe_task,
// otherwise the (possibly aggregated) scaled usage on the ja_task
const lList *primary_usage = (pe_task != nullptr)
? lGetList(pe_task, PET_scaled_usage)
: lGetList(ja_task, JAT_scaled_usage_list);

// aggregation only applies to ja_task-level records
const bool aggregate = (pe_task == nullptr) && aggregate_pe_tasks;

// Suppress empty-usage records: if none of the configured variables
// have data yet, do not emit anything.
bool any_data = false;
for (const auto &name : vars) {
if (lGetElemStr(primary_usage, UA_name, name.c_str()) != nullptr) {
any_data = true;
break;
}
if (aggregate) {
const lListElem *pe_task_iter;
for_each_ep(pe_task_iter, lGetList(ja_task, JAT_task_list)) {
if (lGetSubStr(pe_task_iter, UA_name, name.c_str(), PET_scaled_usage) != nullptr) {
any_data = true;
break;
}
}
if (any_data) {
break;
}
}
}
if (!any_data) {
DRETURN(true);
}

rapidjson::StringBuffer stringBuffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(stringBuffer);

writer.StartObject();
write_json(writer, "time", sge_get_gmt64());
write_json(writer, "type", "online_usage");
write_json(writer, "job_number", lGetUlong(job, JB_job_number));
int task_number = job_is_array(job) ? (int) lGetUlong(ja_task, JAT_task_number) : 0;
write_json(writer, "task_number", task_number);
if (pe_task != nullptr) {
write_json(writer, "pe_taskid", lGetString(pe_task, PET_id));
}

writer.Key("usage");
writer.StartObject();
for (const auto &name : vars) {
double total = 0.0;
bool found_any = false;

const lListElem *u = lGetElemStr(primary_usage, UA_name, name.c_str());
if (u != nullptr) {
total = lGetDouble(u, UA_value);
found_any = true;
}

if (aggregate) {
const lListElem *pe_task_iter;
for_each_ep(pe_task_iter, lGetList(ja_task, JAT_task_list)) {
const lListElem *pu = lGetSubStr(pe_task_iter, UA_name, name.c_str(), PET_scaled_usage);
if (pu != nullptr) {
total += lGetDouble(pu, UA_value);
found_any = true;
}
}
}

if (found_any) {
write_json(writer, name.c_str(), total);
}
}
writer.EndObject();

writer.EndObject();
create_record(stringBuffer);

DRETURN(true);
}

void
JsonReportingFileWriter::create_record(rapidjson::StringBuffer &stringBuffer) {
stringBuffer.Put('\n');
Expand Down
6 changes: 5 additions & 1 deletion source/daemons/qmaster/ocs_JsonReportingFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/*___INFO__MARK_BEGIN_NEW__*/
/***************************************************************************
*
* Copyright 2024 HPC-Gridware GmbH
* Copyright 2024,2026 HPC-Gridware GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,10 @@ namespace ocs {
create_acct_record(lList **answer_list, lListElem *job_report, lListElem *job,
lListElem *ja_task, bool intermediate) override;

bool
create_online_usage_record(lList **answer_list, lListElem *job_report, lListElem *job,
lListElem *ja_task, lListElem *pe_task, bool aggregate_pe_tasks) override;

bool
create_host_record(lList **answer_list, const lListElem *host, u_long64 report_time) override;

Expand Down
Loading