Skip to content

Commit bbade43

Browse files
authored
Fix: Spark partition by on state tables (#1238)
* convert strings to columns * dry up mock engine adapter creation
1 parent cf534c2 commit bbade43

9 files changed

Lines changed: 245 additions & 392 deletions

File tree

sqlmesh/core/engine_adapter/spark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def create_state_table(
212212
self.create_table(
213213
table_name,
214214
columns_to_types,
215-
partitioned_by=primary_key,
215+
partitioned_by=[exp.column(x) for x in primary_key] if primary_key else None,
216216
)
217217

218218
def create_view(

tests/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
pytest_plugins = ["tests.common_fixtures"]
2525

26+
T = t.TypeVar("T", bound=EngineAdapter)
27+
2628

2729
class DuckDBMetadata:
2830
def __init__(self, engine_adapter: EngineAdapter):
@@ -244,6 +246,17 @@ def sushi_fixed_date_data_validator(sushi_context_fixed_date: Context) -> SushiD
244246
return SushiDataValidator.from_context(sushi_context_fixed_date)
245247

246248

249+
@pytest.fixture
250+
def make_mocked_engine_adapter(mocker: MockerFixture) -> t.Callable:
251+
def _make_function(klass: t.Type[T], dialect: t.Optional[str] = None) -> T:
252+
connection_mock = mocker.NonCallableMock()
253+
cursor_mock = mocker.Mock()
254+
connection_mock.cursor.return_value = cursor_mock
255+
return klass(lambda: connection_mock, dialect=dialect or klass.DIALECT)
256+
257+
return _make_function
258+
259+
247260
def delete_cache(project_paths: str | t.List[str]) -> None:
248261
for path in ensure_list(project_paths):
249262
try:

0 commit comments

Comments
 (0)