Skip to content

Commit c8c7f14

Browse files
fix(proxy): add backpressure handling to prevent hang on large responses
When forwarding large responses, the proxy's send to the destination socket can block: the receiver's TCP buffer fills up, which fills the proxy's send buffer, which stalls the sender via flow control. With no EVENT_WRITE handling the proxy had no way to retry the stalled send, causing a deadlock for responses larger than the TCP send buffer (~400KB on macOS, ~256KB on Linux). Catch BlockingIOError explicitly (before the generic OSError handler) and register the socket for EVENT_WRITE so the selector retries the flush when buffer space becomes available. Also add return guards after connection close in the EVENT_READ path to prevent fall-through into the now-stale redirect_conn state. Add test_various_payload_sizes covering 1B, 1KB, 100KB, 1MB, 10MB and 10k/100k rows, over both plain and SSL connections, to catch regressions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent eb000fc commit c8c7f14

2 files changed

Lines changed: 92 additions & 0 deletions

File tree

postgresql_proxy/proxy.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,29 @@ def service_connection(self, key: SelectorKeyProxy, mask):
251251
LOG.debug('%s connection closing %s', conn.name, conn.address)
252252
# A file object shall be unregistered prior to being closed.
253253
sock.close()
254+
return
254255
except OSError as e:
255256
# it means the socket was closed by peer
256257
LOG.debug('%s connection closed by peer %s: %s', conn.name, conn.address, e)
257258
self._unregister_conn(conn)
259+
return
260+
261+
if mask & selectors.EVENT_WRITE:
262+
# Socket has buffer space — flush this connection's backlogged output.
263+
try:
264+
while conn.out_bytes:
265+
sent = sock.send(conn.out_bytes)
266+
conn.sent(sent)
267+
# All data drained; stop watching for writability.
268+
conn.events = selectors.EVENT_READ
269+
self.selector.modify(sock, selectors.EVENT_READ, data=conn)
270+
except BlockingIOError:
271+
pass # Still full; will retry on the next EVENT_WRITE notification.
272+
except OSError as e:
273+
LOG.debug('%s closed while flushing backlog: %s', conn.name, e)
274+
self._unregister_conn(conn)
275+
sock.close()
276+
return
258277

259278
next_conn = conn.redirect_conn
260279
if next_conn and next_conn.out_bytes:
@@ -263,6 +282,15 @@ def service_connection(self, key: SelectorKeyProxy, mask):
263282
LOG.debug('sending to %s:\n%s', next_conn.name, next_conn.out_bytes)
264283
sent = next_conn.sock.send(next_conn.out_bytes)
265284
next_conn.sent(sent)
285+
# All sent; clear write interest if it was previously registered.
286+
if next_conn.events & selectors.EVENT_WRITE:
287+
next_conn.events = selectors.EVENT_READ
288+
self.selector.modify(next_conn.sock, selectors.EVENT_READ, data=next_conn)
289+
except BlockingIOError:
290+
# next_conn's send buffer is full — register for writability so we retry when there's space.
291+
if not (next_conn.events & selectors.EVENT_WRITE):
292+
next_conn.events = selectors.EVENT_READ | selectors.EVENT_WRITE
293+
self.selector.modify(next_conn.sock, next_conn.events, data=next_conn)
266294
except OSError:
267295
# If one side is closed, close the other one
268296
# this can happen in the case where the client disconnects, and postgres still return a response

tests/test_proxy.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,70 @@ def test_repeated_connect_query_smoke_no_hang(postgres_settings, plain_proxy_por
226226
assert cur.fetchone() == (i,)
227227

228228

229+
@pytest.mark.timeout(60)
230+
@pytest.mark.parametrize("sslmode", ["disable", "require"])
231+
@pytest.mark.parametrize(
232+
["sql", "expected"],
233+
[
234+
pytest.param(
235+
"SELECT 1",
236+
[(1,)],
237+
id="tiny-1B",
238+
),
239+
pytest.param(
240+
"SELECT repeat('x', 1024)",
241+
[("x" * 1024,)],
242+
id="small-1KB",
243+
),
244+
pytest.param(
245+
"SELECT repeat('x', 102400)",
246+
[("x" * 102400,)],
247+
id="medium-100KB",
248+
),
249+
pytest.param(
250+
"SELECT repeat('x', 1048576)",
251+
[("x" * 1048576,)],
252+
id="large-1MB",
253+
),
254+
pytest.param(
255+
"SELECT repeat('x', 10485760)",
256+
[("x" * 10485760,)],
257+
id="xlarge-10MB",
258+
),
259+
pytest.param(
260+
"SELECT i FROM generate_series(1, 10000) AS t(i)",
261+
[(i,) for i in range(1, 10001)],
262+
id="rows-10k",
263+
),
264+
pytest.param(
265+
"SELECT i FROM generate_series(1, 100000) AS t(i)",
266+
[(i,) for i in range(1, 100001)],
267+
id="rows-100k",
268+
),
269+
]
270+
)
271+
def test_various_payload_sizes(
272+
postgres_settings,
273+
plain_proxy_port,
274+
ssl_proxy_port,
275+
sslmode,
276+
sql,
277+
expected,
278+
):
279+
with psycopg2.connect(
280+
host="127.0.0.1",
281+
port=plain_proxy_port if sslmode == "disable" else ssl_proxy_port,
282+
user=postgres_settings["user"],
283+
password=postgres_settings["password"],
284+
dbname=postgres_settings["dbname"],
285+
sslmode=sslmode,
286+
connect_timeout=3,
287+
) as conn:
288+
with conn.cursor() as cur:
289+
cur.execute(sql)
290+
assert cur.fetchall() == expected
291+
292+
229293
@pytest.mark.timeout(60)
230294
def test_psql_ssl_file_batch_stress_no_hang(postgres_settings, ssl_proxy_port):
231295
if shutil.which("psql") is None:

0 commit comments

Comments
 (0)