Skip to content

Commit c3aa110

Browse files
committed
fix: implement connection tracking in metrics
Right now metrics observation handler does not track database connections but updates a single Gauge based on HasqlPoolObs events. This is problematic because Hasql pool reports various connection events in multiple phases. The connection state machine is not simple and to precisely report the number of connections in various states, it is necessary to track their lifecycles. This change adds a ConnTrack data structure and logic to track database connections lifecycles. At the moment it supports "connected" and "inUse" connection counts precisely. The "pgrst_db_pool_available" metric is implemented on top of ConnTrack instead of a simple Gauge.
1 parent 3b1373e commit c3aa110

4 files changed

Lines changed: 81 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ All notable changes to this project will be documented in this file. From versio
3333
### Fixed
3434

3535
- Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645
36+
- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622
3637

3738
## [14.9] - 2026-04-10
3839

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ library
161161
, stm-hamt >= 1.2 && < 2
162162
, focus >= 1.0 && < 2
163163
, some >= 1.0.4.1 && < 2
164+
, uuid >= 1.3 && < 2
164165
-- -fno-spec-constr may help keep compile time memory use in check,
165166
-- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304
166167
-- -optP-Wno-nonportable-include-path

src/PostgREST/Metrics.hs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs.
55
-}
66
module PostgREST.Metrics
77
( init
8+
, ConnTrack
9+
, ConnStats (..)
810
, MetricsState (..)
11+
, connectionCounts
912
, observationMetrics
1013
, metricsToText
1114
) where
@@ -17,12 +20,18 @@ import Prometheus
1720

1821
import PostgREST.Observation
1922

20-
import Protolude
23+
import Control.Arrow ((&&&))
24+
import Data.Bitraversable (bisequenceA)
25+
import Data.Tuple.Extra (both)
26+
import Data.UUID (UUID)
27+
import qualified Focus
28+
import Protolude
29+
import qualified StmHamt.SizedHamt as SH
2130

2231
data MetricsState =
2332
MetricsState {
2433
poolTimeouts :: Counter,
25-
poolAvailable :: Gauge,
34+
connTrack :: ConnTrack,
2635
poolWaiting :: Gauge,
2736
poolMaxSize :: Gauge,
2837
schemaCacheLoads :: Vector Label1 Counter,
@@ -36,7 +45,7 @@ init :: Int -> IO MetricsState
3645
init configDbPoolSize = do
3746
metricState <- MetricsState <$>
3847
register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*>
39-
register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*>
48+
register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*>
4049
register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*>
4150
register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*>
4251
register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*>
@@ -46,20 +55,19 @@ init configDbPoolSize = do
4655
register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions"))
4756
setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize)
4857
pure metricState
58+
where
59+
dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts
60+
where
61+
calcAvailable = (configDbPoolSize -) . inUse
62+
toSample name labels = Sample name labels . encodeUtf8 . show
63+
noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty
4964

5065
-- Only some observations are used as metrics
5166
observationMetrics :: MetricsState -> ObservationHandler
5267
observationMetrics MetricsState{..} obs = case obs of
5368
PoolAcqTimeoutObs -> do
5469
incCounter poolTimeouts
55-
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
56-
SQL.ReadyForUseConnectionStatus -> do
57-
incGauge poolAvailable
58-
SQL.InUseConnectionStatus -> do
59-
decGauge poolAvailable
60-
SQL.TerminatedConnectionStatus _ -> do
61-
decGauge poolAvailable
62-
SQL.ConnectingConnectionStatus -> pure ()
70+
(HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs
6371
PoolRequest ->
6472
incGauge poolWaiting
6573
PoolRequestFullfilled ->
@@ -77,3 +85,28 @@ observationMetrics MetricsState{..} obs = case obs of
7785

7886
metricsToText :: IO LBS.ByteString
7987
metricsToText = exportMetricsAsText
88+
89+
data ConnStats = ConnStats {
90+
connected :: Int,
91+
inUse :: Int
92+
} deriving (Eq, Show)
93+
94+
data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID }
95+
96+
connectionTracker :: IO ConnTrack
97+
connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO
98+
99+
trackConnections :: ConnTrack -> SQL.Observation -> IO ()
100+
trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of
101+
SQL.ReadyForUseConnectionStatus -> atomically $
102+
SH.insert identity uuid connTrackConnected *>
103+
SH.focus Focus.delete identity uuid connTrackInUse
104+
SQL.TerminatedConnectionStatus _ -> atomically $
105+
SH.focus Focus.delete identity uuid connTrackConnected *>
106+
SH.focus Focus.delete identity uuid connTrackInUse
107+
SQL.InUseConnectionStatus -> atomically $
108+
SH.insert identity uuid connTrackInUse
109+
_ -> mempty
110+
111+
connectionCounts :: ConnTrack -> IO ConnStats
112+
connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse)

test/observability/Observation/MetricsSpec.hs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,20 @@
66

77
module Observation.MetricsSpec where
88

9-
import Data.List (lookup)
10-
import Network.Wai (Application)
9+
import Data.List (lookup)
10+
import qualified Hasql.Pool.Observation as SQL
11+
import Network.Wai (Application)
1112
import ObsHelper
12-
import qualified PostgREST.AppState as AppState
13-
import PostgREST.Config (AppConfig (configDbSchemas))
14-
import qualified PostgREST.Metrics as Metrics
13+
import qualified PostgREST.AppState as AppState
14+
import PostgREST.Config (AppConfig (configDbSchemas))
15+
import PostgREST.Metrics (ConnStats (..),
16+
MetricsState (..),
17+
connectionCounts)
1518
import PostgREST.Observation
16-
import Prometheus (getCounter, getVectorWith)
19+
import Prometheus (getCounter, getVectorWith)
1720
import Protolude
18-
import Test.Hspec (SpecWith, describe, it)
19-
import Test.Hspec.Wai (getState)
21+
import Test.Hspec (SpecWith, describe, it)
22+
import Test.Hspec.Wai (getState)
2023

2124
spec :: SpecWith (SpecState, Application)
2225
spec = describe "Server started with metrics enabled" $ do
@@ -71,9 +74,33 @@ spec = describe "Server started with metrics enabled" $ do
7174
-- (there should be none but we need to verify that)
7275
threadDelay $ 1 * sec
7376

77+
it "Should track in use connections" $ do
78+
SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState
79+
let waitFor = waitForObs specObsChan
80+
81+
liftIO $ checkState' metrics [
82+
-- we expect in use connections to be the same once finished
83+
inUseConnections (+ 0)
84+
] $ do
85+
signal <- newEmptyMVar
86+
-- make sure waiting thread is signaled
87+
bracket_ (pure ()) (putMVar signal ()) $
88+
-- expecting one more connection in use
89+
checkState' metrics [
90+
inUseConnections (+ 1)
91+
] $ do
92+
-- start a thread hanging on a single connection until signaled
93+
void $ forkIO $ void $ AppState.usePool appState $ liftIO (readMVar signal)
94+
-- main thread waits for ConnectionObservation with InUseConnectionStatus
95+
-- after which used connections count should be incremented
96+
waitFor (1 * sec) "InUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.InUseConnectionStatus)) <- pure x]
97+
98+
-- hanging thread was signaled and should return the connection
99+
waitFor (1 * sec) "ReadyForUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.ReadyForUseConnectionStatus)) <- pure x]
74100

75101
where
76102
-- prometheus-client api to handle vectors is convoluted
77103
schemaCacheLoads label = expectField @"schemaCacheLoads" $
78104
fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter)
105+
inUseConnections = expectField @"connTrack" ((inUse <$>) . connectionCounts)
79106
sec = 1000000

0 commit comments

Comments
 (0)