Skip to content

Commit a96c490

Browse files
committed
fix: Limit concurrent schema cache loads
Triggering schema cache reload immediately upon receival of notification by the listener leads to thundering herd problem in PostgREST cluster. This change adds limiting of number of concurrent schema cache loading queries using advisory locks.
1 parent aa50c06 commit a96c490

3 files changed

Lines changed: 126 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ All notable changes to this project will be documented in this file. From versio
1313
- Optimize requests with `Prefer: count=exact` that do not use ranges or `db-max-rows` by @laurenceisla in #3957
1414
+ Removed unnecessary double count when building the `Content-Range`.
1515

16+
### Fixed
17+
18+
- Limit concurrent schema cache loads by @mkleczek in #4643
19+
1620
### Changed
1721

1822
- Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359

src/PostgREST/AppState.hs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{-# LANGUAGE LambdaCase #-}
22
{-# LANGUAGE NamedFieldPuns #-}
3+
{-# LANGUAGE QuasiQuotes #-}
34
{-# LANGUAGE RecordWildCards #-}
45

56
module PostgREST.AppState
@@ -35,7 +36,8 @@ import Data.Either.Combinators (whenLeft)
3536
import qualified Data.Text as T (unpack)
3637
import qualified Hasql.Pool as SQL
3738
import qualified Hasql.Pool.Config as SQL
38-
import qualified Hasql.Session as SQL
39+
import qualified Hasql.Session as SQL hiding (statement)
40+
import qualified Hasql.Transaction as SQL hiding (sql)
3941
import qualified Hasql.Transaction.Sessions as SQL
4042
import qualified Network.HTTP.Types.Status as HTTP
4143
import qualified Network.Socket as NS
@@ -72,9 +74,16 @@ import PostgREST.SchemaCache (SchemaCache (..),
7274
import PostgREST.SchemaCache.Identifiers (quoteQi)
7375
import PostgREST.Unix (createAndBindDomainSocket)
7476

75-
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
76-
import Data.String (IsString (..))
77-
import Protolude
77+
import Data.Functor.Contravariant ((>$<))
78+
import Data.Streaming.Network (bindPortTCP,
79+
bindRandomPortTCP)
80+
import Data.String (IsString (..))
81+
import qualified Hasql.Decoders as HD
82+
import qualified Hasql.Encoders as HE
83+
import qualified Hasql.Statement as SQL
84+
import NeatInterpolation (trimming)
85+
import Protolude
86+
7887

7988
data AppState = AppState
8089
-- | Database connection pool
@@ -401,9 +410,15 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
401410
qSchemaCache :: IO (Maybe SchemaCache)
402411
qSchemaCache = do
403412
conf@AppConfig{..} <- getConfig appState
413+
-- Allow 10 concurrent schema cache loads, guarded by advisory locks.
414+
-- This is to prevent thundering herd problem on startup or when many PostgREST
415+
-- instances receive "reload schema" notifications at the same time
416+
let withTxLock = SQL.statement (50168275, 10) $
417+
SQL.Statement get_lock_sql get_lock_params HD.noResult configDbPreparedStatements
418+
404419
(resultTime, result) <-
405420
let transaction = if configDbPreparedStatements then SQL.transaction else SQL.unpreparedTransaction in
406-
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
421+
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ withTxLock *> querySchemaCache conf)
407422
case result of
408423
Left e -> do
409424
putSCacheStatus appState SCPending
@@ -421,6 +436,24 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
421436
observer $ SchemaCacheLoadedObs t
422437
putSCacheStatus appState SCLoaded
423438
return $ Just sCache
439+
where
440+
-- recursive query that tries acquiring locks in order
441+
-- and waits for randomly selected lock if no attempt succeeded
442+
-- parameters are lock number and number of locks to try
443+
get_lock_sql = encodeUtf8 [trimming|
444+
WITH RECURSIVE attempts AS (
445+
SELECT 1 AS lock_number, pg_try_advisory_xact_lock($$1, 1) AS success WHERE $$2 > 0
446+
UNION ALL
447+
SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock($$1, next_lock_number) AS success FROM (
448+
SELECT lock_number + 1 AS next_lock_number FROM attempts
449+
WHERE NOT success AND lock_number < $$2
450+
ORDER BY lock_number DESC
451+
LIMIT 1
452+
) AS previous_attempt
453+
)
454+
SELECT pg_advisory_xact_lock($$1, floor(random() * $$2)::int + 1) WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
455+
456+
get_lock_params = (fst >$< HE.param (HE.nonNullable HE.int4)) <> (snd >$< HE.param (HE.nonNullable HE.int4))
424457

425458
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
426459
shouldRetry _ (pgVer, sCache) = do

test/io/test_io.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"Unit tests for Input/Ouput of PostgREST seen as a black box."
22

3+
import contextlib
34
import os
45
import re
56
import signal
@@ -18,6 +19,7 @@
1819
sleep_until_postgrest_full_reload,
1920
sleep_until_postgrest_scache_reload,
2021
wait_until_exit,
22+
wait_until_status_code,
2123
)
2224

2325

@@ -1058,6 +1060,88 @@ def test_schema_cache_concurrent_notifications(slow_schema_cache_env):
10581060
assert response.status_code == 200
10591061

10601062

1063+
@pytest.mark.parametrize("instance_count", [10, 11])
1064+
def test_schema_cache_reload_throttled_with_advisory_locks(
1065+
instance_count, slow_schema_cache_env
1066+
):
1067+
"schema cache reloads should be throttled across instances if instance count > 10"
1068+
1069+
internal_sleep_ms = int(
1070+
slow_schema_cache_env["PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP"]
1071+
)
1072+
lock_wait_threshold_ms = internal_sleep_ms * 2
1073+
query_log_pattern = re.compile(r"Schema cache queried in ([\d.]+) milliseconds")
1074+
1075+
def read_available_output_lines(postgrest):
1076+
try:
1077+
output = postgrest.process.stdout.read()
1078+
except BlockingIOError:
1079+
return []
1080+
1081+
if not output:
1082+
return []
1083+
return output.decode().splitlines()
1084+
1085+
with contextlib.ExitStack() as stack:
1086+
instances = [
1087+
stack.enter_context(
1088+
run(
1089+
env=slow_schema_cache_env,
1090+
wait_for_readiness=False,
1091+
wait_max_seconds=10,
1092+
)
1093+
)
1094+
for _ in range(instance_count)
1095+
]
1096+
1097+
for postgrest in instances:
1098+
wait_until_status_code(
1099+
postgrest.admin.baseurl + "/ready", max_seconds=10, status_code=200
1100+
)
1101+
1102+
# Drop startup logs so only reload logs are parsed.
1103+
for postgrest in instances:
1104+
read_available_output_lines(postgrest)
1105+
1106+
response = instances[0].session.get("/rpc/notify_pgrst")
1107+
assert response.status_code == 204
1108+
1109+
# Wait long enough for the lock-throttled cache reloads to finish.
1110+
time.sleep((internal_sleep_ms / 1000) * 2)
1111+
1112+
reload_durations_ms = []
1113+
for postgrest in instances:
1114+
output_lines = []
1115+
for _ in range(5):
1116+
output_lines.extend(read_available_output_lines(postgrest))
1117+
if any(query_log_pattern.search(line) for line in output_lines):
1118+
break
1119+
time.sleep(0.2)
1120+
1121+
durations = []
1122+
for line in output_lines:
1123+
match = query_log_pattern.search(line)
1124+
if match:
1125+
durations.append(float(match.group(1)))
1126+
1127+
assert durations
1128+
reload_durations_ms.append(max(durations))
1129+
1130+
assert len(reload_durations_ms) == instance_count
1131+
1132+
# 10 instances should be fast, remaining instances should be slow
1133+
assert (
1134+
len(
1135+
[
1136+
duration
1137+
for duration in reload_durations_ms
1138+
if duration > lock_wait_threshold_ms
1139+
]
1140+
)
1141+
== instance_count - 10
1142+
)
1143+
1144+
10611145
def test_schema_cache_query_sleep_logs(defaultenv):
10621146
"""Schema cache sleep should be reflected in the logged query duration."""
10631147

0 commit comments

Comments
 (0)