Skip to content

Commit 2cb6503

Browse files
committed
add: use SO_REUSEPORT on platform supporting it
2 parents 03ddd0b + ce24174 commit 2cb6503

7 files changed

Lines changed: 117 additions & 93 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file. From versio
1313
- Optimize requests with `Prefer: count=exact` that do not use ranges or `db-max-rows` by @laurenceisla in #3957
1414
+ Removed unnecessary double count when building the `Content-Range`.
1515
- Add config `client_error_verbosity` to customize error verbosity by @taimoorzaeem in #4088, #3980, #3824
16+
- Use SO_REUSEPORT on platforms supporting it by @mkleczek in #4703 #4694
1617

1718
### Changed
1819

src/PostgREST/Admin.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ admin appState socketREST req respond = do
4444
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
4545
["ready"] ->
4646
let
47-
status | not isMainAppReachable = HTTP.status500
48-
| isPending = HTTP.status503
47+
status | isPending = HTTP.status503
48+
| not isMainAppReachable = HTTP.status500
4949
| isLoaded = HTTP.status200
5050
| otherwise = HTTP.status500
5151
in

src/PostgREST/App.hs

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ import PostgREST.Version (docsVersion, prettyVersion)
6363

6464
import qualified Data.ByteString.Char8 as BS
6565
import qualified Data.List as L
66-
import Data.Streaming.Network (bindPortTCP,
67-
bindRandomPortTCP)
66+
import Data.Streaming.Network (HostPreference,
67+
bindPortGenEx)
6868
import qualified Data.Text as T
6969
import qualified Network.HTTP.Types as HTTP
7070
import qualified Network.Socket as NS
71-
import PostgREST.Unix (createAndBindDomainSocket)
71+
import PostgREST.Unix (createAndBindDomainSocketNoListen)
7272
import Protolude hiding (Handler)
7373

7474
type Handler = ExceptT Error
@@ -77,16 +77,22 @@ run :: AppState -> IO ()
7777
run appState = do
7878
conf@AppConfig{..} <- AppState.getConfig appState
7979

80-
AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
81-
(mainSocket, adminSocket) <- initSockets conf
80+
mainSocket <- initServerSocket conf
81+
adminSocket <- initAdminServerSocket conf
8282
let closeSockets = do
8383
whenJust adminSocket NS.close
8484
NS.close mainSocket
8585
Unix.installSignalHandlers closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8686

87+
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
88+
8789
Listener.runListener appState
8890

89-
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
91+
-- Kick off and wait for the initial SchemaCache load before listening on
92+
-- the main API socket.
93+
AppState.schemaCacheLoader appState
94+
AppState.waitForSchemaCacheLoaded appState
95+
void $ listenSocket mainSocket
9096

9197
let app = postgrest configLogLevel appState (AppState.schemaCacheLoader appState)
9298

@@ -244,39 +250,34 @@ addRetryHint delay response = do
244250
isServiceUnavailable :: Wai.Response -> Bool
245251
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503
246252

247-
type AppSockets = (NS.Socket, Maybe NS.Socket)
248-
249-
initSockets :: AppConfig -> IO AppSockets
250-
initSockets AppConfig{..} = do
251-
let
252-
cfg'usp = configServerUnixSocket
253-
cfg'uspm = configServerUnixSocketMode
254-
cfg'host = configServerHost
255-
cfg'port = configServerPort
256-
cfg'adminHost = configAdminServerHost
257-
cfg'adminPort = configAdminServerPort
258-
259-
sock <- case cfg'usp of
260-
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
261-
-- but we need to have runtime error if we try to use it in Windows, not compile time error
262-
Just path -> createAndBindDomainSocket path cfg'uspm
263-
Nothing -> do
264-
(_, sock) <-
265-
if cfg'port /= 0
266-
then do
267-
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
268-
pure (cfg'port, sock)
269-
else do
270-
-- explicitly bind to a random port, returning bound port number
271-
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
272-
pure (num, sock)
273-
pure sock
274-
275-
adminSock <- case cfg'adminPort of
276-
Just adminPort -> do
277-
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
278-
pure $ Just adminSock
279-
Nothing -> pure Nothing
280-
281-
pure (sock, adminSock)
253+
initServerSocket :: AppConfig -> IO NS.Socket
254+
initServerSocket AppConfig{..} = case configServerUnixSocket of
255+
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
256+
-- but we need to have runtime error if we try to use it in Windows, not compile time error
257+
Just path -> createAndBindDomainSocketNoListen path configServerUnixSocketMode
258+
Nothing ->
259+
bindPortTCPWithoutListen configServerPort (fromString $ T.unpack configServerHost)
260+
261+
initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
262+
initAdminServerSocket AppConfig{..} =
263+
traverse (`bindPortTCPWithReusePort` adminHost) configAdminServerPort
264+
where
265+
adminHost = fromString $ T.unpack configAdminServerHost
266+
267+
bindPortTCPWithReusePort :: Int -> HostPreference -> IO NS.Socket
268+
bindPortTCPWithReusePort port hostPreference
269+
= bindPortTCPWithoutListen port hostPreference >>= listenSocket
270+
271+
bindPortTCPWithoutListen :: Int -> HostPreference -> IO NS.Socket
272+
bindPortTCPWithoutListen port hostPreference = do
273+
-- Some unix variants can expose ReusePort but reject it at runtime.
274+
-- Fall back to binding without ReusePort when that happens.
275+
socketWithReusePort <- try (bindPortGenEx reusePortOpts NS.Stream port hostPreference) :: IO (Either SomeException NS.Socket)
276+
either (const $ bindPortGenEx [] NS.Stream port hostPreference) pure socketWithReusePort
277+
where
278+
reusePortOpts = [(NS.ReusePort, 1)]
282279

280+
listenSocket :: NS.Socket -> IO NS.Socket
281+
listenSocket sock = do
282+
NS.listen sock (max 2048 NS.maxListenQueue)
283+
pure sock

src/PostgREST/AppState.hs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module PostgREST.AppState
2222
, usePool
2323
, readInDbConfig
2424
, schemaCacheLoader
25+
, waitForSchemaCacheLoaded
2526
, getObserver
2627
, isLoaded
2728
, isPending
@@ -76,7 +77,7 @@ data AppState = AppState
7677
-- | Schema cache
7778
, stateSchemaCache :: IORef (Maybe SchemaCache)
7879
-- | The schema cache status
79-
, stateSCacheStatus :: IORef SchemaCacheStatus
80+
, stateSCacheStatus :: SchemaCacheStatus
8081
-- | State of the LISTEN channel
8182
, stateIsListenerOn :: IORef Bool
8283
-- | starts the connection worker with a debounce
@@ -99,11 +100,11 @@ data AppState = AppState
99100
, stateMetrics :: Metrics.MetricsState
100101
}
101102

102-
-- | Schema cache status
103-
data SchemaCacheStatus
104-
= SCLoaded
105-
| SCPending
106-
deriving Eq
103+
-- | Schema cache status.
104+
-- Empty means pending and full means loaded.
105+
newtype SchemaCacheStatus = SchemaCacheStatus
106+
{ getSCStatusMVar :: MVar ()
107+
}
107108

108109
init :: AppConfig -> IO AppState
109110
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
@@ -122,7 +123,7 @@ initWithPool pool conf loggerState metricsState observer = do
122123
appState <- AppState pool
123124
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
124125
<*> newIORef Nothing
125-
<*> newIORef SCPending
126+
<*> newSchemaCacheStatus
126127
<*> newIORef False
127128
<*> pure (pure ())
128129
<*> newIORef conf
@@ -240,6 +241,9 @@ putSchemaCache appState = atomicWriteIORef (stateSchemaCache appState)
240241
schemaCacheLoader :: AppState -> IO ()
241242
schemaCacheLoader = debouncedSCacheLoader
242243

244+
waitForSchemaCacheLoaded :: AppState -> IO ()
245+
waitForSchemaCacheLoaded = void . readMVar . getSCStatusMVar . stateSCacheStatus
246+
243247
getNextDelay :: AppState -> IO Int
244248
getNextDelay = readIORef . stateNextDelay
245249

@@ -277,18 +281,15 @@ putIsListenerOn = atomicWriteIORef . stateIsListenerOn
277281

278282
isLoaded :: AppState -> IO Bool
279283
isLoaded x = do
280-
scacheStatus <- readIORef $ stateSCacheStatus x
284+
scacheLoaded <- isSchemaCacheLoaded x
281285
connEstablished <- isConnEstablished x
282-
return $ scacheStatus == SCLoaded && connEstablished
286+
return $ scacheLoaded && connEstablished
283287

284288
isPending :: AppState -> IO Bool
285289
isPending x = do
286-
scacheStatus <- readIORef $ stateSCacheStatus x
290+
scacheLoaded <- isSchemaCacheLoaded x
287291
connEstablished <- isConnEstablished x
288-
return $ scacheStatus == SCPending || not connEstablished
289-
290-
putSCacheStatus :: AppState -> SchemaCacheStatus -> IO ()
291-
putSCacheStatus = atomicWriteIORef . stateSCacheStatus
292+
return $ not scacheLoaded || not connEstablished
292293

293294
getObserver :: AppState -> ObservationHandler
294295
getObserver = stateObserver
@@ -347,19 +348,19 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
347348
timeItT $ usePool appState (transaction SQL.ReadCommitted SQL.Read $ querySchemaCache conf)
348349
case result of
349350
Left e -> do
350-
putSCacheStatus appState SCPending
351+
markSchemaCachePending appState
351352
putSchemaCache appState Nothing
352353
observer $ SchemaCacheErrorObs configDbSchemas configDbExtraSearchPath e
353354
return Nothing
354355

355356
Right sCache -> do
356357
-- IMPORTANT: While the pending schema cache state starts from running the above querySchemaCache, only at this stage we block API requests due to the usage of an
357-
-- IORef on putSchemaCache. This is why SCacheStatus is put at SCPending here to signal the Admin server (using isPending) that we're on a recovery state.
358-
putSCacheStatus appState SCPending
358+
-- IORef on putSchemaCache. This is why schema cache status is marked as pending here to signal the Admin server (using isPending) that we're on a recovery state.
359+
markSchemaCachePending appState
359360
putSchemaCache appState $ Just sCache
360361
observer $ SchemaCacheQueriedObs resultTime
361362
observer . uncurry SchemaCacheLoadedObs =<< timeItT (evaluate $ showSummary sCache)
362-
putSCacheStatus appState SCLoaded
363+
markSchemaCacheLoaded appState
363364
return $ Just sCache
364365

365366
shouldRetry :: RetryStatus -> (Maybe PgVersion, Maybe SchemaCache) -> IO Bool
@@ -375,6 +376,18 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
375376

376377
oneSecondInUs = 1000000 -- one second in microseconds
377378

379+
newSchemaCacheStatus :: IO SchemaCacheStatus
380+
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar
381+
382+
markSchemaCachePending :: AppState -> IO ()
383+
markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus
384+
385+
markSchemaCacheLoaded :: AppState -> IO ()
386+
markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus
387+
388+
isSchemaCacheLoaded :: AppState -> IO Bool
389+
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
390+
378391
-- | Reads the in-db config and reads the config file again
379392
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
380393
readInDbConfig :: Bool -> AppState -> IO ()

src/PostgREST/Unix.hs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module PostgREST.Unix
44
( installSignalHandlers
5-
, createAndBindDomainSocket
5+
, createAndBindDomainSocketNoListen
66
) where
77

88
#ifndef mingw32_HOST_OS
@@ -32,16 +32,16 @@ installSignalHandlers interrupt usr1 usr2 = do
3232
installSignalHandlers _ _ _ = pass
3333
#endif
3434

35-
-- | Create a unix domain socket and bind it to the given path.
35+
-- | Create a unix domain socket and bind it to the given path, without
36+
-- | listening yet.
3637
-- | The socket file will be deleted if it already exists.
37-
createAndBindDomainSocket :: String -> FileMode -> IO NS.Socket
38-
createAndBindDomainSocket path mode = do
38+
createAndBindDomainSocketNoListen :: String -> FileMode -> IO NS.Socket
39+
createAndBindDomainSocketNoListen path mode = do
3940
unless NS.isUnixDomainSocketAvailable $
4041
panic "Cannot run with unix socket on non-unix platforms. Consider deleting the `server-unix-socket` config entry in order to continue."
4142
deleteSocketFileIfExist path
4243
sock <- NS.socket NS.AF_UNIX NS.Stream NS.defaultProtocol
4344
NS.bind sock $ NS.SockAddrUnix path
44-
NS.listen sock (max 2048 NS.maxListenQueue)
4545
setFileMode path mode
4646
return sock
4747
where

test/io/postgrest.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def run(
8888
admin_port=None,
8989
host=None,
9090
wait_for_readiness=True,
91-
wait_max_seconds=1,
91+
wait_max_seconds=3,
9292
no_pool_connection_available=False,
9393
no_startup_stdout=True,
9494
):
@@ -188,6 +188,7 @@ def wait_until_exit(postgrest, timeout=1):
188188
def wait_until_status_code(url, max_seconds, status_code):
189189
"Wait for the given HTTP endpoint to return a status code"
190190
session = requests_unixsocket.Session()
191+
response = None
191192

192193
for _ in range(max_seconds * 10):
193194
try:

0 commit comments

Comments
 (0)