@@ -22,6 +22,9 @@ import GHC.IO.Exception (IOErrorType (..))
2222import System.IO.Error (ioeGetErrorType )
2323
2424import Control.Monad.Except (liftEither )
25+ import Control.Monad.Extra (whenJust )
26+ import Data.IORef (atomicWriteIORef , newIORef ,
27+ readIORef )
2528import Data.Either.Combinators (mapLeft , whenLeft )
2629import Data.Maybe (fromJust )
2730import Data.String (IsString (.. ))
@@ -62,8 +65,8 @@ import PostgREST.Version (docsVersion, prettyVersion)
6265
6366import qualified Data.ByteString.Char8 as BS
6467import qualified Data.List as L
65- import Data.Streaming.Network (bindPortTCP ,
66- bindRandomPortTCP )
68+ import Data.Streaming.Network (HostPreference ,
69+ bindPortGenEx )
6770import qualified Data.Text as T
6871import qualified Network.HTTP.Types as HTTP
6972import qualified Network.Socket as NS
@@ -76,21 +79,36 @@ run :: AppState -> IO ()
7679run appState = do
7780 conf@ AppConfig {.. } <- AppState. getConfig appState
7881
79- AppState. schemaCacheLoader appState -- Loads the initial SchemaCache
80- (mainSocket, adminSocket) <- initSockets conf
82+ mainSocketRef <- newIORef Nothing
83+ adminSocket <- initAdminServerSocket conf
8184
82- Unix. installSignalHandlers (AppState. getMainThreadId appState) (AppState. schemaCacheLoader appState) (AppState. readInDbConfig False appState)
85+ let closeSockets = do
86+ whenJust adminSocket NS. close
87+ readIORef mainSocketRef >>= foldMap NS. close
88+ Unix. installSignalHandlers closeSockets (AppState. schemaCacheLoader appState) (AppState. readInDbConfig False appState)
89+
90+ Admin. runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)
8391
8492 Listener. runListener appState
8593
86- Admin. runAdmin appState adminSocket mainSocket (serverSettings conf)
94+ -- Kick off and wait for the initial SchemaCache load before creating the
95+ -- main API socket.
96+ AppState. schemaCacheLoader appState
97+ AppState. waitForSchemaCacheLoaded appState
98+
99+ mainSocket <- initServerSocket conf
100+ atomicWriteIORef mainSocketRef $ Just mainSocket
87101
88102 let app = postgrest configLogLevel appState (AppState. schemaCacheLoader appState)
89103
90104 do
91105 address <- resolveSocketToAddress mainSocket
92106 observer $ AppServerAddressObs address
93107
108+ -- Hardcoding maximum graceful shutdown timeout (arbitrary set to 5 seconds)
109+ -- This is unfortunate but necessary becase graceful shutdowns don't work with HTTP keep-alive
110+ -- causing Warp to handle requests on already opened connections even if the listen socket is closed
111+ -- See: https://github.com/yesodweb/wai/issues/853
94112 Warp. runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
95113 where
96114 observer = AppState. getObserver appState
@@ -237,39 +255,27 @@ addRetryHint delay response = do
237255isServiceUnavailable :: Wai. Response -> Bool
238256isServiceUnavailable response = Wai. responseStatus response == HTTP. status503
239257
240- type AppSockets = (NS. Socket , Maybe NS. Socket )
241-
242- initSockets :: AppConfig -> IO AppSockets
243- initSockets AppConfig {.. } = do
244- let
245- cfg'usp = configServerUnixSocket
246- cfg'uspm = configServerUnixSocketMode
247- cfg'host = configServerHost
248- cfg'port = configServerPort
249- cfg'adminHost = configAdminServerHost
250- cfg'adminPort = configAdminServerPort
251-
252- sock <- case cfg'usp of
253- -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
254- -- but we need to have runtime error if we try to use it in Windows, not compile time error
255- Just path -> createAndBindDomainSocket path cfg'uspm
256- Nothing -> do
257- (_, sock) <-
258- if cfg'port /= 0
259- then do
260- sock <- bindPortTCP cfg'port (fromString $ T. unpack cfg'host)
261- pure (cfg'port, sock)
262- else do
263- -- explicitly bind to a random port, returning bound port number
264- (num, sock) <- bindRandomPortTCP (fromString $ T. unpack cfg'host)
265- pure (num, sock)
266- pure sock
267-
268- adminSock <- case cfg'adminPort of
269- Just adminPort -> do
270- adminSock <- bindPortTCP adminPort (fromString $ T. unpack cfg'adminHost)
271- pure $ Just adminSock
272- Nothing -> pure Nothing
273-
274- pure (sock, adminSock)
275-
258+ initServerSocket :: AppConfig -> IO NS. Socket
259+ initServerSocket AppConfig {.. } = case configServerUnixSocket of
260+ -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
261+ -- but we need to have runtime error if we try to use it in Windows, not compile time error
262+ Just path -> createAndBindDomainSocket path configServerUnixSocketMode
263+ Nothing ->
264+ bindPortTCPWithReusePort configServerPort (fromString $ T. unpack configServerHost)
265+
266+ initAdminServerSocket :: AppConfig -> IO (Maybe NS. Socket )
267+ initAdminServerSocket AppConfig {.. } =
268+ traverse (`bindPortTCPWithReusePort` adminHost) configAdminServerPort
269+ where
270+ adminHost = fromString $ T. unpack configAdminServerHost
271+
272+ bindPortTCPWithReusePort :: Int -> HostPreference -> IO NS. Socket
273+ bindPortTCPWithReusePort port hostPreference = do
274+ -- Some unix variants can expose ReusePort but reject it at runtime.
275+ -- Fall back to binding without ReusePort when that happens.
276+ try (bindPortGenEx reusePortOpts NS. Stream port hostPreference) :: IO (Either SomeException NS. Socket )
277+ >>= either (const $ bindPortGenEx [] NS. Stream port hostPreference) pure
278+ >>= listenSocket
279+ where
280+ reusePortOpts = [(NS. ReusePort , 1 )]
281+ listenSocket sock = NS. listen sock (max 2048 NS. maxListenQueue) $> sock
0 commit comments