Skip to content

Commit fb7c0f4

Browse files
fix(proxy): add backpressure handling to prevent hang on large responses (#16)
* fix(proxy): add backpressure handling to prevent hang on large responses When forwarding large responses, the proxy's send to the destination socket can block: the receiver's TCP buffer fills up, which fills the proxy's send buffer, which stalls the sender via flow control. With no EVENT_WRITE handling the proxy had no way to retry the stalled send, causing a deadlock for responses larger than the TCP send buffer (~400KB on macOS, ~256KB on Linux). Catch BlockingIOError explicitly (before the generic OSError handler) and register the socket for EVENT_WRITE so the selector retries the flush when buffer space becomes available. Also add return guards after connection close in the EVENT_READ path to prevent fall-through into the now-stale redirect_conn state. Add test_various_payload_sizes covering 1B, 1KB, 100KB, 1MB, 10MB and 10k/100k rows, over both plain and SSL connections, to catch regressions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(proxy): handle ssl.SSLWantWriteError for large SSL responses ssl.SSLSocket.send() raises ssl.SSLWantWriteError (not BlockingIOError) when the underlying TCP buffer is full on a non-blocking SSL socket. SSLWantWriteError is a subclass of OSError, so it was caught by the generic connection-close handler, closing the connection mid-response. The client socket stayed open, leaving the caller hanging indefinitely. Catch SSLWantWriteError alongside BlockingIOError in both send paths so SSL connections correctly register EVENT_WRITE and retry when buffer space becomes available. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore: add changelog and bump version to 0.3.2 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6160719 commit fb7c0f4

4 files changed

Lines changed: 96 additions & 1 deletion

File tree

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ Or run it without activating the venv:
6262
```
6363

6464
## Changelog
65+
- v0.3.2
66+
- Fix proxy hang on large responses by adding backpressure handling [#16](https://github.com/localstack/postgresql-proxy/pull/16)
67+
- Reduce SSL connection overhead by setting `TCP_NODELAY` [#15](https://github.com/localstack/postgresql-proxy/pull/15)
6568
- v0.3.1
6669
- Fix SSL COPY stalls by draining pending SSL buffer after recv [#11](https://github.com/localstack/postgresql-proxy/pull/11)
6770
- Fix intermittent `BlockingIOError` on macOS during SSL negotiation

postgresql_proxy/proxy.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,29 @@ def service_connection(self, key: SelectorKeyProxy, mask):
253253
LOG.debug('%s connection closing %s', conn.name, conn.address)
254254
# A file object shall be unregistered prior to being closed.
255255
sock.close()
256+
return
256257
except OSError as e:
257258
# it means the socket was closed by peer
258259
LOG.debug('%s connection closed by peer %s: %s', conn.name, conn.address, e)
259260
self._unregister_conn(conn)
261+
return
262+
263+
if mask & selectors.EVENT_WRITE:
264+
# Socket has buffer space — flush this connection's backlogged output.
265+
try:
266+
while conn.out_bytes:
267+
sent = sock.send(conn.out_bytes)
268+
conn.sent(sent)
269+
# All data drained; stop watching for writability.
270+
conn.events = selectors.EVENT_READ
271+
self.selector.modify(sock, selectors.EVENT_READ, data=conn)
272+
except (BlockingIOError, ssl.SSLWantWriteError):
273+
pass # Still full; will retry on the next EVENT_WRITE notification.
274+
except OSError as e:
275+
LOG.debug('%s closed while flushing backlog: %s', conn.name, e)
276+
self._unregister_conn(conn)
277+
sock.close()
278+
return
260279

261280
next_conn = conn.redirect_conn
262281
if next_conn and next_conn.out_bytes:
@@ -265,6 +284,15 @@ def service_connection(self, key: SelectorKeyProxy, mask):
265284
LOG.debug('sending to %s:\n%s', next_conn.name, next_conn.out_bytes)
266285
sent = next_conn.sock.send(next_conn.out_bytes)
267286
next_conn.sent(sent)
287+
# All sent; clear write interest if it was previously registered.
288+
if next_conn.events & selectors.EVENT_WRITE:
289+
next_conn.events = selectors.EVENT_READ
290+
self.selector.modify(next_conn.sock, selectors.EVENT_READ, data=next_conn)
291+
except (BlockingIOError, ssl.SSLWantWriteError):
292+
# next_conn's send buffer is full — register for writability so we retry when there's space.
293+
if not (next_conn.events & selectors.EVENT_WRITE):
294+
next_conn.events = selectors.EVENT_READ | selectors.EVENT_WRITE
295+
self.selector.modify(next_conn.sock, next_conn.events, data=next_conn)
268296
except OSError:
269297
# If one side is closed, close the other one
270298
# this can happen in the case where the client disconnects, and postgres still return a response

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
setup(
99
name='postgresql-proxy',
10-
version='0.3.1',
10+
version='0.3.2',
1111
description='Postgresql Proxy',
1212
packages=find_packages(exclude=('tests', 'tests.*')),
1313
install_requires=install_requires,

tests/test_proxy.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,70 @@ def test_repeated_connect_query_smoke_no_hang(postgres_settings, plain_proxy_por
226226
assert cur.fetchone() == (i,)
227227

228228

229+
@pytest.mark.timeout(60)
230+
@pytest.mark.parametrize("sslmode", ["disable", "require"])
231+
@pytest.mark.parametrize(
232+
["sql", "expected"],
233+
[
234+
pytest.param(
235+
"SELECT 1",
236+
[(1,)],
237+
id="tiny-1B",
238+
),
239+
pytest.param(
240+
"SELECT repeat('x', 1024)",
241+
[("x" * 1024,)],
242+
id="small-1KB",
243+
),
244+
pytest.param(
245+
"SELECT repeat('x', 102400)",
246+
[("x" * 102400,)],
247+
id="medium-100KB",
248+
),
249+
pytest.param(
250+
"SELECT repeat('x', 1048576)",
251+
[("x" * 1048576,)],
252+
id="large-1MB",
253+
),
254+
pytest.param(
255+
"SELECT repeat('x', 10485760)",
256+
[("x" * 10485760,)],
257+
id="xlarge-10MB",
258+
),
259+
pytest.param(
260+
"SELECT i FROM generate_series(1, 10000) AS t(i)",
261+
[(i,) for i in range(1, 10001)],
262+
id="rows-10k",
263+
),
264+
pytest.param(
265+
"SELECT i FROM generate_series(1, 100000) AS t(i)",
266+
[(i,) for i in range(1, 100001)],
267+
id="rows-100k",
268+
),
269+
]
270+
)
271+
def test_various_payload_sizes(
272+
postgres_settings,
273+
plain_proxy_port,
274+
ssl_proxy_port,
275+
sslmode,
276+
sql,
277+
expected,
278+
):
279+
with psycopg2.connect(
280+
host="127.0.0.1",
281+
port=plain_proxy_port if sslmode == "disable" else ssl_proxy_port,
282+
user=postgres_settings["user"],
283+
password=postgres_settings["password"],
284+
dbname=postgres_settings["dbname"],
285+
sslmode=sslmode,
286+
connect_timeout=3,
287+
) as conn:
288+
with conn.cursor() as cur:
289+
cur.execute(sql)
290+
assert cur.fetchall() == expected
291+
292+
229293
@pytest.mark.timeout(60)
230294
def test_psql_ssl_file_batch_stress_no_hang(postgres_settings, ssl_proxy_port):
231295
if shutil.which("psql") is None:

0 commit comments

Comments
 (0)