-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_sqlalchemy_bind_manager.py
More file actions
160 lines (130 loc) · 4.82 KB
/
test_sqlalchemy_bind_manager.py
File metadata and controls
160 lines (130 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from unittest.mock import patch
import pytest
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session, registry
from sqlalchemy_bind_manager import (
SQLAlchemyBindManager,
SQLAlchemyConfig,
)
from sqlalchemy_bind_manager.exceptions import (
InvalidConfigError,
NotInitializedBindError,
)
@pytest.mark.parametrize(
"supplied_config",
[
{"bind_name": "Invalid Config"},
{
"valid": SQLAlchemyConfig(
engine_url="sqlite://",
engine_options=dict(connect_args={"check_same_thread": False}),
session_options=dict(expire_on_commit=False),
),
"invalid": "Invalid Config",
},
"Invalid single config",
],
)
def test_invalid_config_raises_exception(supplied_config):
# We consciously ignore the type to supply an invalid config
with pytest.raises(InvalidConfigError):
SQLAlchemyBindManager(supplied_config) # type: ignore
def test_initialised_bind_raises_error(single_config):
sa_manager = SQLAlchemyBindManager(single_config)
with pytest.raises(NotInitializedBindError):
sa_manager.get_session("uninitialised bind")
def test_single_config_creates_default_bind(single_config):
sa_manager = SQLAlchemyBindManager(single_config)
assert len(sa_manager.get_binds()) == 1
default_bind = sa_manager.get_bind()
assert default_bind is not None
assert isinstance(sa_manager.get_mapper(), registry)
assert isinstance(sa_manager.get_session(), Session)
assert sa_manager.get_session().get_bind() == default_bind.engine
def test_multiple_binds(multiple_config):
sa_manager = SQLAlchemyBindManager(multiple_config)
assert len(sa_manager.get_binds()) == 2
mappers_metadata = sa_manager.get_bind_mappers_metadata()
assert len(mappers_metadata) == 2
for key in ["default", "async"]:
assert key in mappers_metadata
assert isinstance(mappers_metadata[key], MetaData)
default_bind = sa_manager.get_bind()
assert default_bind is not None
assert isinstance(sa_manager.get_mapper(), registry)
assert isinstance(sa_manager.get_session(), Session)
async_bind = sa_manager.get_bind("async")
assert async_bind is not None
assert isinstance(sa_manager.get_mapper("async"), registry)
assert isinstance(sa_manager.get_session("async"), AsyncSession)
async def test_engine_is_disposed_on_cleanup(multiple_config):
sa_manager = SQLAlchemyBindManager(multiple_config)
sync_engine = sa_manager.get_bind("default").engine
async_engine = sa_manager.get_bind("async").engine
original_sync_dispose = sync_engine.dispose
original_async_dispose = async_engine.dispose
with (
patch.object(
sync_engine,
"dispose",
wraps=original_sync_dispose,
) as mocked_dispose,
patch.object(
type(async_engine),
"dispose",
wraps=original_async_dispose,
) as mocked_async_dispose,
):
sa_manager = None
mocked_dispose.assert_called_once()
mocked_async_dispose.assert_called()
def test_engine_is_disposed_on_cleanup_even_if_no_loop(multiple_config):
sa_manager = SQLAlchemyBindManager(multiple_config)
sync_engine = sa_manager.get_bind("default").engine
async_engine = sa_manager.get_bind("async").engine
original_sync_dispose = sync_engine.dispose
original_async_dispose = async_engine.dispose
with (
patch.object(
sync_engine,
"dispose",
wraps=original_sync_dispose,
) as mocked_dispose,
patch.object(
type(async_engine),
"dispose",
wraps=original_async_dispose,
) as mocked_async_dispose,
):
sa_manager = None
mocked_dispose.assert_called_once()
mocked_async_dispose.assert_called()
def test_engine_is_disposed_on_cleanup_even_if_loop_search_errors_out(
multiple_config,
):
sa_manager = SQLAlchemyBindManager(multiple_config)
sync_engine = sa_manager.get_bind("default").engine
async_engine = sa_manager.get_bind("async").engine
original_sync_dispose = sync_engine.dispose
original_async_dispose = async_engine.dispose
with (
patch.object(
sync_engine,
"dispose",
wraps=original_sync_dispose,
) as mocked_dispose,
patch.object(
type(async_engine),
"dispose",
wraps=original_async_dispose,
) as mocked_async_dispose,
patch(
"asyncio.get_event_loop",
side_effect=RuntimeError(),
) as mocked_get_event_loop,
):
sa_manager = None
mocked_get_event_loop.assert_called_once()
mocked_dispose.assert_called_once()
mocked_async_dispose.assert_called()