Skip to content

Commit 405075a

Browse files
committed
fix: shutdown should wait for in flight requests
3 parents 6257a16 + d0efc95 + f589074 commit 405075a

8 files changed

Lines changed: 125 additions & 83 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ All notable changes to this project will be documented in this file. From versio
1919
- Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359
2020
+ Now fails at startup. Prior to this, it failed with `PGRST205` on requests related to these schemas.
2121

22+
### Fixed
23+
24+
- Shutdown should wait for in flight requests by @mkleczek in #4702
25+
2226
## [14.6] - 2026-03-06
2327

2428
### Fixed

nix/overlays/haskell-packages.nix

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,53 @@ let
7878
hasql-pool = lib.dontCheck prev.hasql-pool_1_0_1;
7979
hasql-transaction = lib.dontCheck prev.hasql-transaction_1_1_0_1;
8080
postgresql-binary = lib.dontCheck (lib.doJailbreak prev.postgresql-binary_0_13_1_3);
81+
82+
http2 =
83+
prev.callHackageDirect
84+
{
85+
pkg = "http2";
86+
ver = "5.4.0";
87+
sha256 = "sha256-PeEWVd61bQ8G7LvfLeXklzXqNJFaAjE2ecRMWJZESPE=";
88+
}
89+
{ };
90+
91+
http-semantics =
92+
prev.callHackageDirect
93+
{
94+
pkg = "http-semantics";
95+
ver = "0.4.0";
96+
sha256 = "sha256-rh0z51EKvsu5rQd5n2z3fSRjjEObouNZSBPO9NFYOF0=";
97+
}
98+
{ };
99+
100+
network-run =
101+
prev.callHackageDirect
102+
{
103+
pkg = "network-run";
104+
ver = "0.5.0";
105+
sha256 = "sha256-vbXh+CzxDsGApjqHxCYf/ijpZtUCApFbkcF5gyN0THU=";
106+
}
107+
{ };
108+
109+
time-manager =
110+
prev.callHackageDirect
111+
{
112+
pkg = "time-manager";
113+
ver = "0.2.4";
114+
sha256 = "sha256-sAt/331YLQ2IU3z90aKYSq1nxoazv87irsuJp7ZG3pw=";
115+
}
116+
{ };
117+
118+
warp =
119+
lib.dontCheck (prev.callCabal2nixWithOptions "warp"
120+
(super.fetchFromGitHub {
121+
owner = "mkleczek";
122+
repo = "wai";
123+
rev = "e9c9e784dee1b54461b94089175a969eb647d262";
124+
#sha256 = "sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=";
125+
sha256 = "sha256-q8vgvsNeKM/fHp3H6+W/tR4HicioqeaU9Eo3rb13bLo=";
126+
}) "--subpath=warp"
127+
{ });
81128
};
82129
in
83130
{

src/PostgREST/Admin.hs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@ import PostgREST.Observation (Observation (..))
1919

2020
import qualified PostgREST.AppState as AppState
2121

22-
import Protolude
22+
import qualified Network.Socket as NS
23+
import Protolude
2324

24-
runAdmin :: AppState -> Warp.Settings -> IO ()
25-
runAdmin appState settings = do
26-
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
25+
runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
26+
runAdmin appState maybeAdminSocket socketREST settings = do
27+
whenJust maybeAdminSocket $ \adminSocket -> do
2728
address <- resolveSocketToAddress adminSocket
2829
observer $ AdminStartObs address
2930
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
3031
where
31-
adminApp = admin appState
32+
adminApp = admin appState socketREST
3233
observer = AppState.getObserver appState
3334

3435
-- | PostgREST admin application
35-
admin :: AppState.AppState -> Wai.Application
36-
admin appState req respond = do
37-
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
36+
admin :: AppState.AppState -> NS.Socket -> Wai.Application
37+
admin appState socketREST req respond = do
38+
isMainAppReachable <- isRight <$> reachMainApp socketREST
3839
isLoaded <- AppState.isLoaded appState
3940
isPending <- AppState.isPending appState
4041

src/PostgREST/App.hs

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import GHC.IO.Exception (IOErrorType (..))
2222
import System.IO.Error (ioeGetErrorType)
2323

2424
import Control.Monad.Except (liftEither)
25+
import Control.Monad.Extra (whenJust)
2526
import Data.Either.Combinators (mapLeft, whenLeft)
2627
import Data.Maybe (fromJust)
2728
import Data.String (IsString (..))
@@ -60,10 +61,15 @@ import PostgREST.SchemaCache (SchemaCache (..))
6061
import PostgREST.TimeIt (timeItT)
6162
import PostgREST.Version (docsVersion, prettyVersion)
6263

63-
import qualified Data.ByteString.Char8 as BS
64-
import qualified Data.List as L
65-
import qualified Network.HTTP.Types as HTTP
66-
import Protolude hiding (Handler)
64+
import qualified Data.ByteString.Char8 as BS
65+
import qualified Data.List as L
66+
import Data.Streaming.Network (bindPortTCP,
67+
bindRandomPortTCP)
68+
import qualified Data.Text as T
69+
import qualified Network.HTTP.Types as HTTP
70+
import qualified Network.Socket as NS
71+
import PostgREST.Unix (createAndBindDomainSocket)
72+
import Protolude hiding (Handler)
6773

6874
type Handler = ExceptT Error
6975

@@ -72,19 +78,27 @@ run appState = do
7278
conf@AppConfig{..} <- AppState.getConfig appState
7379

7480
AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
75-
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
81+
(mainSocket, adminSocket) <- initSockets conf
82+
let closeSockets = do
83+
whenJust adminSocket NS.close
84+
NS.close mainSocket
85+
Unix.installSignalHandlers closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
7686

7787
Listener.runListener appState
7888

79-
Admin.runAdmin appState (serverSettings conf)
89+
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
8090

8191
let app = postgrest configLogLevel appState (AppState.schemaCacheLoader appState)
8292

8393
do
84-
address <- resolveSocketToAddress (AppState.getSocketREST appState)
94+
address <- resolveSocketToAddress mainSocket
8595
observer $ AppServerAddressObs address
8696

87-
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) (AppState.getSocketREST appState) app
97+
-- Hardcoding maximum graceful shutdown timeout (arbitrary set to 5 seconds)
98+
-- This is unfortunate but necessary becase graceful shutdowns don't work with HTTP keep-alive
99+
-- causing Warp to handle requests on already opened connections even if the listen socket is closed
100+
-- See: https://github.com/yesodweb/wai/issues/853
101+
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
88102
where
89103
observer = AppState.getObserver appState
90104

@@ -229,3 +243,40 @@ addRetryHint delay response = do
229243

230244
isServiceUnavailable :: Wai.Response -> Bool
231245
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503
246+
247+
type AppSockets = (NS.Socket, Maybe NS.Socket)
248+
249+
initSockets :: AppConfig -> IO AppSockets
250+
initSockets AppConfig{..} = do
251+
let
252+
cfg'usp = configServerUnixSocket
253+
cfg'uspm = configServerUnixSocketMode
254+
cfg'host = configServerHost
255+
cfg'port = configServerPort
256+
cfg'adminHost = configAdminServerHost
257+
cfg'adminPort = configAdminServerPort
258+
259+
sock <- case cfg'usp of
260+
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
261+
-- but we need to have runtime error if we try to use it in Windows, not compile time error
262+
Just path -> createAndBindDomainSocket path cfg'uspm
263+
Nothing -> do
264+
(_, sock) <-
265+
if cfg'port /= 0
266+
then do
267+
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
268+
pure (cfg'port, sock)
269+
else do
270+
-- explicitly bind to a random port, returning bound port number
271+
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
272+
pure (num, sock)
273+
pure sock
274+
275+
adminSock <- case cfg'adminPort of
276+
Just adminPort -> do
277+
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
278+
pure $ Just adminSock
279+
Nothing -> pure Nothing
280+
281+
pure (sock, adminSock)
282+

src/PostgREST/AppState.hs

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ module PostgREST.AppState
1313
, getNextListenerDelay
1414
, getTime
1515
, getJwtCacheState
16-
, getSocketREST
17-
, getSocketAdmin
1816
, init
19-
, initSockets
2017
, initWithPool
2118
, putNextListenerDelay
2219
, putSchemaCache
@@ -32,13 +29,11 @@ module PostgREST.AppState
3229

3330
import qualified Data.ByteString.Char8 as BS
3431
import Data.Either.Combinators (whenLeft)
35-
import qualified Data.Text as T (unpack)
3632
import qualified Hasql.Pool as SQL
3733
import qualified Hasql.Pool.Config as SQL
3834
import qualified Hasql.Session as SQL
3935
import qualified Hasql.Transaction.Sessions as SQL
4036
import qualified Network.HTTP.Types.Status as HTTP
41-
import qualified Network.Socket as NS
4237
import qualified PostgREST.Auth.JwtCache as JwtCache
4338
import qualified PostgREST.Error as Error
4439
import qualified PostgREST.Logger as Logger
@@ -70,10 +65,7 @@ import PostgREST.SchemaCache (SchemaCache (..),
7065
querySchemaCache,
7166
showSummary)
7267
import PostgREST.SchemaCache.Identifiers (quoteQi)
73-
import PostgREST.Unix (createAndBindDomainSocket)
7468

75-
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
76-
import Data.String (IsString (..))
7769
import Protolude
7870

7971
data AppState = AppState
@@ -99,10 +91,6 @@ data AppState = AppState
9991
, stateNextDelay :: IORef Int
10092
-- | Keeps track of the next delay for the listener
10193
, stateNextListenerDelay :: IORef Int
102-
-- | Network socket for REST API
103-
, stateSocketREST :: NS.Socket
104-
-- | Network socket for the admin UI
105-
, stateSocketAdmin :: Maybe NS.Socket
10694
-- | Observation handler
10795
, stateObserver :: ObservationHandler
10896
-- | JWT Cache
@@ -117,8 +105,6 @@ data SchemaCacheStatus
117105
| SCPending
118106
deriving Eq
119107

120-
type AppSockets = (NS.Socket, Maybe NS.Socket)
121-
122108
init :: AppConfig -> IO AppState
123109
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
124110
loggerState <- Logger.init
@@ -128,12 +114,10 @@ init conf@AppConfig{configLogLevel, configDbPoolSize} = do
128114
observer $ AppStartObs prettyVersion
129115

130116
pool <- initPool conf observer
131-
(sock, adminSock) <- initSockets conf
132-
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
133-
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}
117+
initWithPool pool conf loggerState metricsState observer --{ stateSocketREST = sock, stateSocketAdmin = adminSock}
134118

135-
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
136-
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
119+
initWithPool :: SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
120+
initWithPool pool conf loggerState metricsState observer = do
137121

138122
appState <- AppState pool
139123
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
@@ -146,8 +130,6 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
146130
<*> myThreadId
147131
<*> newIORef 0
148132
<*> newIORef 1
149-
<*> pure sock
150-
<*> pure adminSock
151133
<*> pure observer
152134
<*> JwtCache.init conf observer
153135
<*> pure loggerState
@@ -166,40 +148,6 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
166148
destroy :: AppState -> IO ()
167149
destroy = destroyPool
168150

169-
initSockets :: AppConfig -> IO AppSockets
170-
initSockets AppConfig{..} = do
171-
let
172-
cfg'usp = configServerUnixSocket
173-
cfg'uspm = configServerUnixSocketMode
174-
cfg'host = configServerHost
175-
cfg'port = configServerPort
176-
cfg'adminHost = configAdminServerHost
177-
cfg'adminPort = configAdminServerPort
178-
179-
sock <- case cfg'usp of
180-
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
181-
-- but we need to have runtime error if we try to use it in Windows, not compile time error
182-
Just path -> createAndBindDomainSocket path cfg'uspm
183-
Nothing -> do
184-
(_, sock) <-
185-
if cfg'port /= 0
186-
then do
187-
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
188-
pure (cfg'port, sock)
189-
else do
190-
-- explicitly bind to a random port, returning bound port number
191-
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
192-
pure (num, sock)
193-
pure sock
194-
195-
adminSock <- case cfg'adminPort of
196-
Just adminPort -> do
197-
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
198-
pure $ Just adminSock
199-
Nothing -> pure Nothing
200-
201-
pure (sock, adminSock)
202-
203151
initPool :: AppConfig -> ObservationHandler -> IO SQL.Pool
204152
initPool AppConfig{..} observer = do
205153
SQL.acquire $ SQL.settings
@@ -313,12 +261,6 @@ getTime = stateGetTime
313261
getJwtCacheState :: AppState -> JwtCacheState
314262
getJwtCacheState = stateJwtCache
315263

316-
getSocketREST :: AppState -> NS.Socket
317-
getSocketREST = stateSocketREST
318-
319-
getSocketAdmin :: AppState -> Maybe NS.Socket
320-
getSocketAdmin = stateSocketAdmin
321-
322264
getMainThreadId :: AppState -> ThreadId
323265
getMainThreadId = stateMainThreadId
324266

src/PostgREST/Unix.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ import System.Directory (removeFile)
1818
import System.IO.Error (isDoesNotExistError)
1919

2020
-- | Set signal handlers, only for systems with signals
21-
installSignalHandlers :: ThreadId -> IO () -> IO () -> IO ()
21+
installSignalHandlers :: IO () -> IO () -> IO () -> IO ()
2222
#ifndef mingw32_HOST_OS
23-
installSignalHandlers tid usr1 usr2 = do
24-
let interrupt = throwTo tid UserInterrupt
23+
installSignalHandlers interrupt usr1 usr2 = do
2524
install Signals.sigINT interrupt
2625
install Signals.sigTERM interrupt
2726
install Signals.sigUSR1 usr1

test/io/test_io.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def sleep():
105105
t.join()
106106

107107

108-
@pytest.mark.xfail(reason="Graceful shutdown is currently failing", strict=True)
109108
def test_graceful_shutdown_waits_for_in_flight_request(defaultenv):
110109
"SIGTERM should allow in-flight requests to finish before exiting"
111110

test/spec/Main.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ main = do
8484

8585
-- cached schema cache so most tests run fast
8686
baseSchemaCache <- loadSCache pool testCfg
87-
sockets <- AppState.initSockets testCfg
8887
loggerState <- Logger.init
8988
metricsState <- Metrics.init (configDbPoolSize testCfg)
9089

9190
let
9291
initApp sCache st config = do
93-
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState)
92+
appState <- AppState.initWithPool pool config loggerState metricsState (Metrics.observationMetrics metricsState)
9493
AppState.putPgVersion appState actualPgVersion
9594
AppState.putSchemaCache appState (Just sCache)
9695
return (st, postgrest (configLogLevel config) appState (pure ()))

0 commit comments

Comments
 (0)