Skip to content

Commit b428649

Browse files
committed
fix: Flush pool as late as possible during schema cache reloading
retryingSchemaCacheLoad flushes the pool upon every retry before it starts reloading the schema. This is too early as schema reloading might take some time during which new connections might be acquired. The consequence is that: * upon successful schema cache reload we might have some connections created with the old schema cache * we close connections upon each retry and under load we will keep closing and re-opening connections until schema cache load succeeds This change is to make sure we flush the pool only after successful schema cache querying but before loading (so that connections acquired during loading wait for it and do not interfere with timing the loading process).
1 parent 38ebc23 commit b428649

4 files changed

Lines changed: 135 additions & 55 deletions

File tree

src/PostgREST/AppState.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,6 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
368368
observer $ ConnectionRetryObs delay
369369
putNextListenerDelay appState delay
370370

371-
flushPool appState
372-
373371
(,) <$> qPgVersion <*> (qInDbConfig *> qSchemaCache)
374372
)
375373
where
@@ -417,8 +415,12 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
417415
-- IMPORTANT: While the pending schema cache state starts from running the above querySchemaCache, only at this stage we block API requests due to the usage of an
418416
-- IORef on putSchemaCache. This is why SCacheStatus is put at SCPending here to signal the Admin server (using isPending) that we're on a recovery state.
419417
putSCacheStatus appState SCPending
420-
putSchemaCache appState $ Just sCache
421418
observer $ SchemaCacheQueriedObs resultTime
419+
putSchemaCache appState $ Just sCache
420+
-- Flush the pool after loading the schema cache to reset any stale session cache entries
421+
-- We do it after successfully querying the schema cache
422+
-- and after marking sCacheStatus as pending,
423+
flushPool appState
422424
(t, _) <- timeItT $ observer $ SchemaCacheSummaryObs $ showSummary sCache
423425
observer $ SchemaCacheLoadedObs t
424426
putSCacheStatus appState SCLoaded

test/io/postgrest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ def wait_until_exit(postgrest):
187187
def wait_until_status_code(url, max_seconds, status_code):
188188
"Wait for the given HTTP endpoint to return a status code"
189189
session = requests_unixsocket.Session()
190+
response = None
190191

191192
for _ in range(max_seconds * 10):
192193
try:

test/io/test_io.py

Lines changed: 125 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
sleep_until_postgrest_full_reload,
1919
sleep_until_postgrest_scache_reload,
2020
wait_until_exit,
21+
wait_until_status_code,
2122
)
2223

2324

@@ -105,6 +106,88 @@ def sleep():
105106
t.join()
106107

107108

109+
def test_pool_flushes_metric(defaultenv):
110+
"Should increase pool flushes metric when the pool is flushed"
111+
112+
with run(env=defaultenv, port=freeport()) as postgrest:
113+
response = postgrest.admin.get("/metrics")
114+
assert response.status_code == 200
115+
before = float(
116+
re.search(r"pgrst_db_pool_flushes_total ([0-9.e+-]+)", response.text).group(
117+
1
118+
)
119+
)
120+
121+
postgrest.process.send_signal(signal.SIGUSR1)
122+
sleep_until_postgrest_scache_reload()
123+
124+
response = postgrest.admin.get("/metrics")
125+
assert response.status_code == 200
126+
after = float(
127+
re.search(r"pgrst_db_pool_flushes_total ([0-9.e+-]+)", response.text).group(
128+
1
129+
)
130+
)
131+
assert after == before + 1
132+
133+
134+
def test_pool_flushes_metric_with_schema_cache_retries(defaultenv, metapostgrest):
135+
"Should flush the pool exactly once even when schema cache reload retries"
136+
137+
role = "timeout_authenticator"
138+
app_name = "pool-flush-retry"
139+
env = {
140+
**defaultenv,
141+
"PGUSER": role,
142+
"PGAPPNAME": app_name,
143+
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "50",
144+
}
145+
146+
with run(env=env, port=freeport()) as postgrest:
147+
response = postgrest.admin.get("/metrics")
148+
assert response.status_code == 200
149+
before = float(
150+
re.search(r"pgrst_db_pool_flushes_total ([0-9.e+-]+)", response.text).group(
151+
1
152+
)
153+
)
154+
155+
try:
156+
# Force schema cache reload failures to trigger retries.
157+
set_statement_timeout(metapostgrest, role, 20)
158+
postgrest.process.send_signal(signal.SIGUSR1)
159+
160+
postgrest.wait_until_scache_starts_loading(max_seconds=2)
161+
162+
# Give retry loop time to run and verify it doesn't flush the pool.
163+
time.sleep(1)
164+
165+
response = postgrest.admin.get("/metrics")
166+
assert response.status_code == 200
167+
during_retries = float(
168+
re.search(
169+
r"pgrst_db_pool_flushes_total ([0-9.e+-]+)", response.text
170+
).group(1)
171+
)
172+
assert during_retries == before
173+
174+
reset_statement_timeout(metapostgrest, role)
175+
# Ensure next retry establishes fresh sessions with the reset timeout.
176+
metapostgrest.session.get(f"/rpc/terminate_pgrst?appname={app_name}")
177+
wait_until_status_code(postgrest.admin.baseurl + "/ready", 12, 200)
178+
finally:
179+
reset_statement_timeout(metapostgrest, role)
180+
181+
response = postgrest.admin.get("/metrics")
182+
assert response.status_code == 200
183+
after = float(
184+
re.search(r"pgrst_db_pool_flushes_total ([0-9.e+-]+)", response.text).group(
185+
1
186+
)
187+
)
188+
assert after == before + 1
189+
190+
108191
def test_random_port_bound(defaultenv):
109192
"PostgREST should bind to a random port when PGRST_SERVER_PORT is 0."
110193

@@ -661,55 +744,53 @@ def test_log_level(level, defaultenv):
661744
response = postgrest.session.get("/")
662745
assert response.status_code == 200
663746

664-
output = sorted(postgrest.read_stdout(nlines=7))
747+
output = postgrest.read_stdout(nlines=9)
748+
749+
def match_log(matchers):
750+
ito = iter(output)
751+
itm = iter(matchers)
752+
nextMatcher = next(itm, None)
753+
while nextMatcher is not None and (line := next(ito, None)) is not None:
754+
if re.match(nextMatcher, line) is not None:
755+
nextMatcher = next(itm, None)
756+
if nextMatcher is not None:
757+
raise AssertionError(
758+
f"Expected log line matching {nextMatcher} not found in output"
759+
)
665760

666761
if level == "crit":
667762
assert len(output) == 0
668763
elif level == "error":
669-
assert re.match(
670-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
671-
output[0],
764+
match_log(
765+
[r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"']
672766
)
673767
assert len(output) == 1
674768
elif level == "warn":
675-
assert re.match(
676-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
677-
output[0],
678-
)
679-
assert re.match(
680-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
681-
output[1],
769+
match_log(
770+
[
771+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
772+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
773+
]
682774
)
683775
assert len(output) == 2
684776
elif level == "info":
685-
assert re.match(
686-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
687-
output[0],
688-
)
689-
assert re.match(
690-
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
691-
output[1],
692-
)
693-
assert re.match(
694-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
695-
output[2],
777+
match_log(
778+
[
779+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
780+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
781+
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
782+
]
696783
)
697784
assert len(output) == 3
698785
elif level == "debug":
699-
assert re.match(
700-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
701-
output[0],
786+
match_log(
787+
[
788+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
789+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
790+
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
791+
]
702792
)
703-
assert re.match(
704-
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
705-
output[1],
706-
)
707-
assert re.match(
708-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
709-
output[2],
710-
)
711-
712-
assert len(output) == 7
793+
assert len(output) == 9
713794
assert any("Connection" and "is available" in line for line in output)
714795
assert any("Connection" and "is used" in line for line in output)
715796

@@ -1346,16 +1427,16 @@ def test_db_error_logging_to_stderr(level, defaultenv, metapostgrest):
13461427
assert response.status_code == 500
13471428

13481429
# ensure the message appears on the logs
1349-
output = sorted(postgrest.read_stdout(nlines=6))
1430+
output = postgrest.read_stdout(nlines=8)
13501431

13511432
if level == "crit":
13521433
assert len(output) == 0
13531434
elif level == "debug":
1354-
assert " 500 " in output[0]
1355-
assert "canceling statement due to statement timeout" in output[5]
1435+
assert " 500 " in output[7]
1436+
assert "canceling statement due to statement timeout" in output[6]
13561437
else:
1357-
assert " 500 " in output[0]
1358-
assert "canceling statement due to statement timeout" in output[1]
1438+
assert " 500 " in output[1]
1439+
assert "canceling statement due to statement timeout" in output[0]
13591440

13601441
reset_statement_timeout(metapostgrest, role)
13611442

@@ -1460,6 +1541,7 @@ def test_admin_metrics(defaultenv):
14601541
assert "pgrst_db_pool_waiting" in response.text
14611542
assert "pgrst_db_pool_available" in response.text
14621543
assert "pgrst_db_pool_timeouts_total" in response.text
1544+
assert "pgrst_db_pool_flushes_total" in response.text
14631545

14641546

14651547
def test_schema_cache_startup_load_with_in_db_config(defaultenv, metapostgrest):
@@ -1557,10 +1639,10 @@ def test_log_pool_req_observation(level, defaultenv):
15571639
postgrest.session.get("/authors_only", headers=headers)
15581640

15591641
if level == "debug":
1560-
output = postgrest.read_stdout(nlines=5)
1642+
output = postgrest.read_stdout(nlines=7)
1643+
assert len(output) == 7
15611644
assert pool_req in output[1]
1562-
assert pool_req_fullfill in output[4]
1563-
assert len(output) == 5
1645+
assert pool_req_fullfill in output[6]
15641646
elif level == "info":
15651647
output = postgrest.read_stdout(nlines=4)
15661648
assert len(output) == 1

test/spec/Feature/MetricsSpec.hs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,14 @@ import Test.Hspec (SpecWith, describe,
1616
expectationFailure, it)
1717
import Test.Hspec.Wai (getState)
1818

19-
untilM :: Int -> (a -> Bool) -> IO a -> IO (Maybe a)
19+
untilM :: Int -> (a -> Maybe b) -> IO a -> IO (Maybe b)
2020
untilM timeout cond act = rightToMaybe <$> race (threadDelay timeout) (fix $
21-
\loop -> do
22-
value <- act
23-
if cond value then
24-
pure value
25-
else
26-
loop)
21+
\loop -> act >>= maybe loop pure . cond)
2722

2823
waitForSchemaReload :: Int -> IO Observation -> IO (Maybe Observation)
2924
waitForSchemaReload timeout = untilM timeout $ \case
30-
SchemaCacheLoadedObs _ -> True
31-
_ -> False
25+
o@(SchemaCacheLoadedObs _) -> pure o
26+
_ -> empty
3227

3328
spec :: SpecWith ((MetricsState, AppState.AppState, IO Observation), Application)
3429
spec = describe "Server started with metrics enabled" $

0 commit comments

Comments
 (0)