-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
324 lines (257 loc) · 11.2 KB
/
database.py
File metadata and controls
324 lines (257 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import psycopg2
import json
from contextlib import contextmanager
from f.shared.otel_logging import get_logger
logger = get_logger(__name__)
@contextmanager
def get_db_connection(db_config):
"""Context manager for database connections"""
connection = None
try:
connection = psycopg2.connect(**db_config)
yield connection
except psycopg2.OperationalError as e:
logger.error(f"Database connection error: {e}")
logger.error(f"Database config: {db_config}")
raise
finally:
if connection:
connection.close()
logger.debug("Closed database connection")
def execute_query(db_config, query, with_result=False):
"""Execute a database query within a transaction"""
try:
with get_db_connection(db_config) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
if with_result:
return cursor.fetchall()
connection.commit()
return None
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise
def execute_ddl_query(db_config, query):
"""Execute DDL statements that require autocommit (CREATE DATABASE, etc.)"""
try:
with get_db_connection(db_config) as connection:
connection.set_session(autocommit=True)
with connection.cursor() as cursor:
cursor.execute(query)
logger.debug(f"DDL executed: {query[:50]}...")
except Exception as e:
logger.error(f"DDL execution failed: {e}")
raise
class ArchiveDatabaseRepository:
"""Repository for archive database operations"""
def __init__(self, base_config):
self.base_config = base_config.copy()
def create_breeder_state_table(self, breeder_id):
"""Create the breeder state table for shutdown signaling"""
db_config = self.base_config.copy()
db_config['database'] = breeder_id
query = """
CREATE TABLE IF NOT EXISTS breeder_state (
id SERIAL PRIMARY KEY,
shutdown_requested BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
INSERT INTO breeder_state (shutdown_requested) VALUES (FALSE)
ON CONFLICT DO NOTHING;
"""
execute_query(db_config, query)
logger.info(f"Created breeder state table for: {breeder_id}")
def set_shutdown_requested(self, breeder_id, value=True):
"""Set or clear the shutdown requested flag in the breeder's archive database
Args:
breeder_id: The breeder database name
value: True to set shutdown flag, False to clear it
"""
db_config = self.base_config.copy()
db_config['database'] = breeder_id
query = f"""
UPDATE breeder_state SET shutdown_requested = {str(value).upper()}, updated_at = NOW();
"""
execute_query(db_config, query)
logger.info(f"Set shutdown_requested={value} in archive DB for breeder: {breeder_id}")
def get_shutdown_requested(self, breeder_id):
"""Check if shutdown has been requested for a breeder"""
db_config = self.base_config.copy()
db_config['database'] = breeder_id
query = """
SELECT shutdown_requested FROM breeder_state;
"""
result = execute_query(db_config, query, with_result=True)
if result and len(result) > 0:
return result[0][0]
return False
def create_database(self, breeder_id):
"""Create a new database for a breeder"""
db_config = self.base_config.copy()
db_config['database'] = "archive_db"
query = f"CREATE DATABASE {breeder_id};"
execute_ddl_query(db_config, query)
logger.info(f"Created archive database: {breeder_id}")
def drop_database(self, breeder_id):
"""Drop a breeder database"""
db_config = self.base_config.copy()
db_config['database'] = "archive_db"
query = f"DROP DATABASE IF EXISTS {breeder_id};"
execute_ddl_query(db_config, query)
logger.info(f"Dropped archive database: {breeder_id}")
def get_connection_url(self, breeder_id):
"""Get PostgreSQL connection URL for a breeder database"""
return (
f"postgresql://{self.base_config['user']}:"
f"{self.base_config['password']}@"
f"{self.base_config['host']}:"
f"{self.base_config['port']}/"
f"{breeder_id}"
)
class MetadataDatabaseRepository:
"""Repository for metadata database operations"""
def __init__(self, base_config):
self.base_config = base_config.copy()
self.breeder_table_name = 'breeder_meta_data'
self.credentials_table_name = 'credentials'
def _get_db_config(self):
"""Get database config with metadata database name"""
db_config = self.base_config.copy()
db_config['database'] = 'meta_data'
return db_config
def create_table(self):
"""Create the breeder metadata table"""
db_config = self._get_db_config()
query = f"""
CREATE TABLE IF NOT EXISTS {self.breeder_table_name}
(
id uuid PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT '',
creation_tsz TIMESTAMPTZ,
definition jsonb NOT NULL
);
"""
execute_query(db_config, query)
logger.info(f"Ensured metadata table exists: {self.breeder_table_name}")
def create_credentials_table(self):
"""Create the credentials catalog table"""
db_config = self._get_db_config()
query = f"""
CREATE TABLE IF NOT EXISTS {self.credentials_table_name}
(
id uuid PRIMARY KEY,
name VARCHAR(255) UNIQUE NOT NULL,
credential_type VARCHAR(50) NOT NULL,
description TEXT,
windmill_variable VARCHAR(255) NOT NULL,
store_type VARCHAR(50) DEFAULT 'windmill_variable',
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
last_used_at TIMESTAMPTZ,
last_verified_at TIMESTAMPTZ
);
"""
execute_query(db_config, query)
logger.info(f"Ensured credentials table exists: {self.credentials_table_name}")
def insert_breeder_meta(self, breeder_id, name, creation_ts, meta_state):
"""Insert breeder metadata"""
db_config = self._get_db_config()
json_string = json.dumps(meta_state).replace("'", "''")
name_escaped = name.replace("'", "''")
query = f"""
INSERT INTO {self.breeder_table_name} (id, name, creation_tsz, definition)
VALUES('{breeder_id}', '{name_escaped}', '{creation_ts}', '{json_string}');
"""
execute_query(db_config, query)
logger.info(f"Inserted metadata for breeder: {breeder_id}")
def update_breeder_meta(self, breeder_id, meta_state):
"""Update breeder metadata (e.g., to add job IDs)"""
db_config = self._get_db_config()
json_string = json.dumps(meta_state).replace("'", "''")
query = f"""
UPDATE {self.breeder_table_name}
SET definition = '{json_string}'
WHERE id = '{breeder_id}';
"""
execute_query(db_config, query)
logger.info(f"Updated metadata for breeder: {breeder_id}")
def remove_breeder_meta(self, breeder_id):
"""Remove breeder metadata"""
db_config = self._get_db_config()
query = f"DELETE FROM {self.breeder_table_name} WHERE id = '{breeder_id}';"
execute_query(db_config, query)
logger.info(f"Removed metadata for breeder: {breeder_id}")
def fetch_meta_data(self, breeder_id):
"""Fetch metadata for a specific breeder"""
db_config = self._get_db_config()
query = f"""
SELECT id, name, creation_tsz, definition FROM {self.breeder_table_name} WHERE id = '{breeder_id}';
"""
return execute_query(db_config, query, with_result=True)
def fetch_breeders_list(self):
"""Fetch list of all breeders"""
db_config = self._get_db_config()
query = f"""
SELECT id, name, creation_tsz FROM {self.breeder_table_name};
"""
result = execute_query(db_config, query, with_result=True)
return result if result else []
# Credential management methods
def insert_credential(self, credential_id, name, credential_type, description, windmill_variable, store_type='windmill_variable', metadata=None):
"""Insert credential catalog entry"""
db_config = self._get_db_config()
metadata_json = json.dumps(metadata) if metadata else 'NULL'
description_escaped = "'" + description.replace("'", "''") + "'" if description else 'NULL'
query = f"""
INSERT INTO {self.credentials_table_name}
(id, name, credential_type, description, windmill_variable, store_type, metadata)
VALUES('{credential_id}', '{name}', '{credential_type}', {description_escaped},
'{windmill_variable}', '{store_type}', {metadata_json}::jsonb);
"""
execute_query(db_config, query)
logger.info(f"Inserted credential catalog entry: {name}")
def fetch_credentials_list(self):
"""Fetch list of all credentials"""
db_config = self._get_db_config()
query = f"""
SELECT id, name, credential_type, description, windmill_variable, created_at, last_used_at
FROM {self.credentials_table_name}
ORDER BY created_at DESC;
"""
result = execute_query(db_config, query, with_result=True)
return result if result else []
def fetch_credential_by_id(self, credential_id):
"""Fetch credential by ID"""
db_config = self._get_db_config()
query = f"""
SELECT id, name, credential_type, description, windmill_variable, store_type, metadata, created_at, last_used_at
FROM {self.credentials_table_name}
WHERE id = '{credential_id}';
"""
result = execute_query(db_config, query, with_result=True)
return result[0] if result else None
def fetch_credential_by_name(self, name):
"""Fetch credential by name"""
db_config = self._get_db_config()
query = f"""
SELECT id, name, credential_type, description, windmill_variable, store_type, metadata, created_at, last_used_at
FROM {self.credentials_table_name}
WHERE name = '{name}';
"""
result = execute_query(db_config, query, with_result=True)
return result[0] if result else None
def delete_credential(self, credential_id):
"""Delete credential from catalog"""
db_config = self._get_db_config()
query = f"DELETE FROM {self.credentials_table_name} WHERE id = '{credential_id}';"
execute_query(db_config, query)
logger.info(f"Deleted credential catalog entry: {credential_id}")
def update_credential_last_used(self, credential_id):
"""Update the last_used_at timestamp for a credential"""
db_config = self._get_db_config()
query = f"""
UPDATE {self.credentials_table_name}
SET last_used_at = NOW()
WHERE id = '{credential_id}';
"""
execute_query(db_config, query)