Skip to content

Commit 8a3371a

Browse files
committed
Upgrade skypilot executor with 0.9.2
Updates follow migration guidelines: https://docs.skypilot.co/en/latest/reference/faq.html\#migration-from-skypilot-0-8-1 Minor documentation improvement to ensure package compatibility during installation of sky Co-authored by: sahger.lad@thoughtworks.com rahul.punjabi@thoughtworks.com Signed-off-by: Meissane Chami <meissane.chami@thoughtworks.com> Signed-off-by: twmeissane <meissane.chami@thoughtworks.com>
1 parent 78f54ee commit 8a3371a

6 files changed

Lines changed: 57 additions & 63 deletions

File tree

docs/source/guides/execution.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ The `dependency_type` parameter specifies the type of dependency relationship:
197197
This functionality enables you to create complex workflows with proper orchestration between different tasks, such as starting a training job only after data preparation is complete, or running an evaluation only after training finishes successfully.
198198

199199
#### SkypilotExecutor
200-
This executor is used to configure [Skypilot](https://skypilot.readthedocs.io/en/latest/docs/index.html). Make sure Skypilot is installed and atleast one cloud is configured using `sky check`.
200+
This executor is used to configure [Skypilot](https://skypilot.readthedocs.io/en/latest/docs/index.html). Make sure Skypilot is installed using `pip install "nemo_run[skypilot]"` and atleast one cloud is configured using `sky check`.
201201

202202
Here's an example of the `SkypilotExecutor` for Kubernetes:
203203
```python

nemo_run/core/execution/skypilot.py

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
try:
3636
import sky
3737
import sky.task as skyt
38-
from sky import backends, status_lib
38+
from sky.utils import status_lib
39+
from sky import backends
3940

4041
_SKYPILOT_AVAILABLE = True
4142
except ImportError:
@@ -107,12 +108,10 @@ class SkypilotExecutor(Executor):
107108
packager: Packager = field(default_factory=lambda: GitArchivePackager()) # type: ignore # noqa: F821
108109

109110
def __post_init__(self):
110-
assert _SKYPILOT_AVAILABLE, (
111-
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
112-
)
113-
assert isinstance(self.packager, GitArchivePackager), (
114-
"Only GitArchivePackager is currently supported for SkypilotExecutor."
115-
)
111+
assert _SKYPILOT_AVAILABLE, "Skypilot is not installed. Please install it using `pip install \"nemo_run[skypilot]\"`."
112+
assert isinstance(
113+
self.packager, GitArchivePackager
114+
), "Only GitArchivePackager is currently supported for SkypilotExecutor."
116115

117116
@classmethod
118117
def parse_app(cls: Type["SkypilotExecutor"], app_id: str) -> tuple[str, str, int]:
@@ -195,7 +194,7 @@ def status(
195194
) -> tuple[Optional["status_lib.ClusterStatus"], Optional[dict]]:
196195
import sky.core as sky_core
197196
import sky.exceptions as sky_exceptions
198-
from sky import status_lib
197+
from sky.utils import status_lib
199198

200199
cluster, _, job_id = cls.parse_app(app_id)
201200
try:
@@ -386,11 +385,9 @@ def launch(
386385
task: "skyt.Task",
387386
cluster_name: Optional[str] = None,
388387
num_nodes: Optional[int] = None,
389-
detach_run: bool = True,
390388
dryrun: bool = False,
391389
) -> tuple[Optional[int], Optional["backends.ResourceHandle"]]:
392-
from sky import backends
393-
from sky.execution import launch
390+
from sky import backends, launch, stream_and_get
394391
from sky.utils import common_utils
395392

396393
task_yml = os.path.join(self.job_dir, "skypilot_task.yml")
@@ -402,19 +399,19 @@ def launch(
402399
task.num_nodes = num_nodes
403400

404401
cluster_name = cluster_name or self.cluster_name or self.experiment_id
405-
job_id, handle = launch(
406-
task,
407-
dryrun=dryrun,
408-
stream_logs=False,
409-
cluster_name=cluster_name,
410-
detach_setup=False,
411-
detach_run=detach_run,
412-
backend=backend,
413-
idle_minutes_to_autostop=self.idle_minutes_to_autostop,
414-
down=self.autodown,
415-
fast=True,
416-
# retry_until_up=retry_until_up,
417-
# clone_disk_from=clone_disk_from,
402+
403+
job_id, handle = stream_and_get(
404+
launch(
405+
task,
406+
dryrun=dryrun,
407+
cluster_name=cluster_name,
408+
backend=backend,
409+
idle_minutes_to_autostop=self.idle_minutes_to_autostop,
410+
down=self.autodown,
411+
fast=True,
412+
# retry_until_up=retry_until_up,
413+
# clone_disk_from=clone_disk_from,
414+
)
418415
)
419416

420417
return job_id, handle

nemo_run/core/execution/slurm.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -364,12 +364,12 @@ def merge(
364364
main_executor.run_as_group = True
365365

366366
if main_executor.het_group_indices:
367-
assert main_executor.heterogeneous, (
368-
"heterogeneous must be True if het_group_indices is provided"
369-
)
370-
assert len(main_executor.het_group_indices) == num_tasks, (
371-
"het_group_indices must be the same length as the number of tasks"
372-
)
367+
assert (
368+
main_executor.heterogeneous
369+
), "heterogeneous must be True if het_group_indices is provided"
370+
assert (
371+
len(main_executor.het_group_indices) == num_tasks
372+
), "het_group_indices must be the same length as the number of tasks"
373373
assert all(
374374
x <= y
375375
for x, y in zip(

nemo_run/run/torchx_backend/schedulers/skypilot.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ class SkypilotScheduler(SchedulerMixin, Scheduler[dict[str, str]]): # type: ign
8787
def __init__(self, session_name: str) -> None:
8888
# NOTE: make sure any new init options are supported in create_scheduler(...)
8989
super().__init__("skypilot", session_name)
90-
assert _SKYPILOT_AVAILABLE, (
91-
"Skypilot is not installed. Please install it using `pip install nemo_run[skypilot]"
92-
)
90+
assert _SKYPILOT_AVAILABLE, "Skypilot is not installed. Please install it using `pip install \"nemo_run[skypilot]\"`"
9391

9492
def _run_opts(self) -> runopts:
9593
opts = runopts()
@@ -105,12 +103,13 @@ def _run_opts(self) -> runopts:
105103
def schedule(self, dryrun_info: AppDryRunInfo[SkypilotRequest]) -> str:
106104
req = dryrun_info.request
107105
task = req.task
106+
108107
executor = req.executor
109108
executor.package(executor.packager, job_name=executor.job_name)
110109
job_id, handle = executor.launch(task)
111-
assert job_id and handle, (
112-
f"Failed scheduling run on Skypilot. Job id: {job_id}, Handle: {handle}"
113-
)
110+
assert (
111+
job_id and handle
112+
), f"Failed scheduling run on Skypilot. Job id: {job_id}, Handle: {handle}"
114113
app_id = f"{executor.experiment_id}___{handle.get_cluster_name()}___{task.name}___{job_id}"
115114
_, task_details = SkypilotExecutor.status(app_id=app_id)
116115
if task_details:
@@ -127,9 +126,9 @@ def _submit_dryrun( # type: ignore
127126
) -> AppDryRunInfo[SkypilotRequest]:
128127
from sky.utils import common_utils
129128

130-
assert isinstance(cfg, SkypilotExecutor), (
131-
f"{cfg.__class__} not supported for skypilot scheduler."
132-
)
129+
assert isinstance(
130+
cfg, SkypilotExecutor
131+
), f"{cfg.__class__} not supported for skypilot scheduler."
133132
executor = cfg
134133

135134
assert len(app.roles) == 1, "Only 1 role supported for Skypilot executor."

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ lepton = "nemo_run.run.torchx_backend.schedulers.lepton:create_scheduler"
5151

5252
[project.optional-dependencies]
5353
skypilot = [
54-
"skypilot[kubernetes]>=0.8.0",
54+
"skypilot[kubernetes]>=0.9.2",
5555
]
5656
skypilot-all = [
57-
"skypilot[all]>=0.8.0",
57+
"skypilot[all]>=0.9.2",
5858
]
5959
ray = [
6060
"kubernetes"

test/core/execution/test_skypilot.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class MockClusterNotUpError(Exception):
6464
"sky": sky_mock,
6565
"sky.task": sky_task_mock,
6666
"sky.backends": backends_mock,
67-
"sky.status_lib": status_lib_mock,
67+
"sky.utils.status_lib": status_lib_mock,
6868
"sky.core": sky_core_mock,
6969
"sky.skylet.job_lib": job_lib_mock,
7070
"sky.utils.common_utils": common_utils_mock,
@@ -228,8 +228,8 @@ def test_to_resources_with_none_string(self, mock_resources, mock_skypilot_impor
228228
assert config["cloud"] is None
229229
assert config["any_of"][1]["region"] is None
230230

231-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
232-
@patch("nemo_run.core.execution.skypilot.sky.core.queue")
231+
@patch("sky.core.status")
232+
@patch("sky.core.queue")
233233
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
234234
def test_status_success(self, mock_parse_app, mock_queue, mock_status):
235235
# Set up mocks
@@ -250,7 +250,7 @@ def test_status_success(self, mock_parse_app, mock_queue, mock_status):
250250
mock_status.assert_called_once_with("cluster-name")
251251
mock_queue.assert_called_once_with("cluster-name", all_users=True)
252252

253-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
253+
@patch("sky.core.status")
254254
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
255255
def test_status_cluster_not_found(self, mock_parse_app, mock_status):
256256
# Set up mocks
@@ -264,8 +264,8 @@ def test_status_cluster_not_found(self, mock_parse_app, mock_status):
264264
assert status is None
265265
assert job_details is None
266266

267-
@patch("nemo_run.core.execution.skypilot.sky.core.status")
268-
@patch("nemo_run.core.execution.skypilot.sky.core.queue")
267+
@patch("sky.core.status")
268+
@patch("sky.core.queue")
269269
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
270270
def test_status_cluster_not_up(self, mock_parse_app, mock_queue, mock_status):
271271
# Create a mock exception instead of importing the real one
@@ -280,7 +280,7 @@ class MockClusterNotUpError(Exception):
280280

281281
# Patch the ClusterNotUpError class in sky.exceptions
282282
with patch(
283-
"nemo_run.core.execution.skypilot.sky.exceptions.ClusterNotUpError",
283+
"sky.exceptions.ClusterNotUpError",
284284
MockClusterNotUpError,
285285
):
286286
# Call the method
@@ -290,8 +290,8 @@ class MockClusterNotUpError(Exception):
290290
assert status == mock_cluster_status
291291
assert job_details is None
292292

293-
@patch("nemo_run.core.execution.skypilot.sky.core.tail_logs")
294-
@patch("nemo_run.core.execution.skypilot.sky.skylet.job_lib.JobStatus.is_terminal")
293+
@patch("sky.core.tail_logs")
294+
@patch("sky.skylet.job_lib.JobStatus.is_terminal")
295295
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
296296
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
297297
def test_logs_running_job(self, mock_parse_app, mock_status, mock_is_terminal, mock_tail_logs):
@@ -306,7 +306,7 @@ def test_logs_running_job(self, mock_parse_app, mock_status, mock_is_terminal, m
306306
# Verify results
307307
mock_tail_logs.assert_called_once_with("cluster-name", 123)
308308

309-
@patch("nemo_run.core.execution.skypilot.sky.skylet.job_lib.JobStatus.is_terminal")
309+
@patch("sky.skylet.job_lib.JobStatus.is_terminal")
310310
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
311311
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
312312
@patch("builtins.open", new_callable=mock_open, read_data="Test log content")
@@ -328,7 +328,7 @@ def test_logs_terminal_job_fallback(
328328
mock_open.assert_called_once()
329329
mock_print.assert_called_with("Test log content", end="", flush=True)
330330

331-
@patch("nemo_run.core.execution.skypilot.sky.core.cancel")
331+
@patch("sky.core.cancel")
332332
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
333333
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
334334
def test_cancel(self, mock_parse_app, mock_status, mock_cancel):
@@ -342,7 +342,7 @@ def test_cancel(self, mock_parse_app, mock_status, mock_cancel):
342342
# Verify results
343343
mock_cancel.assert_called_once_with(cluster_name="cluster-name", job_ids=[123])
344344

345-
@patch("nemo_run.core.execution.skypilot.sky.core.cancel")
345+
@patch("sky.core.cancel")
346346
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.status")
347347
@patch("nemo_run.core.execution.skypilot.SkypilotExecutor.parse_app")
348348
def test_cancel_no_job(self, mock_parse_app, mock_status, mock_cancel):
@@ -377,21 +377,19 @@ def test_package(self, mock_run, executor):
377377
# Fake a successful test - this is better than omitting it
378378
assert True
379379

380-
@patch("sky.execution.launch")
381380
@patch("sky.backends.CloudVmRayBackend")
382-
def test_launch(self, mock_backend_class, mock_launch, executor):
383-
# Completely bypass any real method calls to avoid YAML serialization issues
381+
@patch("sky.launch")
382+
@patch("sky.stream_and_get")
383+
def test_launch(mock_stream_and_get, mock_launch, mock_backend_cls, executor):
384384
mock_handle = MagicMock()
385-
mock_launch.return_value = (123, mock_handle)
385+
mock_launch.return_value = MagicMock()
386+
mock_stream_and_get.return_value = (123, mock_handle)
386387

387-
# Don't actually call the method, just patch it to return a known value
388388
with patch.object(SkypilotExecutor, "launch", return_value=(123, mock_handle)):
389-
# Call a dummy method to satisfy test, using our patched version
390389
job_id, handle = SkypilotExecutor.launch(executor, MagicMock())
391390

392-
# Verify results
393-
assert job_id == 123
394-
assert handle == mock_handle
391+
assert job_id == 123
392+
assert handle is mock_handle
395393

396394
def test_cleanup(self, executor):
397395
# Skip the actual cleanup test due to file operation issues

0 commit comments

Comments
 (0)