Skip to content

Commit 9cc8e04

Browse files
committed
refactor(test): provide means to validate metrics and observations
Some helpers are provided for introspecting metrics already (used in JWT cache tests). This change provides facilities to additionally validate emited Observation events. A new Spec module is also implemented, adding basic tests of schema cache reloading - their main goal is to excercise the new infrastructure.
1 parent 78f231c commit 9cc8e04

5 files changed

Lines changed: 129 additions & 1 deletion

File tree

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ test-suite spec
219219
Feature.ConcurrentSpec
220220
Feature.CorsSpec
221221
Feature.ExtraSearchPathSpec
222+
Feature.MetricsSpec
222223
Feature.NoSuperuserSpec
223224
Feature.ObservabilitySpec
224225
Feature.OpenApi.DisabledOpenApiSpec

src/PostgREST/AppState.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module PostgREST.AppState
1818
, init
1919
, initSockets
2020
, initWithPool
21+
, putConfig -- For tests TODO refactoring
2122
, putNextListenerDelay
2223
, putSchemaCache
2324
, putPgVersion

test/spec/Feature/MetricsSpec.hs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE FlexibleContexts #-}
3+
{-# LANGUAGE ImpredicativeTypes #-}
4+
{-# LANGUAGE MonadComprehensions #-}
5+
{-# LANGUAGE TypeApplications #-}
6+
7+
module Feature.MetricsSpec where
8+
9+
import Data.List (lookup)
10+
import Network.Wai (Application)
11+
import qualified PostgREST.AppState as AppState
12+
import PostgREST.Config (AppConfig (configDbSchemas))
13+
import qualified PostgREST.Metrics as Metrics
14+
import PostgREST.Observation
15+
import Prometheus (getCounter, getVectorWith)
16+
import Protolude
17+
import SpecHelper
18+
import Test.Hspec (SpecWith, describe, it)
19+
import Test.Hspec.Wai (getState)
20+
21+
spec :: SpecWith ((AppState.AppState, Metrics.MetricsState, ObsChan), Application)
22+
spec = describe "Server started with metrics enabled" $ do
23+
it "Should update pgrst_schema_cache_loads_total[SUCCESS]" $ do
24+
(appState, metrics, obsChan) <- getState
25+
let waitFor = waitForObs obsChan
26+
27+
liftIO $ checkState' metrics [
28+
schemaCacheLoads "SUCCESS" (+1)
29+
] $ do
30+
AppState.schemaCacheLoader appState
31+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- Just x]
32+
33+
it "Should update pgrst_schema_cache_loads_total[ERROR]" $ do
34+
(appState, metrics, obsChan) <- getState
35+
let waitFor = waitForObs obsChan
36+
37+
liftIO $ checkState' metrics [
38+
schemaCacheLoads "FAIL" (+1),
39+
schemaCacheLoads "SUCCESS" (+1)
40+
] $ do
41+
AppState.getConfig appState >>= \prev -> do
42+
AppState.putConfig appState $ prev { configDbSchemas = pure "bad_schema" }
43+
AppState.schemaCacheLoader appState
44+
waitFor (1 * sec) "SchemaCacheErrorObs" $ \x -> [ o | o@(SchemaCacheErrorObs{}) <- Just x]
45+
AppState.putConfig appState prev
46+
47+
-- wait up to 2 secs so that retry can happen
48+
waitFor (2 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- Just x]
49+
50+
where
51+
-- prometheus-client api to handle vectors is convoluted
52+
schemaCacheLoads label = expectField @"schemaCacheLoads" $
53+
fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter)
54+
sec = 1000000

test/spec/Main.hs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import qualified Feature.Auth.NoJwtSecretSpec
2929
import qualified Feature.ConcurrentSpec
3030
import qualified Feature.CorsSpec
3131
import qualified Feature.ExtraSearchPathSpec
32+
import qualified Feature.MetricsSpec
3233
import qualified Feature.NoSuperuserSpec
3334
import qualified Feature.ObservabilitySpec
3435
import qualified Feature.OpenApi.DisabledOpenApiSpec
@@ -68,16 +69,26 @@ import qualified Feature.Query.UpdateSpec
6869
import qualified Feature.Query.UpsertSpec
6970
import qualified Feature.RollbackSpec
7071
import qualified Feature.RpcPreRequestGucsSpec
72+
import PostgREST.Observation (Observation (HasqlPoolObs))
7173

7274

7375
main :: IO ()
7476
main = do
77+
poolChan <- newChan
78+
-- make sure poolChan is not growing indefinitely
79+
-- start a thread that drains the channel
80+
-- this is necessary because test cases operate on
81+
-- copies so poolChan is never read from
82+
void $ forkIO $ forever $ readChan poolChan
83+
metricsState <- Metrics.init (configDbPoolSize testCfg)
7584
pool <- P.acquire $ P.settings
7685
[ P.size 3
7786
, P.acquisitionTimeout 10
7887
, P.agingTimeout 60
7988
, P.idlenessTimeout 60
8089
, P.staticConnectionSettings (toUtf8 $ configDbUri testCfg)
90+
-- make sure metrics are updated and pool observations published to poolChan
91+
, P.observationHandler $ (writeChan poolChan <> Metrics.observationMetrics metricsState) . HasqlPoolObs
8192
]
8293

8394
actualPgVersion <- either (panic . show) id <$> P.use pool (queryPgVersion False)
@@ -86,7 +97,6 @@ main = do
8697
baseSchemaCache <- loadSCache pool testCfg
8798
sockets <- AppState.initSockets testCfg
8899
loggerState <- Logger.init
89-
metricsState <- Metrics.init (configDbPoolSize testCfg)
90100

91101
let
92102
initApp sCache st config = do
@@ -95,6 +105,15 @@ main = do
95105
AppState.putSchemaCache appState (Just sCache)
96106
return (st, postgrest (configLogLevel config) appState (pure ()))
97107

108+
initObservationsApp sCache config = do
109+
-- duplicate poolChan as a starting point
110+
obsChan <- dupChan poolChan
111+
stateObsChan <- newObsChan obsChan
112+
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState <> writeChan obsChan)
113+
AppState.putPgVersion appState actualPgVersion
114+
AppState.putSchemaCache appState (Just sCache)
115+
return ((appState, metricsState, stateObsChan), postgrest (configLogLevel config) appState (pure ()))
116+
98117
-- For tests that run with the same schema cache
99118
app = initApp baseSchemaCache ()
100119

@@ -123,6 +142,7 @@ main = do
123142
obsApp = app testObservabilityCfg
124143
serverTiming = app testCfgServerTiming
125144
aggregatesEnabled = app testCfgAggregatesEnabled
145+
observationsApp = initObservationsApp baseSchemaCache testCfg
126146

127147
extraSearchPathApp = appDbs testCfgExtraSearchPath
128148
unicodeApp = appDbs testUnicodeCfg
@@ -278,6 +298,9 @@ main = do
278298
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
279299
describe "Feature.Auth.JwtCacheSpec" Feature.Auth.JwtCacheSpec.spec
280300

301+
before observationsApp $
302+
describe "Feature.MetricsSpec" Feature.MetricsSpec.spec
303+
281304
where
282305
loadSCache pool conf =
283306
either (panic.show) id <$> P.use pool (HT.transaction HT.ReadCommitted HT.Read $ querySchemaCache conf)

test/spec/SpecHelper.hs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE ExistentialQuantification #-}
34
{-# LANGUAGE FlexibleContexts #-}
45
{-# LANGUAGE RankNTypes #-}
@@ -41,10 +42,13 @@ import PostgREST.Config (AppConfig (..),
4142
LogLevel (..),
4243
OpenAPIMode (..),
4344
Verbosity (..), parseSecret)
45+
import PostgREST.Observation (Observation,
46+
observationMessage)
4447
import PostgREST.SchemaCache.Identifiers (QualifiedIdentifier (..))
4548
import Prometheus (Counter, getCounter)
4649
import Protolude hiding (get, toS)
4750
import Protolude.Conv (toS)
51+
import System.Timeout (timeout)
4852
import Test.Hspec.Expectations.Contrib (annotate)
4953

5054
filterAndMatchCT :: BS.ByteString -> MatchHeader
@@ -381,3 +385,48 @@ expectCounter :: forall s st m. (KnownSymbol s, HasField s st Counter, MonadIO m
381385
expectCounter = expectField @s intCounter
382386
where
383387
intCounter = ((round @Double @Int) <$>) . getCounter
388+
389+
data TimeoutException = TimeoutException deriving (Show, Exception)
390+
391+
accumulateUntilTimeout :: Int -> (s -> a -> s) -> s -> IO a -> IO s
392+
accumulateUntilTimeout t f start act = do
393+
tid <- myThreadId
394+
-- mask to make sure TimeoutException is not thrown before starting the loop
395+
mask $ \unmask -> do
396+
-- start timeout thread unmasking exceptions
397+
ttid <- forkIOWithUnmask ($ (threadDelay t *> throwTo tid TimeoutException))
398+
-- unmask effect
399+
unmask (fix (\loop accum -> (act >>= loop . f accum) `onTimeout` pure accum) start)
400+
-- make sure we catch timeout if happens before entering the loop
401+
`onTimeout` pure start
402+
-- make sure timer thread is killed on other exceptions
403+
-- so that it won't throw TimeoutException later
404+
`onException` killThread ttid
405+
where
406+
onTimeout m a = m `catch` \TimeoutException -> a
407+
408+
data ObsChan = ObsChan (Chan Observation) (Chan Observation)
409+
410+
newObsChan :: Chan Observation -> IO ObsChan
411+
newObsChan = fmap <$> ObsChan <*> dupChan
412+
413+
-- read messages from copy chan and once condition is met drain original to the same point
414+
-- upon timeout report error and messages remaining in the original chan
415+
-- that way we report messages since last successful read
416+
waitForObs :: (HasCallStack, Foldable f) => ObsChan -> Int -> String -> (Observation -> f a) -> IO ()
417+
waitForObs (ObsChan orig copy) t msg f =
418+
timeout t (readUntil copy *> readUntil orig) >>= maybe failTimeout mempty
419+
where
420+
failTimeout =
421+
takeUntilTimeout decisecond (readChan orig)
422+
>>= expectationFailure
423+
. ("Timeout waiting for " <> msg <> " at " <> loc <> ". Remaining observations:\n" ++)
424+
. foldMap ((++ "\n") . show . observationMessage)
425+
readUntil = void . untilM (pure . not . null . f) . readChan
426+
loc = fromMaybe "(unknown)" . head $ (prettySrcLoc . snd <$> getCallStack callStack)
427+
-- execute effectful computation until result meets provided condition
428+
untilM cond m = fix $ \loop -> m >>= \a -> ifM (cond a) (pure a) loop
429+
-- duplicate the provided channel and construct wairFor function binding both channels
430+
-- accumulate effecful computation results into a list for specified time
431+
takeUntilTimeout t' = fmap reverse . accumulateUntilTimeout t' (flip (:)) []
432+
decisecond = 100000

0 commit comments

Comments
 (0)