Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions tests/smoke_tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
- Batch cancel: cancel a running batch job mid-flight.
- Batch HA: kill controller mid-flight, verify resume from DB.
"""
import re
import tempfile
from typing import Dict

import pytest
from smoke_tests import smoke_tests_utils
Expand All @@ -24,6 +26,9 @@
# option to not allow shared env tests.


_CLOUD_TO_STORE = {'gcp': 'gs', 'nebius': 'nebius'}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 _CLOUD_TO_STORE maps nebius to 'nebius' but batch IO classes only accept s3:// and gs:// prefixes

Setting 'nebius': 'nebius' in _CLOUD_TO_STORE causes the test to set SKY_BATCH_STORE='nebius'. The Python example scripts (e.g., examples/batch/simple/double_text.py:81) then construct paths like nebius://bucket/test.jsonl. However, JsonReader.__post_init__ (sky/batch/io_formats.py:223-227), JsonWriter.__post_init__ (sky/batch/io_formats.py:281-285), and ImageWriter.__post_init__ (sky/batch/io_formats.py:327-331) all explicitly reject any prefix other than s3:// or gs://, raising ValueError. Additionally, parse_cloud_path (sky/batch/utils.py:179-187) also only handles s3:// and gs://. This means every batch test (test_batch_simple, test_batch_diffusion, test_batch_cancel, test_batch_custom_formats, test_batch_ha_kill_running) will fail immediately on nebius with a ValueError when the example script creates a JsonReader or JsonWriter with a nebius:// path.

Prompt for agents
The _CLOUD_TO_STORE mapping sets nebius store to 'nebius', which causes example scripts to construct nebius://bucket/... paths. But the batch IO format classes (JsonReader, JsonWriter, ImageWriter in sky/batch/io_formats.py) and utility functions (parse_cloud_path in sky/batch/utils.py) only support s3:// and gs:// prefixes. There are two possible approaches:

1. Remove 'nebius' from _CLOUD_TO_STORE so it falls through to the default 's3', since Nebius uses S3-compatible storage. This requires verifying that the batch framework's internal S3 operations (using aws.client('s3') via sky/adaptors/aws) work correctly with Nebius credentials on Nebius worker machines.

2. Update the batch IO format classes and utility functions to support nebius:// paths, translating them to s3:// internally (similar to how sky/cloud_stores.py:589-590 does source.replace('nebius://', 's3://')).

Option 1 is simpler if Nebius workers already have proper S3-compatible credentials configured.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.



def _storage_cmds(generic_cloud: str, bucket: str):
"""Return cloud-specific storage command fragments for batch tests.

Expand All @@ -36,6 +41,17 @@ def _storage_cmds(generic_cloud: str, bucket: str):
url = f'gs://{bucket}'
return (url, f'gsutil mb {url}', f'gsutil rm -r {url}', 'gsutil cp',
'gsutil rm', 'gsutil ls', lambda p: f'gsutil rm -r {p}')
if generic_cloud == 'nebius':
# Nebius uses S3-compatible storage but requires --profile=nebius
from sky.adaptors import nebius
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In-function import of nebius module without explanatory comment violates AGENTS.md rule

AGENTS.md states: "Always place imports at the top of the file, not inside function definitions. [...] Only as a last resort, place the import inside the function with a comment explaining why." The from sky.adaptors import nebius import at line 46 is inside the _storage_cmds function with no comment. The module sky/adaptors/nebius.py only imports lightweight SkyPilot internals (no heavy third-party SDK at module level), so there's no practical reason this can't be at the top of the file — indeed, tests/smoke_tests/test_mount_and_storage.py:47 already imports it at the top level.

Prompt for agents
Move the import 'from sky.adaptors import nebius' from inside _storage_cmds() (line 46) to the top-level imports section of the file (around line 18, with the other sky imports). The nebius adaptor module only imports lightweight SkyPilot internals at module level, so top-level import is safe. See tests/smoke_tests/test_mount_and_storage.py:47 for the existing pattern.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

profile = f'--profile={nebius.NEBIUS_PROFILE_NAME}'
url = f's3://{bucket}'
return (url,
f'aws s3 mb {url} {profile}',
f'aws s3 rb {url} {profile} --force',
f'aws s3 cp {profile}', f'aws s3 rm {profile}',
f'aws s3 ls {profile}',
lambda p: f'aws s3 rm {p} {profile} --recursive')
# Default to AWS
url = f's3://{bucket}'
return (url,
Expand All @@ -53,7 +69,7 @@ def test_batch_simple(generic_cloud: str):
pool_name = f'batch-smpl-pool-{name}'
url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds(
generic_cloud, bucket)
store = 'gs' if generic_cloud == 'gcp' else 's3'
store = _CLOUD_TO_STORE.get(generic_cloud, 's3')

test = smoke_tests_utils.Test(
'batch_simple',
Expand Down Expand Up @@ -140,15 +156,36 @@ def test_batch_simple(generic_cloud: str):
# ---------- Test diffusion batch (image generation) ----------
@pytest.mark.batch
@pytest.mark.resource_heavy
@pytest.mark.no_kubernetes # pool.yaml hardcodes L4 GPU; K8s CI clusters may not have it
@pytest.mark.no_remote_server # see note 1 above
def test_batch_diffusion(generic_cloud: str):
@pytest.mark.parametrize('accelerator', [{'azure': 'T4', 'nebius': 'L40S'}])
def test_batch_diffusion(generic_cloud: str, accelerator: Dict[str, str]):
if generic_cloud in ('kubernetes', 'slurm'):
accelerator_str = smoke_tests_utils.get_available_gpus(
infra=generic_cloud)
if not accelerator_str:
pytest.fail(f'No GPUs available for {generic_cloud}.')
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 pytest.fail used instead of pytest.skip when GPUs are unavailable, inconsistent with codebase pattern

When no GPUs are available for kubernetes/slurm, the test calls pytest.fail() (line 152) which marks the test as FAILED. The established pattern throughout the test suite (e.g., tests/smoke_tests/test_cluster_job.py:63-64) is to use pytest.skip() in this scenario, since the test's prerequisites aren't met rather than the test itself being broken. This will cause CI false-failures on k8s/slurm environments without GPUs.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

else:
accelerator_str = accelerator.get(generic_cloud, 'L4')

# Rewrite pool.yaml with the resolved accelerator for this cloud
pool_yaml_path = 'examples/batch/diffusion/pool.yaml'
with open(pool_yaml_path, 'r', encoding='utf-8') as f:
pool_content = f.read()
pool_content = re.sub(r'accelerators:\s*[^\n]+',
f'accelerators: {accelerator_str}:1', pool_content)
pool_tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml',
delete=False)
pool_tmp.write(pool_content)
pool_tmp.flush()
pool_tmp_path = pool_tmp.name
pool_tmp.close()

name = smoke_tests_utils.get_cluster_name()
bucket = f'sky-batch-diff-{name}'
pool_name = 'diffusion-pool'
url, create_bkt, delete_bkt, cp, rm, ls, rm_r = _storage_cmds(
generic_cloud, bucket)
store = 'gs' if generic_cloud == 'gcp' else 's3'
store = _CLOUD_TO_STORE.get(generic_cloud, 's3')

test = smoke_tests_utils.Test(
'batch_diffusion',
Expand All @@ -158,7 +195,7 @@ def test_batch_diffusion(generic_cloud: str):
f'sky serve down {pool_name} -y 2>/dev/null || true',
# --- Create GPU pool with generic_cloud ---
(f's=$(sky jobs pool apply -p {pool_name} --infra {generic_cloud}'
f' examples/batch/diffusion/pool.yaml -y); '
f' {pool_tmp_path} -y); '
f'echo "$s"; '
f'echo "$s" | grep "Successfully created pool"'),
# --- Data setup (extracted from examples/batch/diffusion/) ---
Expand Down Expand Up @@ -213,7 +250,8 @@ def test_batch_diffusion(generic_cloud: str):
f' sky serve down {pool_name} -y 2>/dev/null || true;'
f' {delete_bkt};'
f' rm -f /tmp/batch-prompts-{name}.jsonl'
f' /tmp/batch-manifest-{name}.jsonl'),
f' /tmp/batch-manifest-{name}.jsonl'
f' {pool_tmp_path}'),
timeout=45 * 60,
env={
'SKY_BATCH_BUCKET': bucket,
Expand All @@ -232,7 +270,7 @@ def test_batch_custom_formats(generic_cloud: str):
pool_name = 'custom-fmt-pool'
url, create_bkt, delete_bkt, cp, _, ls, rm_r = _storage_cmds(
generic_cloud, bucket)
store = 'gs' if generic_cloud == 'gcp' else 's3'
store = _CLOUD_TO_STORE.get(generic_cloud, 's3')

test = smoke_tests_utils.Test(
'batch_custom_formats',
Expand Down Expand Up @@ -311,7 +349,7 @@ def test_batch_cancel(generic_cloud: str):
pool_name = f'batch-cncl-pool-{name}'
url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds(
generic_cloud, bucket)
store = 'gs' if generic_cloud == 'gcp' else 's3'
store = _CLOUD_TO_STORE.get(generic_cloud, 's3')

test = smoke_tests_utils.Test(
'batch_cancel',
Expand Down Expand Up @@ -402,7 +440,7 @@ def test_batch_ha_kill_running(generic_cloud: str):
pool_name = 'test-batch-pool'
url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds(
generic_cloud, bucket)
store = 'gs' if generic_cloud == 'gcp' else 's3'
store = _CLOUD_TO_STORE.get(generic_cloud, 's3')

# HA config: run the jobs controller on k8s with high_availability.
skypilot_config_path = 'tests/test_yamls/managed_jobs_ha_config.yaml'
Expand Down
Loading