Skip to content

Commit 748885e

Browse files
committed
add a grace for Jobs that may start in Unknown
Signed-off-by: Prekshi Vyas <prekshivyas@gmail.com>
1 parent c99ddc1 commit 748885e

1 file changed

Lines changed: 75 additions & 75 deletions

File tree

nemo_run/core/execution/lepton.py

Lines changed: 75 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -83,83 +83,83 @@ def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> Li
8383
return full_command
8484

8585
def move_data(self, sleep: float = 10, timeout: int = 600, poll_interval: int = 5, unknowns_grace_period: int = 60) -> None:
86-
"""
87-
Moves job directory into remote storage and deletes the workload after completion.
88-
"""
89-
client = APIClient()
90-
cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir)
91-
node_group_id = self._node_group_id(client)
92-
valid_node_ids = self._valid_node_ids(node_group_id, client)
93-
94-
job_spec = LeptonJobUserSpec(
95-
resource_shape="cpu.small",
96-
affinity=LeptonResourceAffinity(
97-
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
98-
allowed_nodes_in_node_group=valid_node_ids,
99-
),
100-
container=LeptonContainer(
101-
image="busybox:1.37.0",
102-
command=cmd,
103-
),
104-
completions=1,
105-
parallelism=1,
106-
mounts=[Mount(**mount) for mount in self.mounts],
107-
)
108-
109-
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
110-
111-
job = LeptonJob(
112-
metadata=Metadata(
113-
id=custom_name,
114-
name=custom_name,
115-
visibility=LeptonVisibility("private"),
116-
),
117-
spec=job_spec,
118-
)
119-
120-
response = client.job.create(job)
121-
job_id = response.metadata.id_
122-
123-
start_time = time.time()
124-
unknown_start_time = None
125-
count = 0
126-
127-
while True:
128-
if time.time() - start_time > timeout:
129-
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
86+
"""
87+
Moves job directory into remote storage and deletes the workload after completion.
88+
"""
89+
client = APIClient()
90+
cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir)
91+
node_group_id = self._node_group_id(client)
92+
valid_node_ids = self._valid_node_ids(node_group_id, client)
93+
94+
job_spec = LeptonJobUserSpec(
95+
resource_shape="cpu.small",
96+
affinity=LeptonResourceAffinity(
97+
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
98+
allowed_nodes_in_node_group=valid_node_ids,
99+
),
100+
container=LeptonContainer(
101+
image="busybox:1.37.0",
102+
command=cmd,
103+
),
104+
completions=1,
105+
parallelism=1,
106+
mounts=[Mount(**mount) for mount in self.mounts],
107+
)
108+
109+
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
110+
111+
job = LeptonJob(
112+
metadata=Metadata(
113+
id=custom_name,
114+
name=custom_name,
115+
visibility=LeptonVisibility("private"),
116+
),
117+
spec=job_spec,
118+
)
119+
120+
response = client.job.create(job)
121+
job_id = response.metadata.id_
122+
123+
start_time = time.time()
124+
unknown_start_time = None
125+
count = 0
126+
127+
while True:
128+
if time.time() - start_time > timeout:
129+
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds.")
130+
131+
current_job = client.job.get(job_id)
132+
current_job_status = current_job.status.state
130133

131-
current_job = client.job.get(job_id)
132-
current_job_status = current_job.status.state
133-
134-
if count > 0:
135-
if current_job_status == LeptonJobState.Completed:
136-
break
137-
elif current_job_status == LeptonJobState.Failed:
138-
break
139-
elif current_job_status == LeptonJobState.Unknown:
140-
if unknown_start_time is None:
141-
unknown_start_time = time.time()
142-
logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...")
143-
144-
elif time.time() - unknown_start_time > unknowns_grace_period:
145-
logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds")
134+
if count > 0:
135+
if current_job_status == LeptonJobState.Completed:
136+
break
137+
elif current_job_status == LeptonJobState.Failed:
146138
break
147-
else:
148-
if unknown_start_time is not None:
149-
logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}")
150-
unknown_start_time = None
151-
152-
count += 1
153-
time.sleep(poll_interval)
154-
155-
if current_job_status != LeptonJobState.Completed:
156-
raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}")
157-
158-
delete_success = client.job.delete(job_id)
159-
if delete_success:
160-
logging.info(f"Successfully deleted job {job_id}")
161-
else:
162-
logging.error(f"Failed to delete job {job_id}")
139+
elif current_job_status == LeptonJobState.Unknown:
140+
if unknown_start_time is None:
141+
unknown_start_time = time.time()
142+
logging.warning(f"Job {job_id} entered Unknown state, giving it {unknowns_grace_period} seconds to recover...")
143+
144+
elif time.time() - unknown_start_time > unknowns_grace_period:
145+
logging.error(f"Job {job_id} has been in Unknown state for more than {unknowns_grace_period} seconds")
146+
break
147+
else:
148+
if unknown_start_time is not None:
149+
logging.info(f"Job {job_id} recovered from Unknown state to {current_job_status}")
150+
unknown_start_time = None
151+
152+
count += 1
153+
time.sleep(poll_interval)
154+
155+
if current_job_status != LeptonJobState.Completed:
156+
raise RuntimeError(f"Job {job_id} failed with status: {current_job_status}")
157+
158+
delete_success = client.job.delete(job_id)
159+
if delete_success:
160+
logging.info(f"Successfully deleted job {job_id}")
161+
else:
162+
logging.error(f"Failed to delete job {job_id}")
163163

164164
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
165165
"""

0 commit comments

Comments
 (0)