forked from aws/aws-xray-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpatch.py
More file actions
69 lines (56 loc) · 2.09 KB
/
patch.py
File metadata and controls
69 lines (56 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import copy
import re
import wrapt
from operator import methodcaller
from aws_xray_sdk.ext.dbapi2 import XRayTracedConn, XRayTracedCursor
def patch():
wrapt.wrap_function_wrapper(
'psycopg2',
'connect',
_xray_traced_connect
)
wrapt.wrap_function_wrapper(
'psycopg2.extensions',
'register_type',
_xray_register_type_fix
)
wrapt.wrap_function_wrapper(
'psycopg2.extensions',
'quote_ident',
_xray_register_type_fix
)
wrapt.wrap_function_wrapper(
'psycopg2.extras',
'register_default_jsonb',
_xray_register_default_jsonb_fix
)
def _xray_traced_connect(wrapped, instance, args, kwargs):
conn = wrapped(*args, **kwargs)
parameterized_dsn = {c[0]: c[-1] for c in map(methodcaller('split', '='), conn.dsn.split(' '))}
meta = {
'database_type': 'PostgreSQL',
'url': 'postgresql://{}@{}:{}/{}'.format(
parameterized_dsn.get('user', 'unknown'),
parameterized_dsn.get('host', 'unknown'),
parameterized_dsn.get('port', 'unknown'),
parameterized_dsn.get('dbname', 'unknown'),
),
'user': parameterized_dsn.get('user', 'unknown'),
'database_version': str(conn.server_version),
'driver_version': 'Psycopg 2'
}
return XRayTracedConn(conn, meta)
def _xray_register_type_fix(wrapped, instance, args, kwargs):
"""Send the actual connection or curser to register type."""
our_args = list(copy.copy(args))
if len(our_args) == 2 and isinstance(our_args[1], (XRayTracedConn, XRayTracedCursor)):
our_args[1] = our_args[1].__wrapped__
return wrapped(*our_args, **kwargs)
def _xray_register_default_jsonb_fix(wrapped, instance, args, kwargs):
our_kwargs = dict()
for key, value in kwargs.items():
if key == "conn_or_curs" and isinstance(value, (XRayTracedConn, XRayTracedCursor)):
# unwrap the connection or cursor to be sent to register_default_jsonb
value = value.__wrapped__
our_kwargs[key] = value
return wrapped(*args, **our_kwargs)