Skip to content

Commit fb7a5aa

Browse files
committed
fix: Limit concurrent schema cache loads
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. Triggering schema cache reload immediately upon receival of notification by the listener leads to thundering herd problem in PostgREST cluster. This change adds limiting of number of concurrent schema cache loading queries using advisory locks.
1 parent b3d2bfa commit fb7a5aa

3 files changed

Lines changed: 155 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ All notable changes to this project will be documented in this file. From versio
3232
### Fixed
3333

3434
- Fix leaking table and function names when calculating error hint by @taimoorzaeem in #4675
35+
- Limit concurrent schema cache loads by @mkleczek in #4643
3536

3637
## [14.5] - 2026-02-12
3738

src/PostgREST/AppState.hs

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE NamedFieldPuns #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE LambdaCase #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
3+
{-# LANGUAGE QuasiQuotes #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE TypeApplications #-}
46

57
module PostgREST.AppState
68
( AppState
@@ -32,7 +34,8 @@ import qualified Data.ByteString.Char8 as BS
3234
import Data.Either.Combinators (whenLeft)
3335
import qualified Hasql.Pool as SQL
3436
import qualified Hasql.Pool.Config as SQL
35-
import qualified Hasql.Session as SQL
37+
import qualified Hasql.Session as SQL hiding (statement)
38+
import qualified Hasql.Transaction as SQL hiding (sql)
3639
import qualified Hasql.Transaction.Sessions as SQL
3740
import qualified Network.HTTP.Types.Status as HTTP
3841
import qualified PostgREST.Auth.JwtCache as JwtCache
@@ -62,11 +65,17 @@ import PostgREST.Config.Database (queryDbSettings,
6265
queryRoleSettings)
6366
import PostgREST.Config.PgVersion (PgVersion (..),
6467
minimumPgVersion)
68+
import PostgREST.Metrics (MetricsState (connTrack))
6569
import PostgREST.SchemaCache (SchemaCache (..),
6670
querySchemaCache,
6771
showSummary)
6872
import PostgREST.SchemaCache.Identifiers (quoteQi)
6973

74+
import qualified Hasql.Decoders as HD
75+
import qualified Hasql.Encoders as HE
76+
import qualified Hasql.Statement as SQL
77+
import NeatInterpolation (trimming)
78+
7079
import Protolude
7180

7281
data AppState = AppState
@@ -299,7 +308,7 @@ getObserver = stateObserver
299308
-- + Because connections cache the pg catalog(see #2620)
300309
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
301310
retryingSchemaCacheLoad :: AppState -> IO ()
302-
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
311+
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId, stateMetrics} =
303312
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
304313
when (rsIterNumber > 0) $ do
305314
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
@@ -340,9 +349,23 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
340349
qSchemaCache :: IO (Maybe SchemaCache)
341350
qSchemaCache = do
342351
conf@AppConfig{..} <- getConfig appState
352+
-- Throttle concurrent schema cache loads, guarded by advisory locks.
353+
-- This is to prevent thundering herd problem on startup or when many PostgREST
354+
-- instances receive "reload schema" notifications at the same time
355+
-- See get_lock_sql for details of the algorithm.
356+
-- Here we calculate the number of open connections passed to the query.
357+
Metrics.ConnStats connected inUse <- Metrics.connectionCounts $ connTrack stateMetrics
358+
-- Determine whether schema cache loading will create a new session
359+
let
360+
-- if all connections in use but pool not full - schema cache loading will create session
361+
scLoadingSessions = if connected <= inUse && inUse < configDbPoolSize then 1 else 0
362+
withTxLock = SQL.statement
363+
(fromIntegral $ connected + scLoadingSessions)
364+
(SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements)
365+
343366
(resultTime, result) <-
344367
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
345-
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
368+
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
346369
case result of
347370
Left e -> do
348371
markSchemaCachePending appState
@@ -359,6 +382,43 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
359382
observer . uncurry SchemaCacheLoadedObs =<< timeItT (evaluate $ showSummary sCache)
360383
markSchemaCacheLoaded appState
361384
return $ Just sCache
385+
where
386+
-- Recursive query that tries acquiring locks in order
387+
-- and waits for randomly selected lock if no attempt succeeded.
388+
-- It has a single parameter: this node open connection count.
389+
-- It is used to estimate the number of nodes
390+
-- by counting the number of active sessions for current session_user
391+
-- and dividing it by this node open connections.
392+
-- Assuming load is uniform among cluster nodes, all should have
393+
-- statistically the same number of open connections.
394+
-- Once the number of nodes is known we calculate the number
395+
-- of locks as ceil(log(2, number_of_nodes))
396+
get_lock_sql = encodeUtf8 [trimming|
397+
WITH RECURSIVE attempts AS (
398+
SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id, 1) AS success FROM parameters
399+
UNION ALL
400+
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
401+
FROM
402+
parameters CROSS JOIN LATERAL (
403+
SELECT lock_number + 1 AS next_lock_number FROM attempts
404+
WHERE NOT success AND lock_number < locks_count
405+
ORDER BY lock_number DESC
406+
LIMIT 1
407+
) AS previous_attempt
408+
),
409+
counts AS (
410+
SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
411+
FROM
412+
pg_stat_activity WHERE usename = SESSION_USER
413+
),
414+
parameters AS (
415+
SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
416+
)
417+
SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
418+
FROM
419+
parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
420+
421+
get_lock_params = HE.param (HE.nonNullable HE.int4)
362422

363423
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
364424
shouldRetry _ (pgVer, sCache) = do

test/io/test_io.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"Unit tests for Input/Ouput of PostgREST seen as a black box."
22

3+
import contextlib
34
import os
45
import re
56
import signal
@@ -18,6 +19,7 @@
1819
sleep_until_postgrest_full_reload,
1920
sleep_until_postgrest_scache_reload,
2021
wait_until_exit,
22+
wait_until_status_code,
2123
)
2224

2325

@@ -1098,6 +1100,91 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
10981100
assert response.status_code == 200
10991101

11001102

1103+
@pytest.mark.parametrize(
1104+
"instance_count, expected_concurrency", [(2, 2), (4, 3), (6, 4), (8, 4), (16, 5)]
1105+
)
1106+
def test_schema_cache_reload_throttled_with_advisory_locks(
1107+
instance_count, expected_concurrency, slow_schema_cache_env
1108+
):
1109+
"schema cache reloads should be throttled across instances if instance count > 10"
1110+
1111+
internal_sleep_ms = int(
1112+
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
1113+
)
1114+
lock_wait_threshold_ms = internal_sleep_ms * 2
1115+
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")
1116+
1117+
def read_available_output_lines(postgrest):
1118+
try:
1119+
output = postgrest.process.stdout.read()
1120+
except BlockingIOError:
1121+
return []
1122+
1123+
if not output:
1124+
return []
1125+
return output.decode().splitlines()
1126+
1127+
with contextlib.ExitStack() as stack:
1128+
instances = [
1129+
stack.enter_context(
1130+
run(
1131+
env=slow_schema_cache_env,
1132+
wait_for_readiness=False,
1133+
wait_max_seconds=10,
1134+
)
1135+
)
1136+
for _ in range(instance_count)
1137+
]
1138+
1139+
for postgrest in instances:
1140+
wait_until_status_code(
1141+
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
1142+
)
1143+
1144+
# Drop startup logs so only reload logs are parsed.
1145+
for postgrest in instances:
1146+
read_available_output_lines(postgrest)
1147+
1148+
response = instances[0].session.get("/rpc/notify_pgrst")
1149+
assert response.status_code == 204
1150+
1151+
# Wait long enough for the lock-throttled cache reloads to finish.
1152+
time.sleep((internal_sleep_ms / 1000) * 2)
1153+
1154+
reload_durations_ms = []
1155+
for postgrest in instances:
1156+
output_lines = []
1157+
for _ in range(instance_count * 2):
1158+
output_lines.extend(read_available_output_lines(postgrest))
1159+
if any(query_log_pattern.search(line) for line in output_lines):
1160+
break
1161+
time.sleep(0.2)
1162+
1163+
durations = []
1164+
for line in output_lines:
1165+
match = query_log_pattern.search(line)
1166+
if match:
1167+
durations.append(float(match.group(1)))
1168+
1169+
assert durations
1170+
reload_durations_ms.append(max(durations))
1171+
1172+
assert len(reload_durations_ms) == instance_count
1173+
1174+
# 10 instances should be fast, remaining instances should be slow
1175+
assert (
1176+
instance_count
1177+
- len(
1178+
[
1179+
duration
1180+
for duration in reload_durations_ms
1181+
if duration > lock_wait_threshold_ms
1182+
]
1183+
)
1184+
== expected_concurrency
1185+
)
1186+
1187+
11011188
def test_schema_cache_query_sleep_logs(defaultenv):
11021189
"""Schema cache sleep should be reflected in the logged query duration."""
11031190

@@ -1731,7 +1818,7 @@ def test_requests_with_resource_embedding_wait_for_schema_cache_reload(defaulten
17311818
env = {
17321819
**defaultenv,
17331820
"PGRST_DB_POOL": "2",
1734-
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5100",
1821+
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5200",
17351822
}
17361823

17371824
with run(env=env, wait_max_seconds=30) as postgrest:

0 commit comments

Comments
 (0)