55import time
66from urllib .parse import urlparse
77
8- import psycopg as pg
9- from psycopg import ClientCursor
10- from psycopg .rows import dict_row
8+ import psycopg2 as pg
9+ from psycopg2 import extras as pgextras
1110
1211from datadog_checks .base import AgentCheck , ConfigurationError , is_affirmative
1312from datadog_checks .pgbouncer .metrics import (
@@ -74,50 +73,51 @@ def _collect_stats(self, db):
7473 metric_scope .append (SERVERS_METRICS )
7574
7675 try :
77- for scope in metric_scope :
78- descriptors = scope ['descriptors' ]
79- metrics = scope ['metrics' ]
80- query = scope ['query' ]
76+ with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
77+ for scope in metric_scope :
78+ descriptors = scope ['descriptors' ]
79+ metrics = scope ['metrics' ]
80+ query = scope ['query' ]
81+
82+ try :
83+ self .log .debug ("Running query: %s" , query )
84+ cursor .execute (query )
85+ rows = self .iter_rows (cursor )
86+
87+ except Exception as e :
88+ self .log .exception ("Not all metrics may be available: %s" , str (e ))
89+
90+ else :
91+ for row in rows :
92+ if 'key' in row : # We are processing "config metrics"
93+ # Make a copy of the row to allow mutation
94+ # (a `psycopg2.lib.extras.DictRow` object doesn't accept a new key)
95+ row = row .copy ()
96+ # We flip/rotate the row: row value becomes the column name
97+ row [row ['key' ]] = row ['value' ]
98+ # Skip the "pgbouncer" database
99+ elif row .get ('database' ) == self .DB_NAME :
100+ continue
101+
102+ tags = list (self .tags )
103+ tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
104+ for column , (name , reporter ) in metrics :
105+ if column in row :
106+ value = row [column ]
107+ if column in ['connect_time' , 'request_time' ]:
108+ self .log .debug ("Parsing timestamp; original value: %s" , value )
109+ # First get rid of any UTC suffix.
110+ value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
111+ value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
112+ value = time .mktime (value )
113+ reporter (self , name , value , tags )
114+
115+ if not rows :
116+ self .log .warning ("No results were found for query: %s" , query )
117+
118+ except pg .Error :
119+ self .log .exception ("Connection error" )
81120
82- try :
83- cursor = db .cursor (row_factory = dict_row )
84- self .log .debug ("Running query: %s" , query )
85- cursor .execute (query )
86- rows = self .iter_rows (cursor )
87-
88- except (pg .InterfaceError , pg .OperationalError ) as e :
89- self .log .error ("Not all metrics may be available: %s" , e )
90- raise ShouldReconnectException
91-
92- else :
93- for row in rows :
94- if 'key' in row : # We are processing "config metrics"
95- # Make a copy of the row to allow mutation
96- row = row .copy ()
97- # We flip/rotate the row: row value becomes the column name
98- row [row ['key' ]] = row ['value' ]
99- # Skip the "pgbouncer" database
100- elif row .get ('database' ) == self .DB_NAME :
101- continue
102-
103- tags = list (self .tags )
104- tags += ["%s:%s" % (tag , row [column ]) for (column , tag ) in descriptors if column in row ]
105- for column , (name , reporter ) in metrics :
106- if column in row :
107- value = row [column ]
108- if column in ['connect_time' , 'request_time' ]:
109- self .log .debug ("Parsing timestamp; original value: %s" , value )
110- # First get rid of any UTC suffix.
111- value = re .findall (r'^[^ ]+ [^ ]+' , value )[0 ]
112- value = time .strptime (value , '%Y-%m-%d %H:%M:%S' )
113- value = time .mktime (value )
114- reporter (self , name , value , tags )
115-
116- if not rows :
117- self .log .warning ("No results were found for query: %s" , query )
118-
119- except pg .Error as e :
120- self .log .error ("Not all metrics may be available: %s" , e )
121121 raise ShouldReconnectException
122122
123123 def iter_rows (self , cursor ):
@@ -138,25 +138,21 @@ def iter_rows(self, cursor):
138138
139139 def _get_connect_kwargs (self ):
140140 """
141- Get the params to pass to psycopg .connect() based on passed-in vals
141+ Get the params to pass to psycopg2 .connect() based on passed-in vals
142142 from yaml settings file
143143 """
144- # It's important to set the client_encoding to utf-8
145- # PGBouncer defaults to an encoding of 'UNICODE`, which will cause psycopg to error out
146144 if self .database_url :
147- return {'conninfo ' : self .database_url , 'client_encoding' : 'utf-8' }
145+ return {'dsn ' : self .database_url }
148146
149147 if self .host in ('localhost' , '127.0.0.1' ) and self .password == '' :
150148 # Use ident method
151- return {'conninfo ' : "user={} dbname={} client_encoding=utf-8 " .format (self .user , self .DB_NAME )}
149+ return {'dsn ' : "user={} dbname={}" .format (self .user , self .DB_NAME )}
152150
153151 args = {
154152 'host' : self .host ,
155153 'user' : self .user ,
156154 'password' : self .password ,
157- 'dbname' : self .DB_NAME ,
158- 'cursor_factory' : ClientCursor ,
159- 'client_encoding' : 'utf-8' ,
155+ 'database' : self .DB_NAME ,
160156 }
161157 if self .port :
162158 args ['port' ] = self .port
@@ -168,9 +164,8 @@ def _new_connection(self):
168164 connection = None
169165 try :
170166 connect_kwargs = self ._get_connect_kwargs ()
171- # Somewhat counterintuitively, we need to set autocommit to True to avoid a BEGIN/COMMIT block
172- # https://www.psycopg.org/psycopg3/docs/basic/transactions.html#autocommit-transactions
173- connection = pg .connect (** connect_kwargs , autocommit = True )
167+ connection = pg .connect (** connect_kwargs )
168+ connection .set_isolation_level (pg .extensions .ISOLATION_LEVEL_AUTOCOMMIT )
174169 return connection
175170 except Exception :
176171 if connection :
@@ -201,7 +196,6 @@ def _get_redacted_dsn(self):
201196 return self .database_url
202197
203198 def _close_connection (self ):
204- """Close the connection to PgBouncer"""
205199 if self .connection :
206200 try :
207201 self .connection .close ()
@@ -244,31 +238,18 @@ def _collect_metadata(self, db):
244238 self .set_metadata ('version' , pgbouncer_version )
245239
246240 def get_version (self , db ):
247- """
248- Get the version of PgBouncer.
249- """
250241 if not db :
251242 self .log .warning ("Cannot get version: no active connection" )
252243 return None
253244
254245 regex = r'\d+\.\d+\.\d+'
255- try :
256- with db .cursor () as cursor :
257- # This command was added in pgbouncer 1.12
258- cursor .execute ('SHOW VERSION;' )
259- result = cursor .fetchone ()
260- if result :
261- # Result looks like: ['PgBouncer 1.2.3 ...']
262- version_string = result [0 ]
263- return re .findall (regex , version_string )[0 ]
264- else :
265- self .log .debug ("No version found in result: %s" , result )
266- return None
267- except pg .ProgrammingError as e :
268- # This is expected on versions of pgbouncer < 1.12
269- self .log .debug ("Cannot retrieve pgbouncer version using `SHOW VERSION;`: %s" , e )
270- return None
271- except Exception as e :
272- self .log .error ("An unexpected error occurred when retrieving pgbouncer version: %s" , e )
273- return None
274- return None
246+ with db .cursor (cursor_factory = pgextras .DictCursor ) as cursor :
247+ cursor .execute ('SHOW VERSION;' )
248+ if db .notices :
249+ data = db .notices [0 ]
250+ else :
251+ data = cursor .fetchone ()[0 ]
252+ res = re .findall (regex , data )
253+ if res :
254+ return res [0 ]
255+ self .log .debug ("Couldn't detect version from %s" , data )
0 commit comments