Skip to content

Commit 7d95a9e

Browse files
Enhance executeJobsAcrossNodeSources: specify concurrent dispatch to chosen or auto-detected nodes and metrics gathering (#69)
1 parent 10134cd commit 7d95a9e

1 file changed

Lines changed: 49 additions & 18 deletions

File tree

proactive/ProactiveGateway.py

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -982,29 +982,62 @@ def getJobResultMap(self, job_id, timeout=60000):
982982
return self.proactive_scheduler_client.waitForJob(str(job_id), timeout).getResultMap()
983983

984984
def executeJobsAcrossNodeSources(self, proactive_jobs, node_sources=None):
985+
"""
986+
Executes the given proactive jobs on all available node sources or on a specified subset.
987+
988+
Args:
989+
proactive_jobs (iterable): Jobs to be executed (must not be empty).
990+
node_sources (list[str], optional): Specific node source names to use. If None, will auto-discover all available sources.
991+
992+
Returns:
993+
List[dict]: A list of result dictionaries, each containing:
994+
- 'job_id': the ID of the job
995+
- 'job_state': final state of the job ('FINISHED', 'CANCELED', or 'FAILED')
996+
- 'hardware_metrics': dict with 'cpu_usage' and 'ram_usage' averaged over the job duration
997+
998+
Raises:
999+
ValueError: If proactive_jobs is empty, or node_sources is provided but empty.
1000+
RuntimeError: If no node sources are available or none of the specified sources are valid.
1001+
"""
9851002
if not proactive_jobs:
9861003
raise ValueError("proactive_jobs cannot be empty")
9871004

988-
if node_sources:
989-
logging.info(f'Executing {len(proactive_jobs)} jobs across {len(node_sources)} node sources')
990-
else:
991-
logging.info(f'Executing {len(proactive_jobs)} jobs with default node source behavior')
992-
9931005
monitoring_client = self.getProactiveMonitoringClient()
9941006
job_results = []
9951007
job_queue = list(proactive_jobs)
9961008
active_jobs = {}
997-
target_sources = node_sources if node_sources else [f"default-{i}" for i in range(len(proactive_jobs))]
9981009

999-
while job_queue or active_jobs:
1000-
logging.debug(f'Jobs in queue: {len(job_queue)}, Active jobs: {len(active_jobs)}')
1010+
# Get available node sources
1011+
available_nodes = monitoring_client.list_proactive_jmx_urls()
1012+
available_sources_set = {node["nodeSource"] for node in available_nodes}
1013+
1014+
if node_sources is None:
1015+
target_sources = list(available_sources_set)
1016+
if not target_sources:
1017+
raise RuntimeError("No available node sources were found.")
1018+
logging.info("Detected %d available node source%s: %s", len(target_sources), "" if len(target_sources) == 1 else "s", ", ".join(target_sources))
1019+
else:
1020+
if not node_sources:
1021+
raise ValueError("The node_sources list is empty.")
1022+
1023+
# Filter only those that are actually available
1024+
valid_sources = [src for src in node_sources if src in available_sources_set]
1025+
invalid_sources = [src for src in node_sources if src not in available_sources_set]
1026+
1027+
for src in invalid_sources:
1028+
logging.warning(f"Node source '{src}' is not available and will be skipped.")
10011029

1030+
if not valid_sources:
1031+
raise RuntimeError("None of the specified node sources are currently available.")
1032+
1033+
target_sources = valid_sources
1034+
logging.info(f"Using valid node sources: {target_sources}")
1035+
1036+
while job_queue or active_jobs:
10021037
completed_sources = []
10031038
for source, job_id in active_jobs.items():
10041039
job_status = self.getJobStatus(job_id)
10051040
if job_status.upper() in ["FINISHED", "CANCELED", "FAILED"]:
1006-
logging.info(f'Job {job_id} on source {source} completed with status: {job_status}')
1007-
10081041
metrics = {}
10091042
job_info = self.getJobInfo(job_id)
10101043
start_time = job_info.getStartTime()
@@ -1041,23 +1074,21 @@ def executeJobsAcrossNodeSources(self, proactive_jobs, node_sources=None):
10411074
for source in completed_sources:
10421075
del active_jobs[source]
10431076

1044-
available_sources = [ns for ns in target_sources if ns not in active_jobs]
1077+
available_slots = [ns for ns in target_sources if ns not in active_jobs]
10451078

1046-
for source in available_sources:
1079+
for source in available_slots:
10471080
if not job_queue:
10481081
break
10491082

10501083
job = job_queue.pop(0)
1051-
if node_sources:
1052-
job.addGenericInformation("NODE_SOURCE", source)
1053-
1084+
job.addGenericInformation("NODE_SOURCE", source)
10541085
try:
1055-
logging.info(f'Submitting job {job.getJobName()} {"to " + source if node_sources else ""}')
1086+
logging.info(f'Submitting job {job.getJobName()} to {source}')
10561087
job_id = self.submitJob(job)
10571088
active_jobs[source] = job_id
1058-
logging.info(f'Job {job_id} submitted {"to " + source if node_sources else ""}')
1089+
logging.info(f'Job {job_id} submitted to {source}')
10591090
except Exception as e:
1060-
logging.error(f'Error submitting job {"to " + source if node_sources else ""}: {e}')
1091+
logging.error(f'Error submitting job to {source}: {e}')
10611092
job_queue.append(job)
10621093

10631094
if job_queue or active_jobs:

0 commit comments

Comments
 (0)