Skip to content

Commit cefc5d3

Browse files
committed
add(observation): db pool flushes observation
Emit a dedicated PoolFlushed observation when the DB pool is released during schema cache reload.
1 parent 02feaf0 commit cefc5d3

7 files changed

Lines changed: 167 additions & 46 deletions

File tree

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ test-suite spec
258258
Feature.Query.UpsertSpec
259259
Feature.RollbackSpec
260260
Feature.RpcPreRequestGucsSpec
261+
Feature.SchemaCacheSpec
261262
SpecHelper
262263
build-depends: base >= 4.9 && < 4.20
263264
, aeson >= 2.0.3 && < 2.3

src/PostgREST/AppState.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module PostgREST.AppState
1818
, init
1919
, initSockets
2020
, initWithPool
21+
, putConfig -- For testing, TODO refactoring
2122
, putNextListenerDelay
2223
, putSchemaCache
2324
, putPgVersion
@@ -270,10 +271,14 @@ usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} ses
270271

271272
-- | Flush the connection pool so that any future use of the pool will
272273
-- use connections freshly established after this call.
274+
-- | Emits PoolFlushed observation
273275
flushPool :: AppState -> IO ()
274-
flushPool AppState{..} = SQL.release statePool
276+
flushPool AppState{..} = do
277+
SQL.release statePool
278+
stateObserver PoolFlushed
275279

276280
-- | Destroy the pool on shutdown.
281+
-- | Differs from flushPool in not emiting PoolFlushed observation.
277282
destroyPool :: AppState -> IO ()
278283
destroyPool AppState{..} = SQL.release statePool
279284

src/PostgREST/Logger.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ observationLogger loggerState logLevel obs = case obs of
102102
o@PoolRequestFullfilled ->
103103
when (logLevel >= LogDebug) $ do
104104
logWithZTime loggerState $ observationMessage o
105+
o@PoolFlushed ->
106+
when (logLevel >= LogDebug) $ do
107+
logWithZTime loggerState $ observationMessage o
105108
o@JwtCacheEviction ->
106109
when (logLevel >= LogDebug) $ do
107110
logWithZTime loggerState $ observationMessage o

src/PostgREST/Observation.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ data Observation
6363
| HasqlPoolObs SQL.Observation
6464
| PoolRequest
6565
| PoolRequestFullfilled
66+
| PoolFlushed
6667
| JwtCacheLookup Bool
6768
| JwtCacheEviction
6869
| WarpErrorObs Text
@@ -157,6 +158,8 @@ observationMessage = \case
157158
"Trying to borrow a connection from pool"
158159
PoolRequestFullfilled ->
159160
"Borrowed a connection from the pool"
161+
PoolFlushed ->
162+
"Database connection pool flushed"
160163
JwtCacheLookup _ ->
161164
"Looked up a JWT in JWT cache"
162165
JwtCacheEviction ->

test/io/test_io.py

Lines changed: 52 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,19 @@
2121
)
2222

2323

24+
def match_log(output, matchers):
25+
ito = iter(output)
26+
itm = iter(matchers)
27+
nextMatcher = next(itm, None)
28+
while nextMatcher is not None and (line := next(ito, None)) is not None:
29+
if re.match(nextMatcher, line) is not None:
30+
nextMatcher = next(itm, None)
31+
if nextMatcher is not None:
32+
raise AssertionError(
33+
f"Expected log line matching {nextMatcher} not found in output"
34+
)
35+
36+
2437
def test_connect_with_dburi(dburi, defaultenv):
2538
"Connecting with db-uri instead of LIPQ* environment variables should work."
2639
defaultenv_without_libpq = {
@@ -661,54 +674,44 @@ def test_log_level(level, defaultenv):
661674
response = postgrest.session.get("/")
662675
assert response.status_code == 200
663676

664-
output = sorted(postgrest.read_stdout(nlines=7))
677+
output = postgrest.read_stdout(nlines=7)
665678

666679
if level == "crit":
667680
assert len(output) == 0
668681
elif level == "error":
669-
assert re.match(
670-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
671-
output[0],
682+
match_log(
683+
output,
684+
[r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"'],
672685
)
673686
assert len(output) == 1
674687
elif level == "warn":
675-
assert re.match(
676-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
677-
output[0],
678-
)
679-
assert re.match(
680-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
681-
output[1],
688+
match_log(
689+
output,
690+
[
691+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
692+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
693+
],
682694
)
683695
assert len(output) == 2
684696
elif level == "info":
685-
assert re.match(
686-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
687-
output[0],
688-
)
689-
assert re.match(
690-
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
691-
output[1],
692-
)
693-
assert re.match(
694-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
695-
output[2],
697+
match_log(
698+
output,
699+
[
700+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
701+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
702+
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
703+
],
696704
)
697705
assert len(output) == 3
698706
elif level == "debug":
699-
assert re.match(
700-
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
701-
output[0],
707+
match_log(
708+
output,
709+
[
710+
r'- - - \[.+\] "GET / HTTP/1.1" 500 \d+ "" "python-requests/.+"',
711+
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
712+
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
713+
],
702714
)
703-
assert re.match(
704-
r'- - postgrest_test_anonymous \[.+\] "GET / HTTP/1.1" 200 \d+ "" "python-requests/.+"',
705-
output[1],
706-
)
707-
assert re.match(
708-
r'- - postgrest_test_anonymous \[.+\] "GET /unknown HTTP/1.1" 404 \d+ "" "python-requests/.+"',
709-
output[2],
710-
)
711-
712715
assert len(output) == 7
713716
assert any("Connection" and "is available" in line for line in output)
714717
assert any("Connection" and "is used" in line for line in output)
@@ -1346,16 +1349,21 @@ def test_db_error_logging_to_stderr(level, defaultenv, metapostgrest):
13461349
assert response.status_code == 500
13471350

13481351
# ensure the message appears on the logs
1349-
output = sorted(postgrest.read_stdout(nlines=6))
1352+
output = postgrest.read_stdout(nlines=6)
13501353

13511354
if level == "crit":
13521355
assert len(output) == 0
13531356
elif level == "debug":
1354-
assert " 500 " in output[0]
1355-
assert "canceling statement due to statement timeout" in output[5]
1357+
match_log(
1358+
output,
1359+
[
1360+
r".*canceling statement due to statement timeout.*",
1361+
r".*500.*",
1362+
],
1363+
)
13561364
else:
1357-
assert " 500 " in output[0]
1358-
assert "canceling statement due to statement timeout" in output[1]
1365+
assert " 500 " in output[1]
1366+
assert "canceling statement due to statement timeout" in output[0]
13591367

13601368
reset_statement_timeout(metapostgrest, role)
13611369

@@ -1549,18 +1557,17 @@ def test_log_pool_req_observation(level, defaultenv):
15491557

15501558
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
15511559

1552-
pool_req = "Trying to borrow a connection from pool"
1553-
pool_req_fullfill = "Borrowed a connection from the pool"
1560+
pool_req = r".*Trying to borrow a connection from pool.*"
1561+
pool_req_fullfill = r".*Borrowed a connection from the pool.*"
15541562

15551563
with run(env=env) as postgrest:
15561564

15571565
postgrest.session.get("/authors_only", headers=headers)
15581566

15591567
if level == "debug":
1560-
output = postgrest.read_stdout(nlines=5)
1561-
assert pool_req in output[1]
1562-
assert pool_req_fullfill in output[4]
1563-
assert len(output) == 5
1568+
output = postgrest.read_stdout(nlines=7)
1569+
assert len(output) == 6
1570+
match_log(output, [pool_req, pool_req_fullfill])
15641571
elif level == "info":
15651572
output = postgrest.read_stdout(nlines=4)
15661573
assert len(output) == 1
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE DeriveAnyClass #-}
3+
{-# LANGUAGE MonadComprehensions #-}
4+
module Feature.SchemaCacheSpec
5+
6+
where
7+
8+
import Network.Wai (Application)
9+
10+
import qualified PostgREST.AppState as AppState
11+
import PostgREST.Config (configDbSchemas)
12+
import PostgREST.Observation
13+
import Protolude
14+
import System.Timeout (timeout)
15+
import Test.Hspec (SpecWith, describe, it)
16+
import Test.Hspec.Expectations
17+
import Test.Hspec.Wai (getState)
18+
19+
data TimeoutException = TimeoutException deriving (Show, Exception)
20+
21+
accumulateUntilTimeout :: Int -> (s -> a -> s) -> s -> IO a -> IO s
22+
accumulateUntilTimeout t f start act = do
23+
tid <- myThreadId
24+
-- mask to make sure TimeoutException is not thrown before starting the loop
25+
mask $ \unmask -> do
26+
-- start timeout thread unmasking exceptions
27+
ttid <- forkIOWithUnmask ($ (threadDelay t *> throwTo tid TimeoutException))
28+
-- unmask effect
29+
unmask $ fix (\loop accum -> (act >>= loop . f accum) `onTimeout` pure accum) start
30+
-- make sure we catch timeout if happens bifore entering the loop
31+
`onTimeout` pure start
32+
-- make sure timer thread is killed on other exceptions
33+
-- so that it won't throw TimeoutException later
34+
`onException` killThread ttid
35+
where
36+
onTimeout m a = m `catch` \TimeoutException -> a
37+
38+
spec :: SpecWith ((AppState.AppState, Chan Observation), Application)
39+
spec = describe "Server started with metrics enabled" $ do
40+
41+
it "Should emit PoolFlushed, SchemaCacheQueriedObs and SchemaCacheLoadedObs when schema cache is reloaded" $ do
42+
(appState, waitFor) <- prepareState
43+
44+
liftIO $ do
45+
AppState.schemaCacheLoader appState
46+
47+
waitFor (1 * sec) "PoolFlushed" $ \x -> [ o | o@PoolFlushed <- pure x ]
48+
waitFor (1 * sec) "SchemaCacheQueriedObs" $ \x -> [ o | o@SchemaCacheQueriedObs{} <- pure x ]
49+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
50+
51+
52+
it "Should flush pool multiple times when schema reloading retries" $ do
53+
(appState, waitFor) <- prepareState
54+
55+
liftIO $ do
56+
AppState.getConfig appState >>= \cfg -> do
57+
AppState.putConfig appState $ cfg { configDbSchemas = pure "bad_schema" }
58+
AppState.schemaCacheLoader appState
59+
60+
waitFor (1 * sec) "PoolFlushed 1" $ \x -> [ o | o@PoolFlushed <- pure x ]
61+
waitFor (1 * sec) "SchemaCacheErrorObs" $ \x -> [ o | o@SchemaCacheErrorObs{} <- pure x ]
62+
63+
-- Restore configuration
64+
AppState.putConfig appState cfg
65+
66+
-- Wait for 2 seconds so that retry can happen
67+
waitFor (2 * sec) "PoolFlushed 2" $ \x -> [ o | o@PoolFlushed <- pure x ]
68+
waitFor (1 * sec) "SchemaCacheQueriedObs" $ \x -> [ o | o@SchemaCacheQueriedObs{} <- pure x ]
69+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
70+
71+
where
72+
sec = 1000000
73+
-- duplicate the provided channel and construct wairFor function binding both channels
74+
prepareState = getState >>= traverse (liftA2 (<$>) waitFor (liftIO . dupChan))
75+
where
76+
-- execute effectful computation until result meets provided condition
77+
untilM cond m = fix $ \loop -> m >>= \a -> ifM (cond a) (pure a) loop
78+
-- accumulate effecful computation results into a list for specified time
79+
takeUntilTimeout t = fmap reverse . accumulateUntilTimeout t (flip (:)) []
80+
-- read messages from copy chan and once condition is met drain original to the same point
81+
-- upon timeout report error and messages remaining in the original chan
82+
-- that way we report messages since last successful read
83+
waitFor original copy t msg f =
84+
timeout t (readUntil copy *> readUntil original) >>= maybe failTimeout mempty
85+
where
86+
failTimeout = takeUntilTimeout 100000 (readChan original) >>=
87+
expectationFailure .
88+
("Timeout waiting for " <> msg <> ". Remaining observations:\n" ++) .
89+
foldMap ((++ "\n") . show . observationMessage)
90+
readUntil = void . untilM (pure . isJust . f) . readChan

test/spec/Main.hs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ import qualified Feature.Query.UpdateSpec
6868
import qualified Feature.Query.UpsertSpec
6969
import qualified Feature.RollbackSpec
7070
import qualified Feature.RpcPreRequestGucsSpec
71+
import qualified Feature.SchemaCacheSpec
7172

7273

7374
main :: IO ()
@@ -95,6 +96,13 @@ main = do
9596
AppState.putSchemaCache appState (Just sCache)
9697
return (st, postgrest (configLogLevel config) appState (pure ()))
9798

99+
initObservationsApp sCache config = do
100+
obsChan <- newChan
101+
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState <> writeChan obsChan)
102+
AppState.putPgVersion appState actualPgVersion
103+
AppState.putSchemaCache appState (Just sCache)
104+
return ((appState, obsChan), postgrest (configLogLevel config) appState (pure ()))
105+
98106
-- For tests that run with the same schema cache
99107
app = initApp baseSchemaCache ()
100108

@@ -123,6 +131,7 @@ main = do
123131
obsApp = app testObservabilityCfg
124132
serverTiming = app testCfgServerTiming
125133
aggregatesEnabled = app testCfgAggregatesEnabled
134+
observationsApp = initObservationsApp baseSchemaCache testCfg
126135

127136
extraSearchPathApp = appDbs testCfgExtraSearchPath
128137
unicodeApp = appDbs testUnicodeCfg
@@ -278,6 +287,9 @@ main = do
278287
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
279288
describe "Feature.Auth.JwtCacheSpec" Feature.Auth.JwtCacheSpec.spec
280289

290+
before observationsApp $
291+
describe "Feature.SchemaCacheSpec" Feature.SchemaCacheSpec.spec
292+
281293
where
282294
loadSCache pool conf =
283295
either (panic.show) id <$> P.use pool (HT.transaction HT.ReadCommitted HT.Read $ querySchemaCache conf)

0 commit comments

Comments
 (0)