Skip to content

Commit 2204ec9

Browse files
e2e tests
1 parent 955027e commit 2204ec9

File tree

2 files changed

+330
-0
lines changed

2 files changed

+330
-0
lines changed

tests/e2e/server.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import os
2+
import sys
3+
4+
# Use LockedMmapedValue for this server
5+
os.environ['PROMETHEUS_VALUE_CLASS'] = 'prometheus_client.values.LockedMmapedValue'
6+
7+
import http.server
8+
import json
9+
from urllib.parse import urlparse, parse_qs
10+
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, generate_latest, values
11+
from prometheus_client.multiprocess import MultiProcessAggregateCollector
12+
13+
# Define metrics at module level
14+
C = Counter('c', 'test counter', ['l'])
15+
G_SUM = Gauge('g_sum', 'test gauge sum', ['l'], multiprocess_mode='sum')
16+
G_MAX = Gauge('g_max', 'test gauge max', ['l'], multiprocess_mode='max')
17+
G_MIN = Gauge('g_min', 'test gauge min', ['l'], multiprocess_mode='min')
18+
G_MOSTRECENT = Gauge('g_mostrecent', 'test gauge mostrecent', ['l'], multiprocess_mode='mostrecent')
19+
G_ALL = Gauge('g_all', 'test gauge all', ['l'], multiprocess_mode='all')
20+
G_LIVESUM = Gauge('g_livesum', 'test gauge livesum', ['l'], multiprocess_mode='livesum')
21+
G_LIVEMAX = Gauge('g_livemax', 'test gauge livemax', ['l'], multiprocess_mode='livemax')
22+
G_LIVEMIN = Gauge('g_livemin', 'test gauge livemin', ['l'], multiprocess_mode='livemin')
23+
G_LIVEMOSTRECENT = Gauge('g_livemostrecent', 'test gauge livemostrecent', ['l'], multiprocess_mode='livemostrecent')
24+
G_LIVEALL = Gauge('g_liveall', 'test gauge liveall', ['l'], multiprocess_mode='liveall')
25+
H = Histogram('h', 'test histogram', ['l'], buckets=(1.0, 5.0, 10.0))
26+
27+
METRICS = {
28+
'c': C,
29+
'g_sum': G_SUM,
30+
'g_max': G_MAX,
31+
'g_min': G_MIN,
32+
'g_mostrecent': G_MOSTRECENT,
33+
'g_all': G_ALL,
34+
'g_livesum': G_LIVESUM,
35+
'g_livemax': G_LIVEMAX,
36+
'g_livemin': G_LIVEMIN,
37+
'g_livemostrecent': G_LIVEMOSTRECENT,
38+
'g_liveall': G_LIVEALL,
39+
'h': H,
40+
}
41+
42+
class MetricHandler(http.server.BaseHTTPRequestHandler):
43+
def send_ok(self, data=b'OK', content_type='text/plain'):
44+
self.send_response(200)
45+
self.send_header('Content-Type', content_type)
46+
self.end_headers()
47+
self.wfile.write(data)
48+
49+
def send_error(self, code=404):
50+
self.send_response(code)
51+
self.end_headers()
52+
53+
def do_GET(self):
54+
parsed_url = urlparse(self.path)
55+
query = parse_qs(parsed_url.query)
56+
path = parsed_url.path
57+
58+
if path == '/metrics':
59+
registry = CollectorRegistry()
60+
MultiProcessAggregateCollector(registry)
61+
self.send_ok(generate_latest(registry))
62+
elif path in ('/inc', '/set', '/observe'):
63+
name = query.get('name', [None])[0]
64+
labels_json = query.get('labels', ['{}'])[0]
65+
labels = json.loads(labels_json)
66+
value = float(query.get('value', query.get('amount', [1]))[0])
67+
68+
if name not in METRICS:
69+
self.send_error(400)
70+
return
71+
72+
m = METRICS[name]
73+
metric_with_labels = m.labels(**labels) if labels else m
74+
75+
if path == '/inc':
76+
metric_with_labels.inc(value)
77+
elif path == '/set':
78+
metric_with_labels.set(value)
79+
elif path == '/observe':
80+
metric_with_labels.observe(value)
81+
82+
self.send_ok()
83+
else:
84+
self.send_error()
85+
86+
if __name__ == '__main__':
87+
port = int(sys.argv[1])
88+
server = http.server.HTTPServer(('127.0.0.1', port), MetricHandler)
89+
print(f'Starting server on port {port}')
90+
server.serve_forever()

tests/e2e/test_multi_process.py

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
import os
2+
import sys
3+
import subprocess
4+
import time
5+
import unittest
6+
import shutil
7+
import urllib.request
8+
import tempfile
9+
import json
10+
from prometheus_client.parser import text_string_to_metric_families
11+
12+
class TestMultiProcessAggregate(unittest.TestCase):
13+
def setUp(self):
14+
self.tmpdir = tempfile.mkdtemp()
15+
os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tmpdir
16+
self.processes = []
17+
18+
def tearDown(self):
19+
for p in self.processes:
20+
p.terminate()
21+
p.wait()
22+
shutil.rmtree(self.tmpdir)
23+
24+
def start_server(self, port):
25+
# We need to make sure prometheus_client is in PYTHONPATH
26+
env = os.environ.copy()
27+
env['PYTHONPATH'] = os.getcwd()
28+
print(f"DEBUG: Starting server on port {port} with PROMETHEUS_MULTIPROC_DIR={env.get('PROMETHEUS_MULTIPROC_DIR')}")
29+
p = subprocess.Popen([sys.executable, 'tests/e2e/server.py', str(port)], env=env)
30+
self.processes.append(p)
31+
# Wait for server to start
32+
max_retries = 10
33+
for i in range(max_retries):
34+
try:
35+
urllib.request.urlopen(f'http://127.0.0.1:{port}/metrics', timeout=1)
36+
break
37+
except:
38+
time.sleep(0.5)
39+
else:
40+
self.fail(f"Server on port {port} failed to start")
41+
return p
42+
43+
def get_metrics(self, port):
44+
content = urllib.request.urlopen(f'http://127.0.0.1:{port}/metrics').read().decode()
45+
print(f"DEBUG: Metrics from {port}:\n{content}")
46+
families = text_string_to_metric_families(content)
47+
metrics = {}
48+
for family in families:
49+
for sample in family.samples:
50+
# Store by (name, labels_tuple)
51+
labels = tuple(sorted(sample.labels.items()))
52+
metrics[(sample.name, labels)] = sample.value
53+
return metrics
54+
55+
def call_metric(self, port, action, name, labels, value):
56+
import urllib.parse
57+
labels_json = json.dumps(labels)
58+
labels_encoded = urllib.parse.quote(labels_json)
59+
url = f'http://127.0.0.1:{port}/{action}?name={name}&labels={labels_encoded}&value={value}'
60+
return urllib.request.urlopen(url).read()
61+
62+
def test_aggregation_and_modes(self):
63+
port1 = 12345
64+
port2 = 12346
65+
66+
# Start two servers
67+
p1 = self.start_server(port1)
68+
p2 = self.start_server(port2)
69+
70+
labels = '{"l": "v"}'
71+
labels_dict = {"l": "v"}
72+
labels_tuple = (("l", "v"),)
73+
74+
import urllib.parse
75+
labels_encoded = urllib.parse.quote(labels)
76+
77+
# 1. Test Counters (Aggregate by sum)
78+
self.call_metric(port1, 'inc', 'c', labels_dict, 10)
79+
time.sleep(1)
80+
self.call_metric(port2, 'inc', 'c', labels_dict, 20)
81+
time.sleep(1)
82+
83+
# 2. Test Gauges (Various modes)
84+
# sum
85+
self.call_metric(port1, 'set', 'g_sum', labels_dict, 10)
86+
self.call_metric(port2, 'set', 'g_sum', labels_dict, 20)
87+
# max
88+
self.call_metric(port1, 'set', 'g_max', labels_dict, 10)
89+
self.call_metric(port2, 'set', 'g_max', labels_dict, 20)
90+
# min
91+
self.call_metric(port1, 'set', 'g_min', labels_dict, 10)
92+
self.call_metric(port2, 'set', 'g_min', labels_dict, 20)
93+
# mostrecent
94+
self.call_metric(port1, 'set', 'g_mostrecent', labels_dict, 10)
95+
time.sleep(0.1) # Ensure different timestamp if possible (though mmap might not have high res)
96+
self.call_metric(port2, 'set', 'g_mostrecent', labels_dict, 20)
97+
98+
# 3. Test Histograms
99+
self.call_metric(port1, 'observe', 'h', labels_dict, 2)
100+
self.call_metric(port2, 'observe', 'h', labels_dict, 6)
101+
102+
# Check metrics while both are alive
103+
m = self.get_metrics(port1)
104+
self.assertEqual(m[('c_total', labels_tuple)], 30.0)
105+
self.assertEqual(m[('g_sum', labels_tuple)], 30.0)
106+
self.assertEqual(m[('g_max', labels_tuple)], 20.0)
107+
self.assertEqual(m[('g_min', labels_tuple)], 10.0)
108+
self.assertEqual(m[('g_mostrecent', labels_tuple)], 20.0)
109+
self.assertEqual(m[('h_count', labels_tuple)], 2.0)
110+
self.assertEqual(m[('h_sum', labels_tuple)], 8.0)
111+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '1.0'),))], 0.0)
112+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '5.0'),))], 1.0)
113+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '10.0'),))], 2.0)
114+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '+Inf'),))], 2.0)
115+
116+
# Kill port2 server
117+
p2.terminate()
118+
p2.wait()
119+
self.processes.remove(p2)
120+
121+
# Check metrics from surviving server (should be aggregated)
122+
m = self.get_metrics(port1)
123+
self.assertEqual(m[('c_total', labels_tuple)], 30.0)
124+
self.assertEqual(m[('g_sum', labels_tuple)], 30.0)
125+
self.assertEqual(m[('g_max', labels_tuple)], 20.0)
126+
self.assertEqual(m[('g_min', labels_tuple)], 10.0)
127+
self.assertEqual(m[('g_mostrecent', labels_tuple)], 20.0)
128+
self.assertEqual(m[('h_count', labels_tuple)], 2.0)
129+
self.assertEqual(m[('h_sum', labels_tuple)], 8.0)
130+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '1.0'),))], 0.0)
131+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '5.0'),))], 1.0)
132+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '10.0'),))], 2.0)
133+
self.assertEqual(m[('h_bucket', labels_tuple + (('le', '+Inf'),))], 2.0)
134+
135+
# Ensure aggregate.db exists
136+
self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'aggregate.db')))
137+
138+
# Kill surviving server
139+
p1.terminate()
140+
p1.wait()
141+
self.processes.remove(p1)
142+
143+
# Start new server p3
144+
port3 = 12347
145+
p3 = self.start_server(port3)
146+
147+
# Check metrics from p3 (should read from aggregate.db)
148+
m = self.get_metrics(port3)
149+
self.assertEqual(m[('c_total', labels_tuple)], 30.0)
150+
self.assertEqual(m[('g_sum', labels_tuple)], 30.0)
151+
self.assertEqual(m[('g_max', labels_tuple)], 20.0)
152+
self.assertEqual(m[('g_min', labels_tuple)], 10.0)
153+
self.assertEqual(m[('g_mostrecent', labels_tuple)], 20.0)
154+
self.assertEqual(m[('h_count', labels_tuple)], 2.0)
155+
156+
# Add more to p3
157+
self.call_metric(port3, 'inc', 'c', labels_dict, 5)
158+
m = self.get_metrics(port3)
159+
self.assertEqual(m[('c_total', labels_tuple)], 35.0)
160+
161+
def test_live_gauges(self):
162+
# Test various live gauge modes
163+
modes = {
164+
'g_livesum': 30.0,
165+
'g_livemax': 20.0,
166+
'g_livemin': 10.0,
167+
'g_livemostrecent': 20.0,
168+
}
169+
170+
for name, expected_sum in modes.items():
171+
port1 = 12348
172+
port2 = 12349
173+
p1 = self.start_server(port1)
174+
p2 = self.start_server(port2)
175+
176+
labels_dict = {"l": "live"}
177+
labels_tuple = (("l", "live"),)
178+
179+
# Set live gauges
180+
self.call_metric(port1, 'set', name, labels_dict, 10)
181+
if name == 'g_livemostrecent':
182+
time.sleep(0.1)
183+
self.call_metric(port2, 'set', name, labels_dict, 20)
184+
185+
m = self.get_metrics(port1)
186+
self.assertEqual(m[(name, labels_tuple)], expected_sum, f"Failed for {name} with both alive")
187+
188+
# Kill p2
189+
p2.terminate()
190+
p2.wait()
191+
self.processes.remove(p2)
192+
193+
# Live gauge should only reflect p1 now, as p2 is dead and live gauges are not aggregated into aggregate.db
194+
m = self.get_metrics(port1)
195+
self.assertEqual(m[(name, labels_tuple)], 10.0, f"Failed for {name} after p2 death")
196+
197+
# Cleanup for next iteration
198+
p1.terminate()
199+
p1.wait()
200+
self.processes.remove(p1)
201+
shutil.rmtree(self.tmpdir)
202+
self.tmpdir = tempfile.mkdtemp()
203+
os.environ['PROMETHEUS_MULTIPROC_DIR'] = self.tmpdir
204+
205+
def test_live_all_gauge(self):
206+
# Test liveall mode separately as it keeps PID labels
207+
port1 = 12350
208+
port2 = 12351
209+
p1 = self.start_server(port1)
210+
p2 = self.start_server(port2)
211+
212+
pid1 = str(p1.pid)
213+
pid2 = str(p2.pid)
214+
215+
labels_dict = {"l": "liveall"}
216+
217+
self.call_metric(port1, 'set', 'g_liveall', labels_dict, 10)
218+
self.call_metric(port2, 'set', 'g_liveall', labels_dict, 20)
219+
220+
m = self.get_metrics(port1)
221+
# Should have two entries with different pids
222+
expected_metrics = {
223+
(('l', 'liveall'), ('pid', pid1)): 10.0,
224+
(('l', 'liveall'), ('pid', pid2)): 20.0,
225+
}
226+
for labels, val in expected_metrics.items():
227+
self.assertEqual(m.get(('g_liveall', labels)), val, f"Missing or incorrect metric for pid {labels}")
228+
229+
# Kill p2
230+
p2.terminate()
231+
p2.wait()
232+
self.processes.remove(p2)
233+
234+
# Now should only have one entry (p1's)
235+
m = self.get_metrics(port1)
236+
self.assertEqual(m.get(('g_liveall', (('l', 'liveall'), ('pid', pid1)))), 10.0)
237+
self.assertNotIn(('g_liveall', (('l', 'liveall'), ('pid', pid2))), m)
238+
239+
if __name__ == '__main__':
240+
unittest.main()

0 commit comments

Comments
 (0)