Skip to content

Commit 50f945f

Browse files
committed
fix: Limit concurrent schema cache loads
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. Triggering schema cache reload immediately upon receival of notification by the listener leads to thundering herd problem in PostgREST cluster. This change adds limiting of number of concurrent schema cache loading queries using advisory locks.
1 parent 200830f commit 50f945f

3 files changed

Lines changed: 162 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ All notable changes to this project will be documented in this file. From versio
1414
+ Removed unnecessary double count when building the `Content-Range`.
1515
- Add config `client_error_verbosity` to customize error verbosity by @taimoorzaeem in #4088, #3980, #3824
1616

17+
### Fixed
18+
19+
- Limit concurrent schema cache loads by @mkleczek in #4643
20+
1721
### Changed
1822

1923
- Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359

src/PostgREST/AppState.hs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE NamedFieldPuns #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE LambdaCase #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
3+
{-# LANGUAGE QuasiQuotes #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE TypeApplications #-}
46

57
module PostgREST.AppState
68
( AppState
@@ -36,7 +38,8 @@ import Data.Either.Combinators (whenLeft)
3638
import qualified Data.Text as T (unpack)
3739
import qualified Hasql.Pool as SQL
3840
import qualified Hasql.Pool.Config as SQL
39-
import qualified Hasql.Session as SQL
41+
import qualified Hasql.Session as SQL hiding (statement)
42+
import qualified Hasql.Transaction as SQL hiding (sql)
4043
import qualified Hasql.Transaction.Sessions as SQL
4144
import qualified Network.HTTP.Types.Status as HTTP
4245
import qualified Network.Socket as NS
@@ -73,9 +76,16 @@ import PostgREST.SchemaCache (SchemaCache (..),
7376
import PostgREST.SchemaCache.Identifiers (quoteQi)
7477
import PostgREST.Unix (createAndBindDomainSocket)
7578

76-
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
77-
import Data.String (IsString (..))
78-
import Protolude
79+
import Data.Streaming.Network (bindPortTCP,
80+
bindRandomPortTCP)
81+
import Data.String (IsString (..))
82+
import qualified Hasql.Decoders as HD
83+
import qualified Hasql.Encoders as HE
84+
import qualified Hasql.Statement as SQL
85+
import NeatInterpolation (trimming)
86+
import PostgREST.Metrics (MetricsState (connTrack))
87+
import Protolude
88+
7989

8090
data AppState = AppState
8191
-- | Database connection pool
@@ -360,7 +370,7 @@ getObserver = stateObserver
360370
-- + Because connections cache the pg catalog(see #2620)
361371
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
362372
retryingSchemaCacheLoad :: AppState -> IO ()
363-
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
373+
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId, stateMetrics} =
364374
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
365375
when (rsIterNumber > 0) $ do
366376
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
@@ -401,9 +411,23 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
401411
qSchemaCache :: IO (Maybe SchemaCache)
402412
qSchemaCache = do
403413
conf@AppConfig{..} <- getConfig appState
414+
-- Throttle concurrent schema cache loads, guarded by advisory locks.
415+
-- This is to prevent thundering herd problem on startup or when many PostgREST
416+
-- instances receive "reload schema" notifications at the same time
417+
-- See get_lock_sql for details of the algorithm.
418+
-- Here we calculate the number of open connections passed to the query.
419+
Metrics.ConnStats connected inUse <- Metrics.connectionCounts $ connTrack stateMetrics
420+
-- Determine whether schema cache loading will create a new session
421+
let
422+
-- if all connections in use but pool not full - schema cache loading will create session
423+
scLoadingSessions = if connected <= inUse && inUse < configDbPoolSize then 1 else 0
424+
withTxLock = SQL.statement
425+
(fromIntegral $ connected + scLoadingSessions)
426+
(SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements)
427+
404428
(resultTime, result) <-
405429
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
406-
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
430+
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
407431
case result of
408432
Left e -> do
409433
putSCacheStatus appState SCPending
@@ -420,6 +444,43 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
420444
observer . uncurry SchemaCacheLoadedObs =<< timeItT (evaluate $ showSummary sCache)
421445
putSCacheStatus appState SCLoaded
422446
return $ Just sCache
447+
where
448+
-- Recursive query that tries acquiring locks in order
449+
-- and waits for randomly selected lock if no attempt succeeded.
450+
-- It has a single parameter: this node open connection count.
451+
-- It is used to estimate the number of nodes
452+
-- by counting the number of active sessions for current session_user
453+
-- and dividing it by this node open connections.
454+
-- Assuming load is uniform among cluster nodes, all should have
455+
-- statistically the same number of open connections.
456+
-- Once the number of nodes is known we calculate the number
457+
-- of locks as ceil(log(2, number_of_nodes))
458+
get_lock_sql = encodeUtf8 [trimming|
459+
WITH RECURSIVE attempts AS (
460+
SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id, 1) AS success FROM parameters
461+
UNION ALL
462+
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
463+
FROM
464+
parameters CROSS JOIN LATERAL (
465+
SELECT lock_number + 1 AS next_lock_number FROM attempts
466+
WHERE NOT success AND lock_number < locks_count
467+
ORDER BY lock_number DESC
468+
LIMIT 1
469+
) AS previous_attempt
470+
),
471+
counts AS (
472+
SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
473+
FROM
474+
pg_stat_activity WHERE usename = SESSION_USER
475+
),
476+
parameters AS (
477+
SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
478+
)
479+
SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
480+
FROM
481+
parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
482+
483+
get_lock_params = HE.param (HE.nonNullable HE.int4)
423484

424485
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
425486
shouldRetry _ (pgVer, sCache) = do

test/io/test_io.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"Unit tests for Input/Ouput of PostgREST seen as a black box."
22

3+
import contextlib
34
import os
45
import re
56
import signal
@@ -18,6 +19,7 @@
1819
sleep_until_postgrest_full_reload,
1920
sleep_until_postgrest_scache_reload,
2021
wait_until_exit,
22+
wait_until_status_code,
2123
)
2224

2325

@@ -1058,6 +1060,91 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
10581060
assert response.status_code == 200
10591061

10601062

1063+
@pytest.mark.parametrize(
1064+
"instance_count, expected_concurrency", [(2, 2), (4, 3), (6, 4), (8, 4), (16, 5)]
1065+
)
1066+
def test_schema_cache_reload_throttled_with_advisory_locks(
1067+
instance_count, expected_concurrency, slow_schema_cache_env
1068+
):
1069+
"schema cache reloads should be throttled across instances if instance count > 10"
1070+
1071+
internal_sleep_ms = int(
1072+
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
1073+
)
1074+
lock_wait_threshold_ms = internal_sleep_ms * 2
1075+
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")
1076+
1077+
def read_available_output_lines(postgrest):
1078+
try:
1079+
output = postgrest.process.stdout.read()
1080+
except BlockingIOError:
1081+
return []
1082+
1083+
if not output:
1084+
return []
1085+
return output.decode().splitlines()
1086+
1087+
with contextlib.ExitStack() as stack:
1088+
instances = [
1089+
stack.enter_context(
1090+
run(
1091+
env=slow_schema_cache_env,
1092+
wait_for_readiness=False,
1093+
wait_max_seconds=10,
1094+
)
1095+
)
1096+
for _ in range(instance_count)
1097+
]
1098+
1099+
for postgrest in instances:
1100+
wait_until_status_code(
1101+
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
1102+
)
1103+
1104+
# Drop startup logs so only reload logs are parsed.
1105+
for postgrest in instances:
1106+
read_available_output_lines(postgrest)
1107+
1108+
response = instances[0].session.get("/rpc/notify_pgrst")
1109+
assert response.status_code == 204
1110+
1111+
# Wait long enough for the lock-throttled cache reloads to finish.
1112+
time.sleep((internal_sleep_ms / 1000) * 2)
1113+
1114+
reload_durations_ms = []
1115+
for postgrest in instances:
1116+
output_lines = []
1117+
for _ in range(instance_count * 2):
1118+
output_lines.extend(read_available_output_lines(postgrest))
1119+
if any(query_log_pattern.search(line) for line in output_lines):
1120+
break
1121+
time.sleep(0.2)
1122+
1123+
durations = []
1124+
for line in output_lines:
1125+
match = query_log_pattern.search(line)
1126+
if match:
1127+
durations.append(float(match.group(1)))
1128+
1129+
assert durations
1130+
reload_durations_ms.append(max(durations))
1131+
1132+
assert len(reload_durations_ms) == instance_count
1133+
1134+
# 10 instances should be fast, remaining instances should be slow
1135+
assert (
1136+
instance_count
1137+
- len(
1138+
[
1139+
duration
1140+
for duration in reload_durations_ms
1141+
if duration > lock_wait_threshold_ms
1142+
]
1143+
)
1144+
== expected_concurrency
1145+
)
1146+
1147+
10611148
def test_schema_cache_query_sleep_logs(defaultenv):
10621149
"""Schema cache sleep should be reflected in the logged query duration."""
10631150

@@ -1691,7 +1778,7 @@ def test_requests_with_resource_embedding_wait_for_schema_cache_reload(defaulten
16911778
env = {
16921779
**defaultenv,
16931780
"PGRST_DB_POOL": "2",
1694-
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5100",
1781+
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5200",
16951782
}
16961783

16971784
with run(env=env, wait_max_seconds=30) as postgrest:

0 commit comments

Comments
 (0)