|
| 1 | +from typing import Dict, Any, Optional |
| 2 | +from .base_job_handler import BaseJobHandler |
| 3 | + |
| 4 | + |
| 5 | +class MpmJobHandler(BaseJobHandler): |
| 6 | + """Custom handler for MPM (Material Point Method) jobs.""" |
| 7 | + |
| 8 | + def generate_job_info( |
| 9 | + self, |
| 10 | + tapis_client, |
| 11 | + input_uri: str, |
| 12 | + input_file: str, |
| 13 | + job_name: Optional[str] = None, |
| 14 | + max_minutes: Optional[int] = None, |
| 15 | + node_count: Optional[int] = None, |
| 16 | + cores_per_node: Optional[int] = None, |
| 17 | + queue: Optional[str] = None, |
| 18 | + allocation: Optional[str] = None, |
| 19 | + ) -> Dict[str, Any]: |
| 20 | + # Get app info for the single MPM app |
| 21 | + app_info = tapis_client.apps.getApp(appId="mpm") |
| 22 | + |
| 23 | + # Create the base job info |
| 24 | + job_info = { |
| 25 | + "name": job_name, |
| 26 | + "appId": app_info.id, |
| 27 | + "appVersion": app_info.version, |
| 28 | + "execSystemId": app_info.jobAttributes.execSystemId, |
| 29 | + "maxMinutes": max_minutes or app_info.jobAttributes.maxMinutes, |
| 30 | + "archiveOnAppError": app_info.jobAttributes.archiveOnAppError, |
| 31 | + "fileInputs": [{"name": "Input Directory", "sourceUrl": input_uri}], |
| 32 | + "execSystemLogicalQueue": queue |
| 33 | + or app_info.jobAttributes.execSystemLogicalQueue, |
| 34 | + "nodeCount": node_count or 1, # Default to 1 if not specified |
| 35 | + "coresPerNode": cores_per_node or 1, # Default to 1 if not specified |
| 36 | + "parameterSet": { |
| 37 | + "appArgs": [{"name": "Input Script", "arg": input_file}], |
| 38 | + "schedulerOptions": [], |
| 39 | + }, |
| 40 | + } |
| 41 | + |
| 42 | + # Add TACC allocation if provided |
| 43 | + if allocation: |
| 44 | + job_info["parameterSet"]["schedulerOptions"].append( |
| 45 | + {"name": "TACC Allocation", "arg": f"-A {allocation}"} |
| 46 | + ) |
| 47 | + |
| 48 | + return job_info |
0 commit comments