Skip to content

Commit f3e0692

Browse files
committed
fix: Limit concurrent schema cache loads
Triggering schema cache reload immediately upon receival of notification by the listener leads to thundering herd problem in PostgREST cluster. This change adds limiting of number of concurrent schema cache loading queries using advisory locks.
1 parent 7e9df64 commit f3e0692

3 files changed

Lines changed: 164 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ All notable changes to this project will be documented in this file. From versio
1313
- Optimize requests with `Prefer: count=exact` that do not use ranges or `db-max-rows` by @laurenceisla in #3957
1414
+ Removed unnecessary double count when building the `Content-Range`.
1515

16+
### Fixed
17+
18+
- Limit concurrent schema cache loads by @mkleczek in #4643
19+
1620
### Changed
1721

1822
- Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359

src/PostgREST/AppState.hs

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE NamedFieldPuns #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE LambdaCase #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
3+
{-# LANGUAGE QuasiQuotes #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE TypeApplications #-}
46

57
module PostgREST.AppState
68
( AppState
@@ -36,7 +38,8 @@ import Data.Either.Combinators (whenLeft)
3638
import qualified Data.Text as T (unpack)
3739
import qualified Hasql.Pool as SQL
3840
import qualified Hasql.Pool.Config as SQL
39-
import qualified Hasql.Session as SQL
41+
import qualified Hasql.Session as SQL hiding (statement)
42+
import qualified Hasql.Transaction as SQL hiding (sql)
4043
import qualified Hasql.Transaction.Sessions as SQL
4144
import qualified Network.HTTP.Types.Status as HTTP
4245
import qualified Network.Socket as NS
@@ -73,9 +76,16 @@ import PostgREST.SchemaCache (SchemaCache (..),
7376
import PostgREST.SchemaCache.Identifiers (quoteQi)
7477
import PostgREST.Unix (createAndBindDomainSocket)
7578

76-
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
77-
import Data.String (IsString (..))
78-
import Protolude
79+
import Data.Streaming.Network (bindPortTCP,
80+
bindRandomPortTCP)
81+
import Data.String (IsString (..))
82+
import qualified Hasql.Decoders as HD
83+
import qualified Hasql.Encoders as HE
84+
import qualified Hasql.Statement as SQL
85+
import NeatInterpolation (trimming)
86+
import PostgREST.Metrics (MetricsState (connTrack))
87+
import Protolude
88+
7989

8090
data AppState = AppState
8191
-- | Database connection pool
@@ -360,7 +370,7 @@ getObserver = stateObserver
360370
-- + Because connections cache the pg catalog(see #2620)
361371
-- + For rapid recovery. Otherwise, the pool idle or lifetime timeout would have to be reached for new healthy connections to be acquired.
362372
retryingSchemaCacheLoad :: AppState -> IO ()
363-
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId} =
373+
retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThreadId=mainThreadId, stateMetrics} =
364374
void $ retrying retryPolicy shouldRetry (\RetryStatus{rsIterNumber, rsPreviousDelay} -> do
365375
when (rsIterNumber > 0) $ do
366376
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
@@ -402,9 +412,25 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
402412
qSchemaCache :: IO (Maybe SchemaCache)
403413
qSchemaCache = do
404414
conf@AppConfig{..} <- getConfig appState
415+
-- Throttle concurrent schema cache loads, guarded by advisory locks.
416+
-- This is to prevent thundering herd problem on startup or when many PostgREST
417+
-- instances receive "reload schema" notifications at the same time
418+
-- schema reloading session + listener session
419+
-- See get_lock_sql for details of the algorithm.
420+
-- Here we calculate the number of open connections passed to the query.
421+
Metrics.ConnStats connected inUse <- Metrics.connectionCounts $ connTrack stateMetrics
422+
-- Determine whether schema cache loading will create a new session
423+
let
424+
scLoadingSessions = case (connected <= inUse, inUse >= configDbPoolSize) of
425+
(True, False) -> 1 -- all connections in use but pool not full - schema cache loading will create session
426+
_ -> 0
427+
withTxLock = SQL.statement
428+
(fromIntegral $ connected + scLoadingSessions)
429+
(SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements)
430+
405431
(resultTime, result) <-
406432
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
407-
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
433+
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
408434
case result of
409435
Left e -> do
410436
putSCacheStatus appState SCPending
@@ -422,6 +448,43 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
422448
observer $ SchemaCacheLoadedObs t
423449
putSCacheStatus appState SCLoaded
424450
return $ Just sCache
451+
where
452+
-- Recursive query that tries acquiring locks in order
453+
-- and waits for randomly selected lock if no attempt succeeded.
454+
-- It has a single parameter: this node open connection count.
455+
-- It is used to estimate the number of nodes
456+
-- by counting the number of active sessions for current session_user
457+
-- and dividing it by this node open connections.
458+
-- Assuming load is uniform among cluster nodes, all should have
459+
-- statistically the same number of open connections.
460+
-- Once the number of nodes is known we calculate the number
461+
-- of locks as ceil(log(2, number_of_nodes))
462+
get_lock_sql = encodeUtf8 [trimming|
463+
WITH RECURSIVE attempts AS (
464+
SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id, 1) AS success FROM parameters
465+
UNION ALL
466+
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
467+
FROM
468+
parameters CROSS JOIN LATERAL (
469+
SELECT lock_number + 1 AS next_lock_number FROM attempts
470+
WHERE NOT success AND lock_number < locks_count
471+
ORDER BY lock_number DESC
472+
LIMIT 1
473+
) AS previous_attempt
474+
),
475+
counts AS (
476+
SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
477+
FROM
478+
pg_stat_activity WHERE usename = SESSION_USER
479+
),
480+
parameters AS (
481+
SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
482+
)
483+
SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
484+
FROM
485+
parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
486+
487+
get_lock_params = HE.param (HE.nonNullable HE.int4)
425488

426489
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
427490
shouldRetry _ (pgVer, sCache) = do

test/io/test_io.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"Unit tests for Input/Ouput of PostgREST seen as a black box."
22

3+
import contextlib
34
import os
45
import re
56
import signal
@@ -18,6 +19,7 @@
1819
sleep_until_postgrest_full_reload,
1920
sleep_until_postgrest_scache_reload,
2021
wait_until_exit,
22+
wait_until_status_code,
2123
)
2224

2325

@@ -1058,6 +1060,91 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
10581060
assert response.status_code == 200
10591061

10601062

1063+
@pytest.mark.parametrize(
1064+
"instance_count, expected_concurrency", [(2, 2), (4, 3), (6, 4), (8, 4), (16, 5)]
1065+
)
1066+
def test_schema_cache_reload_throttled_with_advisory_locks(
1067+
instance_count, expected_concurrency, slow_schema_cache_env
1068+
):
1069+
"schema cache reloads should be throttled across instances if instance count > 10"
1070+
1071+
internal_sleep_ms = int(
1072+
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
1073+
)
1074+
lock_wait_threshold_ms = internal_sleep_ms * 2
1075+
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")
1076+
1077+
def read_available_output_lines(postgrest):
1078+
try:
1079+
output = postgrest.process.stdout.read()
1080+
except BlockingIOError:
1081+
return []
1082+
1083+
if not output:
1084+
return []
1085+
return output.decode().splitlines()
1086+
1087+
with contextlib.ExitStack() as stack:
1088+
instances = [
1089+
stack.enter_context(
1090+
run(
1091+
env=slow_schema_cache_env,
1092+
wait_for_readiness=False,
1093+
wait_max_seconds=10,
1094+
)
1095+
)
1096+
for _ in range(instance_count)
1097+
]
1098+
1099+
for postgrest in instances:
1100+
wait_until_status_code(
1101+
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
1102+
)
1103+
1104+
# Drop startup logs so only reload logs are parsed.
1105+
for postgrest in instances:
1106+
read_available_output_lines(postgrest)
1107+
1108+
response = instances[0].session.get("/rpc/notify_pgrst")
1109+
assert response.status_code == 204
1110+
1111+
# Wait long enough for the lock-throttled cache reloads to finish.
1112+
time.sleep((internal_sleep_ms / 1000) * 2)
1113+
1114+
reload_durations_ms = []
1115+
for postgrest in instances:
1116+
output_lines = []
1117+
for _ in range(instance_count * 2):
1118+
output_lines.extend(read_available_output_lines(postgrest))
1119+
if any(query_log_pattern.search(line) for line in output_lines):
1120+
break
1121+
time.sleep(0.2)
1122+
1123+
durations = []
1124+
for line in output_lines:
1125+
match = query_log_pattern.search(line)
1126+
if match:
1127+
durations.append(float(match.group(1)))
1128+
1129+
assert durations
1130+
reload_durations_ms.append(max(durations))
1131+
1132+
assert len(reload_durations_ms) == instance_count
1133+
1134+
# 10 instances should be fast, remaining instances should be slow
1135+
assert (
1136+
instance_count
1137+
- len(
1138+
[
1139+
duration
1140+
for duration in reload_durations_ms
1141+
if duration > lock_wait_threshold_ms
1142+
]
1143+
)
1144+
== expected_concurrency
1145+
)
1146+
1147+
10611148
def test_schema_cache_query_sleep_logs(defaultenv):
10621149
"""Schema cache sleep should be reflected in the logged query duration."""
10631150

@@ -1691,7 +1778,7 @@ def test_requests_with_resource_embedding_wait_for_schema_cache_reload(defaulten
16911778
env = {
16921779
**defaultenv,
16931780
"PGRST_DB_POOL": "2",
1694-
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5100",
1781+
"PGRST_INTERNAL_SCHEMA_CACHE_RELATIONSHIP_LOAD_SLEEP": "5200",
16951782
}
16961783

16971784
with run(env=env, wait_max_seconds=30) as postgrest:

0 commit comments

Comments
 (0)