diff --git a/CHANGELOG.md b/CHANGELOG.md index a801b9e64..a913391fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Fixes +- Fix `workflow_job` Python model submission method failing with dictionary attribute error ([#1360](https://github.com/databricks/dbt-databricks/issues/1360)) - Fix column order mismatch in microbatch and replace_where incremental strategies by using INSERT BY NAME syntax ([#1338](https://github.com/databricks/dbt-databricks/issues/1338)) ## dbt-databricks 1.11.6 (Mar 10, 2026) diff --git a/dbt/adapters/databricks/api_client.py b/dbt/adapters/databricks/api_client.py index 1ccdcba6e..49b0cc00d 100644 --- a/dbt/adapters/databricks/api_client.py +++ b/dbt/adapters/databricks/api_client.py @@ -801,7 +801,15 @@ def create(self, job_spec: dict[str, Any]) -> str: try: # Convert job_spec to be compatible with jobs.create converted_job_spec = self._convert_job_spec_for_create(job_spec) - create_response = self.workspace_client.jobs.create(**converted_job_spec) + # Convert plain dicts to proper SDK dataclass objects via JobSettings. + # The SDK's jobs.create() iterates tasks calling `v.as_dict()` on each + # (confirmed in JobsAPI.create source). Plain dicts have no .as_dict(), + # which caused the original AttributeError. as_shallow_dict() is the + # correct choice: it unpacks the top-level fields as kwargs (matching + # the jobs.create() signature) while keeping nested values as typed + # Task / NotebookTask / etc. dataclasses that have .as_dict(). + job_settings = JobSettings.from_dict(converted_job_spec) + create_response = self.workspace_client.jobs.create(**job_settings.as_shallow_dict()) job_id = str(create_response.job_id) logger.info(f"New workflow created with job id {job_id}") return job_id diff --git a/tests/unit/api_client/test_workflow_job_api.py b/tests/unit/api_client/test_workflow_job_api.py index 8a427a861..ce7ed18a4 100644 --- a/tests/unit/api_client/test_workflow_job_api.py +++ b/tests/unit/api_client/test_workflow_job_api.py @@ -1,7 +1,7 @@ -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest -from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings +from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings, Task from dbt_common.exceptions import DbtRuntimeError from dbt.adapters.databricks.api_client import WorkflowJobApi @@ -59,7 +59,15 @@ def test_create__success(self, api, workspace_client): result = api.create(job_spec) assert result == "456" - workspace_client.jobs.create.assert_called_once_with(**job_spec) + workspace_client.jobs.create.assert_called_once() + # create() converts the dict via JobSettings.from_dict().as_shallow_dict(). + # The SDK's jobs.create() calls v.as_dict() on each Task internally, so + # tasks must be proper Task dataclasses, not plain dicts. + # Note: as_shallow_dict() omits None/empty fields, so an empty tasks list + # is not included in the kwargs (the SDK treats missing as empty). + call_kwargs = workspace_client.jobs.create.call_args[1] + assert call_kwargs["name"] == "test_job" + assert call_kwargs.get("tasks") in (None, []) def test_create__job_spec_conversion(self, api, workspace_client): mock_create_response = Mock(spec=CreateResponse) @@ -89,21 +97,42 @@ def test_create__job_spec_conversion(self, api, workspace_client): assert result == "789" workspace_client.jobs.create.assert_called_once() - # Verify the call was made with converted parameters + # Tasks are converted to SDK Task dataclasses via JobSettings.from_dict().as_shallow_dict(). + # The SDK's jobs.create() calls v.as_dict() on each task internally, so + # we verify: (a) they are Task instances, (b) attributes are correct, + # (c) .as_dict() works (proving the original AttributeError is fixed). call_kwargs = workspace_client.jobs.create.call_args[1] assert call_kwargs["name"] == "test_job" - assert len(call_kwargs["tasks"]) == 2 - - # Check first task conversion - task1 = call_kwargs["tasks"][0] - assert task1["task_key"] == "task1" - assert "cluster_id" not in task1 # Should be removed - assert task1["existing_cluster_id"] == "test-cluster-id" # Should be converted - - # Check second task remains unchanged - task2 = call_kwargs["tasks"][1] - assert task2["task_key"] == "task2" - assert task2["existing_cluster_id"] == "already-correct" + tasks = call_kwargs["tasks"] + assert len(tasks) == 2 + + task1 = tasks[0] + assert isinstance(task1, Task) + assert task1.task_key == "task1" + assert task1.existing_cluster_id == "test-cluster-id" + # Verify .as_dict() works — this was the root cause of the bug + task1_dict = task1.as_dict() + assert task1_dict["task_key"] == "task1" + assert task1_dict["existing_cluster_id"] == "test-cluster-id" + + task2 = tasks[1] + assert isinstance(task2, Task) + assert task2.task_key == "task2" + assert task2.existing_cluster_id == "already-correct" + assert task2.as_dict()["existing_cluster_id"] == "already-correct" + + def test_create__invalid_job_spec_raises(self, api, workspace_client): + """If JobSettings.from_dict() raises (e.g. on malformed input), create() + wraps it in a DbtRuntimeError via the existing except block.""" + with patch( + "dbt.adapters.databricks.api_client.JobSettings.from_dict", + side_effect=Exception("malformed spec"), + ): + with pytest.raises(DbtRuntimeError) as exc_info: + api.create({"name": "bad_job", "tasks": ["not-a-dict"]}) + + assert "Error creating Workflow" in str(exc_info.value) + workspace_client.jobs.create.assert_not_called() def test_update_job_settings__exception(self, api, workspace_client): workspace_client.jobs.reset.side_effect = Exception("API Error")