Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Fixes

- Fix `workflow_job` Python model submission method failing with dictionary attribute error ([#1360](https://github.com/databricks/dbt-databricks/issues/1360))
- Fix column order mismatch in microbatch and replace_where incremental strategies by using INSERT BY NAME syntax ([#1338](https://github.com/databricks/dbt-databricks/issues/1338))

## dbt-databricks 1.11.6 (Mar 10, 2026)
Expand Down
10 changes: 9 additions & 1 deletion dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,15 @@ def create(self, job_spec: dict[str, Any]) -> str:
try:
# Convert job_spec to be compatible with jobs.create
converted_job_spec = self._convert_job_spec_for_create(job_spec)
create_response = self.workspace_client.jobs.create(**converted_job_spec)
# Convert plain dicts to proper SDK dataclass objects via JobSettings.
# The SDK's jobs.create() iterates tasks calling `v.as_dict()` on each
# (confirmed in JobsAPI.create source). Plain dicts have no .as_dict(),
# which caused the original AttributeError. as_shallow_dict() is the
# correct choice: it unpacks the top-level fields as kwargs (matching
# the jobs.create() signature) while keeping nested values as typed
# Task / NotebookTask / etc. dataclasses that have .as_dict().
job_settings = JobSettings.from_dict(converted_job_spec)
create_response = self.workspace_client.jobs.create(**job_settings.as_shallow_dict())
job_id = str(create_response.job_id)
logger.info(f"New workflow created with job id {job_id}")
return job_id
Expand Down
61 changes: 45 additions & 16 deletions tests/unit/api_client/test_workflow_job_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import Mock
from unittest.mock import Mock, patch

import pytest
from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings
from databricks.sdk.service.jobs import BaseJob, CreateResponse, JobSettings, QueueSettings, Task
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.databricks.api_client import WorkflowJobApi
Expand Down Expand Up @@ -59,7 +59,15 @@ def test_create__success(self, api, workspace_client):
result = api.create(job_spec)

assert result == "456"
workspace_client.jobs.create.assert_called_once_with(**job_spec)
workspace_client.jobs.create.assert_called_once()
# create() converts the dict via JobSettings.from_dict().as_shallow_dict().
# The SDK's jobs.create() calls v.as_dict() on each Task internally, so
# tasks must be proper Task dataclasses, not plain dicts.
# Note: as_shallow_dict() omits None/empty fields, so an empty tasks list
# is not included in the kwargs (the SDK treats missing as empty).
call_kwargs = workspace_client.jobs.create.call_args[1]
assert call_kwargs["name"] == "test_job"
assert call_kwargs.get("tasks") in (None, [])

def test_create__job_spec_conversion(self, api, workspace_client):
mock_create_response = Mock(spec=CreateResponse)
Expand Down Expand Up @@ -89,21 +97,42 @@ def test_create__job_spec_conversion(self, api, workspace_client):
assert result == "789"
workspace_client.jobs.create.assert_called_once()

# Verify the call was made with converted parameters
# Tasks are converted to SDK Task dataclasses via JobSettings.from_dict().as_shallow_dict().
# The SDK's jobs.create() calls v.as_dict() on each task internally, so
# we verify: (a) they are Task instances, (b) attributes are correct,
# (c) .as_dict() works (proving the original AttributeError is fixed).
call_kwargs = workspace_client.jobs.create.call_args[1]
assert call_kwargs["name"] == "test_job"
assert len(call_kwargs["tasks"]) == 2

# Check first task conversion
task1 = call_kwargs["tasks"][0]
assert task1["task_key"] == "task1"
assert "cluster_id" not in task1 # Should be removed
assert task1["existing_cluster_id"] == "test-cluster-id" # Should be converted

# Check second task remains unchanged
task2 = call_kwargs["tasks"][1]
assert task2["task_key"] == "task2"
assert task2["existing_cluster_id"] == "already-correct"
tasks = call_kwargs["tasks"]
assert len(tasks) == 2

task1 = tasks[0]
assert isinstance(task1, Task)
assert task1.task_key == "task1"
assert task1.existing_cluster_id == "test-cluster-id"
# Verify .as_dict() works — this was the root cause of the bug
task1_dict = task1.as_dict()
assert task1_dict["task_key"] == "task1"
assert task1_dict["existing_cluster_id"] == "test-cluster-id"

task2 = tasks[1]
assert isinstance(task2, Task)
assert task2.task_key == "task2"
assert task2.existing_cluster_id == "already-correct"
assert task2.as_dict()["existing_cluster_id"] == "already-correct"

def test_create__invalid_job_spec_raises(self, api, workspace_client):
"""If JobSettings.from_dict() raises (e.g. on malformed input), create()
wraps it in a DbtRuntimeError via the existing except block."""
with patch(
"dbt.adapters.databricks.api_client.JobSettings.from_dict",
side_effect=Exception("malformed spec"),
):
with pytest.raises(DbtRuntimeError) as exc_info:
api.create({"name": "bad_job", "tasks": ["not-a-dict"]})

assert "Error creating Workflow" in str(exc_info.value)
workspace_client.jobs.create.assert_not_called()

def test_update_job_settings__exception(self, api, workspace_client):
workspace_client.jobs.reset.side_effect = Exception("API Error")
Expand Down