-
Notifications
You must be signed in to change notification settings - Fork 194
Expand file tree
/
Copy pathtest_workflow_job_api.py
More file actions
187 lines (143 loc) · 7.58 KB
/
test_workflow_job_api.py
File metadata and controls
187 lines (143 loc) · 7.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from unittest.mock import Mock, patch
import pytest
from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings, Task
from dbt_common.exceptions import DbtRuntimeError
from dbt.adapters.databricks.api_client import WorkflowJobApi
class TestWorkflowJobApi:
@pytest.fixture
def workspace_client(self):
return Mock()
@pytest.fixture
def api(self, workspace_client):
return WorkflowJobApi(workspace_client)
def test_search_by_name__exception(self, api, workspace_client):
workspace_client.jobs.list.side_effect = Exception("API Error")
with pytest.raises(DbtRuntimeError) as exc_info:
api.search_by_name("test_job")
assert "Error fetching job by name" in str(exc_info.value)
def test_search_by_name__success(self, api, workspace_client):
mock_job = Mock(spec=BaseJob)
mock_job.as_dict.return_value = {"job_id": 123, "name": "test_job"}
workspace_client.jobs.list.return_value = [mock_job]
result = api.search_by_name("test_job")
assert result == [{"job_id": 123, "name": "test_job"}]
workspace_client.jobs.list.assert_called_once_with(name="test_job")
def test_search_by_name__empty_results(self, api, workspace_client):
workspace_client.jobs.list.return_value = []
result = api.search_by_name("nonexistent_job")
assert result == []
workspace_client.jobs.list.assert_called_once_with(name="nonexistent_job")
def test_create__exception(self, api, workspace_client):
workspace_client.jobs.create.side_effect = Exception("API Error")
with pytest.raises(DbtRuntimeError) as exc_info:
api.create({"name": "test_job"})
assert "Error creating Workflow" in str(exc_info.value)
def test_create__success(self, api, workspace_client):
mock_create_response = Mock(spec=CreateResponse)
mock_create_response.job_id = 456
workspace_client.jobs.create.return_value = mock_create_response
job_spec = {"name": "test_job", "tasks": []}
result = api.create(job_spec)
assert result == "456"
workspace_client.jobs.create.assert_called_once()
# create() converts the dict via JobSettings.from_dict().as_shallow_dict().
# The SDK's jobs.create() calls v.as_dict() on each Task internally, so
# tasks must be proper Task dataclasses, not plain dicts.
# Note: as_shallow_dict() omits None/empty fields, so an empty tasks list
# is not included in the kwargs (the SDK treats missing as empty).
call_kwargs = workspace_client.jobs.create.call_args[1]
assert call_kwargs["name"] == "test_job"
assert call_kwargs.get("tasks") in (None, [])
def test_create__job_spec_conversion(self, api, workspace_client):
mock_create_response = Mock(spec=CreateResponse)
mock_create_response.job_id = 789
workspace_client.jobs.create.return_value = mock_create_response
# Test job_spec with tasks that need cluster_id conversion
job_spec = {
"name": "test_job",
"tasks": [
{
"task_key": "task1",
"notebook_task": {"notebook_path": "/path/to/notebook"},
"cluster_id": "test-cluster-id", # Should be converted to existing_cluster_id
"libraries": [{"pypi": {"package": "requests"}}],
},
{
"task_key": "task2",
"notebook_task": {"notebook_path": "/path/to/notebook2"},
"existing_cluster_id": "already-correct", # Should remain unchanged
},
],
}
result = api.create(job_spec)
assert result == "789"
workspace_client.jobs.create.assert_called_once()
# Tasks are converted to SDK Task dataclasses via JobSettings.from_dict().as_shallow_dict().
# The SDK's jobs.create() calls v.as_dict() on each task internally, so
# we verify: (a) they are Task instances, (b) attributes are correct,
# (c) .as_dict() works (proving the original AttributeError is fixed).
call_kwargs = workspace_client.jobs.create.call_args[1]
assert call_kwargs["name"] == "test_job"
tasks = call_kwargs["tasks"]
assert len(tasks) == 2
task1 = tasks[0]
assert isinstance(task1, Task)
assert task1.task_key == "task1"
assert task1.existing_cluster_id == "test-cluster-id"
# Verify .as_dict() works — this was the root cause of the bug
task1_dict = task1.as_dict()
assert task1_dict["task_key"] == "task1"
assert task1_dict["existing_cluster_id"] == "test-cluster-id"
task2 = tasks[1]
assert isinstance(task2, Task)
assert task2.task_key == "task2"
assert task2.existing_cluster_id == "already-correct"
assert task2.as_dict()["existing_cluster_id"] == "already-correct"
def test_create__invalid_job_spec_raises(self, api, workspace_client):
"""If JobSettings.from_dict() raises (e.g. on malformed input), create()
wraps it in a DbtRuntimeError via the existing except block."""
with patch(
"dbt.adapters.databricks.api_client.JobSettings.from_dict",
side_effect=Exception("malformed spec"),
):
with pytest.raises(DbtRuntimeError) as exc_info:
api.create({"name": "bad_job", "tasks": ["not-a-dict"]})
assert "Error creating Workflow" in str(exc_info.value)
workspace_client.jobs.create.assert_not_called()
def test_update_job_settings__exception(self, api, workspace_client):
workspace_client.jobs.reset.side_effect = Exception("API Error")
with pytest.raises(DbtRuntimeError) as exc_info:
api.update_job_settings("123", {"name": "updated_job"})
assert "Error updating Workflow" in str(exc_info.value)
def test_update_job_settings__success(self, api, workspace_client):
job_spec = {"name": "updated_job", "tasks": []}
api.update_job_settings("123", job_spec)
workspace_client.jobs.reset.assert_called_once()
call_args = workspace_client.jobs.reset.call_args
assert call_args[1]["job_id"] == 123
# The new_settings should be a JobSettings object
assert isinstance(call_args[1]["new_settings"], JobSettings)
def test_run__exception(self, api, workspace_client):
workspace_client.jobs.run_now.side_effect = Exception("API Error")
with pytest.raises(DbtRuntimeError) as exc_info:
api.run("123")
assert "Error triggering run for workflow" in str(exc_info.value)
def test_run__success(self, api, workspace_client):
mock_run_result = Mock()
mock_run_result.run_id = 789
workspace_client.jobs.run_now.return_value = mock_run_result
result = api.run("123", enable_queueing=True)
assert result == "789"
workspace_client.jobs.run_now.assert_called_once()
call_args = workspace_client.jobs.run_now.call_args
assert call_args[1]["job_id"] == 123
assert isinstance(call_args[1]["queue"], QueueSettings)
assert call_args[1]["queue"].enabled is True
def test_run__disable_queueing(self, api, workspace_client):
mock_run_result = Mock()
mock_run_result.run_id = 789
workspace_client.jobs.run_now.return_value = mock_run_result
result = api.run("123", enable_queueing=False)
assert result == "789"
call_args = workspace_client.jobs.run_now.call_args
assert call_args[1]["queue"].enabled is False