Skip to content

Commit 0daf2b5

Browse files
committed
add: set keepalives options in listener connection string
This change adds keepalive options to listener connections string. The value of keepalives_idle is adjusted using a simple algorithm based on idle time tracking.
1 parent 9d6712c commit 0daf2b5

File tree

5 files changed

+173
-20
lines changed

5 files changed

+173
-20
lines changed

src/PostgREST/App.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ run appState = do
8181

8282
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8383

84-
Listener.runListener appState
84+
void $ Listener.runListener appState
8585

8686
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
8787

src/PostgREST/Config.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ module PostgREST.Config
2525
, readPGRSTEnvironment
2626
, toURI
2727
, parseSecret
28+
, addConnStringOption
2829
, addFallbackAppName
2930
, addTargetSessionAttrs
3031
, exampleConfigFile
@@ -619,7 +620,7 @@ pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.i
619620
-- addFallbackAppName ver "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass"
620621
-- "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass&fallback_application_name=PostgREST%2011.1.0%20%285a04ec7%29"
621622
addFallbackAppName :: ByteString -> Text -> Text
622-
addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_application_name" pgrstVer
623+
addFallbackAppName version = addConnStringOption "fallback_application_name" pgrstVer
623624
where
624625
pgrstVer = "PostgREST " <> T.decodeUtf8 version
625626

@@ -641,10 +642,10 @@ addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_applicati
641642
-- >>> addTargetSessionAttrs "host=localhost port=5432 dbname=postgres"
642643
-- "host=localhost port=5432 dbname=postgres target_session_attrs='read-write'"
643644
addTargetSessionAttrs :: Text -> Text
644-
addTargetSessionAttrs dbUri = addConnStringOption dbUri "target_session_attrs" "read-write"
645+
addTargetSessionAttrs = addConnStringOption "target_session_attrs" "read-write"
645646

646647
addConnStringOption :: Text -> Text -> Text -> Text
647-
addConnStringOption dbUri key val = dbUri <>
648+
addConnStringOption key val dbUri= dbUri <>
648649
case pgConnString dbUri of
649650
Nothing -> mempty
650651
Just PGKeyVal -> " " <> keyValFmt

src/PostgREST/Listener.hs

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE MultiWayIf #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE DeriveAnyClass #-}
2+
{-# LANGUAGE LambdaCase #-}
3+
{-# LANGUAGE MultiWayIf #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
{-# LANGUAGE TypeApplications #-}
47

5-
module PostgREST.Listener (runListener) where
8+
module PostgREST.Listener (runListener, runListener') where
69

710
import qualified Data.ByteString.Char8 as BS
811

@@ -19,28 +22,55 @@ import qualified PostgREST.Config as Config
1922
import Control.Arrow ((&&&))
2023
import Data.Bitraversable (bisequence)
2124
import Data.Either.Combinators (whenRight)
25+
import Data.IORef (IORef, newIORef,
26+
readIORef, writeIORef)
2227
import qualified Data.Text as T
28+
import Data.Time (UTCTime, diffUTCTime,
29+
nominalDiffTimeToSeconds)
2330
import qualified Database.PostgreSQL.LibPQ as LibPQ
2431
import qualified Hasql.Session as SQL
2532
import PostgREST.Config.Database (queryPgVersion)
2633
import PostgREST.Config.PgVersion (pgvFullName)
2734
import Protolude
35+
import System.IO.Error (isResourceVanishedError)
2836

2937
-- | Starts the Listener in a thread
30-
runListener :: AppState -> IO ()
31-
runListener appState = do
38+
-- | Returns IO action to stop the listener thread.
39+
runListener :: AppState -> IO (IO ())
40+
runListener appState = runListener' appState (15 * minute) (30 * minute)
41+
where
42+
minute = 60
43+
44+
data ListenerStopped = ListenerStopped deriving (Show, Exception)
45+
46+
runListener' :: AppState -> Int -> Int -> IO (IO ())
47+
runListener' appState initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec = do
3248
AppConfig{..} <- getConfig appState
33-
when configDbChannelEnabled $
34-
void . forkIO . void $ retryingListen appState
49+
if configDbChannelEnabled then do
50+
started <- newIORef Nothing
51+
listenerThreadId <- forkIO . void $ retryingListen started initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec False appState
52+
pure $ throwTo listenerThreadId ListenerStopped
53+
else
54+
mempty
3555

3656
-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff with a cap of 32 seconds, if the LISTEN connection is lost.
3757
-- | This function never returns (but can throw) and return type enforces that.
38-
retryingListen :: AppState -> IO Void
39-
retryingListen appState = do
58+
retryingListen :: IORef (Maybe UTCTime) -> Int -> Int -> Bool -> AppState -> IO ()
59+
retryingListen lastActivity currentKeepalivesIdle maxKeepalivesIdle retryingOnIdleTimeout appState = do
4060
AppConfig{..} <- AppState.getConfig appState
4161
let
4262
dbChannel = toS configDbChannel
4363
onError err = do
64+
-- ResourceVanished should be reported when reading from socket fails
65+
-- as long as hasql-notifications does not wrap IOException in something else...
66+
let resourceVanished = maybe False isResourceVanishedError (fromException @IOException err)
67+
(newTcpIdle, newMaxKeepalivesIdle) <-
68+
if resourceVanished then do
69+
readIORef lastActivity >>=
70+
maybe (pure (currentKeepalivesIdle, maxKeepalivesIdle)) adjustTcpIdle
71+
else
72+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
73+
writeIORef lastActivity Nothing
4474
AppState.putIsListenerOn appState False
4575
observer $ DBListenFail dbChannel (Right err)
4676
when (isDbListenerBug err) $
@@ -55,14 +85,14 @@ retryingListen appState = do
5585
unless (delay == maxDelay) $
5686
AppState.putNextListenerDelay appState (delay * 2)
5787
-- loop running the listener
58-
retryingListen appState
88+
retryingListen lastActivity newTcpIdle newMaxKeepalivesIdle resourceVanished appState
5989

6090
-- Execute the listener with with error handling
61-
handle onError $ do
91+
handle onError $ handle (\ListenerStopped -> pure ()) $ do
6292
-- Make sure we don't leak connections on errors
6393
bracket
6494
-- acquire connection
65-
(SQL.acquire $ toUtf8 (Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri))
95+
(SQL.acquire $ toUtf8 (addKeepalivesOptions $ Config.addTargetSessionAttrs $ Config.addFallbackAppName prettyVersion configDbUri))
6696
-- release connection
6797
(`whenRight` releaseConnection) $
6898
-- use connection
@@ -82,6 +112,7 @@ retryingListen appState = do
82112
AppState.putNextListenerDelay appState 1
83113

84114
observer $ DBListenStart pqHost pqPort pgFullName dbChannel
115+
saveLastActivityTime
85116

86117
-- wait for notifications
87118
-- this will never return, in case of an error it will throw and be caught by onError
@@ -96,15 +127,59 @@ retryingListen appState = do
96127
oneSecondInMicro = 1000000
97128
maxDelay = 32
98129

99-
handleNotification channel msg =
130+
handleNotification channel msg = do
100131
if | BS.null msg -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
101132
| msg == "reload schema" -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
102133
| msg == "reload config" -> observer (DBListenerGotConfigMsg channel) >> AppState.readInDbConfig False appState
103134
| otherwise -> pure () -- Do nothing if anything else than an empty message is sent
135+
saveLastActivityTime
136+
137+
saveLastActivityTime = AppState.getTime appState >>= writeIORef lastActivity . Just
104138

105139
cacheReloader =
106140
AppState.schemaCacheLoader appState
107141

108142
releaseConnection = void . forkIO . handle (observer . DBListenerConnectionCleanupFail) . SQL.release
109143

110144
isDbListenerBug e = "could not access status of transaction" `T.isInfixOf` show e
145+
146+
-- adjust the next keepalive timeout
147+
-- This is a simple discovery mechanism that
148+
-- should converge to optimum keepalive timeout
149+
-- we calculate the time T between connection failure and last activity
150+
-- if T is <= than current timeout
151+
-- it means timeout is too long
152+
-- so we set next timeout to T/2 and max timeout to T
153+
-- max cannot be longer because we lost connection earlier
154+
-- if T is longer than current timeout
155+
-- we set timeout in between current timeout and current max
156+
adjustTcpIdle lastActiveTime = do
157+
currentIdleSeconds <- AppState.getTime appState <&> round . nominalDiffTimeToSeconds . (`diffUTCTime` lastActiveTime)
158+
let currentIdleTimeout = currentKeepalivesIdle + keepalivesInterval * keepalivesCount
159+
-- if our idle time == current idle timeout setting it means
160+
-- we have to make it shorter
161+
if currentIdleSeconds `div` currentIdleTimeout <= 1 then
162+
-- only adjust if this is the second idle timeout failure
163+
-- this is to eliminate spurious adjustments (TODO rethink if it is really needed)
164+
if retryingOnIdleTimeout then
165+
-- try with 1/2 of current keepalive idle
166+
-- remember that it is the new maximum we can try later
167+
pure (max 1 $ currentKeepalivesIdle `div` 2, currentKeepalivesIdle)
168+
else
169+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
170+
else
171+
-- we can try to make it longer
172+
-- but not longer than previously calculated maximum
173+
pure (currentKeepalivesIdle + (maxKeepalivesIdle - currentKeepalivesIdle) `div` 2, maxKeepalivesIdle)
174+
175+
keepalivesInterval = max 1 $ currentKeepalivesIdle `div` (5 * keepalivesCount)
176+
keepalivesCount = 5
177+
178+
-- (Config.addConnStringOption opt val) is an endomorphism
179+
-- so it is a Monoid under function composition
180+
-- Haskell is awesome
181+
addKeepalivesOptions = appEndo $ foldMap (Endo . uncurry Config.addConnStringOption . fmap show) [
182+
("keepalives_count", keepalivesCount)
183+
, ("keepalives_interval", keepalivesInterval)
184+
, ("keepalives_idle", currentKeepalivesIdle)
185+
]

test/spec/Feature/ToxiSpec.hs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,23 @@
22
{-# LANGUAGE FlexibleContexts #-}
33
{-# LANGUAGE MonadComprehensions #-}
44
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE NumericUnderscores #-}
56
module Feature.ToxiSpec where
67

78
import Control.Monad.Trans.Control (liftBaseDiscard)
9+
import qualified Data.Map as M
810
import Network.Wai (Application)
911
import qualified PostgREST.AppState as AppState
12+
import PostgREST.Listener (runListener,
13+
runListener')
14+
import PostgREST.Observation (Observation (..))
1015
import Protolude hiding (get)
1116
import SpecHelper
1217
import Test.Hspec (SpecWith, describe, it)
1318
import Test.Hspec.Wai
14-
import Toxiproxy (withDisabled)
19+
import Toxiproxy (Stream (..), Toxic (..),
20+
ToxicType (..),
21+
withDisabled, withToxic)
1522

1623
spec :: SpecWith (SpecState, Application)
1724
spec = describe "Tests using Toxiproxy" $ do
@@ -32,3 +39,73 @@ spec = describe "Tests using Toxiproxy" $ do
3239
liftBaseDiscard (withDisabled specToxiProxy) $ do
3340
void $ get "/items?id=eq.5"
3441
`shouldRespondWith` 503
42+
43+
describe "Toxiproxy tests of notification listener" $ do
44+
it "should start listener" $ do
45+
SpecState {specAppState, specObsChan} <- getState
46+
let waitFor = waitForObs specObsChan
47+
48+
liftIO $ withListener specAppState $
49+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
50+
51+
it "should retry listener" $ do
52+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
53+
let waitFor = waitForObs specObsChan
54+
55+
liftIO $ bracket (
56+
withDisabled specToxiProxy $ do
57+
stopListener <- runListener specAppState
58+
(do
59+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
60+
waitFor (1*sec) "DBListenRetry" $ \x -> [ o | o@DBListenRetry{} <- pure x ])
61+
`onException` stopListener
62+
pure stopListener)
63+
identity
64+
(const $ waitFor (2*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x])
65+
66+
it "should retry listener with exponential backoff when connection broken" $ do
67+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
68+
let waitFor = waitForObs specObsChan
69+
70+
liftIO $ withListener specAppState $ do
71+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
72+
withDisabled specToxiProxy $ do
73+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
74+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
75+
waitFor (2*sec) "DBListenRetry 2" $ \x -> [ o | o@(DBListenRetry 2) <- pure x ]
76+
waitFor (3*sec) "DBListenRetry 4" $ \x -> [ o | o@(DBListenRetry 4) <- pure x ]
77+
waitFor (5*sec) "DBListenStart after retries" $ \x -> [ o | o@DBListenStart{} <- pure x]
78+
waitFor (1*sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
79+
80+
-- this scenario cannot be tested with Toxiproxy
81+
-- because keepalives are handled by the kernel TCP/IP stack
82+
-- left here as an example and template for a test
83+
-- using some future more advanced tool
84+
it "should detect broken connection using keepalives" $ do
85+
pendingWith "Cannot be tested with Toxiproxy"
86+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
87+
let waitFor = waitForObs specObsChan
88+
89+
liftIO $ withKeepAliveListener specAppState $ do
90+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
91+
withTimedOutConnection specToxiProxy $ do
92+
waitFor (10*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
93+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
94+
waitFor (2*sec) "DBListenStart after timeout toxic" $ \x -> [ o | o@DBListenStart{} <- pure x]
95+
96+
where
97+
withListener appState = bracket (runListener appState) identity . const
98+
withKeepAliveListener appState = bracket (runListener' appState 1 1) identity . const
99+
withTimedOutConnection proxy =
100+
withToxic proxy (timeoutToxic Upstream) .
101+
withToxic proxy (timeoutToxic Downstream)
102+
timeoutToxic stream = Toxic
103+
{ toxicName = case stream of
104+
Upstream -> "listener-timeout-upstream"
105+
Downstream -> "listener-timeout-downstream"
106+
, toxicType = Timeout
107+
, toxicStream = stream
108+
, toxicToxicity = 1
109+
, toxicAttributes = M.fromList [("timeout", 0)]
110+
}
111+
sec = 1_000_000

test/spec/SpecHelper.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ baseCfg = let secret = encodeUtf8 "reallyreallyreallyreallyverysafe" in
149149
, configClientErrorVerbosity = Verbose
150150
, configDbAggregates = False
151151
, configDbAnonRole = Just "postgrest_test_anonymous"
152-
, configDbChannel = mempty
152+
, configDbChannel = "pgrst"
153153
, configDbChannelEnabled = True
154154
, configDbExtraSearchPath = []
155155
, configDbHoistedTxSettings = ["default_transaction_isolation","plan_filter.statement_cost_limit","statement_timeout"]

0 commit comments

Comments
 (0)