Skip to content

Commit f577ea6

Browse files
committed
fix: Start listening after schema cache load
This change ensures PostgREST starts listening on a server socket only after it loaded the schema cache and is ready to handle requests. It is no longer going to return 503 errors during startup until the schema cache is loaded.
1 parent e7900f7 commit f577ea6

5 files changed

Lines changed: 55 additions & 107 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file. From versio
2121
- Shutdown should wait for in flight requests by @mkleczek in #4702
2222
- Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673
2323
- Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075
24+
- Do not listen on main socket until schema cache is loaded by @mkleczek in #4880
2425

2526

2627
### Changed

src/PostgREST/Admin.hs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
2222
import qualified Network.Socket as NS
2323
import Protolude
2424

25-
runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
26-
runAdmin appState maybeAdminSocket socketREST settings = do
25+
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
26+
runAdmin appState maybeAdminSocket getSocketREST settings = do
2727
whenJust maybeAdminSocket $ \adminSocket -> do
2828
address <- resolveSocketToAddress adminSocket
2929
observer $ AdminStartObs address
3030
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
3131
where
32-
adminApp = admin appState socketREST
32+
adminApp = admin appState getSocketREST
3333
observer = AppState.getObserver appState
3434

3535
-- | PostgREST admin application
36-
admin :: AppState.AppState -> NS.Socket -> Wai.Application
37-
admin appState socketREST req respond = do
38-
isMainAppReachable <- isRight <$> reachMainApp socketREST
36+
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
37+
admin appState getSocketREST req respond = do
38+
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
3939
isLoaded <- AppState.isLoaded appState
4040
isPending <- AppState.isPending appState
4141

@@ -44,8 +44,8 @@ admin appState socketREST req respond = do
4444
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
4545
["ready"] ->
4646
let
47-
status | not isMainAppReachable = HTTP.status500
48-
| isPending = HTTP.status503
47+
status | isPending = HTTP.status503
48+
| not isMainAppReachable = HTTP.status500
4949
| isLoaded = HTTP.status200
5050
| otherwise = HTTP.status500
5151
in

src/PostgREST/App.hs

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import System.IO.Error (ioeGetErrorType)
2525
import Control.Monad.Except (liftEither)
2626
import Control.Monad.Extra (whenJust)
2727
import Data.Either.Combinators (mapLeft, whenLeft)
28+
import Data.IORef (atomicWriteIORef, newIORef,
29+
readIORef)
2830
import Data.String (IsString (..))
2931
import Network.Wai.Handler.Warp (defaultSettings, setHost,
3032
setOnException, setPort,
@@ -63,11 +65,10 @@ import PostgREST.Version (docsVersion, prettyVersion)
6365

6466
import qualified Data.ByteString.Char8 as BS
6567
import qualified Data.List as L
66-
import Data.Streaming.Network (bindPortTCP,
67-
bindRandomPortTCP)
68+
import Data.Streaming.Network (bindPortTCP)
6869
import qualified Data.Text as T
6970
import qualified Network.HTTP.Types as HTTP
70-
import qualified Network.HTTP.Types.Header as HTTP (hVary)
71+
import qualified Network.HTTP.Types.Header as HTTP
7172
import qualified Network.Socket as NS
7273
import PostgREST.Unix (createAndBindDomainSocket)
7374
import Protolude hiding (Handler)
@@ -76,22 +77,30 @@ run :: AppState -> IO ()
7677
run appState = do
7778
conf <- AppState.getConfig appState
7879

79-
AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
80-
(mainSocket, adminSocket) <- initSockets conf
80+
mainSocketRef <- newIORef Nothing
81+
adminSocket <- initAdminServerSocket conf
82+
8183
let closeSockets = do
8284
whenJust adminSocket NS.close
83-
NS.close mainSocket
85+
readIORef mainSocketRef >>= foldMap NS.close
8486
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8587

88+
Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)
89+
8690
Listener.runListener appState
8791

88-
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
92+
-- Kick off and wait for the initial SchemaCache load before creating the
93+
-- main API socket.
94+
AppState.schemaCacheLoader appState
95+
AppState.waitForSchemaCacheLoaded appState
96+
97+
mainSocket <- initServerSocket conf
98+
atomicWriteIORef mainSocketRef $ Just mainSocket
8999

90100
let app = postgrest appState (AppState.schemaCacheLoader appState)
91101

92-
do
93-
address <- resolveSocketToAddress mainSocket
94-
observer $ AppServerAddressObs address
102+
address <- resolveSocketToAddress mainSocket
103+
observer $ AppServerAddressObs address
95104

96105
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
97106
where
@@ -258,38 +267,16 @@ addRetryHint delay response = do
258267
isServiceUnavailable :: Wai.Response -> Bool
259268
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503
260269

261-
type AppSockets = (NS.Socket, Maybe NS.Socket)
262-
263-
initSockets :: AppConfig -> IO AppSockets
264-
initSockets AppConfig{..} = do
265-
let
266-
cfg'usp = configServerUnixSocket
267-
cfg'uspm = configServerUnixSocketMode
268-
cfg'host = configServerHost
269-
cfg'port = configServerPort
270-
cfg'adminHost = configAdminServerHost
271-
cfg'adminPort = configAdminServerPort
272-
273-
sock <- case cfg'usp of
274-
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
275-
-- but we need to have runtime error if we try to use it in Windows, not compile time error
276-
Just path -> createAndBindDomainSocket path cfg'uspm
277-
Nothing -> do
278-
(_, sock) <-
279-
if cfg'port /= 0
280-
then do
281-
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
282-
pure (cfg'port, sock)
283-
else do
284-
-- explicitly bind to a random port, returning bound port number
285-
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
286-
pure (num, sock)
287-
pure sock
288-
289-
adminSock <- case cfg'adminPort of
290-
Just adminPort -> do
291-
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
292-
pure $ Just adminSock
293-
Nothing -> pure Nothing
294-
295-
pure (sock, adminSock)
270+
initServerSocket :: AppConfig -> IO NS.Socket
271+
initServerSocket AppConfig{..} = case configServerUnixSocket of
272+
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
273+
-- but we need to have runtime error if we try to use it in Windows, not compile time error
274+
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
275+
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)
276+
277+
initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
278+
initAdminServerSocket AppConfig{..} =
279+
traverse (`bindPortTCP` adminHost) configAdminServerPort
280+
where
281+
adminHost = fromString $ T.unpack configAdminServerHost
282+

src/PostgREST/AppState.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module PostgREST.AppState
2727
, getObserver
2828
, isLoaded
2929
, isPending
30+
, waitForSchemaCacheLoaded
3031
) where
3132

3233
import qualified Data.ByteString.Char8 as BS
@@ -394,6 +395,9 @@ markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCache
394395
isSchemaCacheLoaded :: AppState -> IO Bool
395396
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
396397

398+
waitForSchemaCacheLoaded :: AppState -> IO ()
399+
waitForSchemaCacheLoaded = atomically . void . readTMVar . stateSchemaCache
400+
397401
-- | Reads the in-db config and reads the config file again
398402
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
399403
readInDbConfig :: Bool -> AppState -> IO ()

test/io/test_io.py

Lines changed: 10 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import subprocess
77
import time
88
import pytest
9+
import requests
910

1011
from config import CONFIGSDIR, FIXTURES, SECRET
1112
from util import Thread, jwtauthheader, parse_server_timings_header, relativeSeconds
@@ -1085,33 +1086,6 @@ def test_invalid_rpc_method_log_contains_role(defaultenv):
10851086
)
10861087

10871088

1088-
def test_empty_schema_cache_log_contains_jwt_role(defaultenv):
1089-
"Requests are logged with the role when the schema cache is empty on startup"
1090-
1091-
env = {
1092-
**defaultenv,
1093-
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
1094-
"PGRST_JWT_SECRET": SECRET,
1095-
}
1096-
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
1097-
1098-
with run(env=env, wait_for_readiness=False) as postgrest:
1099-
postgrest.wait_until_scache_starts_loading()
1100-
1101-
response = postgrest.session.get("/authors_only", headers=headers)
1102-
assert response.status_code == 503
1103-
1104-
output = drain_stdout(postgrest)
1105-
1106-
assert any(
1107-
re.match(
1108-
r'- - postgrest_test_author \[.+\] "GET /authors_only HTTP/1.1" 503 \d+ "" "python-requests/.+"',
1109-
line,
1110-
)
1111-
for line in output
1112-
)
1113-
1114-
11151089
def test_no_pool_connection_required_on_bad_http_logic(defaultenv):
11161090
"no pool connection should be consumed for failing on invalid http logic"
11171091

@@ -1518,9 +1492,9 @@ def test_log_postgrest_host_and_port(host, defaultenv):
15181492
if is_unix:
15191493
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
15201494
elif is_ipv6(host):
1521-
assert f"API server listening on [{host}]:{port}" in output[2]
1495+
assert f"API server listening on [{host}]:{port}" in output[9]
15221496
else: # IPv4
1523-
assert f"API server listening on {host}:{port}" in output[2]
1497+
assert f"API server listening on {host}:{port}" in output[9]
15241498

15251499

15261500
def test_succeed_w_role_having_superuser_settings(defaultenv):
@@ -1868,29 +1842,8 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
18681842
assert any(log_message in line for line in output)
18691843

18701844

1871-
def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
1872-
"Should log the 503 error message when there is an empty schema cache on startup"
1873-
1874-
env = {
1875-
**defaultenv,
1876-
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
1877-
}
1878-
1879-
with run(env=env, wait_for_readiness=False) as postgrest:
1880-
postgrest.wait_until_scache_starts_loading()
1881-
1882-
response = postgrest.session.get("/projects")
1883-
assert response.status_code == 503
1884-
1885-
output_start = postgrest.read_stdout(nlines=10)
1886-
1887-
log_err_message = '{"code":"PGRST002","details":null,"hint":null,"message":"Could not query the database for the schema cache. Retrying."}'
1888-
1889-
assert any(log_err_message in line for line in output_start)
1890-
1891-
18921845
def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
1893-
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"
1846+
"Should only load the schema cache once when there's an empty schema cache on startup"
18941847

18951848
env = {
18961849
**defaultenv,
@@ -1900,12 +1853,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
19001853
with run(env=env, port=freeport(), wait_for_readiness=False) as postgrest:
19011854
postgrest.wait_until_scache_starts_loading()
19021855

1903-
response = postgrest.session.get("/projects")
1904-
assert response.status_code == 503
1856+
with pytest.raises(requests.ConnectionError):
1857+
postgrest.session.get("/projects")
19051858

19061859
# Should wait enough time to load the schema cache twice to guarantee that the test is valid
19071860
time.sleep(1)
19081861

1862+
response = postgrest.session.get("/projects")
1863+
assert response.status_code == 200
1864+
19091865
response = postgrest.admin.get("/metrics")
19101866
assert response.status_code == 200
19111867
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text
@@ -1987,7 +1943,7 @@ def test_schema_cache_error_observation(defaultenv):
19871943
output = postgrest.read_stdout(nlines=9)
19881944
assert (
19891945
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
1990-
in output[7]
1946+
in output[6]
19911947
)
19921948

19931949

0 commit comments

Comments
 (0)