Skip to content

Commit ab5ae74

Browse files
committed
refactor(test): provide means to validate metrics and observations
Some helpers are provided for introspecting metrics already (used in JWT cache tests). This change provides facilities to additionally validate emited Observation events. A new Spec module is also implemented, adding basic tests of schema cache reloading - their main goal is to excercise the new infrastructure.
1 parent 2edc44c commit ab5ae74

5 files changed

Lines changed: 120 additions & 1 deletion

File tree

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ test-suite spec
219219
Feature.ConcurrentSpec
220220
Feature.CorsSpec
221221
Feature.ExtraSearchPathSpec
222+
Feature.MetricsSpec
222223
Feature.NoSuperuserSpec
223224
Feature.ObservabilitySpec
224225
Feature.OpenApi.DisabledOpenApiSpec

src/PostgREST/AppState.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module PostgREST.AppState
1818
, init
1919
, initSockets
2020
, initWithPool
21+
, putConfig -- For tests TODO refactoring
2122
, putNextListenerDelay
2223
, putSchemaCache
2324
, putPgVersion

test/spec/Feature/MetricsSpec.hs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE FlexibleContexts #-}
3+
{-# LANGUAGE MonadComprehensions #-}
4+
{-# LANGUAGE TypeApplications #-}
5+
6+
module Feature.MetricsSpec where
7+
8+
import Network.Wai (Application)
9+
import qualified PostgREST.AppState as AppState
10+
import PostgREST.Config (AppConfig (configDbSchemas))
11+
import qualified PostgREST.Metrics as Metrics
12+
import PostgREST.Observation
13+
import Prometheus (getCounter, getVectorWith)
14+
import Protolude
15+
import SpecHelper
16+
import Test.Hspec (SpecWith, describe, it)
17+
18+
spec :: SpecWith (((AppState.AppState, Metrics.MetricsState), Chan Observation), Application)
19+
spec = describe "Server started with metrics enabled" $ do
20+
it "Should update pgrst_schema_cache_loads_total[SUCCESS]" $ do
21+
((appState, metrics), waitFor) <- prepareState
22+
23+
liftIO $ checkState' metrics [
24+
schemaCacheLoads "SUCCESS" (+1)
25+
] $ do
26+
AppState.schemaCacheLoader appState
27+
waitFor (1 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- pure x]
28+
29+
it "Should update pgrst_schema_cache_loads_total[ERROR]" $ do
30+
((appState, metrics), waitFor) <- prepareState
31+
32+
liftIO $ checkState' metrics [
33+
schemaCacheLoads "FAIL" (+1),
34+
schemaCacheLoads "SUCCESS" (+1)
35+
] $ do
36+
AppState.getConfig appState >>= \prev -> do
37+
AppState.putConfig appState $ prev { configDbSchemas = pure "bad_schema" }
38+
AppState.schemaCacheLoader appState
39+
waitFor (1 * sec) "SchemaCacheErrorObs" $ \x -> [ o | o@(SchemaCacheErrorObs{}) <- pure x]
40+
AppState.putConfig appState prev
41+
42+
-- wait up to 2 secs so that retry can happen
43+
waitFor (2 * sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@(SchemaCacheLoadedObs{}) <- pure x]
44+
45+
where
46+
-- prometheus-client api to handle vectors is convoluted
47+
schemaCacheLoads label = expectField @"schemaCacheLoads" $
48+
(foldMap (Sum . round @Double @Int . snd) . find ((== label) . fst) <$>) . (`getVectorWith` getCounter)
49+
sec = 1000000

test/spec/Main.hs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import qualified Feature.Auth.NoJwtSecretSpec
2929
import qualified Feature.ConcurrentSpec
3030
import qualified Feature.CorsSpec
3131
import qualified Feature.ExtraSearchPathSpec
32+
import qualified Feature.MetricsSpec
3233
import qualified Feature.NoSuperuserSpec
3334
import qualified Feature.ObservabilitySpec
3435
import qualified Feature.OpenApi.DisabledOpenApiSpec
@@ -68,16 +69,23 @@ import qualified Feature.Query.UpdateSpec
6869
import qualified Feature.Query.UpsertSpec
6970
import qualified Feature.RollbackSpec
7071
import qualified Feature.RpcPreRequestGucsSpec
72+
import PostgREST.Observation (Observation (HasqlPoolObs))
7173

7274

7375
main :: IO ()
7476
main = do
77+
poolChan <- newChan
78+
-- make sure poolChan is not growing indefinitely
79+
void $ forkIO $ fix (readChan poolChan *>)
80+
metricsState <- Metrics.init (configDbPoolSize testCfg)
7581
pool <- P.acquire $ P.settings
7682
[ P.size 3
7783
, P.acquisitionTimeout 10
7884
, P.agingTimeout 60
7985
, P.idlenessTimeout 60
8086
, P.staticConnectionSettings (toUtf8 $ configDbUri testCfg)
87+
-- make sure metrics are updated and pool observations published to poolChan
88+
, P.observationHandler $ (writeChan poolChan <> Metrics.observationMetrics metricsState) . HasqlPoolObs
8189
]
8290

8391
actualPgVersion <- either (panic . show) id <$> P.use pool (queryPgVersion False)
@@ -86,7 +94,6 @@ main = do
8694
baseSchemaCache <- loadSCache pool testCfg
8795
sockets <- AppState.initSockets testCfg
8896
loggerState <- Logger.init
89-
metricsState <- Metrics.init (configDbPoolSize testCfg)
9097

9198
let
9299
initApp sCache st config = do
@@ -95,6 +102,14 @@ main = do
95102
AppState.putSchemaCache appState (Just sCache)
96103
return (st, postgrest (configLogLevel config) appState (pure ()))
97104

105+
initObservationsApp sCache config = do
106+
-- duplicate poolChan as a starting point
107+
obsChan <- dupChan poolChan
108+
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState <> writeChan obsChan)
109+
AppState.putPgVersion appState actualPgVersion
110+
AppState.putSchemaCache appState (Just sCache)
111+
return (((appState, metricsState), obsChan), postgrest (configLogLevel config) appState (pure ()))
112+
98113
-- For tests that run with the same schema cache
99114
app = initApp baseSchemaCache ()
100115

@@ -123,6 +138,7 @@ main = do
123138
obsApp = app testObservabilityCfg
124139
serverTiming = app testCfgServerTiming
125140
aggregatesEnabled = app testCfgAggregatesEnabled
141+
observationsApp = initObservationsApp baseSchemaCache testCfg
126142

127143
extraSearchPathApp = appDbs testCfgExtraSearchPath
128144
unicodeApp = appDbs testUnicodeCfg
@@ -278,6 +294,12 @@ main = do
278294
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
279295
describe "Feature.Auth.JwtCacheSpec" Feature.Auth.JwtCacheSpec.spec
280296

297+
before (initApp baseSchemaCache metricsState testCfgJwtCache) $
298+
describe "Feature.Auth.JwtCacheSpec" Feature.Auth.JwtCacheSpec.spec
299+
300+
before observationsApp $
301+
describe "Feature.MetricsSpec" Feature.MetricsSpec.spec
302+
281303
where
282304
loadSCache pool conf =
283305
either (panic.show) id <$> P.use pool (HT.transaction HT.ReadCommitted HT.Read $ querySchemaCache conf)

test/spec/SpecHelper.hs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DeriveAnyClass #-}
23
{-# LANGUAGE ExistentialQuantification #-}
34
{-# LANGUAGE FlexibleContexts #-}
45
{-# LANGUAGE RankNTypes #-}
@@ -41,10 +42,13 @@ import PostgREST.Config (AppConfig (..),
4142
LogLevel (..),
4243
OpenAPIMode (..),
4344
parseSecret)
45+
import PostgREST.Observation (Observation,
46+
observationMessage)
4447
import PostgREST.SchemaCache.Identifiers (QualifiedIdentifier (..))
4548
import Prometheus (Counter, getCounter)
4649
import Protolude hiding (get, toS)
4750
import Protolude.Conv (toS)
51+
import System.Timeout (timeout)
4852
import Test.Hspec.Expectations.Contrib (annotate)
4953

5054
filterAndMatchCT :: BS.ByteString -> MatchHeader
@@ -380,3 +384,45 @@ expectCounter :: forall s st m. (KnownSymbol s, HasField s st Counter, MonadIO m
380384
expectCounter = expectField @s intCounter
381385
where
382386
intCounter = ((round @Double @Int) <$>) . getCounter
387+
388+
data TimeoutException = TimeoutException deriving (Show, Exception)
389+
390+
accumulateUntilTimeout :: Int -> (s -> a -> s) -> s -> IO a -> IO s
391+
accumulateUntilTimeout t f start act = do
392+
tid <- myThreadId
393+
-- mask to make sure TimeoutException is not thrown before starting the loop
394+
mask $ \unmask -> do
395+
-- start timeout thread unmasking exceptions
396+
ttid <- forkIOWithUnmask ($ (threadDelay t *> throwTo tid TimeoutException))
397+
-- unmask effect
398+
unmask $ fix (\loop accum -> (act >>= loop . f accum) `onTimeout` pure accum) start
399+
-- make sure we catch timeout if happens bifore entering the loop
400+
`onTimeout` pure start
401+
-- make sure timer thread is killed on other exceptions
402+
-- so that it won't throw TimeoutException later
403+
`onException` killThread ttid
404+
where
405+
onTimeout m a = m `catch` \TimeoutException -> a
406+
407+
408+
prepareState :: HasCallStack => Traversable f => WaiSession (f (Chan Observation)) (f (Int -> String -> (Observation -> Maybe a) -> IO ()))
409+
prepareState = getState >>= traverse (liftA2 (<$>) waitFor (liftIO . dupChan))
410+
where
411+
-- read messages from copy chan and once condition is met drain original to the same point
412+
-- upon timeout report error and messages remaining in the original chan
413+
-- that way we report messages since last successful read
414+
waitFor orig copy t msg f =
415+
timeout t (readUntil copy *> readUntil orig) >>= maybe failTimeout mempty
416+
where
417+
failTimeout =
418+
takeUntilTimeout 100000 (readChan orig)
419+
>>= expectationFailure
420+
. ("Timeout waiting for " <> msg <> " at " <> loc <> ". Remaining observations:\n" ++)
421+
. foldMap ((++ "\n") . show . observationMessage)
422+
readUntil = void . untilM (pure . isJust . f) . readChan
423+
loc = fold (head (prettySrcLoc . snd <$> getCallStack callStack))
424+
-- execute effectful computation until result meets provided condition
425+
untilM cond m = fix $ \loop -> m >>= \a -> ifM (cond a) (pure a) loop
426+
-- duplicate the provided channel and construct wairFor function binding both channels
427+
-- accumulate effecful computation results into a list for specified time
428+
takeUntilTimeout t = fmap reverse . accumulateUntilTimeout t (flip (:)) []

0 commit comments

Comments
 (0)