-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathtest_app.py
More file actions
2159 lines (1830 loc) · 91.9 KB
/
test_app.py
File metadata and controls
2159 lines (1830 loc) · 91.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Tests for the Flask monitoring backend."""
import importlib
import os
import threading
import pytest
import json
import psycopg2
from unittest.mock import patch, mock_open, Mock, MagicMock, call
import app as app_module
from app import (
app,
read_version_file,
smart_truncate_query,
_escape_prometheus_label,
escape_promql_label,
escape_promql_regex_literal,
)
@pytest.fixture
def client():
"""Create test client."""
original_trigger_migration_applied = app_module._trigger_migration_applied
original_active_minutes = app_module.QUERYID_ACTIVE_MINUTES
original_retention_hours = app_module.QUERYID_RETENTION_HOURS
original_retention_max_iterations = app_module.QUERYID_RETENTION_MAX_ITERATIONS
cleanup_was_running = app_module._cleanup_running.is_set()
app.config['TESTING'] = True
# Reset the lazy migration flag so tests are independent
app_module._trigger_migration_applied = True # Skip migration in tests
app_module._cleanup_running.clear()
try:
with app.test_client() as client:
yield client
finally:
app_module._trigger_migration_applied = original_trigger_migration_applied
app_module.QUERYID_ACTIVE_MINUTES = original_active_minutes
app_module.QUERYID_RETENTION_HOURS = original_retention_hours
app_module.QUERYID_RETENTION_MAX_ITERATIONS = original_retention_max_iterations
app_module._cleanup_running.clear()
if cleanup_was_running:
app_module._cleanup_running.set()
@pytest.fixture
def debug_mode():
"""Enable the /execute-query debug endpoint with a known secret key.
Yields the app module so tests can assert on module-level state if needed.
Proper fixture teardown ensures env/module state is always restored.
"""
import os
import app as app_module
os.environ['ENABLE_DEBUG'] = 'true'
app_module._DEBUG_SECRET_KEY = 'test-secret'
yield app_module
os.environ.pop('ENABLE_DEBUG', None)
app_module._DEBUG_SECRET_KEY = ''
@pytest.fixture
def debug_mode_no_key():
"""Enable debug mode but with no secret key configured (tests the 403 path)."""
import os
import app as app_module
os.environ['ENABLE_DEBUG'] = 'true'
app_module._DEBUG_SECRET_KEY = ''
yield app_module
os.environ.pop('ENABLE_DEBUG', None)
app_module._DEBUG_SECRET_KEY = ''
class TestVersionEndpoint:
"""Tests for the /version endpoint."""
def test_version_endpoint_returns_json(self, client):
"""Test that /version returns valid JSON."""
response = client.get('/version')
assert response.status_code == 200
assert response.content_type == 'application/json'
def test_version_endpoint_returns_array(self, client):
"""Test that /version returns array for Grafana Infinity datasource."""
response = client.get('/version')
data = json.loads(response.data)
assert isinstance(data, list)
assert len(data) == 1
def test_version_endpoint_contains_version_field(self, client):
"""Test that /version response contains version field."""
response = client.get('/version')
data = json.loads(response.data)
assert 'version' in data[0]
def test_version_endpoint_contains_build_ts_field(self, client):
"""Test that /version response contains build_ts field."""
response = client.get('/version')
data = json.loads(response.data)
assert 'build_ts' in data[0]
def test_version_endpoint_contains_display_field(self, client):
"""Test that /version response contains pre-formatted display field."""
response = client.get('/version')
data = json.loads(response.data)
assert 'display' in data[0]
assert 'PostgresAI v' in data[0]['display']
class TestReadVersionFile:
"""Tests for the read_version_file function."""
def test_read_version_file_success(self):
"""Test reading version file successfully."""
mock_content = "1.2.3"
with patch("builtins.open", mock_open(read_data=mock_content)):
result = read_version_file("/VERSION")
assert result == "1.2.3"
def test_read_version_file_strips_whitespace(self):
"""Test that version file content is stripped."""
mock_content = " 1.2.3\n "
with patch("builtins.open", mock_open(read_data=mock_content)):
result = read_version_file("/VERSION")
assert result == "1.2.3"
def test_read_version_file_not_found_returns_default(self):
"""Test that missing file returns default value."""
with patch("builtins.open", side_effect=FileNotFoundError()):
result = read_version_file("/VERSION")
assert result == "unknown"
def test_read_version_file_custom_default(self):
"""Test custom default value when file not found."""
with patch("builtins.open", side_effect=FileNotFoundError()):
result = read_version_file("/VERSION", default="0.0.0")
assert result == "0.0.0"
class TestHealthEndpoint:
"""Tests for the /health endpoint."""
@patch('app.get_prometheus_client')
def test_health_endpoint_healthy(self, mock_prom, client):
"""Test /health returns healthy when Prometheus is reachable."""
mock_prom.return_value.get_current_metric_value.return_value = [{'value': 1}]
response = client.get('/health')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'healthy'
@patch('app.get_prometheus_client')
def test_health_endpoint_unhealthy(self, mock_prom, client):
"""Test /health returns unhealthy when Prometheus is unreachable."""
mock_prom.return_value.get_current_metric_value.side_effect = Exception("Connection failed")
response = client.get('/health')
assert response.status_code == 500
data = json.loads(response.data)
assert data['status'] == 'unhealthy'
class TestSmartTruncateQuery:
"""Tests for the smart_truncate_query function."""
def test_empty_query(self):
"""Test that empty queries return empty string."""
assert smart_truncate_query('') == ''
assert smart_truncate_query(None) == ''
def test_short_query_unchanged(self):
"""Test that short queries are returned unchanged."""
short_query = "SELECT * FROM users"
assert smart_truncate_query(short_query, max_length=50) == short_query
def test_strip_leading_block_comment(self):
"""Test that leading block comments are stripped."""
query = "/* comment */ SELECT * FROM users"
result = smart_truncate_query(query, max_length=50)
assert result == "SELECT * FROM users"
assert "/*" not in result
def test_strip_multiple_block_comments(self):
"""Test that multiple leading block comments are stripped."""
query = "/* c1 */ /* c2 */ SELECT id FROM orders"
result = smart_truncate_query(query, max_length=50)
assert result == "SELECT id FROM orders"
def test_strip_leading_line_comment(self):
"""Test that leading single-line comments are stripped."""
query = "-- this is a comment\nSELECT * FROM users"
result = smart_truncate_query(query, max_length=50)
assert "SELECT" in result
assert "--" not in result
def test_strip_mixed_comments(self):
"""Test that mixed comments are stripped."""
query = "/* block */ -- line\nSELECT * FROM users"
result = smart_truncate_query(query, max_length=50)
assert "SELECT" in result
assert "/*" not in result
assert "--" not in result
def test_select_with_from_extraction(self):
"""Test that SELECT queries show FROM clause tables."""
query = "SELECT id, name, email, created_at, updated_at, status FROM users WHERE active = true"
result = smart_truncate_query(query, max_length=40)
# Smart truncation outputs lowercase
assert "select" in result
assert "from" in result
assert "users" in result
assert "..." in result
assert len(result) <= 40
def test_select_multiple_tables(self):
"""Test extraction of multiple tables from FROM clause."""
query = "SELECT a.id, b.name FROM users a, orders b WHERE a.id = b.user_id"
result = smart_truncate_query(query, max_length=60)
# Smart truncation outputs lowercase
assert "from" in result
assert "users" in result
def test_cte_extraction(self):
"""Test that CTEs are shown with their names."""
query = "WITH active_users AS (SELECT * FROM users WHERE active) SELECT * FROM active_users"
result = smart_truncate_query(query, max_length=70)
# Smart truncation outputs lowercase
assert "with" in result
assert "active_users" in result
# Should extract CTE name and the main FROM clause
assert "select ... from" in result
def test_multiple_ctes(self):
"""Test extraction of multiple CTE names."""
query = "WITH cte1 AS (SELECT * FROM a), cte2 AS (SELECT * FROM b) SELECT * FROM cte1, cte2 WHERE id = 1"
result = smart_truncate_query(query, max_length=50)
# Smart truncation outputs lowercase
assert "with" in result
assert "cte1" in result
assert "select ... from" in result
def test_insert_query(self):
"""Test that INSERT queries show target table."""
query = "INSERT INTO users (id, name, email) VALUES (1, 'John', 'john@example.com')"
result = smart_truncate_query(query, max_length=40)
# Smart truncation outputs lowercase
assert "insert into" in result
assert "users" in result
def test_update_query(self):
"""Test that UPDATE queries show target table."""
query = "UPDATE users SET email = 'new@example.com', status = 'active' WHERE id = 123"
result = smart_truncate_query(query, max_length=40)
# Smart truncation outputs lowercase
assert "update" in result
assert "users" in result
def test_delete_query(self):
"""Test that DELETE queries show target table."""
query = "DELETE FROM audit_logs WHERE created_at < '2024-01-01'"
result = smart_truncate_query(query, max_length=40)
# Smart truncation outputs lowercase
assert "delete from" in result
assert "audit_logs" in result
def test_fallback_on_unknown_query(self):
"""Test fallback to simple truncation for unknown query types."""
query = "VACUUM ANALYZE users"
result = smart_truncate_query(query, max_length=15)
assert len(result) <= 15
assert result.endswith('...')
def test_whitespace_normalization(self):
"""Test that excessive whitespace is normalized."""
query = "SELECT * FROM users WHERE id = 1"
result = smart_truncate_query(query, max_length=50)
assert " " not in result # No double spaces
def test_respects_max_length(self):
"""Test that result never exceeds max_length."""
query = "SELECT very_long_column_name_1, very_long_column_name_2 FROM extremely_long_table_name WHERE condition"
for max_len in [20, 30, 40, 50]:
result = smart_truncate_query(query, max_length=max_len)
assert len(result) <= max_len, f"Result '{result}' exceeds max_length {max_len}"
def test_pgss_comment_stripping(self):
"""Test stripping of pg_stat_statements style comments."""
query = "/* pgwatch_monitor_user */ SELECT count(*) FROM pg_stat_activity WHERE state = 'active'"
result = smart_truncate_query(query, max_length=50)
assert "pgwatch" not in result
# Smart truncation outputs lowercase
assert "select" in result
assert "pg_stat_activity" in result
def test_complex_query_with_joins(self):
"""Test handling of queries with JOINs."""
query = "SELECT u.id, o.total FROM users u JOIN orders o ON u.id = o.user_id WHERE o.status = 'completed'"
result = smart_truncate_query(query, max_length=40)
# Smart truncation outputs lowercase
assert "select" in result
assert "from" in result
assert "users" in result
def test_select_without_from_clause(self):
"""Test SELECT queries that call functions without FROM clause."""
query = "select current_database() as tag_datname, case when pg_is_in_recovery() then (pg_last_wal_replay_lsn() - $1) % ($2^$3)::bigint else (pg_current_wal_lsn() - $1) % ($2^$3)::bigint end"
result = smart_truncate_query(query, max_length=100)
# Should fall back to simple truncation since no FROM clause
assert len(result) <= 100
assert result.endswith('...')
# Should start with select, not a comment
assert result.lower().startswith('select')
def test_select_without_from_short_truncation(self):
"""Test SELECT without FROM with shorter max_length."""
query = "SELECT current_database(), pg_is_in_recovery(), now()"
result = smart_truncate_query(query, max_length=30)
assert len(result) <= 30
# Should either fit or be truncated with ...
if len(query) > 30:
assert result.endswith('...')
def test_inline_block_comment_stripped(self):
"""Test that inline block comments (not just leading) are stripped."""
query = "SELECT /* inline comment */ id FROM users"
result = smart_truncate_query(query, max_length=50)
assert "/*" not in result
assert "inline" not in result
assert "SELECT" in result
assert "users" in result
def test_block_comment_with_newlines(self):
"""Test block comments containing newlines are stripped."""
query = """/* This is a
multi-line
comment */ SELECT * FROM users"""
result = smart_truncate_query(query, max_length=50)
assert "/*" not in result
assert "multi-line" not in result
assert "SELECT" in result
def test_pgwatch_comment_at_start(self):
"""Test pgwatch-style comments at query start are stripped."""
query = "/* First we have to remove them from the extension */ ALTER EXTENSION pg_stat_statements DROP VIEW pg_stat_statements"
result = smart_truncate_query(query, max_length=100)
# Comment should be stripped
assert "/*" not in result
assert "First we have to" not in result
# ALTER should be at the start
assert result.upper().startswith("ALTER")
def test_comment_followed_by_alter(self):
"""Test ALTER statement after comment is properly shown."""
query = "/* comment */ ALTER TABLE users ADD COLUMN email VARCHAR(255)"
result = smart_truncate_query(query, max_length=60)
assert "/*" not in result
assert "ALTER" in result
def test_comment_followed_by_create(self):
"""Test CREATE statement after comment is properly shown."""
query = "/* setup */ CREATE INDEX idx_users_email ON users(email)"
result = smart_truncate_query(query, max_length=60)
assert "/*" not in result
assert "CREATE" in result
def test_lowercase_output_for_select(self):
"""Test that smart truncation outputs lowercase for SELECT queries."""
query = "SELECT id, name FROM users WHERE active = true"
result = smart_truncate_query(query, max_length=40)
# Should output lowercase "select ... from"
assert "select" in result
assert "from" in result
def test_lowercase_output_for_insert(self):
"""Test that smart truncation outputs lowercase for INSERT queries."""
query = "INSERT INTO users (id, name) VALUES (1, 'test')"
result = smart_truncate_query(query, max_length=30)
assert "insert into" in result
def test_lowercase_output_for_update(self):
"""Test that smart truncation outputs lowercase for UPDATE queries."""
query = "UPDATE users SET name = 'new' WHERE id = 1"
result = smart_truncate_query(query, max_length=30)
assert "update" in result
def test_lowercase_output_for_delete(self):
"""Test that smart truncation outputs lowercase for DELETE queries."""
query = "DELETE FROM users WHERE id = 1 AND status = 'inactive' AND created_at < now()"
result = smart_truncate_query(query, max_length=30)
assert "delete from" in result
def test_lowercase_output_for_cte(self):
"""Test that smart truncation outputs lowercase for CTE queries."""
query = "WITH active_users AS (SELECT * FROM users) SELECT * FROM active_users"
result = smart_truncate_query(query, max_length=60)
assert "with" in result
assert "select" in result
def test_multiple_inline_comments(self):
"""Test multiple inline comments are all stripped."""
query = "SELECT /* c1 */ id, /* c2 */ name FROM /* c3 */ users"
result = smart_truncate_query(query, max_length=50)
assert "/*" not in result
assert "c1" not in result
assert "c2" not in result
assert "c3" not in result
def test_line_comment_at_end(self):
"""Test line comment at end of query is stripped."""
query = "SELECT * FROM users -- get all users"
result = smart_truncate_query(query, max_length=50)
assert "--" not in result
assert "get all users" not in result
def test_very_long_query_respects_limit(self):
"""Test that very long queries respect the max_length limit."""
long_query = "SELECT " + ", ".join([f"column_{i}" for i in range(100)]) + " FROM very_long_table_name"
for max_len in [30, 60, 100]:
result = smart_truncate_query(long_query, max_length=max_len)
assert len(result) <= max_len, f"Result length {len(result)} exceeds max {max_len}"
class TestEscapePrometheusLabel:
"""Tests for the _escape_prometheus_label function."""
def test_empty_string(self):
"""Test empty string returns empty."""
assert _escape_prometheus_label('') == ''
def test_none_returns_empty(self):
"""Test None input returns empty string."""
assert _escape_prometheus_label(None) == ''
def test_simple_string_unchanged(self):
"""Test simple strings without special chars are unchanged."""
assert _escape_prometheus_label('hello world') == 'hello world'
def test_escapes_backslash(self):
"""Test backslashes are escaped."""
assert _escape_prometheus_label('path\\to\\file') == 'path\\\\to\\\\file'
def test_escapes_double_quote(self):
"""Test double quotes are escaped."""
assert _escape_prometheus_label('say "hello"') == 'say \\"hello\\"'
def test_replaces_newline_with_space(self):
"""Test newlines are replaced with spaces."""
assert _escape_prometheus_label('line1\nline2') == 'line1 line2'
def test_combined_special_chars(self):
"""Test string with multiple special characters."""
input_str = 'path\\file "name"\nend'
result = _escape_prometheus_label(input_str)
assert '\\\\' in result # Escaped backslash
assert '\\"' in result # Escaped quote
assert '\n' not in result # No raw newlines
def test_sql_query_escaping(self):
"""Test escaping of SQL query text."""
query = 'SELECT * FROM "users" WHERE name = \'test\''
result = _escape_prometheus_label(query)
assert '\\"' in result # Double quotes escaped
assert "'" in result # Single quotes unchanged (not escaped in Prometheus)
def test_multiline_query(self):
"""Test multiline SQL query has newlines replaced."""
query = "SELECT *\nFROM users\nWHERE id = 1"
result = _escape_prometheus_label(query)
assert '\n' not in result
assert 'SELECT * FROM users WHERE id = 1' == result
def test_backslash_before_quote(self):
"""Test backslash before quote is escaped correctly."""
# Input: \" should become \\\"
assert _escape_prometheus_label('\\"') == '\\\\\\"'
def test_unicode_preserved(self):
"""Test unicode characters are preserved."""
assert _escape_prometheus_label('héllo wörld') == 'héllo wörld'
assert _escape_prometheus_label('日本語') == '日本語'
def test_carriage_return(self):
"""Test carriage returns are handled."""
result = _escape_prometheus_label('line1\r\nline2')
# Should not contain raw \r or \n
assert '\r' not in result
assert '\n' not in result
def test_tab_character(self):
"""Test tab characters are handled."""
result = _escape_prometheus_label('col1\tcol2')
# Tabs should be replaced with space
assert '\t' not in result
class TestQueryInfoMetricsEndpoint:
"""Tests for the /query_info_metrics endpoint."""
@patch('app.psycopg2.connect')
def test_get_query_texts_uses_explicit_max_age_with_db_filter(self, mock_connect):
"""Direct sink lookup passes explicit max_age_hours after db_name."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
result = app_module.get_query_texts_from_sink(db_name='mydb', max_age_hours=48)
assert result == {}
query, params = mock_cursor.execute.call_args[0]
assert 'dbname = %s' in query
assert 'make_interval(hours => %s)' in query
assert params == ('mydb', 48)
@patch('app.psycopg2.connect')
def test_get_query_texts_defaults_max_age_to_retention_window(self, mock_connect):
"""Direct sink lookup defaults max_age_hours to QUERYID_RETENTION_HOURS."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
result = app_module.get_query_texts_from_sink(max_age_hours=None)
assert result == {}
query, params = mock_cursor.execute.call_args[0]
assert 'dbname = %s' not in query
assert 'make_interval(hours => %s)' in query
assert params == (app_module.QUERYID_RETENTION_HOURS,)
@patch('app.psycopg2.connect')
def test_endpoint_returns_prometheus_format(self, mock_connect, client):
"""Test endpoint returns Prometheus exposition format."""
# Mock database connection
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([
{'queryid': '123', 'query': 'SELECT * FROM users'}
])
response = client.get('/query_info_metrics')
assert response.status_code == 200
assert response.content_type == 'text/plain; charset=utf-8'
@patch('app.psycopg2.connect')
def test_endpoint_includes_help_and_type(self, mock_connect, client):
"""Test endpoint includes HELP and TYPE comments."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
response = client.get('/query_info_metrics')
data = response.data.decode('utf-8')
assert '# HELP pgwatch_query_info' in data
assert '# TYPE pgwatch_query_info gauge' in data
@patch('app.psycopg2.connect')
def test_displayname_has_queryid_prefix(self, mock_connect, client):
"""Test that displayname labels include queryid prefix."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([
{'queryid': '12345', 'query': 'SELECT * FROM users WHERE id = 1'}
])
response = client.get('/query_info_metrics')
data = response.data.decode('utf-8')
# All displayname variants should have "queryid | " prefix
assert '12345 | ' in data
# The queryid should appear in displayname labels
assert 'displayname="12345 |' in data
@patch('app.psycopg2.connect')
def test_all_truncation_levels_present(self, mock_connect, client):
"""Test that all truncation level labels are present."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([
{'queryid': '999', 'query': 'SELECT very_long_column_name FROM very_long_table_name WHERE condition = true'}
])
response = client.get('/query_info_metrics')
data = response.data.decode('utf-8')
# Check all 8 label types are present
assert 'displayname="' in data
assert 'displayname_medium="' in data
assert 'displayname_long="' in data
assert 'displayname_raw_short="' in data
assert 'displayname_raw_medium="' in data
assert 'displayname_raw_long="' in data
assert 'displayname_full="' in data
assert 'displayname_queryid="' in data
@patch('app.psycopg2.connect')
def test_smart_truncation_strips_comments(self, mock_connect, client):
"""Test smart truncation removes comments from displayname."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([
{'queryid': '111', 'query': '/* comment */ SELECT * FROM users'}
])
response = client.get('/query_info_metrics')
data = response.data.decode('utf-8')
# Comment should not appear in smart truncation displayname
# Note: We check for the escaped version since Prometheus labels are escaped
assert '/* comment */' not in data or 'displayname_raw' in data.split('/* comment */')[0]
@patch('app.psycopg2.connect')
def test_db_name_filter(self, mock_connect, client):
"""Test db_name parameter filters results."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
response = client.get('/query_info_metrics?db_name=mydb')
assert response.status_code == 200
# Verify the main query (last execute call) was called with db_name filter
call_args = mock_cursor.execute.call_args
assert call_args is not None
query = call_args[0][0]
assert 'dbname = %s' in query
@patch('app.psycopg2.connect')
def test_db_name_all_skips_filter(self, mock_connect, client):
"""Test db_name='all' skips the filter."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
response = client.get('/query_info_metrics?db_name=all')
assert response.status_code == 200
# Verify the main query (last execute call) was called without db_name filter
call_args = mock_cursor.execute.call_args
query = call_args[0][0]
assert 'dbname = %s' not in query
@patch('app.psycopg2.connect')
def test_db_name_variable_skips_filter(self, mock_connect, client):
"""Test db_name starting with $ skips the filter."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
response = client.get('/query_info_metrics?db_name=$db_name')
assert response.status_code == 200
# Verify the main query (last execute call) was called without db_name filter
call_args = mock_cursor.execute.call_args
query = call_args[0][0]
assert 'dbname = %s' not in query
@patch('app.psycopg2.connect')
def test_time_filter_applied(self, mock_connect, client):
"""Test that the time filter is applied and QUERYID_ACTIVE_MINUTES flows through."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
# Temporarily override the active minutes constant
original_minutes = app_module.QUERYID_ACTIVE_MINUTES
app_module.QUERYID_ACTIVE_MINUTES = 42
try:
response = client.get('/query_info_metrics')
assert response.status_code == 200
# Verify the main query includes a time filter
call_args = mock_cursor.execute.call_args
query = call_args[0][0]
assert "make_interval" in query
assert "mins" in query
# Verify the parameter value is passed correctly
params = call_args[0][1]
assert params == (42,)
finally:
app_module.QUERYID_ACTIVE_MINUTES = original_minutes
@patch('app.psycopg2.connect')
def test_time_filter_applied_with_db_filter(self, mock_connect, client):
"""Time filter + db_name filter path: both dbname and make_interval land in the query, params pair correctly."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
original_minutes = app_module.QUERYID_ACTIVE_MINUTES
app_module.QUERYID_ACTIVE_MINUTES = 42
try:
response = client.get('/query_info_metrics?db_name=mydb')
assert response.status_code == 200
call_args = mock_cursor.execute.call_args
query = call_args[0][0]
params = call_args[0][1]
assert 'dbname = %s' in query
assert 'make_interval(mins => %s)' in query
# The db_filter branch passes (db_name, minutes) in that order.
assert params == ('mydb', 42)
finally:
app_module.QUERYID_ACTIVE_MINUTES = original_minutes
@patch('app._run_retention_cleanup')
@patch('app.psycopg2.connect')
def test_retention_cleanup_runs_in_background(self, mock_connect, mock_cleanup, client):
"""Test that retention cleanup is triggered and the running flag is cleared."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
mock_cursor.rowcount = 0
started = threading.Event()
release = threading.Event()
threads = []
original_thread = threading.Thread
def cleanup_side_effect():
started.set()
release.wait(timeout=5)
def make_thread(*args, **kwargs):
thread = original_thread(*args, **kwargs)
threads.append(thread)
return thread
mock_cleanup.side_effect = cleanup_side_effect
with patch('app.threading.Thread', side_effect=make_thread):
response = client.get('/query_info_metrics')
assert response.status_code == 200
assert started.wait(timeout=2)
assert app_module._cleanup_running.is_set() is True
release.set()
for thread in threads:
thread.join(timeout=2)
assert thread.is_alive() is False
mock_cleanup.assert_called_once()
assert app_module._cleanup_running.is_set() is False
def test_start_retention_cleanup_thread_coalesces_concurrent_callers(self):
"""Concurrent scrapes should start at most one in-process cleanup thread."""
app_module._cleanup_running.clear()
start_barrier = threading.Barrier(3)
cleanup_started = threading.Event()
release_cleanup = threading.Event()
cleanup_finished = threading.Event()
errors = []
def guarded_cleanup():
cleanup_started.set()
try:
release_cleanup.wait(timeout=5)
finally:
app_module._cleanup_running.clear()
cleanup_finished.set()
def caller():
try:
start_barrier.wait(timeout=2)
app_module._start_retention_cleanup_thread()
except Exception as exc:
errors.append(exc)
with patch('app._run_retention_cleanup_guarded', side_effect=guarded_cleanup) as mock_guarded:
callers = [threading.Thread(target=caller) for _ in range(2)]
for thread in callers:
thread.start()
start_barrier.wait(timeout=2)
assert cleanup_started.wait(timeout=2)
for thread in callers:
thread.join(timeout=2)
assert thread.is_alive() is False
assert errors == []
assert mock_guarded.call_count == 1
release_cleanup.set()
assert cleanup_finished.wait(timeout=2)
assert app_module._cleanup_running.is_set() is False
@patch('app.psycopg2.connect')
def test_retention_cleanup_thread_start_failure_clears_flag(self, mock_connect, client):
"""Thread.start failure must not leave cleanup disabled forever."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([])
broken_thread = Mock()
broken_thread.start.side_effect = RuntimeError("thread quota exhausted")
with patch('app.threading.Thread', return_value=broken_thread), \
patch('app.logger.warning') as mock_warning:
response = client.get('/query_info_metrics')
assert response.status_code == 200
assert app_module._cleanup_running.is_set() is False
warning_text = ' '.join(str(c) for c in mock_warning.call_args_list)
assert 'Failed to start retention cleanup thread' in warning_text
assert 'thread quota exhausted' in warning_text
@patch('app.psycopg2.connect')
def test_handles_db_connection_error(self, mock_connect, client):
"""Test endpoint handles database connection errors gracefully."""
mock_connect.side_effect = Exception("Connection refused")
response = client.get('/query_info_metrics')
# Should still return 200 with empty metrics (graceful degradation)
assert response.status_code == 200
data = response.data.decode('utf-8')
assert '# HELP pgwatch_query_info' in data
@patch('app.psycopg2.connect')
def test_special_chars_in_query_escaped(self, mock_connect, client):
"""Test special characters in query are escaped in Prometheus labels."""
mock_cursor = mock_connect.return_value.cursor.return_value.__enter__.return_value
mock_cursor.__iter__ = lambda self: iter([
{'queryid': '222', 'query': 'SELECT "column" FROM users\nWHERE x = 1'}
])
response = client.get('/query_info_metrics')
data = response.data.decode('utf-8')
# Raw newlines should be replaced
assert 'users\nWHERE' not in data
# Double quotes should be escaped
assert '\\"column\\"' in data or 'column' in data
_GUARD_PROSRC = """
declare queryid_value text;
begin
queryid_value := new.data->>'queryid';
perform pg_advisory_xact_lock(lock_key);
delete from public.pgss_queryid_queries
where dbname = new.dbname
and data->>'queryid' = queryid_value
and time <= new.time;
...
end;
"""
_UNPATCHED_PROSRC = """
declare queryid_value text;
begin
queryid_value := new.data->>'queryid';
insert into pgss_queryid_queries values (...);
return null;
end;
"""
class TestTriggerMigration:
"""Tests for the verify-only _apply_trigger_migration function.
Migration no longer does DDL — the bootstrap role's init.sql owns
that responsibility. This function verifies the deployed function
body contains the advisory-lock dedup path and the trigger exists.
"""
def _make_mock_conn(self, fetchone_sequence):
"""Build a mock connection whose cursor().fetchone() returns successive tuples."""
app_module._trigger_migration_warning_state['last_at'] = 0.0
mock_conn = Mock()
mock_cursor = Mock()
mock_cursor.fetchone.side_effect = list(fetchone_sequence)
mock_conn.cursor.return_value.__enter__ = Mock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = Mock(return_value=False)
return mock_conn, mock_cursor
def test_verifies_function_body_and_trigger_when_both_present(self):
"""Guard present in function body + trigger exists → flag set, no DDL."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), # pg_try_advisory_lock
(_GUARD_PROSRC,), # pg_proc.prosrc with guard
(1,), # trigger exists
])
with patch('app.psycopg2.connect', return_value=mock_conn):
app_module._apply_trigger_migration()
calls = [c[0][0] for c in mock_cursor.execute.call_args_list]
# Purely read-only: no CREATE / ALTER / DROP.
assert not any('create or replace function' in c.lower() for c in calls)
assert not any('create trigger' in c.lower() for c in calls)
assert not any('alter function' in c.lower() for c in calls)
assert app_module._trigger_migration_applied is True
def test_missing_guard_logs_warning_and_sets_flag(self):
"""Deployed function body missing advisory-lock dedup → warn, don't retry-spam."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), # advisory lock
(_UNPATCHED_PROSRC,), # prosrc without guard
])
with patch('app.psycopg2.connect', return_value=mock_conn), \
patch('app.logger.warning') as mock_warning:
app_module._apply_trigger_migration()
# Verification failures are retryable so a later init.sql repair is detected.
assert app_module._trigger_migration_applied is False
# A clear remediation hint in the log.
warning_text = ' '.join(str(c) for c in mock_warning.call_args_list)
assert 'advisory-lock dedup' in warning_text
assert 'init.sql' in warning_text
def test_missing_function_logs_warning_and_sets_flag(self):
"""Function doesn't exist at all → warn, then retry on a later request."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), # advisory lock
None, # pg_proc lookup returns no row
])
with patch('app.psycopg2.connect', return_value=mock_conn), \
patch('app.logger.warning') as mock_warning:
app_module._apply_trigger_migration()
assert app_module._trigger_migration_applied is False
warning_text = ' '.join(str(c) for c in mock_warning.call_args_list)
assert 'public.enforce_queryid_uniqueness' in warning_text
assert 'init.sql' in warning_text
def test_missing_trigger_logs_warning_and_sets_flag(self):
"""Function exists and is guarded but trigger is absent → warn, then retry."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), # advisory lock
(_GUARD_PROSRC,), # prosrc with guard
None, # trigger lookup returns no row
])
with patch('app.psycopg2.connect', return_value=mock_conn), \
patch('app.logger.warning') as mock_warning:
app_module._apply_trigger_migration()
assert app_module._trigger_migration_applied is False
warning_text = ' '.join(str(c) for c in mock_warning.call_args_list)
assert 'enforce_queryid_uniqueness_trigger' in warning_text
assert 'init.sql' in warning_text
def test_does_not_transfer_function_ownership(self):
"""Security: verification path must never issue ALTER FUNCTION ... OWNER."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), (_GUARD_PROSRC,), (1,),
])
with patch('app.psycopg2.connect', return_value=mock_conn):
app_module._apply_trigger_migration()
calls = [c[0][0].lower() for c in mock_cursor.execute.call_args_list]
assert not any('owner to' in c for c in calls)
def test_advisory_lock_taken_and_released(self):
"""Migration grabs pg_try_advisory_lock and releases it on completion."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), (_GUARD_PROSRC,), (1,),
])
with patch('app.psycopg2.connect', return_value=mock_conn):
app_module._apply_trigger_migration()
calls = [c[0][0] for c in mock_cursor.execute.call_args_list]
assert any('pg_try_advisory_lock' in c for c in calls)
assert any('pg_advisory_unlock' in c for c in calls)
def test_advisory_lock_denied_skips_verification(self):
"""When another worker holds the advisory lock, skip without error; flag stays False."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(False,), # lock denied
])
with patch('app.psycopg2.connect', return_value=mock_conn):
app_module._apply_trigger_migration()
calls = [c[0][0] for c in mock_cursor.execute.call_args_list]
assert any('pg_try_advisory_lock' in c for c in calls)
assert not any('pg_proc' in c for c in calls)
assert not any('pg_trigger' in c for c in calls)
# Flag stays False so the next request retries.
assert app_module._trigger_migration_applied is False
def test_db_error_leaves_flag_false(self):
"""On DB error, flag remains False so retry is possible."""
app_module._trigger_migration_applied = False
with patch('app.psycopg2.connect', side_effect=Exception("connection refused")):
app_module._apply_trigger_migration()
assert app_module._trigger_migration_applied is False
def test_successful_run_sets_flag_so_subsequent_calls_are_noop(self):
"""Once verification succeeds, subsequent calls must not touch the DB."""
app_module._trigger_migration_applied = False
mock_conn, mock_cursor = self._make_mock_conn([
(True,), (_GUARD_PROSRC,), (1,),
])
with patch('app.psycopg2.connect', return_value=mock_conn) as mock_connect:
app_module._apply_trigger_migration()
first_call_count = mock_connect.call_count
# Second invocation should early-return without opening a new connection.
app_module._apply_trigger_migration()
assert mock_connect.call_count == first_call_count
class TestRetentionCleanup:
"""Tests for the _run_retention_cleanup function."""