-
Notifications
You must be signed in to change notification settings - Fork 374
Expand file tree
/
Copy pathconftest.py
More file actions
157 lines (127 loc) · 5.66 KB
/
conftest.py
File metadata and controls
157 lines (127 loc) · 5.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import annotations
import typing as t
import pytest
import pathlib
import os
import logging
from pytest import FixtureRequest
from sqlmesh import Config, EngineAdapter
from sqlmesh.core.constants import SQLMESH_PATH
from sqlmesh.core.config.connection import (
ConnectionConfig,
AthenaConnectionConfig,
DuckDBConnectionConfig,
)
from sqlmesh.core.engine_adapter import AthenaEngineAdapter
from sqlmesh.core.config import load_config_from_paths
from tests.core.engine_adapter.integration import (
TestContext,
generate_pytest_params,
ENGINES,
IntegrationTestEngine,
)
logger = logging.getLogger(__name__)
@pytest.fixture
def config(tmp_path: pathlib.Path) -> Config:
return load_config_from_paths(
Config,
project_paths=[
pathlib.Path(os.path.join(os.path.dirname(__file__), "config.yaml")),
],
personal_paths=[(SQLMESH_PATH / "config.yaml").expanduser()],
variables={"tmp_path": str(tmp_path)},
)
@pytest.fixture
def create_engine_adapter(
request: pytest.FixtureRequest,
testrun_uid: str,
config: Config,
) -> t.Callable[[str, str], EngineAdapter]:
def _create(engine_name: str, gateway: str) -> EngineAdapter:
assert gateway in config.gateways
connection_config = config.gateways[gateway].connection
assert isinstance(connection_config, ConnectionConfig)
engine_adapter = connection_config.create_engine_adapter()
if engine_name == "athena":
assert isinstance(connection_config, AthenaConnectionConfig)
assert isinstance(engine_adapter, AthenaEngineAdapter)
# S3 files need to go into a unique location for each test run
# This is because DROP TABLE on a Hive table just drops the table from the metastore
# The files still exist in S3, so if you CREATE TABLE to the same location, the old data shows back up
# Note that the `testrun_uid` fixture comes from the xdist plugin
if connection_config.s3_warehouse_location:
engine_adapter.s3_warehouse_location = os.path.join(
connection_config.s3_warehouse_location,
f"testrun_{testrun_uid}",
request.node.originalname,
)
# Trino: If we batch up the requests then when running locally we get a table not found error after creating the
# table and then immediately after trying to insert rows into it. There seems to be a delay between when the
# metastore is made aware of the table and when it responds that it exists. I'm hoping this is not an issue
# in practice on production machines.
if not engine_name == "trino":
engine_adapter.DEFAULT_BATCH_SIZE = 1
# Clear our any local db files that may have been left over from previous runs
if engine_name == "duckdb":
assert isinstance(connection_config, DuckDBConnectionConfig)
for raw_path in [
v for v in (connection_config.catalogs or {}).values() if isinstance(v, str)
]:
pathlib.Path(raw_path).unlink(missing_ok=True)
return engine_adapter
return _create
@pytest.fixture
def create_test_context(
request: FixtureRequest,
create_engine_adapter: t.Callable[[str, str], EngineAdapter],
tmp_path: pathlib.Path,
) -> t.Callable[[IntegrationTestEngine, str, str, str], t.Iterable[TestContext]]:
def _create(
engine: IntegrationTestEngine, gateway: str, test_type: str, table_format: str
) -> t.Iterable[TestContext]:
is_remote = request.node.get_closest_marker("remote") is not None
engine_adapter = create_engine_adapter(engine.engine, gateway)
ctx = TestContext(
test_type,
engine_adapter,
f"{engine.engine}_{table_format}",
gateway,
tmp_path=tmp_path,
is_remote=is_remote,
)
try:
ctx.init()
except:
# pytest-retry doesnt work if there are errors in fixture setup (ref: https://github.com/str0zzapreti/pytest-retry/issues/33 )
# what we can do is log the exception and return a partially-initialized context to the test, which should
# throw an exception when it tries to access something that didnt init properly and thus trigger pytest-retry to retry
logger.exception("Context init failed")
with ctx.engine_adapter.session({}):
yield ctx
try:
ctx.cleanup()
except:
# We need to catch this exception because if there is an error during teardown, pytest-retry aborts immediately
# instead of retrying
logger.exception("Context cleanup failed")
return _create
@pytest.fixture(
params=list(generate_pytest_params(ENGINES, query=True, show_variant_in_test_id=False))
)
def ctx(
request: FixtureRequest,
create_test_context: t.Callable[[IntegrationTestEngine, str], t.Iterable[TestContext]],
) -> t.Iterable[TestContext]:
yield from create_test_context(*request.param)
@pytest.fixture(params=list(generate_pytest_params(ENGINES, query=False, df=True)))
def ctx_df(
request: FixtureRequest,
create_test_context: t.Callable[[IntegrationTestEngine, str], t.Iterable[TestContext]],
) -> t.Iterable[TestContext]:
yield from create_test_context(*request.param)
@pytest.fixture(params=list(generate_pytest_params(ENGINES, query=True, df=True)))
def ctx_query_and_df(
request: FixtureRequest,
create_test_context: t.Callable[[IntegrationTestEngine, str], t.Iterable[TestContext]],
) -> t.Iterable[TestContext]:
yield from create_test_context(*request.param)