Skip to content

Commit 6456e0d

Browse files
committed
add(observation): db pool flushes observation
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. Emit a dedicated PoolFlushed observation when the DB pool is released during schema cache reload.
2 parents 7fe90e8 + 4a9c91a commit 6456e0d

9 files changed

Lines changed: 264 additions & 34 deletions

File tree

postgrest.cabal

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ test-suite observability
307307
main-is: Main.hs
308308
other-modules: ObsHelper
309309
Observation.JwtCache
310+
Observation.MetricsSpec
311+
Observation.SchemaCacheSpec
310312
build-depends: base >= 4.9 && < 4.20
311313
, base64-bytestring >= 1 && < 1.3
312314
, bytestring >= 0.10.8 && < 0.13
@@ -321,6 +323,7 @@ test-suite observability
321323
, postgrest
322324
, prometheus-client >= 1.1.1 && < 1.2.0
323325
, protolude >= 0.3.1 && < 0.4
326+
, text >= 1.2.2 && < 2.2
324327
, wai >= 3.2.1 && < 3.3
325328
ghc-options: -threaded -O0 -Werror -Wall -fwarn-identities
326329
-fno-spec-constr -optP-Wno-nonportable-include-path

src/PostgREST/AppState.hs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module PostgREST.AppState
1515
, getJwtCacheState
1616
, init
1717
, initWithPool
18+
, putConfig -- For tests TODO refactoring
1819
, putNextListenerDelay
1920
, putSchemaCache
2021
, putPgVersion
@@ -218,10 +219,14 @@ usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} ses
218219

219220
-- | Flush the connection pool so that any future use of the pool will
220221
-- use connections freshly established after this call.
222+
-- | Emits PoolFlushed observation
221223
flushPool :: AppState -> IO ()
222-
flushPool AppState{..} = SQL.release statePool
224+
flushPool AppState{..} = do
225+
SQL.release statePool
226+
stateObserver PoolFlushed
223227

224228
-- | Destroy the pool on shutdown.
229+
-- | Differs from flushPool in not emiting PoolFlushed observation.
225230
destroyPool :: AppState -> IO ()
226231
destroyPool AppState{..} = SQL.release statePool
227232

src/PostgREST/Logger.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ observationLogger loggerState logLevel obs = case obs of
111111
o@PoolRequestFullfilled ->
112112
when (logLevel >= LogDebug) $ do
113113
logWithZTime loggerState $ observationMessages o
114+
o@PoolFlushed ->
115+
when (logLevel >= LogDebug) $ do
116+
logWithZTime loggerState $ observationMessages o
114117
o@JwtCacheEviction ->
115118
when (logLevel >= LogDebug) $ do
116119
logWithZTime loggerState $ observationMessages o
@@ -224,6 +227,8 @@ observationMessages = \case
224227
pure "Trying to borrow a connection from pool"
225228
PoolRequestFullfilled ->
226229
pure "Borrowed a connection from the pool"
230+
PoolFlushed ->
231+
pure "Database connection pool flushed"
227232
JwtCacheLookup _ ->
228233
pure "Looked up a JWT in JWT cache"
229234
JwtCacheEviction ->

src/PostgREST/Observation.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DeriveGeneric #-}
12
{-|
23
Module : PostgREST.Observation
34
Description : This module holds an Observation type which is the core of Observability for PostgREST.
@@ -52,10 +53,12 @@ data Observation
5253
| HasqlPoolObs SQL.Observation
5354
| PoolRequest
5455
| PoolRequestFullfilled
56+
| PoolFlushed
5557
| JwtCacheLookup Bool
5658
| JwtCacheEviction
5759
| TerminationUnixSignalObs Text
5860
| WarpErrorObs Text
61+
deriving (Generic)
5962

6063
data ObsFatalError = ServerAuthError | ServerPgrstBug | ServerError42P05 | ServerError08P01
6164

test/observability/Main.hs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,57 @@ import qualified PostgREST.Metrics as Metrics
1515
import PostgREST.SchemaCache (querySchemaCache)
1616

1717
import qualified Observation.JwtCache
18+
import qualified Observation.MetricsSpec
1819

19-
import ObsHelper
20-
import Protolude hiding (toList, toS)
21-
import Test.Hspec
20+
import qualified Observation.SchemaCacheSpec
21+
import ObsHelper
22+
import PostgREST.Observation (Observation (HasqlPoolObs))
23+
import Protolude hiding (toList, toS)
24+
import Test.Hspec
2225

2326
main :: IO ()
2427
main = do
28+
poolChan <- newChan
29+
-- make sure poolChan is not growing indefinitely
30+
-- start a thread that drains the channel
31+
-- this is necessary because test cases operate on
32+
-- copies so poolChan is never read from
33+
void $ forkIO $ forever $ readChan poolChan
34+
metricsState <- Metrics.init (configDbPoolSize testCfg)
2535
pool <- P.acquire $ P.settings
2636
[ P.size 3
2737
, P.acquisitionTimeout 10
2838
, P.agingTimeout 60
2939
, P.idlenessTimeout 60
3040
, P.staticConnectionSettings (toUtf8 $ configDbUri testCfg)
41+
-- make sure metrics are updated and pool observations published to poolChan
42+
, P.observationHandler $ (writeChan poolChan <> Metrics.observationMetrics metricsState) . HasqlPoolObs
3143
]
3244

3345
actualPgVersion <- either (panic . show) id <$> P.use pool (queryPgVersion False)
3446

3547
-- cached schema cache so most tests run fast
3648
baseSchemaCache <- loadSCache pool testCfg
3749
loggerState <- Logger.init
38-
metricsState <- Metrics.init (configDbPoolSize testCfg)
3950

4051
let
41-
initApp sCache st config = do
42-
appState <- AppState.initWithPool pool config loggerState metricsState (Metrics.observationMetrics metricsState)
52+
initApp sCache config = do
53+
-- duplicate poolChan as a starting point
54+
obsChan <- dupChan poolChan
55+
stateObsChan <- newObsChan obsChan
56+
appState <- AppState.initWithPool pool config loggerState metricsState (Metrics.observationMetrics metricsState <> writeChan obsChan)
4357
AppState.putPgVersion appState actualPgVersion
4458
AppState.putSchemaCache appState (Just sCache)
45-
return (st, postgrest (configLogLevel config) appState (pure ()))
59+
return (SpecState appState metricsState stateObsChan, postgrest (configLogLevel config) appState (pure ()))
4660

4761
-- Run all test modules
4862
hspec $ do
49-
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
63+
before (initApp baseSchemaCache testCfgJwtCache) $
5064
describe "Observation.JwtCacheObs" Observation.JwtCache.spec
65+
before (initApp baseSchemaCache testCfg) $
66+
describe "Feature.MetricsSpec" Observation.MetricsSpec.spec
67+
before (initApp baseSchemaCache testCfg) $
68+
describe "Feature.SchemaCacheSpec" Observation.SchemaCacheSpec.spec
5169

5270
where
5371
loadSCache pool conf =

test/observability/ObsHelper.hs

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,70 @@
11
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE ExistentialQuantification #-}
34
{-# LANGUAGE FlexibleContexts #-}
5+
{-# LANGUAGE FlexibleInstances #-}
6+
{-# LANGUAGE LambdaCase #-}
7+
{-# LANGUAGE RankNTypes #-}
48
{-# LANGUAGE ScopedTypeVariables #-}
59
{-# LANGUAGE TupleSections #-}
610
{-# LANGUAGE TypeApplications #-}
11+
{-# LANGUAGE TypeOperators #-}
712
module ObsHelper where
813

9-
import qualified Data.ByteString.Base64 as B64 (decodeLenient)
10-
import qualified Data.ByteString.Char8 as BS
11-
import qualified Data.ByteString.Lazy as BL
12-
import qualified Jose.Jwa as JWT
13-
import qualified Jose.Jws as JWT
14-
import qualified Jose.Jwt as JWT
14+
import qualified Data.ByteString as BS
15+
import qualified Data.ByteString.Base64 as B64
16+
import qualified Data.ByteString.Lazy as BL
17+
import qualified Data.List as DL
18+
import Data.List.NonEmpty (fromList)
19+
import Data.String (String)
20+
import qualified Data.Text as T
21+
import qualified Jose.Jwa as JWT
22+
import qualified Jose.Jws as JWT
23+
import qualified Jose.Jwt as JWT
24+
import Network.HTTP.Types
25+
import qualified PostgREST.AppState as AppState
26+
import PostgREST.Config (AppConfig (..),
27+
JSPathExp (..),
28+
LogLevel (..),
29+
OpenAPIMode (..),
30+
Verbosity (..),
31+
parseSecret)
32+
import qualified PostgREST.Metrics as Metrics
33+
import PostgREST.Observation (Observation (..))
34+
import Prometheus (Counter, getCounter)
35+
import Protolude hiding (get, toS)
36+
import System.Timeout (timeout)
37+
import Test.Hspec
38+
import Test.Hspec.Expectations.Contrib (annotate)
39+
40+
-- helpers used to produce observation diagnostics in waitForObs
41+
class HasConstructor f where
42+
genericConstrName :: f x -> Text
43+
44+
instance HasConstructor f => HasConstructor (D1 c f) where
45+
genericConstrName (M1 x) = genericConstrName x
46+
47+
instance (HasConstructor x, HasConstructor y) => HasConstructor (x :+: y) where
48+
genericConstrName (L1 l) = genericConstrName l
49+
genericConstrName (R1 r) = genericConstrName r
50+
51+
instance Constructor c => HasConstructor (C1 c f) where
52+
genericConstrName = T.pack . conName
53+
54+
data SpecState = SpecState {
55+
specAppState :: AppState.AppState,
56+
specMetrics :: Metrics.MetricsState,
57+
specObsChan :: ObsChan
58+
}
1559

16-
import PostgREST.Config (AppConfig (..), JSPathExp (..),
17-
LogLevel (..), OpenAPIMode (..),
18-
Verbosity (..), parseSecret)
60+
data StateCheck st m = forall a. StateCheck (st -> (String, m a)) (a -> a -> Expectation)
1961

20-
import Data.List.NonEmpty (fromList)
21-
import Data.String (String)
22-
import Prometheus (Counter, getCounter)
23-
import Test.Hspec.Expectations.Contrib (annotate)
62+
data TimeoutException = TimeoutException deriving (Show, Exception)
2463

25-
import Network.HTTP.Types
26-
import Protolude
27-
import Test.Hspec
28-
import Test.Hspec.Wai
64+
data ObsChan = ObsChan (Chan Observation) (Chan Observation)
2965

66+
constrName :: (HasConstructor (Rep a), Generic a)=> a -> Text
67+
constrName = genericConstrName . from
3068

3169
baseCfg :: AppConfig
3270
baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in
@@ -109,18 +147,12 @@ generateJWT claims =
109147
either mempty JWT.unJwt $ JWT.hmacEncode JWT.HS256 generateSecret (BL.toStrict claims)
110148

111149
-- state check helpers
112-
113-
data StateCheck st m = forall a. StateCheck (st -> (String, m a)) (a -> a -> Expectation)
114-
115150
stateCheck :: (Show a, Eq a) => (c -> m a) -> (st -> (String, c)) -> (a -> a) -> StateCheck st m
116151
stateCheck extractValue extractComponent expect = StateCheck (second extractValue . extractComponent) (flip shouldBe . expect)
117152

118153
expectField :: forall s st a c m. (KnownSymbol s, Show a, Eq a, HasField s st c) => (c -> m a) -> (a -> a) -> StateCheck st m
119154
expectField extractValue = stateCheck extractValue ((symbolVal (Proxy @s),) . getField @s)
120155

121-
checkState :: (Traversable t) => t (StateCheck st (WaiSession st)) -> WaiSession st b -> WaiSession st ()
122-
checkState checks act = getState >>= flip (`checkState'` checks) act
123-
124156
checkState' :: (Traversable t, MonadIO m) => st -> t (StateCheck st m) -> m b -> m ()
125157
checkState' initialState checks act = do
126158
expectations <- traverse (\(StateCheck g expect) -> let (msg, m) = g initialState in m >>= createExpectation msg m . expect) checks
@@ -133,3 +165,48 @@ expectCounter :: forall s st m. (KnownSymbol s, HasField s st Counter, MonadIO m
133165
expectCounter = expectField @s intCounter
134166
where
135167
intCounter = ((round @Double @Int) <$>) . getCounter
168+
169+
accumulateUntilTimeout :: Int -> (s -> a -> s) -> s -> IO a -> IO s
170+
accumulateUntilTimeout t f start act = do
171+
tid <- myThreadId
172+
-- mask to make sure TimeoutException is not thrown before starting the loop
173+
mask $ \unmask -> do
174+
-- start timeout thread unmasking exceptions
175+
ttid <- forkIOWithUnmask ($ (threadDelay t *> throwTo tid TimeoutException))
176+
-- unmask effect
177+
unmask (fix (\loop accum -> (act >>= loop . f accum) `onTimeout` pure accum) start)
178+
-- make sure we catch timeout if happens before entering the loop
179+
`onTimeout` pure start
180+
-- make sure timer thread is killed on other exceptions
181+
-- so that it won't throw TimeoutException later
182+
`onException` killThread ttid
183+
where
184+
onTimeout m a = m `catch` \TimeoutException -> a
185+
186+
newObsChan :: Chan Observation -> IO ObsChan
187+
newObsChan = fmap <$> ObsChan <*> dupChan
188+
189+
-- read messages from copy chan and once condition is met drain original to the same point
190+
-- upon timeout report error and messages remaining in the original chan
191+
-- that way we report messages since last successful read
192+
waitForObs :: HasCallStack => ObsChan -> Int -> Text -> (Observation -> Maybe a) -> IO ()
193+
waitForObs (ObsChan orig copy) t msg f =
194+
timeout t (readUntil copy *> readUntil orig) >>= maybe failTimeout mempty
195+
where
196+
failTimeout = takeUntilTimeout decisecond (readChan orig)
197+
>>= expectationFailure . DL.unlines . fmap show . (failureMessageHeader :) . fmap obsDiagMessage
198+
failureMessageHeader = "Timeout waiting for " <> msg <> " at " <> loc <> ". Remaining observations:"
199+
readUntil = void . untilM (pure . not . null . f) . readChan
200+
loc = fromMaybe "(unknown)" . head $ (T.pack . prettySrcLoc . snd <$> getCallStack callStack)
201+
-- execute effectful computation until result meets provided condition
202+
untilM cond m = fix $ \loop -> m >>= \a -> ifM (cond a) (pure a) loop
203+
-- duplicate the provided channel and construct wairFor function binding both channels
204+
-- accumulate effecful computation results into a list for specified time
205+
takeUntilTimeout t' = fmap reverse . accumulateUntilTimeout t' (flip (:)) []
206+
decisecond = 100000
207+
208+
obsDiagMessage :: Observation -> Text
209+
obsDiagMessage = \case
210+
(HasqlPoolObs o) -> show o
211+
o@(DBListenStart host port name channel) -> constrName o <> show (host, port, name, channel)
212+
o -> constrName o

test/observability/Observation/JwtCache.hs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
23
{-# LANGUAGE TypeApplications #-}
34
module Observation.JwtCache where
45

@@ -13,9 +14,11 @@ import PostgREST.Metrics (MetricsState (..))
1314
import Protolude
1415
import Test.Hspec.Wai.JSON (json)
1516

16-
spec :: SpecWith (MetricsState, Application)
17+
spec :: SpecWith (SpecState, Application)
1718
spec = describe "Server started with JWT and metrics enabled" $ do
1819
it "Should not have JWT in cache" $ do
20+
expectCounters <- checkState' . specMetrics <$> getState
21+
1922
let auth = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe1"}|]
2023

2124
expectCounters
@@ -27,6 +30,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
2730
request methodGet "/authors_only" [auth] "" `shouldRespondWith` 200
2831

2932
it "Should have JWT in cache" $ do
33+
expectCounters <- checkState' . specMetrics <$> getState
34+
3035
let auth = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe2"}|]
3136

3237
expectCounters
@@ -39,6 +44,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
3944
*> request methodGet "/authors_only" [auth] "" `shouldRespondWith` 200
4045

4146
it "Should not cache invalid JWTs" $ do
47+
expectCounters <- checkState' . specMetrics <$> getState
48+
4249
let auth = authHeaderJWT "some random bytes"
4350

4451
expectCounters
@@ -51,6 +58,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
5158
*> request methodGet "/authors_only" [auth] "" `shouldRespondWith` 401
5259

5360
it "Should cache expired JWTs" $ do
61+
expectCounters <- checkState' . specMetrics <$> getState
62+
5463
let auth = genToken [json|{"exp": 1, "role": "postgrest_test_author", "id": "jdoe2"}|]
5564

5665
expectCounters
@@ -63,6 +72,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
6372
*> request methodGet "/authors_only" [auth] "" `shouldRespondWith` 401
6473

6574
it "Should evict entries from the JWT cache (jwt cache max is 2)" $ do
75+
expectCounters <- checkState' . specMetrics <$> getState
76+
6677
let jwt1 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe3"}|]
6778
jwt2 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe4"}|]
6879
jwt3 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe5"}|]
@@ -82,6 +93,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
8293
*> request methodGet "/authors_only" [jwt3] ""
8394

8495
it "Should not evict entries from the JWT cache in FIFO order" $ do
96+
expectCounters <- checkState' . specMetrics <$> getState
97+
8598
let jwt1 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe6"}|]
8699
jwt2 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe7"}|]
87100
jwt3 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe8"}|]
@@ -108,6 +121,8 @@ spec = describe "Server started with JWT and metrics enabled" $ do
108121
-- The test case was added based on coverage report
109122
-- showing this scenario was not covered by previous tests
110123
it "Should evict entries even though all were hit" $ do
124+
expectCounters <- checkState' . specMetrics <$> getState
125+
111126
let jwt1 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe9"}|]
112127
jwt2 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe10"}|]
113128
jwt3 = genToken [json|{"exp": 9999999999, "role": "postgrest_test_author", "id": "jdoe11"}|]
@@ -135,4 +150,3 @@ spec = describe "Server started with JWT and metrics enabled" $ do
135150
requests = expectCounter @"jwtCacheRequests"
136151
hits = expectCounter @"jwtCacheHits"
137152
evictions = expectCounter @"jwtCacheEvictions"
138-
expectCounters = checkState

0 commit comments

Comments
 (0)