Skip to content

Commit a4aa58b

Browse files
mkleczeksteve-chavez
authored andcommitted
refactor: move socket creation and management to App module
Right now listening sockets initialization, management and usage is split between App, AppState and Admin modules: they are created in AppState.init and remembered in AppState but used only in App and Admin. It has several negative consequences: - sockets are initialized even if not needed (eg. command line invocations like dump-config or dump-schema) - it is impossible to start listening on a socket after initial schema cache load because it requires AppState This change decouples listen socket management from AppState. Sockets are created only when needed (ie. not in command line tools invocation) and passed to admin application and to Warp by the App module.
1 parent e741c1b commit a4aa58b

4 files changed

Lines changed: 64 additions & 78 deletions

File tree

src/PostgREST/Admin.hs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@ import PostgREST.Observation (Observation (..))
1919

2020
import qualified PostgREST.AppState as AppState
2121

22-
import Protolude
22+
import qualified Network.Socket as NS
23+
import Protolude
2324

24-
runAdmin :: AppState -> Warp.Settings -> IO ()
25-
runAdmin appState settings = do
26-
whenJust (AppState.getSocketAdmin appState) $ \adminSocket -> do
25+
runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
26+
runAdmin appState maybeAdminSocket socketREST settings = do
27+
whenJust maybeAdminSocket $ \adminSocket -> do
2728
address <- resolveSocketToAddress adminSocket
2829
observer $ AdminStartObs address
2930
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
3031
where
31-
adminApp = admin appState
32+
adminApp = admin appState socketREST
3233
observer = AppState.getObserver appState
3334

3435
-- | PostgREST admin application
35-
admin :: AppState.AppState -> Wai.Application
36-
admin appState req respond = do
37-
isMainAppReachable <- isRight <$> reachMainApp (AppState.getSocketREST appState)
36+
admin :: AppState.AppState -> NS.Socket -> Wai.Application
37+
admin appState socketREST req respond = do
38+
isMainAppReachable <- isRight <$> reachMainApp socketREST
3839
isLoaded <- AppState.isLoaded appState
3940
isPending <- AppState.isPending appState
4041

src/PostgREST/App.hs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,15 @@ import PostgREST.SchemaCache (SchemaCache (..))
6060
import PostgREST.TimeIt (timeItT)
6161
import PostgREST.Version (docsVersion, prettyVersion)
6262

63-
import qualified Data.ByteString.Char8 as BS
64-
import qualified Data.List as L
65-
import qualified Network.HTTP.Types as HTTP
66-
import Protolude hiding (Handler)
63+
import qualified Data.ByteString.Char8 as BS
64+
import qualified Data.List as L
65+
import Data.Streaming.Network (bindPortTCP,
66+
bindRandomPortTCP)
67+
import qualified Data.Text as T
68+
import qualified Network.HTTP.Types as HTTP
69+
import qualified Network.Socket as NS
70+
import PostgREST.Unix (createAndBindDomainSocket)
71+
import Protolude hiding (Handler)
6772

6873
type Handler = ExceptT Error
6974

@@ -72,19 +77,21 @@ run appState = do
7277
conf@AppConfig{..} <- AppState.getConfig appState
7378

7479
AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
80+
(mainSocket, adminSocket) <- initSockets conf
81+
7582
Unix.installSignalHandlers (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
7683

7784
Listener.runListener appState
7885

79-
Admin.runAdmin appState (serverSettings conf)
86+
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
8087

8188
let app = postgrest configLogLevel appState (AppState.schemaCacheLoader appState)
8289

8390
do
84-
address <- resolveSocketToAddress (AppState.getSocketREST appState)
91+
address <- resolveSocketToAddress mainSocket
8592
observer $ AppServerAddressObs address
8693

87-
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) (AppState.getSocketREST appState) app
94+
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
8895
where
8996
observer = AppState.getObserver appState
9097

@@ -229,3 +236,40 @@ addRetryHint delay response = do
229236

230237
isServiceUnavailable :: Wai.Response -> Bool
231238
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503
239+
240+
type AppSockets = (NS.Socket, Maybe NS.Socket)
241+
242+
initSockets :: AppConfig -> IO AppSockets
243+
initSockets AppConfig{..} = do
244+
let
245+
cfg'usp = configServerUnixSocket
246+
cfg'uspm = configServerUnixSocketMode
247+
cfg'host = configServerHost
248+
cfg'port = configServerPort
249+
cfg'adminHost = configAdminServerHost
250+
cfg'adminPort = configAdminServerPort
251+
252+
sock <- case cfg'usp of
253+
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
254+
-- but we need to have runtime error if we try to use it in Windows, not compile time error
255+
Just path -> createAndBindDomainSocket path cfg'uspm
256+
Nothing -> do
257+
(_, sock) <-
258+
if cfg'port /= 0
259+
then do
260+
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
261+
pure (cfg'port, sock)
262+
else do
263+
-- explicitly bind to a random port, returning bound port number
264+
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
265+
pure (num, sock)
266+
pure sock
267+
268+
adminSock <- case cfg'adminPort of
269+
Just adminPort -> do
270+
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
271+
pure $ Just adminSock
272+
Nothing -> pure Nothing
273+
274+
pure (sock, adminSock)
275+

src/PostgREST/AppState.hs

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@ module PostgREST.AppState
1313
, getNextListenerDelay
1414
, getTime
1515
, getJwtCacheState
16-
, getSocketREST
17-
, getSocketAdmin
1816
, init
19-
, initSockets
2017
, initWithPool
2118
, putNextListenerDelay
2219
, putSchemaCache
@@ -32,13 +29,11 @@ module PostgREST.AppState
3229

3330
import qualified Data.ByteString.Char8 as BS
3431
import Data.Either.Combinators (whenLeft)
35-
import qualified Data.Text as T (unpack)
3632
import qualified Hasql.Pool as SQL
3733
import qualified Hasql.Pool.Config as SQL
3834
import qualified Hasql.Session as SQL
3935
import qualified Hasql.Transaction.Sessions as SQL
4036
import qualified Network.HTTP.Types.Status as HTTP
41-
import qualified Network.Socket as NS
4237
import qualified PostgREST.Auth.JwtCache as JwtCache
4338
import qualified PostgREST.Error as Error
4439
import qualified PostgREST.Logger as Logger
@@ -70,10 +65,7 @@ import PostgREST.SchemaCache (SchemaCache (..),
7065
querySchemaCache,
7166
showSummary)
7267
import PostgREST.SchemaCache.Identifiers (quoteQi)
73-
import PostgREST.Unix (createAndBindDomainSocket)
7468

75-
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
76-
import Data.String (IsString (..))
7769
import Protolude
7870

7971
data AppState = AppState
@@ -99,10 +91,6 @@ data AppState = AppState
9991
, stateNextDelay :: IORef Int
10092
-- | Keeps track of the next delay for the listener
10193
, stateNextListenerDelay :: IORef Int
102-
-- | Network socket for REST API
103-
, stateSocketREST :: NS.Socket
104-
-- | Network socket for the admin UI
105-
, stateSocketAdmin :: Maybe NS.Socket
10694
-- | Observation handler
10795
, stateObserver :: ObservationHandler
10896
-- | JWT Cache
@@ -117,8 +105,6 @@ newtype SchemaCacheStatus = SchemaCacheStatus
117105
{ getSCStatusMVar :: MVar ()
118106
}
119107

120-
type AppSockets = (NS.Socket, Maybe NS.Socket)
121-
122108
init :: AppConfig -> IO AppState
123109
init conf@AppConfig{configLogLevel, configDbPoolSize} = do
124110
loggerState <- Logger.init
@@ -128,12 +114,10 @@ init conf@AppConfig{configLogLevel, configDbPoolSize} = do
128114
observer $ AppStartObs prettyVersion
129115

130116
pool <- initPool conf observer
131-
(sock, adminSock) <- initSockets conf
132-
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
133-
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}
117+
initWithPool pool conf loggerState metricsState observer --{ stateSocketREST = sock, stateSocketAdmin = adminSock}
134118

135-
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
136-
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
119+
initWithPool :: SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
120+
initWithPool pool conf loggerState metricsState observer = do
137121

138122
appState <- AppState pool
139123
<$> newIORef minimumPgVersion -- assume we're in a supported version when starting, this will be corrected on a later step
@@ -146,8 +130,6 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
146130
<*> myThreadId
147131
<*> newIORef 0
148132
<*> newIORef 1
149-
<*> pure sock
150-
<*> pure adminSock
151133
<*> pure observer
152134
<*> JwtCache.init conf observer
153135
<*> pure loggerState
@@ -166,40 +148,6 @@ initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
166148
destroy :: AppState -> IO ()
167149
destroy = destroyPool
168150

169-
initSockets :: AppConfig -> IO AppSockets
170-
initSockets AppConfig{..} = do
171-
let
172-
cfg'usp = configServerUnixSocket
173-
cfg'uspm = configServerUnixSocketMode
174-
cfg'host = configServerHost
175-
cfg'port = configServerPort
176-
cfg'adminHost = configAdminServerHost
177-
cfg'adminPort = configAdminServerPort
178-
179-
sock <- case cfg'usp of
180-
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
181-
-- but we need to have runtime error if we try to use it in Windows, not compile time error
182-
Just path -> createAndBindDomainSocket path cfg'uspm
183-
Nothing -> do
184-
(_, sock) <-
185-
if cfg'port /= 0
186-
then do
187-
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
188-
pure (cfg'port, sock)
189-
else do
190-
-- explicitly bind to a random port, returning bound port number
191-
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
192-
pure (num, sock)
193-
pure sock
194-
195-
adminSock <- case cfg'adminPort of
196-
Just adminPort -> do
197-
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
198-
pure $ Just adminSock
199-
Nothing -> pure Nothing
200-
201-
pure (sock, adminSock)
202-
203151
initPool :: AppConfig -> ObservationHandler -> IO SQL.Pool
204152
initPool AppConfig{..} observer = do
205153
SQL.acquire $ SQL.settings
@@ -313,12 +261,6 @@ getTime = stateGetTime
313261
getJwtCacheState :: AppState -> JwtCacheState
314262
getJwtCacheState = stateJwtCache
315263

316-
getSocketREST :: AppState -> NS.Socket
317-
getSocketREST = stateSocketREST
318-
319-
getSocketAdmin :: AppState -> Maybe NS.Socket
320-
getSocketAdmin = stateSocketAdmin
321-
322264
getMainThreadId :: AppState -> ThreadId
323265
getMainThreadId = stateMainThreadId
324266

test/spec/Main.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ main = do
8484

8585
-- cached schema cache so most tests run fast
8686
baseSchemaCache <- loadSCache pool testCfg
87-
sockets <- AppState.initSockets testCfg
8887
loggerState <- Logger.init
8988
metricsState <- Metrics.init (configDbPoolSize testCfg)
9089

9190
let
9291
initApp sCache st config = do
93-
appState <- AppState.initWithPool sockets pool config loggerState metricsState (Metrics.observationMetrics metricsState)
92+
appState <- AppState.initWithPool pool config loggerState metricsState (Metrics.observationMetrics metricsState)
9493
AppState.putPgVersion appState actualPgVersion
9594
AppState.putSchemaCache appState (Just sCache)
9695
return (st, postgrest (configLogLevel config) appState (pure ()))

0 commit comments

Comments
 (0)