@@ -22,6 +22,7 @@ import GHC.IO.Exception (IOErrorType (..))
2222import System.IO.Error (ioeGetErrorType )
2323
2424import Control.Monad.Except (liftEither )
25+ import Control.Monad.Extra (whenJust )
2526import Data.Either.Combinators (mapLeft , whenLeft )
2627import Data.Maybe (fromJust )
2728import Data.String (IsString (.. ))
@@ -60,31 +61,50 @@ import PostgREST.SchemaCache (SchemaCache (..))
6061import PostgREST.TimeIt (timeItT )
6162import PostgREST.Version (docsVersion , prettyVersion )
6263
63- import qualified Data.ByteString.Char8 as BS
64- import qualified Data.List as L
65- import qualified Network.HTTP.Types as HTTP
66- import Protolude hiding (Handler )
64+ import qualified Data.ByteString.Char8 as BS
65+ import qualified Data.List as L
66+ import Data.Streaming.Network (HostPreference ,
67+ bindPortGenEx )
68+ import qualified Data.Text as T
69+ import qualified Network.HTTP.Types as HTTP
70+ import qualified Network.Socket as NS
71+ import PostgREST.Unix (createAndBindDomainSocketNoListen )
72+ import Protolude hiding (Handler )
6773
6874type Handler = ExceptT Error
6975
7076run :: AppState -> IO ()
7177run appState = do
7278 conf@ AppConfig {.. } <- AppState. getConfig appState
7379
74- AppState. schemaCacheLoader appState -- Loads the initial SchemaCache
75- Unix. installSignalHandlers (AppState. getMainThreadId appState) (AppState. schemaCacheLoader appState) (AppState. readInDbConfig False appState)
80+ mainSocket <- initServerSocket conf
81+ adminSocket <- initAdminServerSocket conf
82+ let closeSockets = do
83+ whenJust adminSocket NS. close
84+ NS. close mainSocket
85+ Unix. installSignalHandlers closeSockets (AppState. schemaCacheLoader appState) (AppState. readInDbConfig False appState)
86+
87+ Admin. runAdmin appState adminSocket mainSocket (serverSettings conf)
7688
7789 Listener. runListener appState
7890
79- Admin. runAdmin appState (serverSettings conf)
91+ -- Kick off and wait for the initial SchemaCache load before listening on
92+ -- the main API socket.
93+ AppState. schemaCacheLoader appState
94+ AppState. waitForSchemaCacheLoaded appState
95+ void $ listenSocket mainSocket
8096
8197 let app = postgrest configLogLevel appState (AppState. schemaCacheLoader appState)
8298
8399 do
84- address <- resolveSocketToAddress ( AppState. getSocketREST appState)
100+ address <- resolveSocketToAddress mainSocket
85101 observer $ AppServerAddressObs address
86102
87- Warp. runSettingsSocket (serverSettings conf & setOnException onWarpException) (AppState. getSocketREST appState) app
103+ -- Hardcoding maximum graceful shutdown timeout (arbitrary set to 5 seconds)
104+ -- This is unfortunate but necessary becase graceful shutdowns don't work with HTTP keep-alive
105+ -- causing Warp to handle requests on already opened connections even if the listen socket is closed
106+ -- See: https://github.com/yesodweb/wai/issues/853
107+ Warp. runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
88108 where
89109 observer = AppState. getObserver appState
90110
@@ -229,3 +249,35 @@ addRetryHint delay response = do
229249
230250isServiceUnavailable :: Wai. Response -> Bool
231251isServiceUnavailable response = Wai. responseStatus response == HTTP. status503
252+
253+ initServerSocket :: AppConfig -> IO NS. Socket
254+ initServerSocket AppConfig {.. } = case configServerUnixSocket of
255+ -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
256+ -- but we need to have runtime error if we try to use it in Windows, not compile time error
257+ Just path -> createAndBindDomainSocketNoListen path configServerUnixSocketMode
258+ Nothing ->
259+ bindPortTCPWithoutListen configServerPort (fromString $ T. unpack configServerHost)
260+
261+ initAdminServerSocket :: AppConfig -> IO (Maybe NS. Socket )
262+ initAdminServerSocket AppConfig {.. } =
263+ traverse (`bindPortTCPWithReusePort` adminHost) configAdminServerPort
264+ where
265+ adminHost = fromString $ T. unpack configAdminServerHost
266+
267+ bindPortTCPWithReusePort :: Int -> HostPreference -> IO NS. Socket
268+ bindPortTCPWithReusePort port hostPreference
269+ = bindPortTCPWithoutListen port hostPreference >>= listenSocket
270+
271+ bindPortTCPWithoutListen :: Int -> HostPreference -> IO NS. Socket
272+ bindPortTCPWithoutListen port hostPreference = do
273+ -- Some unix variants can expose ReusePort but reject it at runtime.
274+ -- Fall back to binding without ReusePort when that happens.
275+ socketWithReusePort <- try (bindPortGenEx reusePortOpts NS. Stream port hostPreference) :: IO (Either SomeException NS. Socket )
276+ either (const $ bindPortGenEx [] NS. Stream port hostPreference) pure socketWithReusePort
277+ where
278+ reusePortOpts = [(NS. ReusePort , 1 )]
279+
280+ listenSocket :: NS. Socket -> IO NS. Socket
281+ listenSocket sock = do
282+ NS. listen sock (max 2048 NS. maxListenQueue)
283+ pure sock
0 commit comments