Skip to content

Commit e5440c0

Browse files
committed
fix: Limit concurrent schema cache loads
Triggering schema cache reload immediately upon receival of notification by the listener leads to thundering herd problem in PostgREST cluster. This change adds limiting of number of concurrent schema cache loading queries using advisory locks.
1 parent 202f84e commit e5440c0

3 files changed

Lines changed: 158 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ All notable changes to this project will be documented in this file. From versio
5353

5454
- Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645
5555
- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622
56+
- Limit concurrent schema cache loads by @mkleczek in #4643
5657

5758
## [14.9] - 2026-04-10
5859

src/PostgREST/AppState.hs

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE NamedFieldPuns #-}
3-
{-# LANGUAGE RecordWildCards #-}
4-
{-# LANGUAGE RecursiveDo #-}
1+
{-# LANGUAGE LambdaCase #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
3+
{-# LANGUAGE QuasiQuotes #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE RecursiveDo #-}
6+
{-# LANGUAGE TypeApplications #-}
57

68
module PostgREST.AppState
79
( AppState
@@ -33,7 +35,8 @@ import qualified Data.ByteString.Char8 as BS
3335
import Data.Either.Combinators (whenLeft)
3436
import qualified Hasql.Pool as SQL
3537
import qualified Hasql.Pool.Config as SQL
36-
import qualified Hasql.Session as SQL
38+
import qualified Hasql.Session as SQL hiding (statement)
39+
import qualified Hasql.Transaction as SQL hiding (sql)
3740
import qualified Hasql.Transaction.Sessions as SQL
3841
import qualified Network.HTTP.Types.Status as HTTP
3942
import qualified PostgREST.Auth.JwtCache as JwtCache
@@ -63,11 +66,17 @@ import PostgREST.Config.Database (queryDbSettings,
6366
import PostgREST.Config.PgVersion (PgVersion (..),
6467
minimumPgVersion)
6568
import PostgREST.Debounce (makeDebouncer)
69+
import PostgREST.Metrics (MetricsState (connTrack))
6670
import PostgREST.SchemaCache (SchemaCache (..),
6771
querySchemaCache,
6872
showSummary)
6973
import PostgREST.SchemaCache.Identifiers (quoteQi)
7074

75+
import qualified Hasql.Decoders as HD
76+
import qualified Hasql.Encoders as HE
77+
import qualified Hasql.Statement as SQL
78+
import NeatInterpolation (trimming)
79+
7180
import Protolude
7281

7382
data AppState = AppState
@@ -303,7 +312,7 @@ getObserver = stateObserver
303312
-- + Because connections cache the pg catalog(see #2620)
304313
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
305314
retryingSchemaCacheLoad :: AppState -> IO ()
306-
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
315+
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId, stateMetrics} =
307316
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
308317
when (rsIterNumber > 0) $ do
309318
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
@@ -342,8 +351,22 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
342351
qSchemaCache :: IO (Maybe SchemaCache)
343352
qSchemaCache = do
344353
conf@AppConfig{..} <- getConfig appState
354+
-- Throttle concurrent schema cache loads, guarded by advisory locks.
355+
-- This is to prevent thundering herd problem on startup or when many PostgREST
356+
-- instances receive "reload schema" notifications at the same time
357+
-- See get_lock_sql for details of the algorithm.
358+
-- Here we calculate the number of open connections passed to the query.
359+
Metrics.ConnStats connected inUse <- Metrics.connectionCounts $ connTrack stateMetrics
360+
-- Determine whether schema cache loading will create a new session
361+
let
362+
-- if all connections in use but pool not full - schema cache loading will create session
363+
scLoadingSessions = if connected <= inUse && inUse < configDbPoolSize then 1 else 0
364+
withTxLock = SQL.statement
365+
(fromIntegral $ connected + scLoadingSessions)
366+
(SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements)
367+
345368
(resultTime, result) <-
346-
timeItT $ usePool appState (SQL.transactionNoRetry SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
369+
timeItT $ usePool appState (SQL.transactionNoRetry SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
347370
case result of
348371
Left e -> do
349372
markSchemaCachePending appState
@@ -365,6 +388,43 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
365388
observer $ SchemaCacheLoadedObs loadTime summary
366389
markSchemaCacheLoaded appState
367390
return $ Just sCache
391+
where
392+
-- Recursive query that tries acquiring locks in order
393+
-- and waits for randomly selected lock if no attempt succeeded.
394+
-- It has a single parameter: this node open connection count.
395+
-- It is used to estimate the number of nodes
396+
-- by counting the number of active sessions for current session_user
397+
-- and dividing it by this node open connections.
398+
-- Assuming load is uniform among cluster nodes, all should have
399+
-- statistically the same number of open connections.
400+
-- Once the number of nodes is known we calculate the number
401+
-- of locks as ceil(log(2, number_of_nodes))
402+
get_lock_sql = encodeUtf8 [trimming|
403+
WITH RECURSIVE attempts AS (
404+
SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id, 1) AS success FROM parameters
405+
UNION ALL
406+
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
407+
FROM
408+
parameters CROSS JOIN LATERAL (
409+
SELECT lock_number + 1 AS next_lock_number FROM attempts
410+
WHERE NOT success AND lock_number < locks_count
411+
ORDER BY lock_number DESC
412+
LIMIT 1
413+
) AS previous_attempt
414+
),
415+
counts AS (
416+
SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
417+
FROM
418+
pg_stat_activity WHERE usename = SESSION_USER
419+
),
420+
parameters AS (
421+
SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
422+
)
423+
SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
424+
FROM
425+
parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
426+
427+
get_lock_params = HE.param (HE.nonNullable HE.int4)
368428

369429
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
370430
shouldRetry _ (pgVer, sCache) = do

test/io/test_io.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"Unit tests for Input/Ouput of PostgREST seen as a black box."
22

3+
import contextlib
34
import os
45
import re
56
import signal
@@ -19,6 +20,7 @@
1920
sleep_until_postgrest_full_reload,
2021
sleep_until_postgrest_scache_reload,
2122
wait_until_exit,
23+
wait_until_status_code,
2224
)
2325

2426

@@ -1367,6 +1369,93 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
13671369
assert response.status_code == 200
13681370

13691371

1372+
@pytest.mark.parametrize(
1373+
"instance_count, expected_concurrency", [(2, 2), (4, 3), (6, 4), (8, 4), (16, 5)]
1374+
)
1375+
def test_schema_cache_reload_throttled_with_advisory_locks(
1376+
instance_count, expected_concurrency, slow_schema_cache_env
1377+
):
1378+
"schema cache reloads should be throttled across instances"
1379+
1380+
internal_sleep_ms = int(
1381+
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
1382+
)
1383+
lock_wait_threshold_ms = internal_sleep_ms * 2
1384+
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")
1385+
1386+
def read_available_output_lines(postgrest):
1387+
try:
1388+
output = postgrest.process.stdout.read()
1389+
except BlockingIOError:
1390+
return []
1391+
1392+
if not output:
1393+
return []
1394+
return output.decode().splitlines()
1395+
1396+
with contextlib.ExitStack() as stack:
1397+
instances = [
1398+
stack.enter_context(
1399+
run(
1400+
env=slow_schema_cache_env,
1401+
wait_for_readiness=False,
1402+
wait_max_seconds=10,
1403+
)
1404+
)
1405+
for _ in range(instance_count)
1406+
]
1407+
1408+
for postgrest in instances:
1409+
wait_until_status_code(
1410+
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
1411+
)
1412+
1413+
# Drop startup logs so only reload logs are parsed.
1414+
for postgrest in instances:
1415+
read_available_output_lines(postgrest)
1416+
1417+
response = instances[0].session.get("/rpc/notify_pgrst")
1418+
assert response.status_code == 204
1419+
1420+
# Wait long enough for the lock-throttled cache reloads to finish.
1421+
time.sleep((internal_sleep_ms / 1000) * 2)
1422+
1423+
reload_durations_ms = []
1424+
for postgrest in instances:
1425+
output_lines = []
1426+
for _ in range(instance_count * 2):
1427+
output_lines.extend(read_available_output_lines(postgrest))
1428+
if any(query_log_pattern.search(line) for line in output_lines):
1429+
break
1430+
time.sleep(0.2)
1431+
1432+
durations = []
1433+
for line in output_lines:
1434+
match = query_log_pattern.search(line)
1435+
if match:
1436+
durations.append(float(match.group(1)))
1437+
1438+
assert durations
1439+
reload_durations_ms.append(max(durations))
1440+
1441+
assert len(reload_durations_ms) == instance_count
1442+
1443+
# expected_concurrency instances should have
1444+
# reload_durations_ms <= lock_wait_threshold_ms
1445+
# the rest should wait
1446+
assert (
1447+
instance_count
1448+
- len(
1449+
[
1450+
duration
1451+
for duration in reload_durations_ms
1452+
if duration > lock_wait_threshold_ms
1453+
]
1454+
)
1455+
== expected_concurrency
1456+
)
1457+
1458+
13701459
def test_schema_cache_query_sleep_logs(defaultenv):
13711460
"""Schema cache sleep should be reflected in the logged query duration."""
13721461

@@ -2060,7 +2149,7 @@ def test_requests_with_resource_embedding_wait_for_schema_cache_reload(defaulten
20602149
env = {
20612150
**defaultenv,
20622151
"PGRST_DB_POOL": "2",
2063-
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5100",
2152+
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5200",
20642153
}
20652154

20662155
with run(env=env, wait_max_seconds=30) as postgrest:

0 commit comments

Comments
 (0)