-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
46 lines (36 loc) · 1.2 KB
/
Copy pathdb.py
File metadata and controls
46 lines (36 loc) · 1.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from __future__ import annotations
from contextlib import contextmanager
from typing import Any, cast
import psycopg
from psycopg import sql
from psycopg.rows import dict_row
from config import (
POSTGRES_DB,
POSTGRES_HOST,
POSTGRES_PASSWORD,
POSTGRES_PORT,
POSTGRES_SCHEMA,
POSTGRES_USER,
)
from observability.logger import get_runtime_logger
logger = get_runtime_logger(__name__)
DB_DSN = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
logger.info("[db] Configured POSTGRES_SCHEMA=%r", POSTGRES_SCHEMA)
def _set_search_path(conn: psycopg.Connection) -> None:
if not POSTGRES_SCHEMA:
logger.warning("[db] POSTGRES_SCHEMA is empty, using default search_path")
return
with conn.cursor() as cur:
cur.execute(
sql.SQL("SET search_path TO {}, public").format(sql.Identifier(POSTGRES_SCHEMA))
)
@contextmanager
def get_conn():
if not POSTGRES_PASSWORD:
raise RuntimeError("POSTGRES_PASSWORD environment variable is not set")
conn = psycopg.connect(DB_DSN, row_factory=cast(Any, dict_row))
_set_search_path(conn)
try:
yield conn
finally:
conn.close()