forked from googleapis/python-spanner-sqlalchemy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_insertmany.py
More file actions
191 lines (177 loc) · 7.22 KB
/
test_insertmany.py
File metadata and controls
191 lines (177 loc) · 7.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright 2025 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from unittest import mock
import sqlalchemy
from sqlalchemy.orm import Session
from sqlalchemy.testing import eq_, is_instance_of
from google.cloud.spanner_v1 import (
ExecuteSqlRequest,
CommitRequest,
RollbackRequest,
BeginTransactionRequest,
CreateSessionRequest,
)
from test.mockserver_tests.mock_server_test_base import (
MockServerTestBase,
add_result,
)
import google.cloud.spanner_v1.types.type as spanner_type
import google.cloud.spanner_v1.types.result_set as result_set
class TestInsertmany(MockServerTestBase):
@mock.patch.object(uuid, "uuid4", mock.MagicMock(side_effect=["a", "b"]))
def test_insertmany_with_uuid_sentinels(self):
"""Ensures one bulk insert for ORM objects distinguished by uuid."""
from test.mockserver_tests.insertmany_model import SingerUUID
self.add_uuid_insert_result(
"INSERT INTO singers_uuid (id, name) "
"VALUES (@a0, @a1), (@a2, @a3) "
"THEN RETURN inserted_at, id"
)
engine = self.create_engine()
with Session(engine) as session:
session.add(SingerUUID(name="a"))
session.add(SingerUUID(name="b"))
session.commit()
# Verify the requests that we got.
requests = self.spanner_service.requests
eq_(4, len(requests))
is_instance_of(requests[0], CreateSessionRequest)
is_instance_of(requests[1], BeginTransactionRequest)
is_instance_of(requests[2], ExecuteSqlRequest)
is_instance_of(requests[3], CommitRequest)
def test_no_insertmany_with_bit_reversed_id(self):
"""Ensures we don't try to bulk insert rows with bit-reversed PKs.
SQLAlchemy's insertmany support requires either incrementing
PKs or client-side supplied sentinel values such as UUIDs.
Spanner's bit-reversed integer PKs don't meet the ordering
requirement, so we need to make sure we don't try to bulk
insert with them.
"""
from test.mockserver_tests.insertmany_model import SingerIntID
self.add_int_id_insert_result(
"INSERT INTO singers_int_id (name) "
"VALUES (@a0) "
"THEN RETURN id, inserted_at"
)
engine = self.create_engine()
with Session(engine) as session:
session.add(SingerIntID(name="a"))
session.add(SingerIntID(name="b"))
try:
session.commit()
except sqlalchemy.exc.SAWarning:
# This will fail because we're returning the same PK
# for two rows. The mock server doesn't currently
# support associating the same query with two
# different results. For our purposes that's okay --
# we just want to ensure we generate two INSERTs, not
# one.
pass
# Verify the requests that we got.
requests = self.spanner_service.requests
eq_(5, len(requests))
is_instance_of(requests[0], CreateSessionRequest)
is_instance_of(requests[1], BeginTransactionRequest)
is_instance_of(requests[2], ExecuteSqlRequest)
is_instance_of(requests[3], ExecuteSqlRequest)
is_instance_of(requests[4], RollbackRequest)
def add_uuid_insert_result(self, sql):
result = result_set.ResultSet(
dict(
metadata=result_set.ResultSetMetadata(
dict(
row_type=spanner_type.StructType(
dict(
fields=[
spanner_type.StructType.Field(
dict(
name="inserted_at",
type=spanner_type.Type(
dict(
code=spanner_type.TypeCode.TIMESTAMP
)
),
)
),
spanner_type.StructType.Field(
dict(
name="id",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.STRING)
),
)
),
]
)
)
)
),
)
)
result.rows.extend(
[
(
"2020-06-02T23:58:40Z",
"a",
),
(
"2020-06-02T23:58:41Z",
"b",
),
]
)
add_result(sql, result)
def add_int_id_insert_result(self, sql):
result = result_set.ResultSet(
dict(
metadata=result_set.ResultSetMetadata(
dict(
row_type=spanner_type.StructType(
dict(
fields=[
spanner_type.StructType.Field(
dict(
name="id",
type=spanner_type.Type(
dict(code=spanner_type.TypeCode.INT64)
),
)
),
spanner_type.StructType.Field(
dict(
name="inserted_at",
type=spanner_type.Type(
dict(
code=spanner_type.TypeCode.TIMESTAMP
)
),
)
),
]
)
)
)
),
)
)
result.rows.extend(
[
(
"1",
"2020-06-02T23:58:40Z",
),
]
)
add_result(sql, result)