Skip to content

Commit b8ec90a

Browse files
committed
add: set keepalives options in listener connection string
This change adds keepalive options to listener connections string. The value of keepalives_idle is adjusted using a simple algorithm based on idle time tracking.
1 parent decba37 commit b8ec90a

4 files changed

Lines changed: 172 additions & 19 deletions

File tree

src/PostgREST/App.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ run appState = do
8585
NS.close mainSocket
8686
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8787

88-
Listener.runListener appState
88+
void $ Listener.runListener appState
8989

9090
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
9191

src/PostgREST/Config.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ module PostgREST.Config
2626
, readPGRSTEnvironment
2727
, toURI
2828
, parseSecret
29+
, addConnStringOption
2930
, addFallbackAppName
3031
, addTargetSessionAttrs
3132
, toConnectionSettings
@@ -627,7 +628,7 @@ pgConnString conn | uriDesignator `T.isPrefixOf` conn || shortUriDesignator `T.i
627628
-- >>> addFallbackAppName ver "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass"
628629
-- "postgresql:///postgres?host=/run/user/1000/postgrest/postgrest-with-postgresql-16-BuR/socket&user=some_protected_user&password=invalid_pass&fallback_application_name=PostgREST%2011.1.0%20%285a04ec7%29"
629630
addFallbackAppName :: ByteString -> Text -> Text
630-
addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_application_name" pgrstVer
631+
addFallbackAppName version = addConnStringOption "fallback_application_name" pgrstVer
631632
where
632633
pgrstVer = "PostgREST " <> T.decodeUtf8 version
633634

@@ -649,7 +650,7 @@ addFallbackAppName version dbUri = addConnStringOption dbUri "fallback_applicati
649650
-- >>> addTargetSessionAttrs "host=localhost port=5432 dbname=postgres"
650651
-- "host=localhost port=5432 dbname=postgres target_session_attrs='read-write'"
651652
addTargetSessionAttrs :: Text -> Text
652-
addTargetSessionAttrs dbUri = addConnStringOption dbUri "target_session_attrs" "read-write"
653+
addTargetSessionAttrs = addConnStringOption "target_session_attrs" "read-write"
653654

654655
toConnectionSettings :: (Text -> Text) -> AppConfig -> [SQL.Setting]
655656
toConnectionSettings transformUri AppConfig{configDbUri, configDbPreparedStatements} =
@@ -658,7 +659,7 @@ toConnectionSettings transformUri AppConfig{configDbUri, configDbPreparedStateme
658659
]
659660

660661
addConnStringOption :: Text -> Text -> Text -> Text
661-
addConnStringOption dbUri key val = dbUri <>
662+
addConnStringOption key val dbUri= dbUri <>
662663
case pgConnString dbUri of
663664
Nothing -> mempty
664665
Just PGKeyVal -> " " <> keyValFmt

src/PostgREST/Listener.hs

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
{-# LANGUAGE LambdaCase #-}
2-
{-# LANGUAGE MultiWayIf #-}
3-
{-# LANGUAGE RecordWildCards #-}
1+
{-# LANGUAGE DeriveAnyClass #-}
2+
{-# LANGUAGE LambdaCase #-}
3+
{-# LANGUAGE MultiWayIf #-}
4+
{-# LANGUAGE RecordWildCards #-}
5+
{-# LANGUAGE ScopedTypeVariables #-}
6+
{-# LANGUAGE TypeApplications #-}
47

5-
module PostgREST.Listener (runListener) where
8+
module PostgREST.Listener (runListener, runListener') where
69

710
import qualified Data.ByteString.Char8 as BS
811

@@ -18,28 +21,55 @@ import qualified PostgREST.Config as Config
1821
import Control.Arrow ((&&&))
1922
import Data.Bitraversable (bisequence)
2023
import Data.Either.Combinators (whenRight)
24+
import Data.IORef (IORef, newIORef,
25+
readIORef, writeIORef)
2126
import qualified Data.Text as T
27+
import Data.Time (UTCTime, diffUTCTime,
28+
nominalDiffTimeToSeconds)
2229
import qualified Database.PostgreSQL.LibPQ as LibPQ
2330
import qualified Hasql.Session as SQL
2431
import PostgREST.Config.Database (queryPgVersion)
2532
import PostgREST.Config.PgVersion (pgvFullName)
2633
import Protolude
34+
import System.IO.Error (isResourceVanishedError)
2735

2836
-- | Starts the Listener in a thread
29-
runListener :: AppState -> IO ()
30-
runListener appState = do
37+
-- | Returns IO action to stop the listener thread.
38+
runListener :: AppState -> IO (IO ())
39+
runListener appState = runListener' appState (15 * minute) (30 * minute)
40+
where
41+
minute = 60
42+
43+
data ListenerStopped = ListenerStopped deriving (Show, Exception)
44+
45+
runListener' :: AppState -> Int -> Int -> IO (IO ())
46+
runListener' appState initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec = do
3147
AppConfig{..} <- getConfig appState
32-
when configDbChannelEnabled $
33-
void . forkIO . void $ retryingListen appState
48+
if configDbChannelEnabled then do
49+
started <- newIORef Nothing
50+
listenerThreadId <- forkIO . void $ retryingListen started initialTcpKeepAlivesIdleSec maxTcpKeepAlivesIdleSec False appState
51+
pure $ throwTo listenerThreadId ListenerStopped
52+
else
53+
mempty
3454

3555
-- | Starts a LISTEN connection and handles notifications. It recovers with exponential backoff with a cap of 32 seconds, if the LISTEN connection is lost.
3656
-- | This function never returns (but can throw) and return type enforces that.
37-
retryingListen :: AppState -> IO Void
38-
retryingListen appState = do
57+
retryingListen :: IORef (Maybe UTCTime) -> Int -> Int -> Bool -> AppState -> IO ()
58+
retryingListen lastActivity currentKeepalivesIdle maxKeepalivesIdle retryingOnIdleTimeout appState = do
3959
cfg@AppConfig{..} <- AppState.getConfig appState
4060
let
4161
dbChannel = toS configDbChannel
4262
onError err = do
63+
-- ResourceVanished should be reported when reading from socket fails
64+
-- as long as hasql-notifications does not wrap IOException in something else...
65+
let resourceVanished = maybe False isResourceVanishedError (fromException @IOException err)
66+
(newTcpIdle, newMaxKeepalivesIdle) <-
67+
if resourceVanished then do
68+
readIORef lastActivity >>=
69+
maybe (pure (currentKeepalivesIdle, maxKeepalivesIdle)) adjustTcpIdle
70+
else
71+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
72+
writeIORef lastActivity Nothing
4373
AppState.putIsListenerOn appState False
4474
observer $ DBListenFail dbChannel (Right err)
4575
when (isDbListenerBug err) $
@@ -54,15 +84,15 @@ retryingListen appState = do
5484
unless (delay == maxDelay) $
5585
AppState.putNextListenerDelay appState (delay * 2)
5686
-- loop running the listener
57-
retryingListen appState
87+
retryingListen lastActivity newTcpIdle newMaxKeepalivesIdle resourceVanished appState
5888

5989
-- Execute the listener with with error handling
60-
handle onError $ do
90+
handle onError $ handle (\ListenerStopped -> pure ()) $ do
6191
-- Make sure we don't leak connections on errors
6292
bracket
6393
-- acquire connection
6494
(SQL.acquire $
65-
Config.toConnectionSettings Config.addTargetSessionAttrs cfg)
95+
Config.toConnectionSettings (addKeepalivesOptions . Config.addTargetSessionAttrs) cfg)
6696
-- release connection
6797
(`whenRight` releaseConnection) $
6898
-- use connection
@@ -82,6 +112,7 @@ retryingListen appState = do
82112
AppState.putNextListenerDelay appState 1
83113

84114
observer $ DBListenStart pqHost pqPort pgFullName dbChannel
115+
saveLastActivityTime
85116

86117
-- wait for notifications
87118
-- this will never return, in case of an error it will throw and be caught by onError
@@ -96,15 +127,59 @@ retryingListen appState = do
96127
oneSecondInMicro = 1000000
97128
maxDelay = 32
98129

99-
handleNotification channel msg =
130+
handleNotification channel msg = do
100131
if | BS.null msg -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
101132
| msg == "reload schema" -> observer (DBListenerGotSCacheMsg channel) >> cacheReloader
102133
| msg == "reload config" -> observer (DBListenerGotConfigMsg channel) >> AppState.readInDbConfig False appState
103134
| otherwise -> pure () -- Do nothing if anything else than an empty message is sent
135+
saveLastActivityTime
136+
137+
saveLastActivityTime = AppState.getTime appState >>= writeIORef lastActivity . Just
104138

105139
cacheReloader =
106140
AppState.schemaCacheLoader appState
107141

108142
releaseConnection = void . forkIO . handle (observer . DBListenerConnectionCleanupFail) . SQL.release
109143

110144
isDbListenerBug e = "could not access status of transaction" `T.isInfixOf` show e
145+
146+
-- adjust the next keepalive timeout
147+
-- This is a simple discovery mechanism that
148+
-- should converge to optimum keepalive timeout
149+
-- we calculate the time T between connection failure and last activity
150+
-- if T is <= than current timeout
151+
-- it means timeout is too long
152+
-- so we set next timeout to T/2 and max timeout to T
153+
-- (max cannot be longer because we lost connection earlier)
154+
-- if T is longer than current timeout
155+
-- we set timeout in between current timeout and current max
156+
adjustTcpIdle lastActiveTime = do
157+
currentIdleSeconds <- AppState.getTime appState <&> round . nominalDiffTimeToSeconds . (`diffUTCTime` lastActiveTime)
158+
let currentIdleTimeout = currentKeepalivesIdle + keepalivesInterval * keepalivesCount
159+
-- if our idle time == current idle timeout setting it means
160+
-- we have to make it shorter
161+
if currentIdleSeconds `div` currentIdleTimeout <= 1 then
162+
-- only adjust if this is the second idle timeout failure
163+
-- this is to eliminate spurious adjustments (TODO rethink if it is really needed)
164+
if retryingOnIdleTimeout then
165+
-- try with 1/2 of current keepalive idle
166+
-- remember that it is the new maximum we can try later
167+
pure (max 1 $ currentKeepalivesIdle `div` 2, currentKeepalivesIdle)
168+
else
169+
pure (currentKeepalivesIdle, maxKeepalivesIdle)
170+
else
171+
-- we can try to make it longer
172+
-- but not longer than previously calculated maximum
173+
pure (currentKeepalivesIdle + (maxKeepalivesIdle - currentKeepalivesIdle) `div` 2, maxKeepalivesIdle)
174+
175+
keepalivesInterval = max 1 $ currentKeepalivesIdle `div` (5 * keepalivesCount)
176+
keepalivesCount = 5
177+
178+
-- (Config.addConnStringOption opt val) is an endomorphism
179+
-- so it is a Monoid under function composition
180+
-- Haskell is awesome
181+
addKeepalivesOptions = appEndo $ foldMap (Endo . uncurry Config.addConnStringOption . fmap show) [
182+
("keepalives_count", keepalivesCount)
183+
, ("keepalives_interval", keepalivesInterval)
184+
, ("keepalives_idle", currentKeepalivesIdle)
185+
]

test/observability/Observation/ToxiSpec.hs

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,23 @@
22
{-# LANGUAGE FlexibleContexts #-}
33
{-# LANGUAGE MonadComprehensions #-}
44
{-# LANGUAGE NamedFieldPuns #-}
5+
{-# LANGUAGE NumericUnderscores #-}
56
module Observation.ToxiSpec where
67

78
import Control.Monad.Trans.Control (liftBaseDiscard)
9+
import qualified Data.Map as M
810
import Network.Wai (Application)
911
import ObsHelper
1012
import qualified PostgREST.AppState as AppState
13+
import PostgREST.Listener (runListener,
14+
runListener')
15+
import PostgREST.Observation (Observation (..))
1116
import Protolude hiding (get)
1217
import Test.Hspec (SpecWith, describe, it)
1318
import Test.Hspec.Wai
14-
import Toxiproxy (withDisabled)
19+
import Toxiproxy (Stream (..), Toxic (..),
20+
ToxicType (..),
21+
withDisabled, withToxic)
1522

1623
spec :: SpecWith (SpecState, Application)
1724
spec = describe "Tests using Toxiproxy" $ do
@@ -32,3 +39,73 @@ spec = describe "Tests using Toxiproxy" $ do
3239
liftBaseDiscard (withDisabled specToxiProxy) $ do
3340
void $ get "/items?id=eq.5"
3441
`shouldRespondWith` 503
42+
43+
describe "Toxiproxy tests of notification listener" $ do
44+
it "should start listener" $ do
45+
SpecState {specAppState, specObsChan} <- getState
46+
let waitFor = waitForObs specObsChan
47+
48+
liftIO $ withListener specAppState $
49+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
50+
51+
it "should retry listener" $ do
52+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
53+
let waitFor = waitForObs specObsChan
54+
55+
liftIO $ bracket (
56+
withDisabled specToxiProxy $ do
57+
stopListener <- runListener specAppState
58+
(do
59+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
60+
waitFor (1*sec) "DBListenRetry" $ \x -> [ o | o@DBListenRetry{} <- pure x ])
61+
`onException` stopListener
62+
pure stopListener)
63+
identity
64+
(const $ waitFor (2*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x])
65+
66+
it "should retry listener with exponential backoff when connection broken" $ do
67+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
68+
let waitFor = waitForObs specObsChan
69+
70+
liftIO $ withListener specAppState $ do
71+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
72+
withDisabled specToxiProxy $ do
73+
waitFor (1*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
74+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
75+
waitFor (2*sec) "DBListenRetry 2" $ \x -> [ o | o@(DBListenRetry 2) <- pure x ]
76+
waitFor (3*sec) "DBListenRetry 4" $ \x -> [ o | o@(DBListenRetry 4) <- pure x ]
77+
waitFor (5*sec) "DBListenStart after retries" $ \x -> [ o | o@DBListenStart{} <- pure x]
78+
waitFor (1*sec) "SchemaCacheLoadedObs" $ \x -> [ o | o@SchemaCacheLoadedObs{} <- pure x ]
79+
80+
-- this scenario cannot be tested with Toxiproxy
81+
-- because keepalives are handled by the kernel TCP/IP stack
82+
-- left here as an example and template for a test
83+
-- using some future more advanced tool
84+
it "should detect broken connection using keepalives" $ do
85+
pendingWith "Cannot be tested with Toxiproxy"
86+
SpecState {specAppState, specObsChan, specToxiProxy} <- getState
87+
let waitFor = waitForObs specObsChan
88+
89+
liftIO $ withKeepAliveListener specAppState $ do
90+
waitFor (1*sec) "DBListenStart" $ \x -> [ o | o@DBListenStart{} <- pure x]
91+
withTimedOutConnection specToxiProxy $ do
92+
waitFor (10*sec) "DBListenFail" $ \x -> [ o | o@DBListenFail{} <- pure x ]
93+
waitFor (1*sec) "DBListenRetry 1" $ \x -> [ o | o@(DBListenRetry 1) <- pure x ]
94+
waitFor (2*sec) "DBListenStart after timeout toxic" $ \x -> [ o | o@DBListenStart{} <- pure x]
95+
96+
where
97+
withListener appState = bracket (runListener appState) identity . const
98+
withKeepAliveListener appState = bracket (runListener' appState 1 1) identity . const
99+
withTimedOutConnection proxy =
100+
withToxic proxy (timeoutToxic Upstream) .
101+
withToxic proxy (timeoutToxic Downstream)
102+
timeoutToxic stream = Toxic
103+
{ toxicName = case stream of
104+
Upstream -> "listener-timeout-upstream"
105+
Downstream -> "listener-timeout-downstream"
106+
, toxicType = Timeout
107+
, toxicStream = stream
108+
, toxicToxicity = 1
109+
, toxicAttributes = M.fromList [("timeout", 0)]
110+
}
111+
sec = 1_000_000

0 commit comments

Comments
 (0)