forked from googleapis/python-spanner-sqlalchemy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsample_helper.py
More file actions
100 lines (87 loc) · 3.2 KB
/
sample_helper.py
File metadata and controls
100 lines (87 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Callable
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import AlreadyExists
from google.auth.credentials import AnonymousCredentials
from google.cloud.spanner_v1 import Client
from google.cloud.spanner_v1.database import Database
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from model import Base
def run_sample(sample_method: Callable):
registry.register(
"spanner",
"google.cloud.sqlalchemy_spanner.sqlalchemy_spanner",
"SpannerDialect",
)
emulator = None
if os.getenv("USE_EXISTING_EMULATOR") == "true":
if os.getenv("SPANNER_EMULATOR_HOST") is None:
os.environ["SPANNER_EMULATOR_HOST"] = "localhost:9010"
_create_instance_and_database("9010")
else:
os.environ["SPANNER_EMULATOR_HOST"] = ""
emulator, port = start_emulator()
os.environ["SPANNER_EMULATOR_HOST"] = "localhost:" + str(port)
try:
_create_tables()
sample_method()
finally:
if emulator is not None:
emulator.stop()
def start_emulator() -> (DockerContainer, str):
emulator = DockerContainer(
"gcr.io/cloud-spanner-emulator/emulator"
).with_exposed_ports(9010)
emulator.start()
wait_for_logs(emulator, "gRPC server listening at 0.0.0.0:9010")
port = str(emulator.get_exposed_port(9010))
_create_instance_and_database(port)
return emulator, port
def _create_instance_and_database(port: str):
client = Client(
project="sample-project",
credentials=AnonymousCredentials(),
client_options=ClientOptions(
api_endpoint="localhost:" + port,
),
)
configs = list(client.list_instance_configs())
instance_config = configs[0].name
instance_id = "sample-instance"
database_id = "sample-database"
instance = client.instance(instance_id, instance_config)
try:
created_op = instance.create()
created_op.result(1800) # block until completion
except AlreadyExists:
# Ignore
print("Using existing instance")
database: Database = instance.database(database_id)
if database.exists():
database.drop()
created_op = database.create()
created_op.result(1800)
def _create_tables():
engine = create_engine(
"spanner:///projects/sample-project/"
"instances/sample-instance/"
"databases/sample-database",
echo=True,
)
Base.metadata.create_all(engine)