-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathtest_pipeline_dsl_cli.py
More file actions
95 lines (73 loc) · 4.23 KB
/
Copy pathtest_pipeline_dsl_cli.py
File metadata and controls
95 lines (73 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from pathlib import Path
from unittest.mock import Mock, patch
import yaml
from click.testing import CliRunner
from clarifai.cli.pipeline import compile, upload
def test_upload_python_pipeline_file_generates_and_uploads_directory(tmp_path: Path):
pipeline_file = tmp_path / 'pipeline_def.py'
pipeline_file.write_text('pipeline = None\n', encoding='utf-8')
runner = CliRunner()
with (
patch('clarifai.runners.pipelines.load_pipeline_from_file') as mock_loader,
patch('clarifai.runners.pipelines.pipeline_builder.upload_pipeline') as mock_upload,
):
mock_pipeline = Mock()
mock_pipeline.id = 'test-pipeline'
mock_loader.return_value = mock_pipeline
result = runner.invoke(upload, [str(pipeline_file)])
expected_dir = str(tmp_path / 'generated-test-pipeline')
assert result.exit_code == 0
mock_loader.assert_called_once_with(str(pipeline_file))
mock_pipeline.generate.assert_called_once_with(expected_dir)
mock_upload.assert_called_once_with(expected_dir, no_lockfile=False)
def test_generate_python_pipeline_file_writes_output(tmp_path: Path):
pipeline_file = tmp_path / 'pipeline_def.py'
pipeline_file.write_text('pipeline = None\n', encoding='utf-8')
output_dir = tmp_path / 'generated'
runner = CliRunner()
with patch('clarifai.runners.pipelines.load_pipeline_from_file') as mock_loader:
mock_pipeline = Mock()
mock_pipeline.generate.return_value = str(output_dir / 'config.yaml')
mock_pipeline.nodes = [] # no managed steps → no Dockerfiles expected
mock_loader.return_value = mock_pipeline
result = runner.invoke(compile, [str(pipeline_file), '--output-dir', str(output_dir)])
assert result.exit_code == 0
mock_loader.assert_called_once_with(str(pipeline_file))
mock_pipeline.generate.assert_called_once_with(str(output_dir))
def test_generate_real_example_pipeline_writes_mixed_step_config(tmp_path: Path):
repo_root = Path(__file__).resolve().parents[2]
pipeline_file = repo_root / 'examples' / 'pipeline_dsl_text_pipeline.py'
output_dir = tmp_path / 'generated'
runner = CliRunner()
result = runner.invoke(compile, [str(pipeline_file), '--output-dir', str(output_dir)])
assert result.exit_code == 0, result.output
config = yaml.safe_load((output_dir / 'config.yaml').read_text(encoding='utf-8'))
argo_spec = yaml.safe_load(config['pipeline']['orchestration_spec']['argo_orchestration_spec'])
step_groups = argo_spec['spec']['templates'][0]['steps']
tasks = {entry['name']: entry for group in step_groups for entry in group}
assert config['pipeline']['step_directories'] == ['prepare-text', 'assemble-report']
assert tasks['summarize']['templateRef']['name'].endswith('/versions/summary-v1')
assert tasks['classify-sentiment']['templateRef']['name'].endswith('/versions/sentiment-v3')
assert (output_dir / 'prepare-text' / '1' / 'text_utils.py').exists()
assert not (output_dir / 'summarize').exists()
assert not (output_dir / 'classify-sentiment').exists()
# compile must also generate Dockerfiles for locally managed steps
assert (output_dir / 'prepare-text' / 'Dockerfile').exists()
assert (output_dir / 'assemble-report' / 'Dockerfile').exists()
def test_compile_generates_dockerfiles_for_managed_steps(tmp_path: Path):
"""compile writes a Dockerfile next to each locally managed step directory."""
repo_root = Path(__file__).resolve().parents[2]
pipeline_file = repo_root / 'examples' / 'pipeline_dsl_text_pipeline.py'
output_dir = tmp_path / 'compiled'
runner = CliRunner()
result = runner.invoke(compile, [str(pipeline_file), '--output-dir', str(output_dir)])
assert result.exit_code == 0, result.output
for step_id in ('prepare-text', 'assemble-report'):
dockerfile = output_dir / step_id / 'Dockerfile'
assert dockerfile.exists(), f'Dockerfile missing for step {step_id!r}'
content = dockerfile.read_text(encoding='utf-8')
assert 'FROM ' in content
assert 'COPY --link=true 1 /home/nonroot/main/1' in content
# Pre-existing (non-managed) steps must NOT get a Dockerfile.
assert not (output_dir / 'summarize').exists()
assert not (output_dir / 'classify-sentiment').exists()