diff --git a/CHANGELOG.md b/CHANGELOG.md index 72394c45e0..425e8a2ec6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ All notable changes to this project will be documented in this file. From versio ### Fixed - Fix unnecessary connection pool flushes during schema cache reloading by @mkleczek in #4645 +- Fix race condition in pool_available metric causing negative values during network instability by @mkleczek in #4622 ## [14.9] - 2026-04-10 diff --git a/postgrest.cabal b/postgrest.cabal index 2f100b3e92..807e104383 100644 --- a/postgrest.cabal +++ b/postgrest.cabal @@ -162,6 +162,7 @@ library , stm-hamt >= 1.2 && < 2 , focus >= 1.0 && < 2 , some >= 1.0.4.1 && < 2 + , uuid >= 1.3 && < 2 -- -fno-spec-constr may help keep compile time memory use in check, -- see https://gitlab.haskell.org/ghc/ghc/issues/16017#note_219304 -- -optP-Wno-nonportable-include-path diff --git a/src/PostgREST/Metrics.hs b/src/PostgREST/Metrics.hs index 75db34b4d6..4a3ad91373 100644 --- a/src/PostgREST/Metrics.hs +++ b/src/PostgREST/Metrics.hs @@ -5,7 +5,10 @@ Description : Metrics based on the Observation module. See Observation.hs. -} module PostgREST.Metrics ( init + , ConnTrack + , ConnStats (..) , MetricsState (..) + , connectionCounts , observationMetrics , metricsToText ) where @@ -19,13 +22,18 @@ import qualified Prometheus.Metric.GHC as PMG import PostgREST.Observation - -import Protolude +import Control.Arrow ((&&&)) +import Data.Bitraversable (bisequenceA) +import Data.Tuple.Extra (both) +import Data.UUID (UUID) +import qualified Focus +import Protolude +import qualified StmHamt.SizedHamt as SH data MetricsState = MetricsState { poolTimeouts :: Counter, - poolAvailable :: Gauge, + connTrack :: ConnTrack, poolWaiting :: Gauge, poolMaxSize :: Gauge, schemaCacheLoads :: Vector Label1 Counter, @@ -40,7 +48,7 @@ init configDbPoolSize = do whenM getRTSStatsEnabled $ void $ register PMG.ghcMetrics metricState <- MetricsState <$> register (counter (Info "pgrst_db_pool_timeouts_total" "The total number of pool connection timeouts")) <*> - register (gauge (Info "pgrst_db_pool_available" "Available connections in the pool")) <*> + register (Metric ((identity &&& dbPoolAvailable) <$> connectionTracker)) <*> register (gauge (Info "pgrst_db_pool_waiting" "Requests waiting to acquire a pool connection")) <*> register (gauge (Info "pgrst_db_pool_max" "Max pool connections")) <*> register (vector "status" $ counter (Info "pgrst_schema_cache_loads_total" "The total number of times the schema cache was loaded")) <*> @@ -50,20 +58,28 @@ init configDbPoolSize = do register (counter (Info "pgrst_jwt_cache_evictions_total" "The total number of JWT cache evictions")) setGauge (poolMaxSize metricState) (fromIntegral configDbPoolSize) pure metricState + where + dbPoolAvailable = (pure . noLabelsGroup (Info "pgrst_db_pool_available" "Available connections in the pool") GaugeType . calcAvailable <$>) . connectionCounts + where + calcAvailable = liftA2 (-) connected inUse + toSample name labels = Sample name labels . encodeUtf8 . show + noLabelsGroup info sampleType = SampleGroup info sampleType . pure . toSample (metricName info) mempty -- Only some observations are used as metrics observationMetrics :: MetricsState -> ObservationHandler observationMetrics MetricsState{..} obs = case obs of PoolAcqTimeoutObs -> do incCounter poolTimeouts - (HasqlPoolObs (SQL.ConnectionObservation _ status)) -> case status of - SQL.ReadyForUseConnectionStatus _ -> do - incGauge poolAvailable - SQL.InUseConnectionStatus -> do - decGauge poolAvailable - SQL.TerminatedConnectionStatus _ -> do - decGauge poolAvailable - SQL.ConnectingConnectionStatus -> pure () + -- Handle pool observations with connection tracking + -- this is necessary because it is not possible + -- to accurately maintain open/in use conneciton counts + -- statelessly based only on pool observation events. + -- The reason is that hasql-pool emits TerminatedConnectionStatus + -- both for connections successfully established and failed when connecting. + -- When receiving TerminatedConnectionStatus we have to find out + -- if we can decrement established connection count. To do that we have to track + -- established connections. + (HasqlPoolObs sqlObs) -> trackConnections connTrack sqlObs PoolRequest -> incGauge poolWaiting PoolRequestFullfilled -> @@ -81,3 +97,28 @@ observationMetrics MetricsState{..} obs = case obs of metricsToText :: IO LBS.ByteString metricsToText = exportMetricsAsText + +data ConnStats = ConnStats { + connected :: Int, + inUse :: Int +} deriving (Eq, Show) + +data ConnTrack = ConnTrack { connTrackConnected :: SH.SizedHamt UUID, connTrackInUse :: SH.SizedHamt UUID } + +connectionTracker :: IO ConnTrack +connectionTracker = ConnTrack <$> SH.newIO <*> SH.newIO + +trackConnections :: ConnTrack -> SQL.Observation -> IO () +trackConnections ConnTrack{..} (SQL.ConnectionObservation uuid status) = case status of + SQL.ReadyForUseConnectionStatus _ -> atomically $ + SH.insert identity uuid connTrackConnected *> + SH.focus Focus.delete identity uuid connTrackInUse + SQL.TerminatedConnectionStatus _ -> atomically $ + SH.focus Focus.delete identity uuid connTrackConnected *> + SH.focus Focus.delete identity uuid connTrackInUse + SQL.InUseConnectionStatus -> atomically $ + SH.insert identity uuid connTrackInUse + _ -> mempty + +connectionCounts :: ConnTrack -> IO ConnStats +connectionCounts = atomically . fmap (uncurry ConnStats) . bisequenceA . both SH.size . (connTrackConnected &&& connTrackInUse) diff --git a/test/observability/Observation/MetricsSpec.hs b/test/observability/Observation/MetricsSpec.hs index 524e0c1018..6057756024 100644 --- a/test/observability/Observation/MetricsSpec.hs +++ b/test/observability/Observation/MetricsSpec.hs @@ -6,17 +6,21 @@ module Observation.MetricsSpec where -import Data.List (lookup) -import Network.Wai (Application) +import Data.List (lookup) +import qualified Hasql.Pool.Observation as SQL +import Network.Wai (Application) import ObsHelper -import qualified PostgREST.AppState as AppState -import PostgREST.Config (AppConfig (configDbSchemas)) -import qualified PostgREST.Metrics as Metrics +import qualified PostgREST.AppState as AppState +import PostgREST.Config (AppConfig (configDbSchemas)) +import PostgREST.Metrics (ConnStats (..), + MetricsState (..), + connectionCounts) import PostgREST.Observation -import Prometheus (getCounter, getVectorWith) -import Protolude -import Test.Hspec (SpecWith, describe, it) -import Test.Hspec.Wai (getState) +import Prometheus (getCounter, getVectorWith) +import Test.Hspec (SpecWith, describe, it) +import Test.Hspec.Wai (getState) + +import Protolude spec :: SpecWith (SpecState, Application) spec = describe "Server started with metrics enabled" $ do @@ -71,9 +75,33 @@ spec = describe "Server started with metrics enabled" $ do -- (there should be none but we need to verify that) threadDelay $ 1 * sec + it "Should track in use connections" $ do + SpecState{specAppState = appState, specMetrics = metrics, specObsChan} <- getState + let waitFor = waitForObs specObsChan + + liftIO $ checkState' metrics [ + -- we expect in use connections to be the same once finished + inUseConnections (+ 0) + ] $ do + signal <- newEmptyMVar + -- make sure waiting thread is signaled + bracket_ (pure ()) (putMVar signal ()) $ + -- expecting one more connection in use + checkState' metrics [ + inUseConnections (+ 1) + ] $ do + -- start a thread hanging on a single connection until signaled + void $ forkIO $ void $ AppState.usePool appState $ liftIO (readMVar signal) + -- main thread waits for ConnectionObservation with InUseConnectionStatus + -- after which used connections count should be incremented + waitFor (1 * sec) "InUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ SQL.InUseConnectionStatus)) <- pure x] + + -- hanging thread was signaled and should return the connection + waitFor (1 * sec) "ReadyForUseConnectionStatus" $ \x -> [ o | o@(HasqlPoolObs (SQL.ConnectionObservation _ (SQL.ReadyForUseConnectionStatus _))) <- pure x] where -- prometheus-client api to handle vectors is convoluted schemaCacheLoads label = expectField @"schemaCacheLoads" $ fmap (maybe (0::Int) round . lookup label) . (`getVectorWith` getCounter) + inUseConnections = expectField @"connTrack" ((inUse <$>) . connectionCounts) sec = 1000000