-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[CI Fix] Skip test_batch_diffusion on Azure and Nebius (no L4 GPUs) #9657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,9 @@ | |
| - Batch cancel: cancel a running batch job mid-flight. | ||
| - Batch HA: kill controller mid-flight, verify resume from DB. | ||
| """ | ||
| import re | ||
| import tempfile | ||
| from typing import Dict | ||
|
|
||
| import pytest | ||
| from smoke_tests import smoke_tests_utils | ||
|
|
@@ -24,6 +26,9 @@ | |
| # option to not allow shared env tests. | ||
|
|
||
|
|
||
| _CLOUD_TO_STORE = {'gcp': 'gs', 'nebius': 'nebius'} | ||
|
|
||
|
|
||
| def _storage_cmds(generic_cloud: str, bucket: str): | ||
| """Return cloud-specific storage command fragments for batch tests. | ||
|
|
||
|
|
@@ -36,6 +41,17 @@ def _storage_cmds(generic_cloud: str, bucket: str): | |
| url = f'gs://{bucket}' | ||
| return (url, f'gsutil mb {url}', f'gsutil rm -r {url}', 'gsutil cp', | ||
| 'gsutil rm', 'gsutil ls', lambda p: f'gsutil rm -r {p}') | ||
| if generic_cloud == 'nebius': | ||
| # Nebius uses S3-compatible storage but requires --profile=nebius | ||
| from sky.adaptors import nebius | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 In-function import of AGENTS.md states: "Always place imports at the top of the file, not inside function definitions. [...] Only as a last resort, place the import inside the function with a comment explaining why." The Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
| profile = f'--profile={nebius.NEBIUS_PROFILE_NAME}' | ||
| url = f's3://{bucket}' | ||
| return (url, | ||
| f'aws s3 mb {url} {profile}', | ||
| f'aws s3 rb {url} {profile} --force', | ||
| f'aws s3 cp {profile}', f'aws s3 rm {profile}', | ||
| f'aws s3 ls {profile}', | ||
| lambda p: f'aws s3 rm {p} {profile} --recursive') | ||
| # Default to AWS | ||
| url = f's3://{bucket}' | ||
| return (url, | ||
|
|
@@ -53,7 +69,7 @@ def test_batch_simple(generic_cloud: str): | |
| pool_name = f'batch-smpl-pool-{name}' | ||
| url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds( | ||
| generic_cloud, bucket) | ||
| store = 'gs' if generic_cloud == 'gcp' else 's3' | ||
| store = _CLOUD_TO_STORE.get(generic_cloud, 's3') | ||
|
|
||
| test = smoke_tests_utils.Test( | ||
| 'batch_simple', | ||
|
|
@@ -140,15 +156,36 @@ def test_batch_simple(generic_cloud: str): | |
| # ---------- Test diffusion batch (image generation) ---------- | ||
| @pytest.mark.batch | ||
| @pytest.mark.resource_heavy | ||
| @pytest.mark.no_kubernetes # pool.yaml hardcodes L4 GPU; K8s CI clusters may not have it | ||
| @pytest.mark.no_remote_server # see note 1 above | ||
| def test_batch_diffusion(generic_cloud: str): | ||
| @pytest.mark.parametrize('accelerator', [{'azure': 'T4', 'nebius': 'L40S'}]) | ||
| def test_batch_diffusion(generic_cloud: str, accelerator: Dict[str, str]): | ||
| if generic_cloud in ('kubernetes', 'slurm'): | ||
| accelerator_str = smoke_tests_utils.get_available_gpus( | ||
| infra=generic_cloud) | ||
| if not accelerator_str: | ||
| pytest.fail(f'No GPUs available for {generic_cloud}.') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 When no GPUs are available for kubernetes/slurm, the test calls Was this helpful? React with 👍 or 👎 to provide feedback. |
||
| else: | ||
| accelerator_str = accelerator.get(generic_cloud, 'L4') | ||
|
|
||
| # Rewrite pool.yaml with the resolved accelerator for this cloud | ||
| pool_yaml_path = 'examples/batch/diffusion/pool.yaml' | ||
| with open(pool_yaml_path, 'r', encoding='utf-8') as f: | ||
| pool_content = f.read() | ||
| pool_content = re.sub(r'accelerators:\s*[^\n]+', | ||
| f'accelerators: {accelerator_str}:1', pool_content) | ||
| pool_tmp = tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', | ||
| delete=False) | ||
| pool_tmp.write(pool_content) | ||
| pool_tmp.flush() | ||
| pool_tmp_path = pool_tmp.name | ||
| pool_tmp.close() | ||
|
|
||
| name = smoke_tests_utils.get_cluster_name() | ||
| bucket = f'sky-batch-diff-{name}' | ||
| pool_name = 'diffusion-pool' | ||
| url, create_bkt, delete_bkt, cp, rm, ls, rm_r = _storage_cmds( | ||
| generic_cloud, bucket) | ||
| store = 'gs' if generic_cloud == 'gcp' else 's3' | ||
| store = _CLOUD_TO_STORE.get(generic_cloud, 's3') | ||
|
|
||
| test = smoke_tests_utils.Test( | ||
| 'batch_diffusion', | ||
|
|
@@ -158,7 +195,7 @@ def test_batch_diffusion(generic_cloud: str): | |
| f'sky serve down {pool_name} -y 2>/dev/null || true', | ||
| # --- Create GPU pool with generic_cloud --- | ||
| (f's=$(sky jobs pool apply -p {pool_name} --infra {generic_cloud}' | ||
| f' examples/batch/diffusion/pool.yaml -y); ' | ||
| f' {pool_tmp_path} -y); ' | ||
| f'echo "$s"; ' | ||
| f'echo "$s" | grep "Successfully created pool"'), | ||
| # --- Data setup (extracted from examples/batch/diffusion/) --- | ||
|
|
@@ -213,7 +250,8 @@ def test_batch_diffusion(generic_cloud: str): | |
| f' sky serve down {pool_name} -y 2>/dev/null || true;' | ||
| f' {delete_bkt};' | ||
| f' rm -f /tmp/batch-prompts-{name}.jsonl' | ||
| f' /tmp/batch-manifest-{name}.jsonl'), | ||
| f' /tmp/batch-manifest-{name}.jsonl' | ||
| f' {pool_tmp_path}'), | ||
| timeout=45 * 60, | ||
| env={ | ||
| 'SKY_BATCH_BUCKET': bucket, | ||
|
|
@@ -232,7 +270,7 @@ def test_batch_custom_formats(generic_cloud: str): | |
| pool_name = 'custom-fmt-pool' | ||
| url, create_bkt, delete_bkt, cp, _, ls, rm_r = _storage_cmds( | ||
| generic_cloud, bucket) | ||
| store = 'gs' if generic_cloud == 'gcp' else 's3' | ||
| store = _CLOUD_TO_STORE.get(generic_cloud, 's3') | ||
|
|
||
| test = smoke_tests_utils.Test( | ||
| 'batch_custom_formats', | ||
|
|
@@ -311,7 +349,7 @@ def test_batch_cancel(generic_cloud: str): | |
| pool_name = f'batch-cncl-pool-{name}' | ||
| url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds( | ||
| generic_cloud, bucket) | ||
| store = 'gs' if generic_cloud == 'gcp' else 's3' | ||
| store = _CLOUD_TO_STORE.get(generic_cloud, 's3') | ||
|
|
||
| test = smoke_tests_utils.Test( | ||
| 'batch_cancel', | ||
|
|
@@ -402,7 +440,7 @@ def test_batch_ha_kill_running(generic_cloud: str): | |
| pool_name = 'test-batch-pool' | ||
| url, create_bkt, delete_bkt, cp, rm, _, rm_r = _storage_cmds( | ||
| generic_cloud, bucket) | ||
| store = 'gs' if generic_cloud == 'gcp' else 's3' | ||
| store = _CLOUD_TO_STORE.get(generic_cloud, 's3') | ||
|
|
||
| # HA config: run the jobs controller on k8s with high_availability. | ||
| skypilot_config_path = 'tests/test_yamls/managed_jobs_ha_config.yaml' | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴
_CLOUD_TO_STOREmaps nebius to'nebius'but batch IO classes only accepts3://andgs://prefixesSetting
'nebius': 'nebius'in_CLOUD_TO_STOREcauses the test to setSKY_BATCH_STORE='nebius'. The Python example scripts (e.g.,examples/batch/simple/double_text.py:81) then construct paths likenebius://bucket/test.jsonl. However,JsonReader.__post_init__(sky/batch/io_formats.py:223-227),JsonWriter.__post_init__(sky/batch/io_formats.py:281-285), andImageWriter.__post_init__(sky/batch/io_formats.py:327-331) all explicitly reject any prefix other thans3://orgs://, raisingValueError. Additionally,parse_cloud_path(sky/batch/utils.py:179-187) also only handless3://andgs://. This means every batch test (test_batch_simple,test_batch_diffusion,test_batch_cancel,test_batch_custom_formats,test_batch_ha_kill_running) will fail immediately on nebius with aValueErrorwhen the example script creates aJsonReaderorJsonWriterwith anebius://path.Prompt for agents
Was this helpful? React with 👍 or 👎 to provide feedback.