Skip to content

Commit dae14e3

Browse files
authored
Upgrade skypilot executor with 0.9.2 (#246)
Updates follow migration guidelines: https://docs.skypilot.co/en/latest/reference/faq.html\#migration-from-skypilot-0-8-1 Minor documentation improvement to ensure package compatibility during installation of sky Co-authored by: sahger.lad@thoughtworks.com rahul.punjabi@thoughtworks.com Signed-off-by: Meissane Chami <meissane.chami@thoughtworks.com> Signed-off-by: twmeissane <meissane.chami@thoughtworks.com>
1 parent 64469e6 commit dae14e3

5 files changed

Lines changed: 42 additions & 44 deletions

File tree

docs/source/guides/execution.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ The `dependency_type` parameter specifies the type of dependency relationship:
197197
This functionality enables you to create complex workflows with proper orchestration between different tasks, such as starting a training job only after data preparation is complete, or running an evaluation only after training finishes successfully.
198198

199199
#### SkypilotExecutor
200-
This executor is used to configure [Skypilot](https://skypilot.readthedocs.io/en/latest/docs/index.html). Make sure Skypilot is installed and atleast one cloud is configured using `sky check`.
200+
This executor is used to configure [Skypilot](https://skypilot.readthedocs.io/en/latest/docs/index.html). Make sure Skypilot is installed using `pip install "nemo_run[skypilot]"` and atleast one cloud is configured using `sky check`.
201201

202202
Here's an example of the `SkypilotExecutor` for Kubernetes:
203203
```python

nemo_run/core/execution/skypilot.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
try:
3636
import sky
3737
import sky.task as skyt
38-
from sky import backends, status_lib
38+
from sky.utils import status_lib
39+
from sky import backends
3940

4041
_SKYPILOT_AVAILABLE = True
4142
except ImportError:
@@ -108,7 +109,7 @@ class SkypilotExecutor(Executor):
108109

109110
def __post_init__(self):
110111
assert _SKYPILOT_AVAILABLE, (
111-
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
112+
'Skypilot is not installed. Please install it using `pip install "nemo_run[skypilot]"`.'
112113
)
113114
assert isinstance(self.packager, GitArchivePackager), (
114115
"Only GitArchivePackager is currently supported for SkypilotExecutor."
@@ -195,7 +196,7 @@ def status(
195196
) -> tuple[Optional["status_lib.ClusterStatus"], Optional[dict]]:
196197
import sky.core as sky_core
197198
import sky.exceptions as sky_exceptions
198-
from sky import status_lib
199+
from sky.utils import status_lib
199200

200201
cluster, _, job_id = cls.parse_app(app_id)
201202
try:
@@ -386,11 +387,9 @@ def launch(
386387
task: "skyt.Task",
387388
cluster_name: Optional[str] = None,
388389
num_nodes: Optional[int] = None,
389-
detach_run: bool = True,
390390
dryrun: bool = False,
391391
) -> tuple[Optional[int], Optional["backends.ResourceHandle"]]:
392-
from sky import backends
393-
from sky.execution import launch
392+
from sky import backends, launch, stream_and_get
394393
from sky.utils import common_utils
395394

396395
task_yml = os.path.join(self.job_dir, "skypilot_task.yml")
@@ -402,19 +401,19 @@ def launch(
402401
task.num_nodes = num_nodes
403402

404403
cluster_name = cluster_name or self.cluster_name or self.experiment_id
405-
job_id, handle = launch(
406-
task,
407-
dryrun=dryrun,
408-
stream_logs=False,
409-
cluster_name=cluster_name,
410-
detach_setup=False,
411-
detach_run=detach_run,
412-
backend=backend,
413-
idle_minutes_to_autostop=self.idle_minutes_to_autostop,
414-
down=self.autodown,
415-
fast=True,
416-
# retry_until_up=retry_until_up,
417-
# clone_disk_from=clone_disk_from,
404+
405+
job_id, handle = stream_and_get(
406+
launch(
407+
task,
408+
dryrun=dryrun,
409+
cluster_name=cluster_name,
410+
backend=backend,
411+
idle_minutes_to_autostop=self.idle_minutes_to_autostop,
412+
down=self.autodown,
413+
fast=True,
414+
# retry_until_up=retry_until_up,
415+
# clone_disk_from=clone_disk_from,
416+
)
418417
)
419418

420419
return job_id, handle

nemo_run/run/torchx_backend/schedulers/skypilot.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def __init__(self, session_name: str) -> None:
8888
# NOTE: make sure any new init options are supported in create_scheduler(...)
8989
super().__init__("skypilot", session_name)
9090
assert _SKYPILOT_AVAILABLE, (
91-
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
91+
'Skypilot is not installed. Please install it using `pip install "nemo_run[skypilot]"`'
9292
)
9393

9494
def _run_opts(self) -> runopts:
@@ -105,6 +105,7 @@ def _run_opts(self) -> runopts:
105105
def schedule(self, dryrun_info: AppDryRunInfo[SkypilotRequest]) -> str:
106106
req = dryrun_info.request
107107
task = req.task
108+
108109
executor = req.executor
109110
executor.package(executor.packager, job_name=executor.job_name)
110111
job_id, handle = executor.launch(task)

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ lepton = "nemo_run.run.torchx_backend.schedulers.lepton:create_scheduler"
5151

5252
[project.optional-dependencies]
5353
skypilot = [
54-
"skypilot[kubernetes]>=0.8.0",
54+
"skypilot[kubernetes]>=0.9.2",
5555
]
5656
skypilot-all = [
57-
"skypilot[all]>=0.8.0",
57+
"skypilot[all]>=0.9.2",
5858
]
5959
ray = [
6060
"kubernetes"

test/core/execution/test_skypilot.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class MockClusterNotUpError(Exception):
6464
"sky": sky_mock,
6565
"sky.task": sky_task_mock,
6666
"sky.backends": backends_mock,
67-
"sky.status_lib": status_lib_mock,
67+
"sky.utils.status_lib": status_lib_mock,
6868
"sky.core": sky_core_mock,
6969
"sky.skylet.job_lib": job_lib_mock,
7070
"sky.utils.common_utils": common_utils_mock,
@@ -228,8 +228,8 @@ def test_to_resources_with_none_string(self, mock_resources, mock_skypilot_impor
228228
assert config["cloud"] is None
229229
assert config["any_of"][1]["region"] is None
230230

231-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
232-
@patch("nemo_run.core.execution.skypilot.sky.core.queue")
231+
@patch("sky.core.status")
232+
@patch("sky.core.queue")
233233
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
234234
def test_status_success(self, mock_parse_app, mock_queue, mock_status):
235235
# Set up mocks
@@ -250,7 +250,7 @@ def test_status_success(self, mock_parse_app, mock_queue, mock_status):
250250
mock_status.assert_called_once_with("cluster-name")
251251
mock_queue.assert_called_once_with("cluster-name", all_users=True)
252252

253-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
253+
@patch("sky.core.status")
254254
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
255255
def test_status_cluster_not_found(self, mock_parse_app, mock_status):
256256
# Set up mocks
@@ -264,8 +264,8 @@ def test_status_cluster_not_found(self, mock_parse_app, mock_status):
264264
assert status is None
265265
assert job_details is None
266266

267-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
268-
@patch("nemo_run.core.execution.skypilot.sky.core.queue")
267+
@patch("sky.core.status")
268+
@patch("sky.core.queue")
269269
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
270270
def test_status_cluster_not_up(self, mock_parse_app, mock_queue, mock_status):
271271
# Create a mock exception instead of importing the real one
@@ -280,7 +280,7 @@ class MockClusterNotUpError(Exception):
280280

281281
# Patch the ClusterNotUpError class in sky.exceptions
282282
with patch(
283-
"nemo_run.core.execution.skypilot.sky.exceptions.ClusterNotUpError",
283+
"sky.exceptions.ClusterNotUpError",
284284
MockClusterNotUpError,
285285
):
286286
# Call the method
@@ -290,8 +290,8 @@ class MockClusterNotUpError(Exception):
290290
assert status == mock_cluster_status
291291
assert job_details is None
292292

293-
@patch("nemo_run.core.execution.skypilot.sky.core.tail_logs")
294-
@patch("nemo_run.core.execution.skypilot.sky.skylet.job_lib.JobStatus.is_terminal")
293+
@patch("sky.core.tail_logs")
294+
@patch("sky.skylet.job_lib.JobStatus.is_terminal")
295295
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
296296
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
297297
def test_logs_running_job(self, mock_parse_app, mock_status, mock_is_terminal, mock_tail_logs):
@@ -306,7 +306,7 @@ def test_logs_running_job(self, mock_parse_app, mock_status, mock_is_terminal, m
306306
# Verify results
307307
mock_tail_logs.assert_called_once_with("cluster-name", 123)
308308

309-
@patch("nemo_run.core.execution.skypilot.sky.skylet.job_lib.JobStatus.is_terminal")
309+
@patch("sky.skylet.job_lib.JobStatus.is_terminal")
310310
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
311311
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
312312
@patch("builtins.open", new_callable=mock_open, read_data="Test log content")
@@ -328,7 +328,7 @@ def test_logs_terminal_job_fallback(
328328
mock_open.assert_called_once()
329329
mock_print.assert_called_with("Test log content", end="", flush=True)
330330

331-
@patch("nemo_run.core.execution.skypilot.sky.core.cancel")
331+
@patch("sky.core.cancel")
332332
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
333333
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
334334
def test_cancel(self, mock_parse_app, mock_status, mock_cancel):
@@ -342,7 +342,7 @@ def test_cancel(self, mock_parse_app, mock_status, mock_cancel):
342342
# Verify results
343343
mock_cancel.assert_called_once_with(cluster_name="cluster-name", job_ids=[123])
344344

345-
@patch("nemo_run.core.execution.skypilot.sky.core.cancel")
345+
@patch("sky.core.cancel")
346346
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
347347
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
348348
def test_cancel_no_job(self, mock_parse_app, mock_status, mock_cancel):
@@ -377,21 +377,19 @@ def test_package(self, mock_run, executor):
377377
# Fake a successful test - this is better than omitting it
378378
assert True
379379

380-
@patch("sky.execution.launch")
381380
@patch("sky.backends.CloudVmRayBackend")
382-
def test_launch(self, mock_backend_class, mock_launch, executor):
383-
# Completely bypass any real method calls to avoid YAML serialization issues
381+
@patch("sky.launch")
382+
@patch("sky.stream_and_get")
383+
def test_launch(self, mock_stream_and_get, mock_launch, mock_backend_cls, executor):
384384
mock_handle = MagicMock()
385-
mock_launch.return_value = (123, mock_handle)
385+
mock_launch.return_value = MagicMock()
386+
mock_stream_and_get.return_value = (123, mock_handle)
386387

387-
# Don't actually call the method, just patch it to return a known value
388388
with patch.object(SkypilotExecutor, "launch", return_value=(123, mock_handle)):
389-
# Call a dummy method to satisfy test, using our patched version
390389
job_id, handle = SkypilotExecutor.launch(executor, MagicMock())
391390

392-
# Verify results
393-
assert job_id == 123
394-
assert handle == mock_handle
391+
assert job_id == 123
392+
assert handle is mock_handle
395393

396394
def test_cleanup(self, executor):
397395
# Skip the actual cleanup test due to file operation issues

0 commit comments

Comments
 (0)