From ed790eba3d7fd314595113dc6d8533fc3b1cfae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C5=82eczek?= Date: Wed, 25 Feb 2026 19:36:48 +0100 Subject: [PATCH] fix: implement connection tracking in metrics Right now metrics observation handler does not track database connections but updates a single Gauge based on HasqlPoolObs events. This is problematic because Hasql pool reports various connection events in multiple phases. The connection state machine is not simple and to precisely report the number of connections in various states, it is necessary to track their lifecycles. This change adds a ConnTrack data structure and logic to track database connections lifecycles. At the moment it supports "connected" and "inUse" connection counts precisely. The "pgrst_db_pool_available" metric is implemented on top of ConnTrack instead of a simple Gauge. --- CHANGELOG.md | 1 + postgrest.cabal | 1 + src/PostgREST/Metrics.hs | 65 +++++++++++++++---- test/observability/Observation/MetricsSpec.hs | 46 ++++++++++--- 4 files changed, 92 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 72394c45e0..425e8a2ec6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ All notable changes to this project will be documented in this file. From versio ### Fixed - Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645 +- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622 ## [14.9] - 2026-04-10 diff --git a/postgrest.cabal b/postgrest.cabal index 2f100b3e92..807e104383 100644 --- a/postgrest.cabal +++ b/postgrest.cabal @@ -162,6 +162,7 @@ library , stm-hamt >= 1.2 && < 2 , focus >= 1.0 && < 2 , some >= 1.0.4.1 && < 2 + , uuid >= 1.3 && < 2 -- -fno-spec-constr may help keep compile time memory use in check, -- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304 -- -optP-Wno-nonportable-include-path diff --git a/src/PostgREST/Metrics.hs b/src/PostgREST/Metrics.hs index 75db34b4d6..4a3ad91373 100644 --- a/src/PostgREST/Metrics.hs +++ b/src/PostgREST/Metrics.hs @@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs. -} module PostgREST.Metrics ( init + , ConnTrack + , ConnStats (..) , MetricsState (..) + , connectionCounts , observationMetrics , metricsToText ) where @@ -19,13 +22,18 @@ import qualified Prometheus.Metric.GHC as PMG import PostgREST.Observation - -import Protolude +import Control.Arrow ((&&&)) +import Data.Bitraversable (bisequenceA) +import Data.Tuple.Extra (both) +import Data.UUID (UUID) +import qualified Focus +import Protolude +import qualified StmHamt.SizedHamt as SH data MetricsState = MetricsState { poolTimeouts :: Counter, - poolAvailable :: Gauge, + connTrack :: ConnTrack, poolWaiting :: Gauge, poolMaxSize :: Gauge, schemaCacheLoads :: Vector Label1 Counter, @@ -40,7 +48,7 @@ init configDbPoolSize = do whenM getRTSStatsEnabled $ void $ register PMG.ghcMetrics metricState <- MetricsState <$> register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*> - register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*> + register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*> register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*> register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*> register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*> @@ -50,20 +58,28 @@ init configDbPoolSize = do register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions")) setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize) pure metricState + where + dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts + where + calcAvailable = liftA2 (-) connected inUse + toSample name labels = Sample name labels . encodeUtf8 . show + noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty -- Only some observations are used as metrics observationMetrics :: MetricsState -> ObservationHandler observationMetrics MetricsState{..} obs = case obs of PoolAcqTimeoutObs -> do incCounter poolTimeouts - (HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of - SQL.ReadyForUseConnectionStatus _ -> do - incGauge poolAvailable - SQL.InUseConnectionStatus -> do - decGauge poolAvailable - SQL.TerminatedConnectionStatus _ -> do - decGauge poolAvailable - SQL.ConnectingConnectionStatus -> pure () + -- Handle pool observations with connection tracking + -- this is necessary because it is not possible + -- to accurately maintain open/in use conneciton counts + -- statelessly based only on pool observation events. + -- The reason is that hasql-pool emits TerminatedConnectionStatus + -- both for connections successfully established and failed when connecting. + -- When receiving TerminatedConnectionStatus we have to find out + -- if we can decrement established connection count. To do that we have to track + -- established connections. + (HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs PoolRequest -> incGauge poolWaiting PoolRequestFullfilled -> @@ -81,3 +97,28 @@ observationMetrics MetricsState{..} obs = case obs of metricsToText :: IO LBS.ByteString metricsToText = exportMetricsAsText + +data ConnStats = ConnStats { + connected :: Int, + inUse :: Int +} deriving (Eq, Show) + +data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID } + +connectionTracker :: IO ConnTrack +connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO + +trackConnections :: ConnTrack -> SQL.Observation -> IO () +trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of + SQL.ReadyForUseConnectionStatus _ -> atomically $ + SH.insert identity uuid connTrackConnected *> + SH.focus Focus.delete identity uuid connTrackInUse + SQL.TerminatedConnectionStatus _ -> atomically $ + SH.focus Focus.delete identity uuid connTrackConnected *> + SH.focus Focus.delete identity uuid connTrackInUse + SQL.InUseConnectionStatus -> atomically $ + SH.insert identity uuid connTrackInUse + _ -> mempty + +connectionCounts :: ConnTrack -> IO ConnStats +connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse) diff --git a/test/observability/Observation/MetricsSpec.hs b/test/observability/Observation/MetricsSpec.hs index 524e0c1018..6057756024 100644 --- a/test/observability/Observation/MetricsSpec.hs +++ b/test/observability/Observation/MetricsSpec.hs @@ -6,17 +6,21 @@ module Observation.MetricsSpec where -import Data.List (lookup) -import Network.Wai (Application) +import Data.List (lookup) +import qualified Hasql.Pool.Observation as SQL +import Network.Wai (Application) import ObsHelper -import qualified PostgREST.AppState as AppState -import PostgREST.Config (AppConfig (configDbSchemas)) -import qualified PostgREST.Metrics as Metrics +import qualified PostgREST.AppState as AppState +import PostgREST.Config (AppConfig (configDbSchemas)) +import PostgREST.Metrics (ConnStats (..), + MetricsState (..), + connectionCounts) import PostgREST.Observation -import Prometheus (getCounter, getVectorWith) -import Protolude -import Test.Hspec (SpecWith, describe, it) -import Test.Hspec.Wai (getState) +import Prometheus (getCounter, getVectorWith) +import Test.Hspec (SpecWith, describe, it) +import Test.Hspec.Wai (getState) + +import Protolude spec :: SpecWith (SpecState, Application) spec = describe "Server started with metrics enabled" $ do @@ -71,9 +75,33 @@ spec = describe "Server started with metrics enabled" $ do -- (there should be none but we need to verify that) threadDelay $ 1 * sec + it "Should track in use connections" $ do + SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState + let waitFor = waitForObs specObsChan + + liftIO $ checkState' metrics [ + -- we expect in use connections to be the same once finished + inUseConnections (+ 0) + ] $ do + signal <- newEmptyMVar + -- make sure waiting thread is signaled + bracket_ (pure ()) (putMVar signal ()) $ + -- expecting one more connection in use + checkState' metrics [ + inUseConnections (+ 1) + ] $ do + -- start a thread hanging on a single connection until signaled + void $ forkIO $ void $ AppState.usePool appState $ liftIO (readMVar signal) + -- main thread waits for ConnectionObservation with InUseConnectionStatus + -- after which used connections count should be incremented + waitFor (1 * sec) "InUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.InUseConnectionStatus)) <- pure x] + + -- hanging thread was signaled and should return the connection + waitFor (1 * sec) "ReadyForUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ (SQL.ReadyForUseConnectionStatus _))) <- pure x] where -- prometheus-client api to handle vectors is convoluted schemaCacheLoads label = expectField @"schemaCacheLoads" $ fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter) + inUseConnections = expectField @"connTrack" ((inUse <$>) . connectionCounts) sec = 1000000