Skip to content

Commit 2c794b3

Browse files
authored
Revert "Update Postgres to psycopg3 (DataDog#20617)" (DataDog#20885)
This reverts commit fbb0579.
1 parent ff8bf3b commit 2c794b3

25 files changed

Lines changed: 441 additions & 426 deletions

agent_requirements.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ prometheus-client==0.22.1
3333
protobuf==6.31.1
3434
psutil==6.0.0
3535
psycopg2-binary==2.9.9
36-
psycopg[binary]==3.2.9
3736
pyasn1==0.4.8
3837
pycryptodomex==3.23.0
3938
pydantic==2.11.7

postgres/datadog_checks/postgres/connections.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import time
99
from typing import Callable, Dict
1010

11-
import psycopg
11+
import psycopg2
1212

1313

1414
class ConnectionPoolFullError(Exception):
@@ -23,7 +23,7 @@ def __str__(self):
2323
class ConnectionInfo:
2424
def __init__(
2525
self,
26-
connection: psycopg.Connection,
26+
connection: psycopg2.extensions.connection,
2727
deadline: int,
2828
active: bool,
2929
last_accessed: int,
@@ -86,9 +86,9 @@ def _get_connection_raw(
8686
dbname: str,
8787
ttl_ms: int,
8888
timeout: int = None,
89-
startup_fn: Callable[[psycopg.Connection], None] = None,
89+
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
9090
persistent: bool = False,
91-
) -> psycopg.Connection:
91+
) -> psycopg2.extensions.connection:
9292
"""
9393
Return a connection from the pool.
9494
Pass a function to startup_func if there is an action needed with the connection
@@ -117,7 +117,7 @@ def _get_connection_raw(
117117
# if already in pool, retain persistence status
118118
persistent = conn.persistent
119119

120-
if db.info.status != psycopg.pq.ConnStatus.OK:
120+
if db.status != psycopg2.extensions.STATUS_READY:
121121
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
122122
db.rollback()
123123

@@ -130,7 +130,6 @@ def _get_connection_raw(
130130
thread=threading.current_thread(),
131131
persistent=persistent,
132132
)
133-
134133
return db
135134

136135
@contextlib.contextmanager
@@ -139,7 +138,7 @@ def get_connection(
139138
dbname: str,
140139
ttl_ms: int,
141140
timeout: int = None,
142-
startup_fn: Callable[[psycopg.Connection], None] = None,
141+
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
143142
persistent: bool = False,
144143
):
145144
"""
@@ -148,14 +147,12 @@ def get_connection(
148147
make a new connection if the max_conn limit hasn't been reached.
149148
Blocks until a connection can be added to the pool,
150149
and optionally takes a timeout in seconds.
151-
Note that leaving a connection context here does NOT close the connection in psycopg;
150+
Note that leaving a connection context here does NOT close the connection in psycopg2;
152151
connections must be manually closed by `close_all_connections()`.
153152
"""
154153
try:
155154
with self._mu:
156-
db = self._get_connection_raw(
157-
dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, startup_fn=startup_fn, persistent=persistent
158-
)
155+
db = self._get_connection_raw(dbname, ttl_ms, timeout, startup_fn, persistent)
159156
yield db
160157
finally:
161158
with self._mu:

postgres/datadog_checks/postgres/cursor.py

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
# All rights reserved
33
# Licensed under a 3-clause BSD style license (see LICENSE)
44

5-
import psycopg
5+
import psycopg2.extensions
6+
import psycopg2.extras
67

78
from datadog_checks.base.utils.db.sql_commenter import add_sql_comment
8-
from datadog_checks.postgres.encoding import decode_with_encodings
99

1010
DD_QUERY_ATTRIBUTES = {
1111
'service': 'datadog-agent',
@@ -17,41 +17,20 @@ def __init__(self, *args, **kwargs):
1717
self.__attributes = DD_QUERY_ATTRIBUTES
1818
super().__init__(*args, **kwargs)
1919

20-
def execute(self, query, params=None, ignore_query_metric=False, binary=False, prepare=None):
20+
def execute(self, query, vars=None, ignore_query_metric=False):
2121
'''
2222
When ignore is True, a /* DDIGNORE */ comment will be added to the query.
2323
This comment indicates that the query should be ignored in query metrics.
2424
'''
2525
query = add_sql_comment(query, prepand=True, **self.__attributes)
2626
if ignore_query_metric:
2727
query = '{} {}'.format('/* DDIGNORE */', query)
28-
return super().execute(query, params, binary=binary, prepare=prepare)
28+
return super().execute(query, vars)
2929

3030

31-
class CommenterCursor(BaseCommenterCursor, psycopg.ClientCursor):
31+
class CommenterCursor(BaseCommenterCursor, psycopg2.extensions.cursor):
3232
pass
3333

3434

35-
class CommenterDictCursor(BaseCommenterCursor, psycopg.ClientCursor):
35+
class CommenterDictCursor(BaseCommenterCursor, psycopg2.extras.DictCursor):
3636
pass
37-
38-
39-
class SQLASCIITextLoader(psycopg.adapt.Loader):
40-
"""
41-
Custom loader for SQLASCII encoding.
42-
"""
43-
44-
encodings = ['utf-8']
45-
format = psycopg.pq.Format.TEXT
46-
47-
def load(self, data):
48-
if isinstance(data, memoryview):
49-
# Convert memoryview to bytes
50-
data = data.tobytes()
51-
if not isinstance(data, bytes) or data is None:
52-
return data
53-
try:
54-
return decode_with_encodings(data, self.encodings)
55-
except:
56-
# Fallback to utf8 with replacement
57-
return data.decode('utf-8', errors='backslashreplace')

postgres/datadog_checks/postgres/discovery.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from datadog_checks.base import AgentCheck
88
from datadog_checks.base.utils.discovery import Discovery
9+
from datadog_checks.postgres.cursor import CommenterCursor
910
from datadog_checks.postgres.util import DatabaseConfigurationError, warning_with_tags
1011

1112
AUTODISCOVERY_QUERY: str = """select datname from pg_catalog.pg_database where datistemplate = false;"""
@@ -71,7 +72,7 @@ def get_items(self) -> List[str]:
7172

7273
def _get_databases(self) -> List[str]:
7374
with self.db_pool.get_connection(self._db, self._default_ttl) as conn:
74-
with conn.cursor() as cursor:
75+
with conn.cursor(cursor_factory=CommenterCursor) as cursor:
7576
cursor.execute(AUTODISCOVERY_QUERY)
7677
databases = list(cursor.fetchall())
7778
databases = [

postgres/datadog_checks/postgres/explain_parameterized_queries.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
import logging
66
import re
77

8-
import psycopg
8+
import psycopg2
99

1010
from datadog_checks.base.utils.tracking import tracked_method
11+
from datadog_checks.postgres.cursor import CommenterDictCursor
1112

1213
from .util import DBExplainError
1314
from .version_utils import V12
@@ -75,7 +76,7 @@ def explain_statement(self, dbname, statement, obfuscated_statement, query_signa
7576
if self._check.version < V12:
7677
# if pg version < 12, skip explaining parameterized queries because
7778
# plan_cache_mode is not supported
78-
e = psycopg.errors.UndefinedParameter("Unable to explain parameterized query")
79+
e = psycopg2.errors.UndefinedParameter("Unable to explain parameterized query")
7980
logger.debug(
8081
"Unable to explain parameterized query. Postgres version %s does not support plan_cache_mode",
8182
self._check.version,
@@ -85,9 +86,9 @@ def explain_statement(self, dbname, statement, obfuscated_statement, query_signa
8586

8687
try:
8788
self._create_prepared_statement(dbname, statement, obfuscated_statement, query_signature)
88-
except psycopg.errors.IndeterminateDatatype as e:
89+
except psycopg2.errors.IndeterminateDatatype as e:
8990
return None, DBExplainError.indeterminate_datatype, '{}'.format(type(e))
90-
except psycopg.errors.UndefinedFunction as e:
91+
except psycopg2.errors.UndefinedFunction as e:
9192
return None, DBExplainError.undefined_function, '{}'.format(type(e))
9293
except Exception as e:
9394
# if we fail to create a prepared statement, we cannot explain the query
@@ -187,14 +188,16 @@ def _deallocate_prepared_statement(self, dbname, query_signature):
187188
)
188189

189190
def _execute_query(self, dbname, query):
191+
# Psycopg2 connections do not get closed when context ends;
192+
# leaving context will just mark the connection as inactive in MultiDatabaseConnectionPool
190193
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
191-
with conn.cursor() as cursor:
194+
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
192195
logger.debug('Executing query=[%s]', query)
193196
cursor.execute(query, ignore_query_metric=True)
194197

195198
def _execute_query_and_fetch_rows(self, dbname, query):
196199
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
197-
with conn.cursor() as cursor:
200+
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
198201
cursor.execute(query, ignore_query_metric=True)
199202
return cursor.fetchall()
200203

postgres/datadog_checks/postgres/metadata.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import time
77
from typing import Dict, List, Optional, Tuple, Union # noqa: F401
88

9-
import psycopg
10-
from psycopg.rows import dict_row
9+
import psycopg2
10+
11+
from datadog_checks.postgres.cursor import CommenterDictCursor
1112

1213
try:
1314
import datadog_agent
@@ -60,8 +61,7 @@
6061
"""
6162

6263
PG_EXTENSIONS_QUERY = """
63-
SELECT extname, nspname schemaname
64-
FROM pg_extension left join pg_namespace on extnamespace = pg_namespace.oid;
64+
SELECT extname, nspname schemaname FROM pg_extension left join pg_namespace on extnamespace = pg_namespace.oid;
6565
"""
6666

6767
PG_EXTENSION_LOADER_QUERY = {
@@ -256,7 +256,7 @@ def __init__(self, check, config, shutdown_callback):
256256
enabled=is_affirmative(config.resources_metadata_config.get("enabled", True)),
257257
dbms="postgres",
258258
min_collection_interval=config.min_collection_interval,
259-
expected_db_exceptions=(psycopg.errors.DatabaseError,),
259+
expected_db_exceptions=(psycopg2.errors.DatabaseError,),
260260
job_name="database-metadata",
261261
shutdown_callback=shutdown_callback,
262262
)
@@ -319,7 +319,7 @@ def report_postgres_extensions(self):
319319
@tracked_method(agent_check_getter=agent_check_getter)
320320
def _collect_postgres_extensions(self):
321321
with self._check._get_main_db() as conn:
322-
with conn.cursor(row_factory=dict_row) as cursor:
322+
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
323323
self._time_since_last_extension_query = time.time()
324324

325325
# Get loaded extensions
@@ -394,7 +394,7 @@ def _collect_postgres_schemas(self):
394394
continue
395395

396396
with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
397-
with conn.cursor(row_factory=dict_row) as cursor:
397+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
398398
for schema in database["schemas"]:
399399
if not self._should_collect_metadata(schema["name"], "schema"):
400400
continue
@@ -510,7 +510,9 @@ def _collect_schema_info(self):
510510
self._last_schemas_query_time = time.time()
511511
return metadata
512512

513-
def _query_database_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, Union[str, int]]:
513+
def _query_database_information(
514+
self, cursor: psycopg2.extensions.cursor, dbname: str
515+
) -> Dict[str, Union[str, int]]:
514516
"""
515517
Collect database info. Returns
516518
description: str
@@ -523,7 +525,7 @@ def _query_database_information(self, cursor: psycopg.Cursor, dbname: str) -> Di
523525
row = cursor.fetchone()
524526
return row
525527

526-
def _query_schema_information(self, cursor: psycopg.Cursor, dbname: str) -> Dict[str, str]:
528+
def _query_schema_information(self, cursor: psycopg2.extensions.cursor, dbname: str) -> Dict[str, str]:
527529
"""
528530
Collect user schemas. Returns
529531
id: str
@@ -639,7 +641,7 @@ def sort_tables(info):
639641
return table_info[:limit]
640642

641643
def _query_tables_for_schema(
642-
self, cursor: psycopg.Cursor, schema_id: str, dbname: str
644+
self, cursor: psycopg2.extensions.cursor, schema_id: str, dbname: str
643645
) -> List[Dict[str, Union[str, Dict]]]:
644646
"""
645647
Collect list of tables for a schema. Returns a list of dictionaries
@@ -670,7 +672,7 @@ def _query_tables_for_schema(
670672
return table_payloads
671673

672674
def _query_table_information(
673-
self, cursor: psycopg.Cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]]
675+
self, cursor: psycopg2.extensions.cursor, schema_name: str, table_info: List[Dict[str, Union[str, bool]]]
674676
) -> List[Dict[str, Union[str, Dict]]]:
675677
"""
676678
Collect table information . Returns a dictionary
@@ -750,7 +752,7 @@ def _query_table_information(
750752
def _collect_metadata_for_database(self, dbname):
751753
metadata = {}
752754
with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
753-
with conn.cursor(row_factory=dict_row) as cursor:
755+
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
754756
database_info = self._query_database_information(cursor, dbname)
755757
metadata.update(
756758
{
@@ -771,7 +773,7 @@ def _collect_metadata_for_database(self, dbname):
771773
@tracked_method(agent_check_getter=agent_check_getter)
772774
def _collect_postgres_settings(self):
773775
with self._check._get_main_db() as conn:
774-
with conn.cursor(row_factory=dict_row) as cursor:
776+
with conn.cursor(cursor_factory=CommenterDictCursor) as cursor:
775777
# Get loaded extensions
776778
cursor.execute(PG_EXTENSIONS_QUERY)
777779
rows = cursor.fetchall()
@@ -798,14 +800,6 @@ def _collect_postgres_settings(self):
798800
)
799801
self._time_since_last_settings_query = time.time()
800802
cursor.execute(query, (self.pg_settings_ignored_patterns,))
801-
# pg3 returns a set of results for each statement in the multiple statement query
802-
# We want to retrieve the last one that actually has the settings results
803-
rows = []
804-
has_more_results = True
805-
while has_more_results:
806-
if cursor.pgresult.status == psycopg.pq.ExecStatus.TUPLES_OK:
807-
rows = cursor.fetchall()
808-
has_more_results = cursor.nextset()
809-
self._log.debug("Loaded %s rows from pg_settings", rows)
803+
rows = cursor.fetchall()
810804
self._log.debug("Loaded %s rows from pg_settings", len(rows))
811-
return rows
805+
return [dict(row) for row in rows]

0 commit comments

Comments
 (0)