1+ from collections .abc import Generator
12import contextlib
23import os
34import shutil
89import threading
910import time
1011
12+ import psycopg
1113import psycopg2
1214import pytest
1315
1416from postgresql_proxy import config_schema as cfg
1517from postgresql_proxy .proxy import Proxy
1618
1719
20+ class _QuerySpy :
21+ """Minimal query interceptor that records every query it sees."""
22+
23+ def __init__ (self ):
24+ self .captured : list [str ] = []
25+
26+ def capture (self , query : str , context ) -> str :
27+ self .captured .append (query )
28+ return query
29+
30+
1831def _get_free_tcp_port () -> int :
1932 with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as sock :
2033 sock .bind (("127.0.0.1" , 0 ))
@@ -125,8 +138,18 @@ def _temporary_server_cert_pair():
125138
126139
127140@contextlib .contextmanager
128- def _run_proxy (postgres_settings , ssl_context : ssl .SSLContext | None = None ):
141+ def _run_proxy (
142+ postgres_settings ,
143+ ssl_context : ssl .SSLContext | None = None ,
144+ * ,
145+ plugins : dict | None = None ,
146+ query_interceptors : list [dict ] | None = None ,
147+ ) -> Generator [int , None , None ]:
148+ """Start a proxy in a background thread and yield its listening port."""
129149 proxy_port = _get_free_tcp_port ()
150+ commands_config : dict = {}
151+ if query_interceptors :
152+ commands_config ["queries" ] = query_interceptors
130153 instance = cfg .InstanceSettings (
131154 {
132155 "listen" : {"name" : "proxy" , "host" : "127.0.0.1" , "port" : proxy_port },
@@ -135,14 +158,13 @@ def _run_proxy(postgres_settings, ssl_context: ssl.SSLContext | None = None):
135158 "host" : postgres_settings ["host" ],
136159 "port" : postgres_settings ["port" ],
137160 },
138- # Keep interceptors active with default no-op behavior.
139- "intercept" : {"commands" : {}, "responses" : {}},
161+ "intercept" : {"commands" : commands_config , "responses" : {}},
140162 }
141163 )
142164 if not hasattr (instance .intercept .responses , "parameter_status" ):
143165 instance .intercept .responses .parameter_status = []
144166
145- proxy = Proxy (instance , plugins = {}, debug = True , ssl_context = ssl_context )
167+ proxy = Proxy (instance , plugins = plugins or {}, debug = True , ssl_context = ssl_context )
146168 thread = threading .Thread (
147169 target = proxy .listen , kwargs = {"max_connections" : 32 }, daemon = True
148170 )
@@ -332,3 +354,90 @@ def test_psql_ssl_file_batch_stress_no_hang(postgres_settings, ssl_proxy_port):
332354 "psql -f batch succeeded but expected marker missing "
333355 f"(run={ run_idx + 1 } , { elapsed = :.2f} s) stdout_tail={ out_tail } "
334356 )
357+
358+
359+ def test_extended_query_protocol_parse_packet_with_high_oid_params_passes_through_proxy (
360+ postgres_settings ,
361+ ):
362+ """Regression: proxy must not corrupt Extended Query Protocol Parse packets.
363+
364+ psycopg v3 sends Parse → Bind → Execute for parameterized queries. The Parse body
365+ ends with binary uint32 OIDs; jsonb OID 3802 (0x00000EDA) contains 0xDA which is
366+ not valid UTF-8. The old interceptor sliced the body incorrectly and crashed on
367+ decode, causing the connection to hang or drop.
368+
369+ A _QuerySpy is wired into the proxy to verify the interceptor receives the correct
370+ SQL text — not corrupted bytes from the binary OID suffix.
371+ """
372+ spy = _QuerySpy ()
373+ with _run_proxy (
374+ postgres_settings ,
375+ plugins = {"spy" : spy },
376+ query_interceptors = [{"plugin" : "spy" , "function" : "capture" }],
377+ ) as proxy_port :
378+ with psycopg .connect (
379+ host = "127.0.0.1" ,
380+ port = proxy_port ,
381+ user = postgres_settings ["user" ],
382+ password = postgres_settings ["password" ],
383+ dbname = postgres_settings ["dbname" ],
384+ sslmode = "disable" ,
385+ ) as conn :
386+ with conn .cursor () as cur :
387+ cur .execute (
388+ "DROP TABLE IF EXISTS _test_jsonb_proxy_params;"
389+ "CREATE TABLE _test_jsonb_proxy_params "
390+ "(id serial PRIMARY KEY, data jsonb, label text);"
391+ )
392+
393+ cur .execute (
394+ "INSERT INTO _test_jsonb_proxy_params (data, label) "
395+ "VALUES (%s, %s) RETURNING id" ,
396+ (psycopg .types .json .Jsonb ({"key" : "value" }), "hello" ),
397+ )
398+ row = cur .fetchone ()
399+
400+ assert row is not None and row [0 ] >= 1
401+
402+ # Verify the interceptor received clean SQL — no binary OID bytes leaked in.
403+ insert_queries = [
404+ q for q in spy .captured if "INSERT INTO _test_jsonb_proxy_params" in q
405+ ]
406+ assert insert_queries
407+ assert all ("\x00 " not in q for q in insert_queries ), (
408+ "null byte leaked into intercepted query"
409+ )
410+
411+
412+ def test_extended_query_protocol_named_prepared_statement_passes_through_proxy (
413+ postgres_settings , plain_proxy_port
414+ ):
415+ """Parse packets with a non-empty statement name must also be relayed correctly.
416+
417+ The statement_name field precedes the query text in the Parse body. The fix uses
418+ find(b'\\ x00') to locate boundaries, so named statements work the same as anonymous
419+ ones (empty name).
420+ """
421+ with psycopg .connect (
422+ host = "127.0.0.1" ,
423+ port = plain_proxy_port ,
424+ user = postgres_settings ["user" ],
425+ password = postgres_settings ["password" ],
426+ dbname = postgres_settings ["dbname" ],
427+ sslmode = "disable" ,
428+ # Prepare after the first execution of the same query (i.e. on 2nd run).
429+ prepare_threshold = 1 ,
430+ ) as conn :
431+ with conn .cursor () as cur :
432+ # Execute twice so psycopg can promote the query to a named statement.
433+ for val in (1 , 2 ):
434+ cur .execute ("SELECT %s::int + 1" , (val ,))
435+ result = cur .fetchone ()
436+ assert result == (val + 1 ,)
437+
438+ # Verify psycopg created a named prepared statement in this session.
439+ cur .execute (
440+ "SELECT count(*) FROM pg_prepared_statements WHERE name LIKE '_pg3_%'"
441+ )
442+ prepared_count = cur .fetchone ()
443+ assert prepared_count is not None and prepared_count [0 ] >= 1
0 commit comments