Skip to content

Commit 54a2d0f

Browse files
committed
Trying with OpenFOAM requirements
1 parent 239815f commit 54a2d0f

1 file changed

Lines changed: 148 additions & 85 deletions

File tree

dapi/jobs.py

Lines changed: 148 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@
3232
]
3333

3434

35-
# --- generate_job_request function (Production Ready) ---
3635
def generate_job_request(
3736
tapis_client: Tapis,
3837
app_id: str,
3938
input_dir_uri: str,
40-
script_filename: Optional[str] = None,
39+
script_filename: Optional[
40+
str
41+
] = None, # Default is None, important for apps like OpenFOAM
4142
app_version: Optional[str] = None,
4243
job_name: Optional[str] = None,
4344
description: Optional[str] = None,
@@ -55,21 +56,23 @@ def generate_job_request(
5556
extra_env_vars: Optional[List[Dict[str, Any]]] = None,
5657
extra_scheduler_options: Optional[List[Dict[str, Any]]] = None,
5758
script_param_names: List[str] = ["Input Script", "Main Script", "tclScript"],
58-
input_dir_param_name: str = "Input Directory",
59+
input_dir_param_name: str = "Input Directory", # Caller MUST override if app uses a different name (e.g., "Case Directory")
5960
allocation_param_name: str = "TACC Allocation",
6061
) -> Dict[str, Any]:
6162
"""Generate a Tapis job request dictionary based on app definition and inputs.
6263
6364
Creates a properly formatted job request dictionary by retrieving the specified
6465
application details and applying user-provided overrides and additional parameters.
65-
The function automatically maps the script filename and input directory to the
66-
appropriate app parameters.
66+
The function automatically maps the script filename (if provided) and input
67+
directory to the appropriate app parameters.
6768
6869
Args:
6970
tapis_client (Tapis): Authenticated Tapis client instance.
7071
app_id (str): The ID of the Tapis application to use for the job.
7172
input_dir_uri (str): Tapis URI to the input directory containing job files.
72-
script_filename (str): Name of the main script file to execute.
73+
script_filename (str, optional): Name of the main script file to execute.
74+
If None (default), no script parameter is added. This is suitable for
75+
apps like OpenFOAM that don't take a script argument.
7376
app_version (str, optional): Specific app version to use. If None, uses latest.
7477
job_name (str, optional): Custom job name. If None, auto-generates based on app ID and timestamp.
7578
description (str, optional): Job description. If None, uses app description.
@@ -87,36 +90,28 @@ def generate_job_request(
8790
defaults to "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}".
8891
extra_file_inputs (List[Dict[str, Any]], optional): Additional file inputs beyond the main input directory.
8992
extra_app_args (List[Dict[str, Any]], optional): Additional application arguments.
93+
Use for parameters expected in 'appArgs' by the Tapis app.
9094
extra_env_vars (List[Dict[str, Any]], optional): Additional environment variables.
95+
Use for parameters expected in 'envVariables' by the Tapis app (e.g., OpenFOAM solver, mesh).
96+
Each item should be a dict like {"key": "VAR_NAME", "value": "var_value"}.
9197
extra_scheduler_options (List[Dict[str, Any]], optional): Additional scheduler options.
92-
script_param_names (List[str], optional): Parameter names to check for script placement.
93-
Defaults to ["Input Script", "Main Script", "tclScript"].
94-
input_dir_param_name (str, optional): Parameter name for input directory.
95-
Defaults to "Input Directory".
98+
script_param_names (List[str], optional): Parameter names/keys to check for script placement
99+
if script_filename is provided. Defaults to ["Input Script", "Main Script", "tclScript"].
100+
input_dir_param_name (str, optional): The 'name' of the fileInput in the Tapis app definition
101+
that corresponds to input_dir_uri. Defaults to "Input Directory".
102+
CRITICAL: Must match the app's definition (e.g., "Case Directory" for OpenFOAM).
96103
allocation_param_name (str, optional): Parameter name for TACC allocation.
97104
Defaults to "TACC Allocation".
98105
99106
Returns:
100107
Dict[str, Any]: Complete job request dictionary ready for submission to Tapis.
101-
Contains all necessary job configuration including file inputs, parameters,
102-
and resource requirements.
103108
104109
Raises:
105110
AppDiscoveryError: If the specified app cannot be found or details cannot be retrieved.
106-
ValueError: If required parameters are missing, invalid, or script parameter cannot be placed.
111+
ValueError: If required parameters are missing, invalid, or if script_filename is provided
112+
but a suitable placement (matching script_param_names) cannot be found in the app's
113+
parameterSet.
107114
JobSubmissionError: If unexpected errors occur during job request generation.
108-
109-
Example:
110-
>>> job_request = generate_job_request(
111-
... tapis_client=client,
112-
... app_id="matlab-r2023a",
113-
... input_dir_uri="tapis://designsafe.storage.default/username/input/",
114-
... script_filename="run_analysis.m",
115-
... max_minutes=120,
116-
... node_count=2,
117-
... allocation="MyProject-123"
118-
... )
119-
>>> print(job_request["name"]) # "matlab-r2023a-20231201_143022"
120115
"""
121116
print(f"Generating job request for app '{app_id}'...")
122117
try:
@@ -136,32 +131,23 @@ def generate_job_request(
136131
description or app_details.description or f"dapi job for {app_details.id}"
137132
)
138133

139-
# Handle archive system configuration
140134
archive_system_id = None
141135
archive_system_dir = None
142-
143136
if archive_system:
144137
if archive_system.lower() == "designsafe":
145138
archive_system_id = "designsafe.storage.default"
146-
# Handle archive path configuration
147139
if archive_path:
148-
# Check if it's a full path or just a directory name
149140
if archive_path.startswith("/") or archive_path.startswith("${"):
150-
# Full path provided
151141
archive_system_dir = archive_path
152142
else:
153-
# Directory name provided, construct the full path
154143
archive_system_dir = f"${{EffectiveUserId}}/{archive_path}/${{JobCreateDate}}/${{JobUUID}}"
155144
else:
156-
# Default path for DesignSafe
157145
archive_system_dir = "${EffectiveUserId}/tapis-jobs-archive/${JobCreateDate}/${JobUUID}"
158146
else:
159-
# Use the provided archive system as-is
160147
archive_system_id = archive_system
161148
if archive_path:
162149
archive_system_dir = archive_path
163150
else:
164-
# Use app defaults
165151
archive_system_id = getattr(job_attrs, "archiveSystemId", None)
166152
archive_system_dir = getattr(job_attrs, "archiveSystemDir", None)
167153

@@ -203,94 +189,169 @@ def generate_job_request(
203189
"fileInputs": [],
204190
"parameterSet": {"appArgs": [], "envVariables": [], "schedulerOptions": []},
205191
}
192+
193+
# --- Handle main input directory ---
194+
# The 'name' of this fileInput in the job request MUST match a fileInput 'name' in the app definition.
195+
# The caller is responsible for providing the correct 'input_dir_param_name'.
206196
main_input_target_path = None
207197
main_input_automount = True
198+
found_input_def = False
208199
if hasattr(job_attrs, "fileInputs") and job_attrs.fileInputs:
209200
for fi_def in job_attrs.fileInputs:
210201
if getattr(fi_def, "name", "").lower() == input_dir_param_name.lower():
211202
main_input_target_path = getattr(fi_def, "targetPath", None)
212203
main_input_automount = getattr(fi_def, "autoMountLocal", True)
204+
found_input_def = True
205+
print(
206+
f"Configuring main input '{input_dir_param_name}' with targetPath: '{main_input_target_path}', autoMount: {main_input_automount}"
207+
)
213208
break
209+
if not found_input_def:
210+
print(
211+
f"Warning: The provided input_dir_param_name '{input_dir_param_name}' was not found in the app's fileInput definitions. "
212+
f"The job request will use '{input_dir_param_name}' as the fileInput name. "
213+
f"Ensure this name is valid for the app '{app_id}'. App fileInputs: {getattr(job_attrs, 'fileInputs', 'Not defined')}"
214+
)
215+
214216
main_input_dict = {
215-
"name": input_dir_param_name,
217+
"name": input_dir_param_name, # Critical: This name must be defined in the app.
216218
"sourceUrl": input_dir_uri,
217219
"autoMountLocal": main_input_automount,
218220
}
219-
if main_input_target_path:
221+
if (
222+
main_input_target_path
223+
): # Add targetPath only if the app definition provided one for this input
220224
main_input_dict["targetPath"] = main_input_target_path
221225
job_req["fileInputs"].append(main_input_dict)
226+
222227
if extra_file_inputs:
223228
job_req["fileInputs"].extend(extra_file_inputs)
229+
230+
# --- Handle script parameter placement ---
224231
script_param_added = False
225-
if hasattr(param_set_def, "appArgs") and param_set_def.appArgs:
226-
for arg_def in param_set_def.appArgs:
227-
arg_name = getattr(arg_def, "name", "")
228-
if arg_name in script_param_names:
229-
print(
230-
f"Placing script '{script_filename}' in appArgs: '{arg_name}'"
231-
)
232-
job_req["parameterSet"]["appArgs"].append(
233-
{"name": arg_name, "arg": script_filename}
234-
)
235-
script_param_added = True
236-
break
237-
if (
238-
not script_param_added
239-
and hasattr(param_set_def, "envVariables")
240-
and param_set_def.envVariables
241-
):
242-
for var_def in param_set_def.envVariables:
243-
var_key = getattr(var_def, "key", "")
244-
if var_key in script_param_names:
245-
print(
246-
f"Placing script '{script_filename}' in envVariables: '{var_key}'"
247-
)
248-
job_req["parameterSet"]["envVariables"].append(
249-
{"key": var_key, "value": script_filename}
250-
)
251-
script_param_added = True
252-
break
253-
if not script_param_added:
254-
# Check if appArgs is empty - if so, skip script parameter requirement
255-
if hasattr(param_set_def, "appArgs") and param_set_def.appArgs == []:
256-
print(
257-
"App has empty appArgs list - skipping script parameter placement"
258-
)
259-
else:
232+
if script_filename is not None: # Only process if a script filename is provided
233+
# Try to place in appArgs
234+
if hasattr(param_set_def, "appArgs") and param_set_def.appArgs:
235+
for arg_def in param_set_def.appArgs:
236+
arg_name = getattr(arg_def, "name", "")
237+
if arg_name in script_param_names:
238+
print(
239+
f"Placing script '{script_filename}' in appArgs: '{arg_name}'"
240+
)
241+
job_req["parameterSet"]["appArgs"].append(
242+
{"name": arg_name, "arg": script_filename}
243+
)
244+
script_param_added = True
245+
break
246+
247+
# If not placed in appArgs, try envVariables
248+
if (
249+
not script_param_added
250+
and hasattr(param_set_def, "envVariables")
251+
and param_set_def.envVariables
252+
):
253+
for var_def in param_set_def.envVariables:
254+
var_key = getattr(var_def, "key", "")
255+
if var_key in script_param_names:
256+
print(
257+
f"Placing script '{script_filename}' in envVariables: '{var_key}'"
258+
)
259+
job_req["parameterSet"]["envVariables"].append(
260+
{"key": var_key, "value": script_filename}
261+
)
262+
script_param_added = True
263+
break
264+
265+
if not script_param_added:
266+
# If script_filename was provided but could not be placed.
267+
app_args_details = getattr(param_set_def, "appArgs", [])
268+
env_vars_details = getattr(param_set_def, "envVariables", [])
269+
defined_app_arg_names = [
270+
getattr(a, "name", None) for a in app_args_details
271+
]
272+
defined_env_var_keys = [
273+
getattr(e, "key", None) for e in env_vars_details
274+
]
260275
raise ValueError(
261-
f"Could not find where to place the script parameter..."
276+
f"script_filename '{script_filename}' was provided, but no matching parameter "
277+
f"(expected names/keys from script_param_names: {script_param_names}) was found "
278+
f"in the app's defined parameterSet. "
279+
f"App's defined appArgs names: {defined_app_arg_names}. "
280+
f"App's defined envVariables keys: {defined_env_var_keys}."
262281
)
282+
else:
283+
print("script_filename is None, skipping script parameter placement.")
284+
285+
# --- Handle extra parameters ---
263286
if extra_app_args:
264287
job_req["parameterSet"]["appArgs"].extend(extra_app_args)
265-
if extra_env_vars:
288+
if extra_env_vars: # For OpenFOAM, parameters like solver, mesh, decomp go here
266289
job_req["parameterSet"]["envVariables"].extend(extra_env_vars)
290+
291+
# --- Handle scheduler options and allocation ---
267292
fixed_sched_opt_names = []
268293
if (
269294
hasattr(param_set_def, "schedulerOptions")
270295
and param_set_def.schedulerOptions
271296
):
272297
for sched_opt_def in param_set_def.schedulerOptions:
273-
if getattr(sched_opt_def, "inputMode", None) == "FIXED":
274-
fixed_sched_opt_names.append(getattr(sched_opt_def, "name", ""))
298+
# Check if inputMode is FIXED for this specific option definition
299+
if getattr(sched_opt_def, "inputMode", None) == "FIXED" and hasattr(
300+
sched_opt_def, "name"
301+
):
302+
fixed_sched_opt_names.append(getattr(sched_opt_def, "name"))
303+
275304
if allocation:
276-
if allocation_param_name in fixed_sched_opt_names:
277-
print(
278-
f"Warning: App definition marks '{allocation_param_name}' as FIXED..."
279-
)
280-
else:
281-
print(f"Adding allocation: {allocation}")
305+
# Check if the app itself defines an allocation parameter that is FIXED
306+
allocation_is_fixed_by_app = False
307+
if (
308+
hasattr(param_set_def, "schedulerOptions")
309+
and param_set_def.schedulerOptions
310+
):
311+
for sched_opt_def in param_set_def.schedulerOptions:
312+
# Assuming allocation is identified by allocation_param_name
313+
if (
314+
getattr(sched_opt_def, "name", "") == allocation_param_name
315+
and getattr(sched_opt_def, "inputMode", None) == "FIXED"
316+
):
317+
allocation_is_fixed_by_app = True
318+
print(
319+
f"Warning: App definition marks '{allocation_param_name}' as FIXED with value '{getattr(sched_opt_def, 'arg', '')}'. "
320+
f"User-provided allocation '{allocation}' will be ignored."
321+
)
322+
break
323+
324+
if not allocation_is_fixed_by_app:
325+
# If user provides an allocation and it's not fixed by the app, add/override it.
326+
# Remove any existing scheduler option with the same name before adding the new one.
327+
job_req["parameterSet"]["schedulerOptions"] = [
328+
opt
329+
for opt in job_req["parameterSet"]["schedulerOptions"]
330+
if getattr(opt, "name", opt.get("name"))
331+
!= allocation_param_name # Handle both Tapis objects and dicts
332+
]
333+
print(f"Adding/Updating TACC allocation: {allocation}")
282334
job_req["parameterSet"]["schedulerOptions"].append(
283335
{"name": allocation_param_name, "arg": f"-A {allocation}"}
284336
)
337+
285338
if extra_scheduler_options:
286339
for extra_opt in extra_scheduler_options:
287340
opt_name = extra_opt.get("name")
288341
if opt_name and opt_name in fixed_sched_opt_names:
289342
print(
290-
f"Warning: Skipping user-provided scheduler option '{opt_name}'..."
343+
f"Warning: Skipping user-provided scheduler option '{opt_name}' because it is marked as FIXED in the app definition."
291344
)
292345
else:
346+
# Avoid duplicates if user tries to override allocation via extra_scheduler_options
347+
if opt_name == allocation_param_name and allocation:
348+
print(
349+
f"Note: Allocation '{allocation}' is already being handled. Skipping duplicate allocation from extra_scheduler_options."
350+
)
351+
continue
293352
job_req["parameterSet"]["schedulerOptions"].append(extra_opt)
353+
354+
# --- Clean up empty parameterSet sections ---
294355
if not job_req["parameterSet"]["appArgs"]:
295356
del job_req["parameterSet"]["appArgs"]
296357
if not job_req["parameterSet"]["envVariables"]:
@@ -299,9 +360,11 @@ def generate_job_request(
299360
del job_req["parameterSet"]["schedulerOptions"]
300361
if not job_req["parameterSet"]:
301362
del job_req["parameterSet"]
363+
302364
final_job_req = {k: v for k, v in job_req.items() if v is not None}
303365
print("Job request dictionary generated successfully.")
304366
return final_job_req
367+
305368
except (AppDiscoveryError, ValueError) as e:
306369
print(f"ERROR: Failed to generate job request: {e}")
307370
raise
@@ -310,7 +373,7 @@ def generate_job_request(
310373
raise JobSubmissionError(f"Unexpected error generating job request: {e}") from e
311374

312375

313-
# --- submit_job_request function (Production Ready) ---
376+
# --- submit_job_request function ---
314377
def submit_job_request(
315378
tapis_client: Tapis, job_request: Dict[str, Any]
316379
) -> "SubmittedJob":
@@ -367,7 +430,7 @@ def submit_job_request(
367430
raise JobSubmissionError(f"Unexpected error during job submission: {e}") from e
368431

369432

370-
# --- SubmittedJob Class (Production Ready) ---
433+
# --- SubmittedJob Class ---
371434
class SubmittedJob:
372435
"""Represents a submitted Tapis job with methods for monitoring and management.
373436
@@ -1048,7 +1111,7 @@ def cancel(self):
10481111
) from e
10491112

10501113

1051-
# --- Standalone Helper Functions (Production Ready) ---
1114+
# --- Standalone Helper Functions ---
10521115
def get_job_status(t: Tapis, job_uuid: str) -> str:
10531116
"""Get the current status of a job by UUID.
10541117

0 commit comments

Comments
 (0)