Skip to content

Commit 202f84e

Browse files
committed
fix: implement connection tracking in metrics
Right now metrics observation handler does not track database connections but updates a single Gauge based on HasqlPoolObs events. This is problematic because Hasql pool reports various connection events in multiple phases. The connection state machine is not simple and to precisely report the number of connections in various states, it is necessary to track their lifecycles. This change adds a ConnTrack data structure and logic to track database connections lifecycles. At the moment it supports "connected" and "inUse" connection counts precisely. The "pgrst_db_pool_available" metric is implemented on top of ConnTrack instead of a simple Gauge.
1 parent 187ea6d commit 202f84e

4 files changed

Lines changed: 99 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ All notable changes to this project will be documented in this file. From versio
5252
### Fixed
5353

5454
- Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645
55+
- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622
5556

5657
## [14.9] - 2026-04-10
5758

postgrest.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ library
163163
, stm-hamt >= 1.2 && < 2
164164
, focus >= 1.0 && < 2
165165
, some >= 1.0.4.1 && < 2
166+
, uuid >= 1.3 && < 2
166167
-- -fno-spec-constr may help keep compile time memory use in check,
167168
-- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304
168169
-- -optP-Wno-nonportable-include-path

src/PostgREST/Metrics.hs

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs.
55
-}
66
module PostgREST.Metrics
77
( init
8+
, ConnTrack
9+
, ConnStats (..)
810
, MetricsState (..)
11+
, connectionCounts
912
, observationMetrics
1013
, metricsToText
1114
) where
@@ -19,13 +22,18 @@ import qualified Prometheus.Metric.GHC as PMG
1922

2023
import PostgREST.Observation
2124

22-
23-
import Protolude
25+
import Control.Arrow ((&&&))
26+
import Data.Bitraversable (bisequenceA)
27+
import Data.Tuple.Extra (both)
28+
import Data.UUID (UUID)
29+
import qualified Focus
30+
import Protolude
31+
import qualified StmHamt.SizedHamt as SH
2432

2533
data MetricsState =
2634
MetricsState {
2735
poolTimeouts :: Counter,
28-
poolAvailable :: Gauge,
36+
connTrack :: ConnTrack,
2937
poolWaiting :: Gauge,
3038
poolMaxSize :: Gauge,
3139
schemaCacheLoads :: Vector Label1 Counter,
@@ -40,7 +48,7 @@ init configDbPoolSize = do
4048
whenM getRTSStatsEnabled $ void $ register PMG.ghcMetrics
4149
metricState <- MetricsState <$>
4250
register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*>
43-
register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*>
51+
register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*>
4452
register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*>
4553
register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*>
4654
register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*>
@@ -50,20 +58,28 @@ init configDbPoolSize = do
5058
register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions"))
5159
setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize)
5260
pure metricState
61+
where
62+
dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts
63+
where
64+
calcAvailable = liftA2 (-) connected inUse
65+
toSample name labels = Sample name labels . encodeUtf8 . show
66+
noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty
5367

5468
-- Only some observations are used as metrics
5569
observationMetrics :: MetricsState -> ObservationHandler
5670
observationMetrics MetricsState{..} obs = case obs of
5771
PoolAcqTimeoutObs -> do
5872
incCounter poolTimeouts
59-
(HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of
60-
SQL.ReadyForUseConnectionStatus _ -> do
61-
incGauge poolAvailable
62-
SQL.InUseConnectionStatus -> do
63-
decGauge poolAvailable
64-
SQL.TerminatedConnectionStatus _ -> do
65-
decGauge poolAvailable
66-
SQL.ConnectingConnectionStatus -> pure ()
73+
-- Handle pool observations with connection tracking
74+
-- this is necessary because it is not possible
75+
-- to accurately maintain open/in use conneciton counts
76+
-- statelessly based only on pool observation events.
77+
-- The reason is that hasql-pool emits TerminatedConnectionStatus
78+
-- both for connections successfully established and failed when connecting.
79+
-- When receiving TerminatedConnectionStatus we have to find out
80+
-- if we can decrement established connection count. To do that we have to track
81+
-- established connections.
82+
(HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs
6783
PoolRequest ->
6884
incGauge poolWaiting
6985
PoolRequestFullfilled ->
@@ -81,3 +97,28 @@ observationMetrics MetricsState{..} obs = case obs of
8197

8298
metricsToText :: IO LBS.ByteString
8399
metricsToText = exportMetricsAsText
100+
101+
data ConnStats = ConnStats {
102+
connected :: Int,
103+
inUse :: Int
104+
} deriving (Eq, Show)
105+
106+
data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID }
107+
108+
connectionTracker :: IO ConnTrack
109+
connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO
110+
111+
trackConnections :: ConnTrack -> SQL.Observation -> IO ()
112+
trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of
113+
SQL.ReadyForUseConnectionStatus _ -> atomically $
114+
SH.insert identity uuid connTrackConnected *>
115+
SH.focus Focus.delete identity uuid connTrackInUse
116+
SQL.TerminatedConnectionStatus _ -> atomically $
117+
SH.focus Focus.delete identity uuid connTrackConnected *>
118+
SH.focus Focus.delete identity uuid connTrackInUse
119+
SQL.InUseConnectionStatus -> atomically $
120+
SH.insert identity uuid connTrackInUse
121+
_ -> mempty
122+
123+
connectionCounts :: ConnTrack -> IO ConnStats
124+
connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse)

test/observability/Observation/MetricsSpec.hs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,21 @@
66

77
module Observation.MetricsSpec where
88

9-
import Data.List (lookup)
10-
import Network.Wai (Application)
9+
import Data.List (lookup)
10+
import qualified Hasql.Pool.Observation as SQL
11+
import Network.Wai (Application)
1112
import ObsHelper
12-
import qualified PostgREST.AppState as AppState
13-
import PostgREST.Config (AppConfig (configDbSchemas))
14-
import qualified PostgREST.Metrics as Metrics
13+
import qualified PostgREST.AppState as AppState
14+
import PostgREST.Config (AppConfig (configDbSchemas))
15+
import PostgREST.Metrics (ConnStats (..),
16+
MetricsState (..),
17+
connectionCounts)
1518
import PostgREST.Observation
16-
import Prometheus (getCounter, getVectorWith)
17-
import Protolude
18-
import Test.Hspec (SpecWith, describe, it)
19-
import Test.Hspec.Wai (getState)
19+
import Prometheus (getCounter, getVectorWith)
20+
import Test.Hspec (SpecWith, describe, it)
21+
import Test.Hspec.Wai (getState)
22+
23+
import Protolude
2024

2125
spec :: SpecWith (SpecState, Application)
2226
spec = describe "Server started with metrics enabled" $ do
@@ -71,9 +75,40 @@ spec = describe "Server started with metrics enabled" $ do
7175
-- (there should be none but we need to verify that)
7276
threadDelay $ 1 * sec
7377

78+
-- The test verifies we properly count in use connections
79+
-- The idea is to fork a worker thread that
80+
-- borrows connection from the pool and waits for a signal to release it
81+
-- Main thread checks that
82+
-- in use connections counter is incremented by worker
83+
-- then it signals the worker to release the connection
84+
-- and finally verifies that in use connection counter is back to original value
85+
it "Should track in use connections" $ do
86+
SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState
87+
let waitFor = waitForObs specObsChan
88+
89+
liftIO $ checkState' metrics [
90+
-- we expect in use connections to be the same once finished
91+
inUseConnections (+ 0)
92+
] $ do
93+
signal <- newEmptyMVar
94+
-- make sure waiting thread is signaled
95+
(`finally` tryPutMVar signal ()) $
96+
-- expecting one more connection in use
97+
checkState' metrics [
98+
inUseConnections (+ 1)
99+
] $ do
100+
-- start a thread hanging on a single connection until signaled
101+
void $ forkIO $ void $ AppState.usePool appState $ liftIO (readMVar signal)
102+
-- main thread waits for ConnectionObservation with InUseConnectionStatus
103+
-- after which used connections count should be incremented
104+
waitFor (1 * sec) "InUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.InUseConnectionStatus)) <- pure x]
105+
106+
-- hanging thread was signaled and should return the connection
107+
waitFor (1 * sec) "ReadyForUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ (SQL.ReadyForUseConnectionStatus _))) <- pure x]
74108

75109
where
76110
-- prometheus-client api to handle vectors is convoluted
77111
schemaCacheLoads label = expectField @"schemaCacheLoads" $
78112
fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter)
113+
inUseConnections = expectField @"connTrack" ((inUse <$>) . connectionCounts)
79114
sec = 1000000

0 commit comments

Comments
 (0)