Skip to content

Commit 8df321e

Browse files
committed
add: set keepalives options in listener connection string
This change adds keepalive options to listener connections string. The value of keepalives_idle is adjusted using a simple algorithm based on idle time tracking.
1 parent 902f6bd commit 8df321e

5 files changed

Lines changed: 156 additions & 20 deletions

File tree

src/PostgREST/App.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ run appState = do
8181

8282
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8383

84-
Listener.runListener appState
84+
void $ Listener.runListener appState
8585

8686
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
8787

src/PostgREST/Config.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ module PostgREST.Config
2525
, readPGRSTEnvironment
2626
, toURI
2727
, parseSecret
28+
, addConnStringOption
2829
, addFallbackAppName
2930
, addTargetSessionAttrs
3031
, exampleConfigFile
@@ -619,7 +620,7 @@ pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.i
619620
-- addFallbackAppName ver "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass"
620621
-- "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass&fallback_application_name=PostgREST%2011.1.0%20%285a04ec7%29"
621622
addFallbackAppName :: ByteString -> Text -> Text
622-
addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_application_name" pgrstVer
623+
addFallbackAppName version = addConnStringOption "fallback_application_name" pgrstVer
623624
where
624625
pgrstVer = "PostgREST " <> T.decodeUtf8 version
625626

@@ -641,10 +642,10 @@ addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_applicati
641642
-- >>> addTargetSessionAttrs "host=localhost port=5432 dbname=postgres"
642643
-- "host=localhost port=5432 dbname=postgres target_session_attrs='read-write'"
643644
addTargetSessionAttrs :: Text -> Text
644-
addTargetSessionAttrs dbUri = addConnStringOption dbUri "target_session_attrs" "read-write"
645+
addTargetSessionAttrs = addConnStringOption "target_session_attrs" "read-write"
645646

646647
addConnStringOption :: Text -> Text -> Text -> Text
647-
addConnStringOption dbUri key val = dbUri <>
648+
addConnStringOption key val dbUri= dbUri <>
648649
case pgConnString dbUri of
649650
Nothing -> mempty
650651
Just PGKeyVal -> " " <> keyValFmt

src/PostgREST/Listener.hs

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE MultiWayIf #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE DeriveAnyClass #-}
2+
{-# LANGUAGE LambdaCase #-}
3+
{-# LANGUAGE MultiWayIf #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
{-# LANGUAGE TypeApplications #-}
47

5-
module PostgREST.Listener (runListener) where
8+
module PostgREST.Listener (runListener, runListener') where
69

710
import qualified Data.ByteString.Char8 as BS
811

@@ -19,28 +22,54 @@ import qualified PostgREST.Config as Config
1922
import Control.Arrow ((&&&))
2023
import Data.Bitraversable (bisequence)
2124
import Data.Either.Combinators (whenRight)
25+
import Data.IORef (IORef, newIORef,
26+
readIORef, writeIORef)
2227
import qualified Data.Text as T
28+
import Data.Time (UTCTime, diffUTCTime,
29+
nominalDiffTimeToSeconds)
2330
import qualified Database.PostgreSQL.LibPQ as LibPQ
2431
import qualified Hasql.Session as SQL
2532
import PostgREST.Config.Database (queryPgVersion)
2633
import PostgREST.Config.PgVersion (pgvFullName)
2734
import Protolude
35+
import System.IO.Error (isResourceVanishedError)
2836

2937
-- | Starts the Listener in a thread
30-
runListener :: AppState -> IO ()
31-
runListener appState = do
38+
-- | Returns IO action to stop the listener thread.
39+
runListener :: AppState -> IO (IO ())
40+
runListener appState = runListener' appState (15 * minute) (30 * minute)
41+
where
42+
minute = 60
43+
44+
data ListenerStopped = ListenerStopped deriving (Show, Exception)
45+
46+
runListener' :: AppState -> Int -> Int -> IO (IO ())
47+
runListener' appState initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec = do
3248
AppConfig{..} <- getConfig appState
33-
when configDbChannelEnabled $
34-
void . forkIO . void $ retryingListen appState
49+
if configDbChannelEnabled then do
50+
started <- newIORef Nothing
51+
listenerThreadId <- forkIO . void $ retryingListen started initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec False appState
52+
pure $ throwTo listenerThreadId ListenerStopped
53+
else
54+
mempty
3555

3656
-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff with a cap of 32 seconds, if the LISTEN connection is lost.
3757
-- | This function never returns (but can throw) and return type enforces that.
38-
retryingListen :: AppState -> IO Void
39-
retryingListen appState = do
58+
retryingListen :: IORef (Maybe UTCTime) -> Int -> Int -> Bool -> AppState -> IO ()
59+
retryingListen lastActivity currentKeepalivesIdle maxKeepalivesIdle retryingOnIdleTimeout appState = do
4060
AppConfig{..} <- AppState.getConfig appState
4161
let
4262
dbChannel = toS configDbChannel
4363
onError err = do
64+
-- ResourceVanished should be reported when reading from socket fails
65+
-- as long as hasql-notifications does not wrap IOException in something else...
66+
let resourceVanished = maybe False isResourceVanishedError (fromException @IOException err)
67+
(newTcpIdle, newMaxKeepalivesIdle) <-
68+
if resourceVanished then do
69+
readIORef lastActivity >>= maybe (pure (currentKeepalivesIdle, maxKeepalivesIdle)) adjustTcpIdle
70+
else
71+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
72+
writeIORef lastActivity Nothing
4473
AppState.putIsListenerOn appState False
4574
observer $ DBListenFail dbChannel (Right err)
4675
when (isDbListenerBug err) $
@@ -55,14 +84,14 @@ retryingListen appState = do
5584
unless (delay == maxDelay) $
5685
AppState.putNextListenerDelay appState (delay * 2)
5786
-- loop running the listener
58-
retryingListen appState
87+
retryingListen lastActivity newTcpIdle newMaxKeepalivesIdle resourceVanished appState
5988

6089
-- Execute the listener with with error handling
61-
handle onError $ do
90+
handle onError $ handle (\ListenerStopped -> pure ()) $ do
6291
-- Make sure we don't leak connections on errors
6392
bracket
6493
-- acquire connection
65-
(SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri))
94+
(SQL.acquire $ toUtf8 (addKeepalivesOptions $ Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri))
6695
-- release connection
6796
(`whenRight` releaseConnection) $
6897
-- use connection
@@ -82,6 +111,7 @@ retryingListen appState = do
82111
AppState.putNextListenerDelay appState 1
83112

84113
observer $ DBListenStart pqHost pqPort pgFullName dbChannel
114+
AppState.getTime appState >>= writeIORef lastActivity . pure
85115

86116
-- wait for notifications
87117
-- this will never return, in case of an error it will throw and be caught by onError
@@ -96,15 +126,46 @@ retryingListen appState = do
96126
oneSecondInMicro = 1000000
97127
maxDelay = 32
98128

99-
handleNotification channel msg =
129+
handleNotification channel msg = do
100130
if | BS.null msg -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
101131
| msg == "reload schema" -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
102132
| msg == "reload config" -> observer (DBListenerGotConfigMsg channel) >> AppState.readInDbConfig False appState
103133
| otherwise -> pure () -- Do nothing if anything else than an empty message is sent
134+
AppState.getTime appState >>= writeIORef lastActivity . Just
135+
104136

105137
cacheReloader =
106138
AppState.schemaCacheLoader appState
107139

108140
releaseConnection = void . forkIO . handle (observer . DBListenerConnectionCleanupFail) . SQL.release
109141

110142
isDbListenerBug e = "could not access status of transaction" `T.isInfixOf` show e
143+
144+
adjustTcpIdle lastActiveTime = do
145+
currentIdleSeconds <- AppState.getTime appState <&> round . nominalDiffTimeToSeconds . (`diffUTCTime` lastActiveTime)
146+
let currentIdleTimeout = currentKeepalivesIdle + keepalivesInterval * keepalivesCount
147+
-- if our idle time == current idle timeout setting it means
148+
-- we have to make it shorter
149+
if currentIdleSeconds `div` currentIdleTimeout <= 1 then
150+
-- only adjust if this is the second idle timeout failure
151+
-- this is to eliminate spurious adjustments (TODO rethink if it is really needed)
152+
if retryingOnIdleTimeout then
153+
-- try with 1/2 of current keepalive idle
154+
-- remember that it is the new maximum we can try later
155+
pure (max 1 $ currentKeepalivesIdle `div` 2, currentKeepalivesIdle)
156+
else
157+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
158+
else
159+
pure (currentKeepalivesIdle + (maxKeepalivesIdle - currentKeepalivesIdle) `div` 2, maxKeepalivesIdle)
160+
161+
keepalivesInterval = max 1 $ currentKeepalivesIdle `div` (5 * keepalivesCount)
162+
keepalivesCount = 5
163+
164+
-- (Config.addConnStringOption opt val) is an endomorphism
165+
-- so it is a Monoid under function composition
166+
-- Haskell is awesome
167+
addKeepalivesOptions = appEndo $ foldMap (Endo . uncurry Config.addConnStringOption . fmap show) [
168+
("keepalives_count", keepalivesCount)
169+
, ("keepalives_interval", keepalivesInterval)
170+
, ("keepalives_idle", currentKeepalivesIdle)
171+
]

test/spec/Feature/ToxiSpec.hs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{-# LANGUAGE FlexibleContexts #-}
33
{-# LANGUAGE MonadComprehensions #-}
44
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE NumericUnderscores #-}
56
module Feature.ToxiSpec where
67

78
import Control.Monad.Trans.Control (liftBaseDiscard)
@@ -11,7 +12,10 @@ import Protolude hiding (get)
1112
import SpecHelper
1213
import Test.Hspec (SpecWith, describe, it)
1314
import Test.Hspec.Wai
14-
import Toxiproxy (withDisabled)
15+
import Toxiproxy (withDisabled, Toxic (..), Stream (..), withToxic, ToxicType (..))
16+
import PostgREST.Observation (Observation(..))
17+
import PostgREST.Listener ( runListener, runListener' )
18+
import qualified Data.Map as M
1519

1620
spec :: SpecWith (SpecState, Application)
1721
spec = describe "Tests using Toxiproxy" $ do
@@ -32,3 +36,73 @@ spec = describe "Tests using Toxiproxy" $ do
3236
liftBaseDiscard (withDisabled specToxiProxy) $ do
3337
void $ get "/items?id=eq.5"
3438
`shouldRespondWith` 503
39+
40+
describe "Toxiproxy tests of notification listener" $ do
41+
it "should start listener" $ do
42+
SpecState {specAppState, specObsChan} <- getState
43+
let waitFor = waitForObs specObsChan
44+
45+
liftIO $ withListener specAppState $
46+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
47+
48+
it "should retry listener" $ do
49+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
50+
let waitFor = waitForObs specObsChan
51+
52+
liftIO $ bracket (
53+
withDisabled specToxiProxy $ do
54+
stopListener <- runListener specAppState
55+
(do
56+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
57+
waitFor (1*sec) "DBListenRetry" $ \x -> [ o | o@DBListenRetry{} <- pure x ])
58+
`onException` stopListener
59+
pure stopListener)
60+
identity
61+
(const $ waitFor (2*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x])
62+
63+
it "should retry listener with exponential backoff when connection broken" $ do
64+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
65+
let waitFor = waitForObs specObsChan
66+
67+
liftIO $ withListener specAppState $ do
68+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
69+
withDisabled specToxiProxy $ do
70+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
71+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
72+
waitFor (2*sec) "DBListenRetry 2" $ \x -> [ o | o@(DBListenRetry 2) <- pure x ]
73+
waitFor (3*sec) "DBListenRetry 4" $ \x -> [ o | o@(DBListenRetry 4) <- pure x ]
74+
waitFor (5*sec) "DBListenStart after retries" $ \x -> [ o | o@DBListenStart{} <- pure x]
75+
waitFor (1*sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
76+
77+
-- this scenario cannot be tested with Toxiproxy
78+
-- because keepalives are handled by the kernel TCP/IP stack
79+
-- left here as an example and template for a test
80+
-- using some future more advanced tool
81+
it "should detect broken connection using keepalives" $ do
82+
pendingWith "Cannot be tested with Toxiproxy"
83+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
84+
let waitFor = waitForObs specObsChan
85+
86+
liftIO $ withKeepAliveListener specAppState $ do
87+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
88+
withTimedOutConnection specToxiProxy $ do
89+
waitFor (10*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
90+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
91+
waitFor (2*sec) "DBListenStart after timeout toxic" $ \x -> [ o | o@DBListenStart{} <- pure x]
92+
93+
where
94+
withListener appState = bracket (runListener appState) identity . const
95+
withKeepAliveListener appState = bracket (runListener' appState 1 1) identity . const
96+
withTimedOutConnection proxy =
97+
withToxic proxy (timeoutToxic Upstream) .
98+
withToxic proxy (timeoutToxic Downstream)
99+
timeoutToxic stream = Toxic
100+
{ toxicName = case stream of
101+
Upstream -> "listener-timeout-upstream"
102+
Downstream -> "listener-timeout-downstream"
103+
, toxicType = Timeout
104+
, toxicStream = stream
105+
, toxicToxicity = 1
106+
, toxicAttributes = M.fromList [("timeout", 0)]
107+
}
108+
sec = 1_000_000

test/spec/SpecHelper.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in
149149
, configClientErrorVerbosity = Verbose
150150
, configDbAggregates = False
151151
, configDbAnonRole = Just "postgrest_test_anonymous"
152-
, configDbChannel = mempty
152+
, configDbChannel = "pgrst"
153153
, configDbChannelEnabled = True
154154
, configDbExtraSearchPath = []
155155
, configDbHoistedTxSettings = ["default_transaction_isolation","plan_filter.statement_cost_limit","statement_timeout"]

0 commit comments

Comments
 (0)