1- {-# LANGUAGE LambdaCase #-}
2- {-# LANGUAGE NamedFieldPuns #-}
3- {-# LANGUAGE QuasiQuotes #-}
4- {-# LANGUAGE RecordWildCards #-}
1+ {-# LANGUAGE LambdaCase #-}
2+ {-# LANGUAGE NamedFieldPuns #-}
3+ {-# LANGUAGE QuasiQuotes #-}
4+ {-# LANGUAGE RecordWildCards #-}
5+ {-# LANGUAGE TypeApplications #-}
56
67module PostgREST.AppState
78 ( AppState
@@ -74,15 +75,21 @@ import PostgREST.SchemaCache (SchemaCache (..),
7475import PostgREST.SchemaCache.Identifiers (quoteQi )
7576import PostgREST.Unix (createAndBindDomainSocket )
7677
77- import Data.Functor.Contravariant ((>$<) )
78- import Data.Streaming.Network (bindPortTCP ,
79- bindRandomPortTCP )
80- import Data.String (IsString (.. ))
81- import qualified Hasql.Decoders as HD
82- import qualified Hasql.Encoders as HE
83- import qualified Hasql.Statement as SQL
84- import NeatInterpolation (trimming )
78+ import Control.Arrow ((&&&) )
79+ import Data.Bitraversable (bisequenceA )
80+ import Data.Streaming.Network (bindPortTCP ,
81+ bindRandomPortTCP )
82+ import Data.String (IsString (.. ))
83+ import Data.Tuple.Extra (both )
84+ import Data.UUID hiding (fromString )
85+ import qualified Focus
86+ import qualified Hasql.Decoders as HD
87+ import qualified Hasql.Encoders as HE
88+ import Hasql.Pool.Observation
89+ import qualified Hasql.Statement as SQL
90+ import NeatInterpolation (trimming )
8591import Protolude
92+ import qualified StmHamt.SizedHamt as SH
8693
8794
8895data AppState = AppState
@@ -118,6 +125,7 @@ data AppState = AppState
118125 , stateJwtCache :: JwtCache. JwtCacheState
119126 , stateLogger :: Logger. LoggerState
120127 , stateMetrics :: Metrics. MetricsState
128+ , stateConnTrack :: ConnTrack
121129 }
122130
123131-- | Schema cache status
@@ -132,7 +140,7 @@ init :: AppConfig -> IO AppState
132140init conf@ AppConfig {configLogLevel, configDbPoolSize} = do
133141 loggerState <- Logger. init
134142 metricsState <- Metrics. init configDbPoolSize
135- let observer = liftA2 (>>) ( Logger. observationLogger loggerState configLogLevel) ( Metrics. observationMetrics metricsState)
143+ let observer = Logger. observationLogger loggerState configLogLevel <> Metrics. observationMetrics metricsState
136144
137145 observer $ AppStartObs prettyVersion
138146
@@ -144,6 +152,7 @@ init conf@AppConfig{configLogLevel, configDbPoolSize} = do
144152initWithPool :: AppSockets -> SQL. Pool -> AppConfig -> Logger. LoggerState -> Metrics. MetricsState -> ObservationHandler -> IO AppState
145153initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
146154
155+ connTrack <- ConnTrack <$> SH. newIO <*> SH. newIO
147156 appState <- AppState pool
148157 <$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
149158 <*> newIORef Nothing
@@ -157,10 +166,11 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
157166 <*> newIORef 1
158167 <*> pure sock
159168 <*> pure adminSock
160- <*> pure observer
169+ <*> pure ( observer <> trackConnections connTrack)
161170 <*> JwtCache. init conf observer
162171 <*> pure loggerState
163172 <*> pure metricsState
173+ <*> pure connTrack
164174
165175 deb <-
166176 let decisecond = 100000 in
@@ -410,11 +420,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
410420 qSchemaCache :: IO (Maybe SchemaCache )
411421 qSchemaCache = do
412422 conf@ AppConfig {.. } <- getConfig appState
413- -- Allow 10 concurrent schema cache loads, guarded by advisory locks.
414- -- This is to prevent thundering herd problem on startup or when many PostgREST
415- -- instances receive "reload schema" notifications at the same time
416- let withTxLock = SQL. statement (50168275 , 10 ) $
417- SQL. Statement get_lock_sql get_lock_params HD. noResult configDbPreparedStatements
423+ -- Throttle concurrent schema cache loads, guarded by advisory locks.
424+ -- This is to prevent thundering herd problem on startup or when many PostgREST
425+ -- instances receive "reload schema" notifications at the same time
426+ -- schema reloading session + listener session
427+ -- See get_lock_sql for details of the algorithm.
428+ -- Here we calculate the number of open connections passed to the query.
429+ (connected, inUse) <- sessionCounts appState
430+ -- Determine whether schema cache loading will create a new session
431+ let
432+ scLoadingSessions = case (connected <= inUse, inUse >= configDbPoolSize) of
433+ (True , False ) -> 1 -- all connections in use but pool not full - schema cache loading will create session
434+ _ -> 0
435+ withTxLock = SQL. statement
436+ (fromIntegral $ connected + scLoadingSessions)
437+ (SQL. Statement get_lock_sql get_lock_params HD. noResult configDbPreparedStatements)
418438
419439 (resultTime, result) <-
420440 let transaction = if configDbPreparedStatements then SQL. transaction else SQL. unpreparedTransaction in
@@ -437,23 +457,42 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
437457 putSCacheStatus appState SCLoaded
438458 return $ Just sCache
439459 where
440- -- recursive query that tries acquiring locks in order
441- -- and waits for randomly selected lock if no attempt succeeded
442- -- parameters are lock number and number of locks to try
460+ -- Recursive query that tries acquiring locks in order
461+ -- and waits for randomly selected lock if no attempt succeeded.
462+ -- It has a single parameter: this node open connection count.
463+ -- It is used to estimate the number of nodes
464+ -- by counting the number of active sessions for current session_user
465+ -- and dividing it by this node open connections.
466+ -- Assuming load is uniform among cluster nodes, all should have
467+ -- statistically the same number of open connections.
468+ -- Once the number of nodes is known we calculate the number
469+ -- of locks as ceil(log(2, number_of_nodes))
443470 get_lock_sql = encodeUtf8 [trimming |
444471 WITH RECURSIVE attempts AS (
445- SELECT 1 AS lock_number, pg_try_advisory_xact_lock($$1 , 1) AS success WHERE $$2 > 0
472+ SELECT 1 AS lock_number, pg_try_advisory_xact_lock(lock_id , 1) AS success FROM parameters
446473 UNION ALL
447- SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock($$1, next_lock_number) AS success FROM (
448- SELECT lock_number + 1 AS next_lock_number FROM attempts
449- WHERE NOT success AND lock_number < $$2
450- ORDER BY lock_number DESC
451- LIMIT 1
452- ) AS previous_attempt
474+ SELECT next_lock_number AS lock_number, pg_try_advisory_xact_lock(lock_id, next_lock_number) AS success
475+ FROM
476+ parameters CROSS JOIN LATERAL (
477+ SELECT lock_number + 1 AS next_lock_number FROM attempts
478+ WHERE NOT success AND lock_number < locks_count
479+ ORDER BY lock_number DESC
480+ LIMIT 1
481+ ) AS previous_attempt
482+ ),
483+ counts AS (
484+ SELECT round(log(2, round(count(*)::double precision/$$1)::numeric))::int AS locks_count
485+ FROM
486+ pg_stat_activity WHERE usename = SESSION_USER
487+ ),
488+ parameters AS (
489+ SELECT locks_count, 50168275 AS lock_id FROM counts WHERE locks_count > 0
453490 )
454- SELECT pg_advisory_xact_lock($$1, floor(random() * $$2)::int + 1) WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
491+ SELECT pg_advisory_xact_lock(lock_id, floor(random() * locks_count)::int + 1)
492+ FROM
493+ parameters WHERE NOT EXISTS (SELECT 1 FROM attempts WHERE success) |]
455494
456- get_lock_params = ( fst >$< HE. param (HE. nonNullable HE. int4)) <> ( snd >$< HE. param ( HE. nonNullable HE. int4) )
495+ get_lock_params = HE. param (HE. nonNullable HE. int4)
457496
458497 shouldRetry :: RetryStatus -> (Maybe PgVersion , Maybe SchemaCache ) -> IO Bool
459498 shouldRetry _ (pgVer, sCache) = do
@@ -511,3 +550,21 @@ readInDbConfig startingUp appState@AppState{stateObserver=observer} = do
511550 pass
512551 else
513552 observer ConfigSucceededObs
553+
554+
555+ data ConnTrack = ConnTrack { connTrackConnected :: SH. SizedHamt UUID , connTrackInUse :: SH. SizedHamt UUID }
556+
557+ trackConnections :: ConnTrack -> ObservationHandler
558+ trackConnections ConnTrack {.. } (HasqlPoolObs (ConnectionObservation uuid status)) = case status of
559+ ReadyForUseConnectionStatus -> atomically $
560+ SH. insert identity uuid connTrackConnected *>
561+ SH. focus Focus. delete identity uuid connTrackInUse
562+ TerminatedConnectionStatus _ -> atomically $
563+ SH. focus Focus. delete identity uuid connTrackConnected *>
564+ SH. focus Focus. delete identity uuid connTrackInUse
565+ InUseConnectionStatus -> atomically $ SH. insert identity uuid connTrackInUse
566+ _ -> mempty
567+ trackConnections _ _ = mempty
568+
569+ sessionCounts :: AppState -> IO (Int , Int )
570+ sessionCounts = atomically . bisequenceA . both SH. size . (connTrackConnected &&& connTrackInUse) . stateConnTrack
0 commit comments