Skip to content

Commit 39e391c

Browse files
committed
fix: Flush pool as late as possible during schema cache reloading
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. retryingSchemaCacheLoad flushes the pool upon every retry before it starts reloading the schema. This is too early as schema reloading might take some time during which new connections might be acquired. The consequence is that: * upon successful schema cache reload we might have some connections created with the old schema cache * we close connections upon each retry and under load we will keep closing and re-opening connections until schema cache load succeeds This change is to make sure we flush the pool only after successful schema cache querying but before loading (so that connections acquired during loading wait for it and do not interfere with timing the loading process).
1 parent 91c475b commit 39e391c

3 files changed

Lines changed: 57 additions & 6 deletions

File tree

src/PostgREST/AppState.hs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,6 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
309309
let delay = fromMaybe 0 rsPreviousDelay `div` oneSecondInUs
310310
observer $ ConnectionRetryObs delay
311311

312-
flushPool appState
313-
314312
(,) <$> qPgVersion <*> (qInDbConfig *> qSchemaCache)
315313
)
316314
where
@@ -359,6 +357,10 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
359357
-- IORef on putSchemaCache. This is why schema cache status is marked as pending here to signal the Admin server (using isPending) that we're on a recovery state.
360358
markSchemaCachePending appState
361359
putSchemaCache appState $ Just sCache
360+
-- Flush the pool after loading the schema cache to reset any stale session cache entries
361+
-- We do it after successfully querying the schema cache
362+
-- and after marking sCacheStatus as pending,
363+
flushPool appState
362364
observer $ SchemaCacheQueriedObs resultTime
363365
observer . uncurry SchemaCacheLoadedObs =<< timeItT (evaluate $ showSummary sCache)
364366
markSchemaCacheLoaded appState

test/io/test_io.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ def test_log_level(level, defaultenv):
714714
response = postgrest.session.get("/")
715715
assert response.status_code == 200
716716

717-
output = postgrest.read_stdout(nlines=7)
717+
output = postgrest.read_stdout(nlines=9)
718718

719719
if level == "crit":
720720
assert len(output) == 0
@@ -752,7 +752,7 @@ def test_log_level(level, defaultenv):
752752
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
753753
],
754754
)
755-
assert len(output) == 7
755+
assert len(output) == 9
756756
assert any("Connection" and "is available" in line for line in output)
757757
assert any("Connection" and "is used" in line for line in output)
758758

@@ -1389,7 +1389,7 @@ def test_db_error_logging_to_stderr(level, defaultenv, metapostgrest):
13891389
assert response.status_code == 500
13901390

13911391
# ensure the message appears on the logs
1392-
output = postgrest.read_stdout(nlines=6)
1392+
output = postgrest.read_stdout(nlines=8)
13931393

13941394
if level == "crit":
13951395
assert len(output) == 0
@@ -1606,7 +1606,7 @@ def test_log_pool_req_observation(level, defaultenv):
16061606

16071607
if level == "debug":
16081608
output = postgrest.read_stdout(nlines=7)
1609-
assert len(output) == 6
1609+
assert len(output) == 7
16101610
match_log(output, [pool_req, pool_req_fullfill])
16111611
elif level == "info":
16121612
output = postgrest.read_stdout(nlines=4)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE MonadComprehensions #-}
3+
{-# LANGUAGE NamedFieldPuns #-}
4+
module Feature.SchemaCacheSpec where
5+
6+
import Network.Wai (Application)
7+
import qualified PostgREST.AppState as AppState
8+
import PostgREST.Config (configDbSchemas)
9+
import PostgREST.Observation
10+
import Protolude
11+
import SpecHelper
12+
import Test.Hspec (SpecWith, describe, it)
13+
import Test.Hspec.Wai (getState)
14+
15+
spec :: SpecWith (SpecState, Application)
16+
spec = describe "Server started with metrics enabled" $ do
17+
18+
it "Should emit PoolFlushed, SchemaCacheQueriedObs and SchemaCacheLoadedObs when schema cache is reloaded" $ do
19+
SpecState{specAppState = appState, specObsChan} <- getState
20+
let waitFor = waitForObs specObsChan
21+
22+
liftIO $ do
23+
AppState.schemaCacheLoader appState
24+
25+
waitFor (1 * sec) "PoolFlushed" $ \x -> [ o | o@PoolFlushed <- pure x ]
26+
waitFor (1 * sec) "SchemaCacheQueriedObs" $ \x -> [ o | o@SchemaCacheQueriedObs{} <- pure x ]
27+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
28+
29+
30+
it "Should flush pool once when schema reloading retries" $ do
31+
SpecState{specAppState = appState, specObsChan} <- getState
32+
let waitFor = waitForObs specObsChan
33+
34+
liftIO $ do
35+
AppState.getConfig appState >>= \cfg -> do
36+
AppState.putConfig appState $ cfg { configDbSchemas = pure "bad_schema" }
37+
AppState.schemaCacheLoader appState
38+
39+
waitFor (1 * sec) "SchemaCacheErrorObs" $ \x -> [ o | o@SchemaCacheErrorObs{} <- pure x ]
40+
41+
-- Restore configuration
42+
AppState.putConfig appState cfg
43+
44+
-- Wait for 2 seconds so that retry can happen
45+
waitFor (2 * sec) "PoolFlushed" $ \x -> [ o | o@PoolFlushed <- pure x ]
46+
waitFor (1 * sec) "SchemaCacheQueriedObs" $ \x -> [ o | o@SchemaCacheQueriedObs{} <- pure x ]
47+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
48+
where
49+
sec = 1000000

0 commit comments

Comments
 (0)