Skip to content

Commit 5b7652e

Browse files
committed
refactor(test): provide means to validate metrics and observations
DISCLAIMER: This commit was authored entirely by a human without the assistance of LLMs. Some helpers are provided for introspecting metrics already (used in JWT cache tests). This change provides facilities to additionally validate emited Observation events. A new Spec module is also implemented, adding basic tests of schema cache reloading - their main goal is to excercise the new infrastructure.
1 parent 2861b35 commit 5b7652e

6 files changed

Lines changed: 173 additions & 12 deletions

File tree

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ test-suite spec
219219
Feature.ConcurrentSpec
220220
Feature.CorsSpec
221221
Feature.ExtraSearchPathSpec
222+
Feature.MetricsSpec
222223
Feature.NoSuperuserSpec
223224
Feature.ObservabilitySpec
224225
Feature.OpenApi.DisabledOpenApiSpec

src/PostgREST/AppState.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module PostgREST.AppState
1818
, init
1919
, initSockets
2020
, initWithPool
21+
, putConfig -- For tests TODO refactoring
2122
, putNextListenerDelay
2223
, putSchemaCache
2324
, putPgVersion

src/PostgREST/Observation.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE DeriveGeneric #-}
12
{-|
23
Module : PostgREST.Observation
34
Description : This module holds an Observation type which is the core of Observability for PostgREST.
@@ -55,6 +56,7 @@ data Observation
5556
| JwtCacheLookup Bool
5657
| JwtCacheEviction
5758
| WarpErrorObs Text
59+
deriving (Generic)
5860

5961
data ObsFatalError = ServerAuthError | ServerPgrstBug | ServerError42P05 | ServerError08P01
6062

test/spec/Feature/MetricsSpec.hs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE FlexibleContexts #-}
3+
{-# LANGUAGE MonadComprehensions #-}
4+
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE TypeApplications #-}
6+
7+
module Feature.MetricsSpec where
8+
9+
import Data.List (lookup)
10+
import Network.Wai (Application)
11+
import qualified PostgREST.AppState as AppState
12+
import PostgREST.Config (AppConfig (configDbSchemas))
13+
import qualified PostgREST.Metrics as Metrics
14+
import PostgREST.Observation
15+
import Prometheus (getCounter, getVectorWith)
16+
import Protolude
17+
import SpecHelper
18+
import Test.Hspec (SpecWith, describe, it)
19+
import Test.Hspec.Wai (getState)
20+
21+
spec :: SpecWith (SpecState, Application)
22+
spec = describe "Server started with metrics enabled" $ do
23+
it "Should update pgrst_schema_cache_loads_total[SUCCESS]" $ do
24+
SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState
25+
let waitFor = waitForObs specObsChan
26+
27+
liftIO $ checkState' metrics [
28+
schemaCacheLoads "SUCCESS" (+1)
29+
] $ do
30+
AppState.schemaCacheLoader appState
31+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- pure x]
32+
33+
it "Should update pgrst_schema_cache_loads_total[ERROR]" $ do
34+
SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState
35+
let waitFor = waitForObs specObsChan
36+
37+
liftIO $ checkState' metrics [
38+
schemaCacheLoads "FAIL" (+1),
39+
schemaCacheLoads "SUCCESS" (+1)
40+
] $ do
41+
AppState.getConfig appState >>= \prev -> do
42+
AppState.putConfig appState $ prev { configDbSchemas = pure "bad_schema" }
43+
AppState.schemaCacheLoader appState
44+
waitFor (1 * sec) "SchemaCacheErrorObs" $ \x -> [ o | o@(SchemaCacheErrorObs{}) <- pure x]
45+
AppState.putConfig appState prev
46+
47+
-- wait up to 2 secs so that retry can happen
48+
waitFor (2 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- pure x]
49+
50+
where
51+
-- prometheus-client api to handle vectors is convoluted
52+
schemaCacheLoads label = expectField @"schemaCacheLoads" $
53+
fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter)
54+
sec = 1000000

test/spec/Main.hs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import qualified Feature.Auth.NoJwtSecretSpec
2929
import qualified Feature.ConcurrentSpec
3030
import qualified Feature.CorsSpec
3131
import qualified Feature.ExtraSearchPathSpec
32+
import qualified Feature.MetricsSpec
3233
import qualified Feature.NoSuperuserSpec
3334
import qualified Feature.ObservabilitySpec
3435
import qualified Feature.OpenApi.DisabledOpenApiSpec
@@ -68,16 +69,26 @@ import qualified Feature.Query.UpdateSpec
6869
import qualified Feature.Query.UpsertSpec
6970
import qualified Feature.RollbackSpec
7071
import qualified Feature.RpcPreRequestGucsSpec
72+
import PostgREST.Observation (Observation (HasqlPoolObs))
7173

7274

7375
main :: IO ()
7476
main = do
77+
poolChan <- newChan
78+
-- make sure poolChan is not growing indefinitely
79+
-- start a thread that drains the channel
80+
-- this is necessary because test cases operate on
81+
-- copies so poolChan is never read from
82+
void $ forkIO $ forever $ readChan poolChan
83+
metricsState <- Metrics.init (configDbPoolSize testCfg)
7584
pool <- P.acquire $ P.settings
7685
[ P.size 3
7786
, P.acquisitionTimeout 10
7887
, P.agingTimeout 60
7988
, P.idlenessTimeout 60
8089
, P.staticConnectionSettings (toUtf8 $ configDbUri testCfg)
90+
-- make sure metrics are updated and pool observations published to poolChan
91+
, P.observationHandler $ (writeChan poolChan <> Metrics.observationMetrics metricsState) . HasqlPoolObs
8192
]
8293

8394
actualPgVersion <- either (panic . show) id <$> P.use pool (queryPgVersion False)
@@ -86,7 +97,6 @@ main = do
8697
baseSchemaCache <- loadSCache pool testCfg
8798
sockets <- AppState.initSockets testCfg
8899
loggerState <- Logger.init
89-
metricsState <- Metrics.init (configDbPoolSize testCfg)
90100

91101
let
92102
initApp sCache st config = do
@@ -95,6 +105,15 @@ main = do
95105
AppState.putSchemaCache appState (Just sCache)
96106
return (st, postgrest (configLogLevel config) appState (pure ()))
97107

108+
initObservationsApp sCache config = do
109+
-- duplicate poolChan as a starting point
110+
obsChan <- dupChan poolChan
111+
stateObsChan <- newObsChan obsChan
112+
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState <> writeChan obsChan)
113+
AppState.putPgVersion appState actualPgVersion
114+
AppState.putSchemaCache appState (Just sCache)
115+
return (SpecState appState metricsState stateObsChan, postgrest (configLogLevel config) appState (pure ()))
116+
98117
-- For tests that run with the same schema cache
99118
app = initApp baseSchemaCache ()
100119

@@ -123,6 +142,7 @@ main = do
123142
obsApp = app testObservabilityCfg
124143
serverTiming = app testCfgServerTiming
125144
aggregatesEnabled = app testCfgAggregatesEnabled
145+
observationsApp = initObservationsApp baseSchemaCache testCfg
126146

127147
extraSearchPathApp = appDbs testCfgExtraSearchPath
128148
unicodeApp = appDbs testUnicodeCfg
@@ -278,6 +298,9 @@ main = do
278298
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
279299
describe "Feature.Auth.JwtCacheSpec" Feature.Auth.JwtCacheSpec.spec
280300

301+
before observationsApp $
302+
describe "Feature.MetricsSpec" Feature.MetricsSpec.spec
303+
281304
where
282305
loadSCache pool conf =
283306
either (panic.show) id <$> P.use pool (HT.transaction HT.ReadCommitted HT.Read $ querySchemaCache conf)

test/spec/SpecHelper.hs

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE ExistentialQuantification #-}
34
{-# LANGUAGE FlexibleContexts #-}
5+
{-# LANGUAGE FlexibleInstances #-}
46
{-# LANGUAGE RankNTypes #-}
57
{-# LANGUAGE ScopedTypeVariables #-}
68
{-# LANGUAGE TupleSections #-}
79
{-# LANGUAGE TypeApplications #-}
10+
{-# LANGUAGE TypeOperators #-}
811
module SpecHelper where
912

1013
import Control.Lens ((^?))
@@ -35,17 +38,25 @@ import Test.Hspec
3538
import Test.Hspec.Wai
3639
import Text.Heredoc
3740

38-
import Data.String (String)
39-
import PostgREST.Config (AppConfig (..),
40-
JSPathExp (..),
41-
LogLevel (..),
42-
OpenAPIMode (..),
43-
Verbosity (..), parseSecret)
44-
import PostgREST.SchemaCache.Identifiers (QualifiedIdentifier (..))
45-
import Prometheus (Counter, getCounter)
46-
import Protolude hiding (get, toS)
47-
import Protolude.Conv (toS)
48-
import Test.Hspec.Expectations.Contrib (annotate)
41+
import qualified Data.List as DL
42+
import Data.String (String)
43+
import qualified Data.Text as T
44+
import qualified PostgREST.AppState as AppState
45+
import PostgREST.Config (AppConfig (..),
46+
JSPathExp (..),
47+
LogLevel (..),
48+
OpenAPIMode (..),
49+
Verbosity (..),
50+
parseSecret)
51+
import qualified PostgREST.Metrics as Metrics
52+
import PostgREST.Observation (Observation (..))
53+
import PostgREST.SchemaCache.Identifiers (QualifiedIdentifier (..))
54+
import Prometheus (Counter,
55+
getCounter)
56+
import Protolude hiding (get, toS)
57+
import Protolude.Conv (toS)
58+
import System.Timeout (timeout)
59+
import Test.Hspec.Expectations.Contrib (annotate)
4960

5061
filterAndMatchCT :: BS.ByteString -> MatchHeader
5162
filterAndMatchCT val = MatchHeader $ \headers _ ->
@@ -381,3 +392,72 @@ expectCounter :: forall s st m. (KnownSymbol s, HasField s st Counter, MonadIO m
381392
expectCounter = expectField @s intCounter
382393
where
383394
intCounter = ((round @Double @Int) <$>) . getCounter
395+
396+
data TimeoutException = TimeoutException deriving (Show, Exception)
397+
398+
accumulateUntilTimeout :: Int -> (s -> a -> s) -> s -> IO a -> IO s
399+
accumulateUntilTimeout t f start act = do
400+
tid <- myThreadId
401+
-- mask to make sure TimeoutException is not thrown before starting the loop
402+
mask $ \unmask -> do
403+
-- start timeout thread unmasking exceptions
404+
ttid <- forkIOWithUnmask ($ (threadDelay t *> throwTo tid TimeoutException))
405+
-- unmask effect
406+
unmask (fix (\loop accum -> (act >>= loop . f accum) `onTimeout` pure accum) start)
407+
-- make sure we catch timeout if happens before entering the loop
408+
`onTimeout` pure start
409+
-- make sure timer thread is killed on other exceptions
410+
-- so that it won't throw TimeoutException later
411+
`onException` killThread ttid
412+
where
413+
onTimeout m a = m `catch` \TimeoutException -> a
414+
415+
data ObsChan = ObsChan (Chan Observation) (Chan Observation)
416+
417+
newObsChan :: Chan Observation -> IO ObsChan
418+
newObsChan = fmap <$> ObsChan <*> dupChan
419+
420+
-- read messages from copy chan and once condition is met drain original to the same point
421+
-- upon timeout report error and messages remaining in the original chan
422+
-- that way we report messages since last successful read
423+
waitForObs :: HasCallStack => ObsChan -> Int -> Text -> (Observation -> Maybe a) -> IO ()
424+
waitForObs (ObsChan orig copy) t msg f =
425+
timeout t (readUntil copy *> readUntil orig) >>= maybe failTimeout mempty
426+
where
427+
failTimeout = takeUntilTimeout decisecond (readChan orig)
428+
>>= expectationFailure . DL.unlines . fmap show . (failureMessageHeader :) . fmap obsDiagMessage
429+
failureMessageHeader = "Timeout waiting for " <> msg <> " at " <> loc <> ". Remaining observations:"
430+
readUntil = void . untilM (pure . not . null . f) . readChan
431+
loc = fromMaybe "(unknown)" . head $ (T.pack . prettySrcLoc . snd <$> getCallStack callStack)
432+
-- execute effectful computation until result meets provided condition
433+
untilM cond m = fix $ \loop -> m >>= \a -> ifM (cond a) (pure a) loop
434+
-- duplicate the provided channel and construct wairFor function binding both channels
435+
-- accumulate effecful computation results into a list for specified time
436+
takeUntilTimeout t' = fmap reverse . accumulateUntilTimeout t' (flip (:)) []
437+
obsDiagMessage (HasqlPoolObs o) = show o
438+
obsDiagMessage o@(DBListenStart host port name channel) = constrName o <> show (host, port, name, channel)
439+
obsDiagMessage o = constrName o
440+
decisecond = 100000
441+
442+
data SpecState = SpecState {
443+
specAppState :: AppState.AppState,
444+
specMetrics :: Metrics.MetricsState,
445+
specObsChan :: ObsChan
446+
}
447+
448+
-- helpers used to produce observation diagnostics in waitForObs
449+
constrName :: (HasConstructor (Rep a), Generic a)=> a -> Text
450+
constrName = genericConstrName . from
451+
452+
class HasConstructor f where
453+
genericConstrName :: f x -> Text
454+
455+
instance HasConstructor f => HasConstructor (D1 c f) where
456+
genericConstrName (M1 x) = genericConstrName x
457+
458+
instance (HasConstructor x, HasConstructor y) => HasConstructor (x :+: y) where
459+
genericConstrName (L1 l) = genericConstrName l
460+
genericConstrName (R1 r) = genericConstrName r
461+
462+
instance Constructor c => HasConstructor (C1 c f) where
463+
genericConstrName = T.pack . conName

0 commit comments

Comments
 (0)