|
1 | 1 | """CLI test module.""" |
2 | 2 |
|
3 | 3 | import base64 |
| 4 | +import os |
4 | 5 | from io import BytesIO |
5 | 6 | from pathlib import Path |
6 | 7 | from tempfile import mkdtemp |
7 | 8 | from unittest import TestCase |
8 | | -from unittest.mock import patch |
| 9 | +from unittest.mock import MagicMock, patch |
9 | 10 | from zipfile import ZipFile |
10 | 11 |
|
11 | 12 | import pytest |
12 | 13 | from click.testing import CliRunner |
13 | 14 |
|
14 | | -from openhexa.cli.cli import pipelines_download, pipelines_run, workspaces_add |
| 15 | +from openhexa.cli.cli import pipelines_download, pipelines_push, pipelines_run, workspaces_add |
| 16 | +from openhexa.sdk.pipelines import Pipeline |
| 17 | + |
| 18 | +python_file_name = "pipeline.py" |
| 19 | +python_code = "print('pipeline.py file')" |
| 20 | +version = "v1.0" |
| 21 | +pipeline_name = "MyPipeline" |
| 22 | + |
| 23 | + |
| 24 | +def create_zip_with_pipeline(): |
| 25 | + """Create a zip file containing the pipeline.py file.""" |
| 26 | + zip_buffer = BytesIO() |
| 27 | + fake_zipfile = ZipFile(zip_buffer, "w") |
| 28 | + fake_zipfile.writestr(python_file_name, python_code) |
| 29 | + fake_zipfile.close() |
| 30 | + return zip_buffer |
| 31 | + |
| 32 | + |
| 33 | +def setup_graphql_response(zip_buffer=create_zip_with_pipeline()): |
| 34 | + """Set up the mock GraphQL response pipelines.""" |
| 35 | + return { |
| 36 | + "pipelineByCode": {"currentVersion": {"zipfile": base64.b64encode(zip_buffer.getvalue()).decode()}}, |
| 37 | + "pipelines": {"items": []}, # (empty workspace initially) |
| 38 | + } |
15 | 39 |
|
16 | 40 |
|
17 | 41 | @pytest.mark.usefixtures("settings") |
@@ -58,52 +82,65 @@ def test_download_pipeline_no_pipeline(self, mock_graphql): |
58 | 82 | def test_download_pipeline_overwrite(self, mock_graphql): |
59 | 83 | """Test the download pipeline command with an existing director with and without content.""" |
60 | 84 | with self.runner.isolated_filesystem() as tmp: |
61 | | - # Create a zipfile with a pipeline.py file in a buffer |
62 | | - zip_buffer = BytesIO() |
63 | | - fake_zipfile = ZipFile(zip_buffer, "w") |
64 | | - fake_zipfile.writestr("pipeline.py", "print('pipeline.py file')") |
65 | | - fake_zipfile.close() |
66 | | - |
67 | | - # Known Pipeline & non empty directory |
68 | | - with open(tmp + "/pipeline.py", "w") as f: |
| 85 | + # Known Pipeline & non-empty directory |
| 86 | + with open(Path(tmp) / python_file_name, "w") as f: |
69 | 87 | f.write("<content>") |
70 | | - mock_graphql.return_value = { |
71 | | - "pipelineByCode": {"currentVersion": {"zipfile": base64.b64encode(zip_buffer.getvalue()).decode()}} |
72 | | - } |
| 88 | + |
| 89 | + mock_graphql.return_value = setup_graphql_response() |
| 90 | + |
73 | 91 | result = self.runner.invoke(pipelines_download, ["test_pipeline", tmp], input="N\n") |
74 | 92 | self.assertIn("Overwrite the files?", result.output) |
75 | 93 | self.assertIn(f"{tmp} is not empty", result.output) |
76 | | - self.assertEqual(open(tmp + "/pipeline.py").read(), "<content>") |
| 94 | + self.assertEqual(open(Path(tmp) / python_file_name).read(), "<content>") |
77 | 95 |
|
78 | 96 | # Overwrite the files in the directory |
79 | | - mock_graphql.return_value = { |
80 | | - "pipelineByCode": {"currentVersion": {"zipfile": base64.b64encode(zip_buffer.getvalue()).decode()}} |
81 | | - } |
| 97 | + mock_graphql.return_value = setup_graphql_response() |
| 98 | + |
82 | 99 | result = self.runner.invoke(pipelines_download, ["test_pipeline", tmp], input="y\n") |
83 | 100 | self.assertIn("Overwrite the files?", result.output) |
84 | | - self.assertEqual(open(tmp + "/pipeline.py").read(), "print('pipeline.py file')") |
| 101 | + self.assertEqual(open(Path(tmp) / python_file_name).read(), python_code) |
85 | 102 |
|
86 | 103 | @patch("openhexa.cli.api.graphql") |
87 | 104 | def test_download_pipeline(self, mock_graphql): |
88 | 105 | """Test the download pipeline command.""" |
89 | 106 | with self.runner.isolated_filesystem() as tmp: |
90 | 107 | # Create a zipfile with a pipeline.py file in a buffer |
91 | | - zip_buffer = BytesIO() |
92 | | - fake_zipfile = ZipFile(zip_buffer, "w") |
93 | | - fake_zipfile.writestr("pipeline.py", "print('pipeline.py file')") |
94 | | - fake_zipfile.close() |
95 | | - |
96 | | - mock_graphql.return_value = { |
97 | | - "pipelineByCode": {"currentVersion": {"zipfile": base64.b64encode(zip_buffer.getvalue()).decode()}} |
98 | | - } |
| 108 | + mock_graphql.return_value = setup_graphql_response() |
| 109 | + |
99 | 110 | result = self.runner.invoke(pipelines_download, ["test_pipeline", tmp]) |
100 | 111 | self.assertEqual(result.exit_code, 0) |
101 | | - self.assertTrue((Path(tmp) / "pipeline.py").exists()) |
102 | | - self.assertEqual(open(tmp + "/pipeline.py").read(), "print('pipeline.py file')") |
| 112 | + path = Path(tmp) / python_file_name |
| 113 | + self.assertTrue(path.exists()) |
| 114 | + self.assertEqual(open(path).read(), python_code) |
| 115 | + |
| 116 | + @patch("openhexa.cli.api.graphql") |
| 117 | + @patch("openhexa.cli.cli.get_pipeline") |
| 118 | + @patch("openhexa.cli.cli.upload_pipeline") |
| 119 | + @patch.dict(os.environ, {"HEXA_API_URL": "https://www.bluesquarehub.com/", "HEXA_WORKSPACE": "workspace"}) |
| 120 | + def test_push_pipeline(self, mock_upload_pipeline, mock_get_pipeline, mock_graphql): |
| 121 | + """Test pushing a pipeline.""" |
| 122 | + with self.runner.isolated_filesystem() as tmp: |
| 123 | + with open(Path(tmp) / python_file_name, "w") as f: |
| 124 | + f.write(python_code) |
| 125 | + mock_graphql.return_value = setup_graphql_response() |
| 126 | + mock_pipeline = MagicMock(spec=Pipeline) |
| 127 | + mock_pipeline.code = pipeline_name |
| 128 | + mock_get_pipeline.return_value = mock_pipeline |
| 129 | + |
| 130 | + result = self.runner.invoke(pipelines_push, [tmp, "--name", version]) |
| 131 | + self.assertEqual(result.exit_code, 0) |
| 132 | + self.assertIn( |
| 133 | + ( |
| 134 | + f"✅ New version '{version}' created! " |
| 135 | + f"You can view the pipeline in OpenHEXA on https://www.bluesquarehub.com/workspaces/workspace/pipelines/{pipeline_name}" |
| 136 | + ), |
| 137 | + result.output, |
| 138 | + ) |
| 139 | + self.assertTrue(mock_upload_pipeline.called) |
103 | 140 |
|
104 | 141 | @patch("openhexa.cli.api.graphql") |
105 | 142 | def test_workspaces_add_not_found(self, mock_graphql): |
106 | | - """Test the add workspace command when the workspae doesn't exist on the current server.""" |
| 143 | + """Test the add workspace command when the workspace doesn't exist on the current server.""" |
107 | 144 | with self.runner.isolated_filesystem(): |
108 | 145 | mock_graphql.return_value = {"workspace": None} |
109 | 146 | result = self.runner.invoke(workspaces_add, ["test_workspace"], input="random_token \n") |
|
0 commit comments