Skip to content

Commit fd07c70

Browse files
committed
nit: do not have uri as an argument
This is a different use case than flux-mcp, and arguably we should not be exposing extra information about the cluster. Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 17d0cf6 commit fd07c70

2 files changed

Lines changed: 27 additions & 17 deletions

File tree

resource_secretary/providers/workload/flux.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
import shlex
32
import time
43
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union
54

@@ -9,7 +8,7 @@
98

109
JobSubmissionResult = Annotated[
1110
Dict[str, Any],
12-
"A dictionary containing 'success' (bool), 'error' (str or None), 'job_id' (int or None), and 'uri' (str).",
11+
"A dictionary containing 'success' (bool), 'error' (str or None), and 'job_id' (int or None).",
1312
]
1413

1514
JobActionResponse = Annotated[
@@ -32,7 +31,9 @@ class FluxProvider(BaseProvider):
3231
"""
3332
The Flux provider interacts with the Flux Framework using native Python bindings.
3433
It provides modular tools for deep investigation of scheduler state,
35-
resource utilization, and queue parameters.
34+
resource utilization, and queue parameters. Unlike flux-mcp, for now we
35+
are explicitly setting the handle on probe and removing from dispatch
36+
functions, assuming this will just serve a single probed handle. That can change.
3637
"""
3738

3839
def __init__(self):
@@ -194,7 +195,6 @@ def get_resource_utilization_stats(self) -> Dict[str, Any]:
194195
def submit_job(
195196
self,
196197
command: List[str],
197-
uri: Optional[str] = None,
198198
num_tasks: int = 1,
199199
cores_per_task: int = 1,
200200
gpus_per_task: Optional[int] = None,
@@ -221,7 +221,6 @@ def submit_job(
221221
222222
Args:
223223
command: Command to execute (iterable of strings).
224-
uri: Optional Flux URI. If not provided, uses local instance.
225224
num_tasks: Number of tasks to create.
226225
cores_per_task: Number of cores to allocate per task.
227226
gpus_per_task: Number of GPUs to allocate per task.
@@ -231,8 +230,8 @@ def submit_job(
231230
environment: Mapping of environment variables for the job.
232231
env_expand: Mapping of environment variables containing mustache templates.
233232
cwd: Set the current working directory for the job.
234-
cpu_affinity: Set the cpu affinity (e.g., per-task)
235-
gpu_affinity: Set the gpu affinity (e.g., per-task)
233+
cpu_affinity: Set the cpu affinity (support for per-task)
234+
gpu_affinity: Set the gpu affinity (support for per-task)
236235
rlimits: Mapping of process resource limits (e.g. {"nofile": 12000}).
237236
name: Set a custom job name.
238237
input: Path to a file for job input.
@@ -248,6 +247,21 @@ def submit_job(
248247
"""
249248
import flux.job
250249

250+
# Sets this wrong a lot
251+
affinity_options = ["null", "None", "per-task"]
252+
if gpu_affinity and isinstance(gpu_affinity, str) and gpu_affinity not in affinity_options:
253+
return {
254+
"success": False,
255+
"error": "gpu_affinity must be unset or set to per-task",
256+
"job_id": None,
257+
}
258+
if cpu_affinity and isinstance(cpu_affinity, str) and cpu_affinity not in affinity_options:
259+
return {
260+
"success": False,
261+
"error": "cpu_affinity must be unset or set to per-task",
262+
"job_id": None,
263+
}
264+
251265
try:
252266
jobspec = flux.job.JobspecV1.from_command(
253267
command=utils.ensure_command(command),
@@ -300,18 +314,17 @@ def submit_job(
300314
jobspec.name = name
301315

302316
jobid = flux.job.submit(self.handle, jobspec)
303-
return {"success": True, "error": None, "job_id": int(jobid), "uri": uri or "local"}
317+
return {"success": True, "error": None, "job_id": int(jobid)}
304318
except Exception as e:
305-
return {"success": False, "error": str(e), "job_id": None, "uri": uri or "local"}
319+
return {"success": False, "error": str(e), "job_id": None}
306320

307321
@dispatch_tool
308-
def cancel_job(self, job_id: Union[int, str], uri: Optional[str] = None) -> JobActionResponse:
322+
def cancel_job(self, job_id: Union[int, str]) -> JobActionResponse:
309323
"""
310324
Cancels a specific Flux job.
311325
312326
Args:
313327
job_id: The ID of the job to cancel.
314-
uri: Optional Flux URI.
315328
"""
316329
import flux.job
317330

@@ -327,13 +340,12 @@ def cancel_job(self, job_id: Union[int, str], uri: Optional[str] = None) -> JobA
327340
return {"success": False, "error": str(e), "message": "Cancellation had an error."}
328341

329342
@dispatch_tool
330-
def get_job_info(self, job_id: Union[int, str], uri: Optional[str] = None) -> JobInfoResult:
343+
def get_job_info(self, job_id: Union[int, str]) -> JobInfoResult:
331344
"""
332345
Retrieves status and metadata about a specific job.
333346
334347
Args:
335348
job_id: The ID of the job.
336-
uri: Optional Flux URI.
337349
"""
338350
import flux.job
339351

@@ -345,9 +357,7 @@ def get_job_info(self, job_id: Union[int, str], uri: Optional[str] = None) -> Jo
345357
return {"success": False, "error": str(e), "info": None}
346358

347359
@dispatch_tool
348-
def get_job_logs(
349-
self, job_id: Union[int, str], uri: Optional[str] = None, delay: Optional[int] = None
350-
) -> LogLinesResult:
360+
def get_job_logs(self, job_id: Union[int, str], delay: Optional[int] = None) -> LogLinesResult:
351361
"""
352362
Retrieves the output logs (stdout/stderr) associated with a specific Flux job.
353363
If you set the delay, it will cut early and you may not get a complete log.
@@ -358,7 +368,6 @@ def get_job_logs(
358368
359369
Args:
360370
job_id: The unique identifier of the job (integer or f58 string).
361-
uri: Optional Flux handle URI. If omitted, connects to the local instance.
362371
delay: The maximum time in seconds to spend collecting logs. If None,
363372
the function blocks until the job event stream is closed.
364373

resource_secretary/utils/text.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def ensure_dict(obj):
9494
return extract_code_block(obj)
9595
return obj
9696

97+
9798
def extract_code_block(text):
9899
"""
99100
Match block of code, assuming llm returns as markdown or code block.

0 commit comments

Comments
 (0)