-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathutils.py
More file actions
180 lines (150 loc) · 6.07 KB
/
Copy pathutils.py
File metadata and controls
180 lines (150 loc) · 6.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import json
import threading
import time
import psycopg
import pytest
from datadog_checks.base import AgentCheck
from .common import PASSWORD_ADMIN, POSTGRES_VERSION, USER_ADMIN
requires_over_10 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 10,
reason='This test is for over 10 only (make sure POSTGRES_VERSION is set)',
)
requires_over_11 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 11,
reason='This test is for over 11 only (make sure POSTGRES_VERSION is set)',
)
requires_over_12 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 12,
reason='This test is for over 12 only (make sure POSTGRES_VERSION is set)',
)
requires_over_13 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 13,
reason='This test is for over 13 only (make sure POSTGRES_VERSION is set)',
)
requires_over_14 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 14,
reason='This test is for over 14 only (make sure POSTGRES_VERSION is set)',
)
requires_over_15 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 15,
reason='This test is for over 15 only (make sure POSTGRES_VERSION is set)',
)
requires_over_16 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 16,
reason='This test is for over 16 only (make sure POSTGRES_VERSION is set)',
)
requires_under_17 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) >= 17,
reason='This test is for under 17 only (make sure POSTGRES_VERSION is set)',
)
requires_over_17 = pytest.mark.skipif(
POSTGRES_VERSION is None or float(POSTGRES_VERSION) < 17,
reason='This test is for over 17 only (make sure POSTGRES_VERSION is set)',
)
def _get_conn(db_instance, dbname=None, user=None, password=None, application_name='test', autocommit=True):
conn = psycopg.connect(
host=db_instance['host'],
port=db_instance['port'],
dbname=dbname or db_instance['dbname'],
user=user or db_instance['username'],
password=password or db_instance['password'],
application_name=application_name,
autocommit=autocommit,
)
return conn
# Get a connection with superuser
def _get_superconn(db_instance, application_name='test', autocommit=True):
return _get_conn(
db_instance, user=USER_ADMIN, password=PASSWORD_ADMIN, application_name=application_name, autocommit=autocommit
)
def lock_table(pg_instance, table, lock_mode, application_name='test'):
lock_conn = _get_superconn(pg_instance, application_name)
cur = lock_conn.cursor()
cur.execute('BEGIN')
cur.execute(f'lock {table} IN {lock_mode} MODE')
return lock_conn
def kill_session(pg_instance, query_pattern, application_name='test'):
with _get_superconn(pg_instance, application_name) as conn:
with conn.cursor() as cur:
cur.execute(
f"""SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query ~* '{query_pattern}' AND pid!=pg_backend_pid()"""
)
def kill_vacuum(pg_instance, application_name='test'):
kill_session(pg_instance, '^vacuum', application_name)
# Wait until the query yielding a single value cross the provided threshold
def _wait_for_value(db_instance, lower_threshold, query, attempts=10, application_name='test'):
value = 0
current_attempt = 0
# Stats table behave slightly differently than normal tables
# Repeating the same query within a transaction will yield the
# same value, despite the fact that the transaction is in READ COMMITED
# To avoid this, we avoid transaction block created by the with statement
conn = _get_superconn(db_instance, application_name)
while value <= lower_threshold and current_attempt < attempts:
with conn.cursor() as cur:
cur.execute(query)
value = cur.fetchall()[0][0]
time.sleep(0.1)
current_attempt += 1
conn.close()
def run_query_thread(pg_instance, query, application_name='test', init_statements=None):
def run_query():
conn = _get_superconn(pg_instance, application_name)
with conn.cursor() as cur:
if init_statements:
for stmt in init_statements:
cur.execute(stmt)
try:
cur.execute(query)
except psycopg.errors.QueryCanceled:
pass
conn.close()
# Start thread
thread = threading.Thread(target=run_query)
thread.start()
return thread
def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'):
init_stmts = ["set statement_timeout='2s'", 'set vacuum_cost_delay=100', 'set vacuum_cost_limit=1']
return run_query_thread(pg_instance, vacuum_query, application_name, init_stmts)
def run_one_check(check: AgentCheck, cancel=True):
"""
Run check and immediately cancel.
cancel() joins all threads and nulls futures, so no extra .result() calls needed.
"""
check.run()
if cancel:
check.cancel()
def normalize_object(obj):
if isinstance(obj, dict):
return {k: normalize_object(v) for k, v in obj.items()}
if isinstance(obj, list):
normalized = [normalize_object(item) for item in obj]
return sorted(normalized, key=lambda x: json.dumps(x, sort_keys=True))
if isinstance(obj, tuple):
return tuple(normalize_object(item) for item in obj)
return obj
# WaitGroup is used like go's sync.WaitGroup
class WaitGroup(object):
def __init__(self):
self.count = 0
self.cv = threading.Condition()
def add(self, n):
self.cv.acquire()
self.count += n
self.cv.release()
def done(self):
self.cv.acquire()
self.count -= 1
if self.count == 0:
self.cv.notify_all()
self.cv.release()
def wait(self):
self.cv.acquire()
while self.count > 0:
self.cv.wait()
self.cv.release()