From 503c30bdfa95bacd8ba4d61b387baf4a1905b4d7 Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 13 May 2025 08:02:08 -0700 Subject: [PATCH 1/2] Add cancel to docker executor Signed-off-by: Hemil Desai --- nemo_run/run/torchx_backend/schedulers/docker.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/nemo_run/run/torchx_backend/schedulers/docker.py b/nemo_run/run/torchx_backend/schedulers/docker.py index ac3a14b3..853bca79 100644 --- a/nemo_run/run/torchx_backend/schedulers/docker.py +++ b/nemo_run/run/torchx_backend/schedulers/docker.py @@ -61,9 +61,9 @@ def __init__(self, session_name: str) -> None: self._scheduled_reqs: list[DockerJobRequest] = [] def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobRequest]: # type: ignore - assert isinstance(cfg, DockerExecutor), ( - f"{cfg.__class__} not supported for docker scheduler." - ) + assert isinstance( + cfg, DockerExecutor + ), f"{cfg.__class__} not supported for docker scheduler." executor = cfg if len(app.roles) > 1: @@ -254,6 +254,13 @@ def local_logs(container: DockerContainer): else: return logs + def _cancel_existing(self, app_id: str) -> None: + req = DockerJobRequest.load(app_id=app_id) + if not req: + return None + for container in req.containers: + container.delete(client=self._docker_client, id=req.id) + def close(self) -> None: # terminate all apps for req in self._scheduled_reqs: From 1022bfdb297c827c4cab1e85bc8d83e86acbf88e Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Tue, 13 May 2025 08:05:16 -0700 Subject: [PATCH 2/2] fix Signed-off-by: Hemil Desai --- nemo_run/run/torchx_backend/schedulers/docker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nemo_run/run/torchx_backend/schedulers/docker.py b/nemo_run/run/torchx_backend/schedulers/docker.py index 853bca79..e3124a02 100644 --- a/nemo_run/run/torchx_backend/schedulers/docker.py +++ b/nemo_run/run/torchx_backend/schedulers/docker.py @@ -61,9 +61,9 @@ def __init__(self, session_name: str) -> None: self._scheduled_reqs: list[DockerJobRequest] = [] def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobRequest]: # type: ignore - assert isinstance( - cfg, DockerExecutor - ), f"{cfg.__class__} not supported for docker scheduler." + assert isinstance(cfg, DockerExecutor), ( + f"{cfg.__class__} not supported for docker scheduler." + ) executor = cfg if len(app.roles) > 1: