Skip to content

Commit 37e2338

Browse files
committed
fix: Start listening after schema cache load
This change ensures PostgREST starts listening on a server socket only after it loaded the schema cache and is ready to handle requests. It is no longer going to return 503 errors during startup until the schema cache is loaded.
1 parent f90d7d7 commit 37e2338

6 files changed

Lines changed: 91 additions & 74 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ All notable changes to this project will be documented in this file. From versio
2121
- Shutdown should wait for in flight requests by @mkleczek in #4702
2222
- Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673
2323
- Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075
24+
- When starting up, PostgREST will now give `connection refused` errors until its schema cache is loaded @mkleczek in #4880
25+
It used to give 503 status responses.
2426

2527

2628
### Changed
@@ -33,6 +35,7 @@ All notable changes to this project will be documented in this file. From versio
3335
- Build the minimal docker image for aarch64-linux by @wolfgangwalther in #4193
3436
- The name of an embedded table can no longer be used in filters if it has an alias by @laurenceisla in #4075
3537
+ e.g. `?select=alias:table(*)&table.id=eq.1` is not possible anymore, use `?select=alias:table(*)&alias.id=eq.1` instead.
38+
- Do not listen on main socket until schema cache is loaded by @mkleczek in #4880
3639

3740
## [14.11] - 2026-05-04
3841

src/PostgREST/Admin.hs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
2222
import qualified Network.Socket as NS
2323
import Protolude
2424

25-
runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
26-
runAdmin appState maybeAdminSocket socketREST settings = do
25+
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
26+
runAdmin appState maybeAdminSocket getSocketREST settings = do
2727
whenJust maybeAdminSocket $ \adminSocket -> do
2828
address <- resolveSocketToAddress adminSocket
2929
observer $ AdminStartObs address
3030
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
3131
where
32-
adminApp = admin appState socketREST
32+
adminApp = admin appState getSocketREST
3333
observer = AppState.getObserver appState
3434

3535
-- | PostgREST admin application
36-
admin :: AppState.AppState -> NS.Socket -> Wai.Application
37-
admin appState socketREST req respond = do
38-
isMainAppReachable <- isRight <$> reachMainApp socketREST
36+
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
37+
admin appState getSocketREST req respond = do
38+
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
3939
isLoaded <- AppState.isLoaded appState
4040
isPending <- AppState.isPending appState
4141

@@ -44,8 +44,8 @@ admin appState socketREST req respond = do
4444
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
4545
["ready"] ->
4646
let
47-
status | not isMainAppReachable = HTTP.status500
48-
| isPending = HTTP.status503
47+
status | isPending = HTTP.status503
48+
| not isMainAppReachable = HTTP.status500
4949
| isLoaded = HTTP.status200
5050
| otherwise = HTTP.status500
5151
in

src/PostgREST/App.hs

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import System.IO.Error (ioeGetErrorType)
2525
import Control.Monad.Except (liftEither)
2626
import Control.Monad.Extra (whenJust)
2727
import Data.Either.Combinators (mapLeft, whenLeft)
28+
import Data.IORef (atomicWriteIORef, newIORef,
29+
readIORef)
2830
import Data.String (IsString (..))
2931
import Network.Wai.Handler.Warp (defaultSettings, setHost,
3032
setOnException, setPort,
@@ -63,11 +65,10 @@ import PostgREST.Version (docsVersion, prettyVersion)
6365

6466
import qualified Data.ByteString.Char8 as BS
6567
import qualified Data.List as L
66-
import Data.Streaming.Network (bindPortTCP,
67-
bindRandomPortTCP)
68+
import Data.Streaming.Network (bindPortTCP)
6869
import qualified Data.Text as T
6970
import qualified Network.HTTP.Types as HTTP
70-
import qualified Network.HTTP.Types.Header as HTTP (hVary)
71+
import qualified Network.HTTP.Types.Header as HTTP
7172
import qualified Network.Socket as NS
7273
import PostgREST.Unix (createAndBindDomainSocket)
7374
import Protolude hiding (Handler)
@@ -76,22 +77,30 @@ run :: AppState -> IO ()
7677
run appState = do
7778
conf <- AppState.getConfig appState
7879

79-
AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
80-
(mainSocket, adminSocket) <- initSockets conf
80+
mainSocketRef <- newIORef Nothing
81+
adminSocket <- initAdminServerSocket conf
82+
8183
let closeSockets = do
8284
whenJust adminSocket NS.close
83-
NS.close mainSocket
85+
readIORef mainSocketRef >>= foldMap NS.close
8486
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
8587

88+
Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)
89+
8690
Listener.runListener appState
8791

88-
Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
92+
-- Kick off and wait for the initial SchemaCache load before creating the
93+
-- main API socket.
94+
AppState.schemaCacheLoader appState
95+
AppState.waitForSchemaCacheInit appState
96+
97+
mainSocket <- initServerSocket conf
98+
atomicWriteIORef mainSocketRef $ Just mainSocket
8999

90100
let app = postgrest appState (AppState.schemaCacheLoader appState)
91101

92-
do
93-
address <- resolveSocketToAddress mainSocket
94-
observer $ AppServerAddressObs address
102+
address <- resolveSocketToAddress mainSocket
103+
observer $ AppServerAddressObs address
95104

96105
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
97106
where
@@ -258,38 +267,16 @@ addRetryHint delay response = do
258267
isServiceUnavailable :: Wai.Response -> Bool
259268
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503
260269

261-
type AppSockets = (NS.Socket, Maybe NS.Socket)
262-
263-
initSockets :: AppConfig -> IO AppSockets
264-
initSockets AppConfig{..} = do
265-
let
266-
cfg'usp = configServerUnixSocket
267-
cfg'uspm = configServerUnixSocketMode
268-
cfg'host = configServerHost
269-
cfg'port = configServerPort
270-
cfg'adminHost = configAdminServerHost
271-
cfg'adminPort = configAdminServerPort
272-
273-
sock <- case cfg'usp of
274-
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
275-
-- but we need to have runtime error if we try to use it in Windows, not compile time error
276-
Just path -> createAndBindDomainSocket path cfg'uspm
277-
Nothing -> do
278-
(_, sock) <-
279-
if cfg'port /= 0
280-
then do
281-
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
282-
pure (cfg'port, sock)
283-
else do
284-
-- explicitly bind to a random port, returning bound port number
285-
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
286-
pure (num, sock)
287-
pure sock
288-
289-
adminSock <- case cfg'adminPort of
290-
Just adminPort -> do
291-
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
292-
pure $ Just adminSock
293-
Nothing -> pure Nothing
294-
295-
pure (sock, adminSock)
270+
initServerSocket :: AppConfig -> IO NS.Socket
271+
initServerSocket AppConfig{..} = case configServerUnixSocket of
272+
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
273+
-- but we need to have runtime error if we try to use it in Windows, not compile time error
274+
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
275+
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)
276+
277+
initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
278+
initAdminServerSocket AppConfig{..} =
279+
traverse (`bindPortTCP` adminHost) configAdminServerPort
280+
where
281+
adminHost = fromString $ T.unpack configAdminServerHost
282+

src/PostgREST/AppState.hs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module PostgREST.AppState
2727
, getObserver
2828
, isLoaded
2929
, isPending
30+
, waitForSchemaCacheInit
3031
) where
3132

3233
import qualified Data.ByteString.Char8 as BS
@@ -53,6 +54,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
5354
readIORef)
5455
import Data.Time.Clock (UTCTime, getCurrentTime)
5556

57+
import Control.Concurrent.STM (TMVar, newEmptyTMVarIO,
58+
putTMVar, readTMVar,
59+
tryReadTMVar, tryTakeTMVar)
5660
import PostgREST.Auth.JwtCache (JwtCacheState, update)
5761
import PostgREST.Config (AppConfig (..),
5862
readAppConfig,
@@ -102,9 +106,11 @@ data AppState = AppState
102106
}
103107

104108
-- | Schema cache status.
105-
-- Empty means pending and full means loaded.
109+
-- Empty means initial loading on startup, False means pending and True means loaded.
110+
-- "Initial" state is needed so that we can wait with application socket listening
111+
-- until after initial schema cache querying.
106112
newtype SchemaCacheStatus = SchemaCacheStatus
107-
{ getSCStatusMVar :: MVar ()
113+
{ getSCStatusTMVar :: TMVar Bool
108114
}
109115

110116
init :: AppConfig -> IO AppState
@@ -380,16 +386,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
380386
oneSecondInUs = 1000000 -- one second in microseconds
381387

382388
newSchemaCacheStatus :: IO SchemaCacheStatus
383-
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar
389+
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO
384390

385391
markSchemaCachePending :: AppState -> IO ()
386-
markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus
392+
markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus
387393

388394
markSchemaCacheLoaded :: AppState -> IO ()
389-
markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus
395+
markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus
390396

391397
isSchemaCacheLoaded :: AppState -> IO Bool
392-
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
398+
isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus
399+
400+
-- | Wait for initial schema cache load to either finish or retry
401+
-- | We wait until scStatusTMVar is not empty.
402+
waitForSchemaCacheInit :: AppState -> IO ()
403+
waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus
393404

394405
-- | Reads the in-db config and reads the config file again
395406
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.

src/PostgREST/SchemaCache.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ maxDbTablesForFuzzySearch = 500
158158
querySchemaCache :: AppConfig -> SQL.Transaction SchemaCache
159159
querySchemaCache conf@AppConfig{..} = do
160160
SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object
161+
_ <-
162+
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
163+
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing
161164
tabs <- sqlTimedStmt gucTbls conf allTables
162165
keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies
163166
m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels
@@ -168,9 +171,6 @@ querySchemaCache conf@AppConfig{..} = do
168171
tzones <- if configDbTimezoneEnabled
169172
then sqlTimedStmt gucTzones mempty timezones
170173
else pure S.empty
171-
_ <-
172-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
173-
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing
174174

175175
qsTime <-
176176
if isLogDebug

test/io/test_io.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import subprocess
77
import time
88
import pytest
9+
import requests
910

1011
from config import CONFIGSDIR, FIXTURES, SECRET
1112
from util import Thread, jwtauthheader, parse_server_timings_header, relativeSeconds
@@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv):
10901091

10911092
env = {
10921093
**defaultenv,
1093-
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
1094+
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
10941095
"PGRST_JWT_SECRET": SECRET,
10951096
}
10961097
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
@@ -1513,14 +1514,19 @@ def test_log_postgrest_host_and_port(host, defaultenv):
15131514
with run(
15141515
env=defaultenv, host=host, port=port, no_startup_stdout=False
15151516
) as postgrest:
1516-
output = postgrest.read_stdout(nlines=10)
1517+
output = postgrest.read_stdout(nlines=11)
15171518

1519+
# Cannot assume a particular log entry order
1520+
# Listening on a socket happens after schema querying
1521+
# but is concurrent to the schema loading process
1522+
# and migh happen before or after writing of the
1523+
# "Schema cache loaded" log entry
15181524
if is_unix:
1519-
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
1525+
match_log(output, [r".*API server listening on .*/tmp/.*\.sock"])
15201526
elif is_ipv6(host):
1521-
assert f"API server listening on [{host}]:{port}" in output[2]
1527+
match_log(output, [r".*API server listening on \[.+]:\d+"])
15221528
else: # IPv4
1523-
assert f"API server listening on {host}:{port}" in output[2]
1529+
match_log(output, [r".*API server listening on .+:\d+"])
15241530

15251531

15261532
def test_succeed_w_role_having_superuser_settings(defaultenv):
@@ -1868,17 +1874,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
18681874
assert any(log_message in line for line in output)
18691875

18701876

1871-
def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
1872-
"Should log the 503 error message when there is an empty schema cache on startup"
1877+
def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv):
1878+
"Should log the 503 error message when there is an error loading schema cache on startup"
18731879

18741880
env = {
18751881
**defaultenv,
1876-
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
1882+
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
1883+
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
18771884
}
18781885

18791886
with run(env=env, wait_for_readiness=False) as postgrest:
18801887
postgrest.wait_until_scache_starts_loading()
18811888

1889+
# First call should fail with connection refused
1890+
with pytest.raises(requests.ConnectionError):
1891+
postgrest.session.get("/projects")
1892+
1893+
# Next call should return 503
1894+
time.sleep(1)
18821895
response = postgrest.session.get("/projects")
18831896
assert response.status_code == 503
18841897

@@ -1890,7 +1903,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
18901903

18911904

18921905
def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
1893-
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"
1906+
"Should only load the schema cache once when there's an empty schema cache on startup"
18941907

18951908
env = {
18961909
**defaultenv,
@@ -1900,12 +1913,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
19001913
with run(env=env, port=freeport(), wait_for_readiness=False) as postgrest:
19011914
postgrest.wait_until_scache_starts_loading()
19021915

1903-
response = postgrest.session.get("/projects")
1904-
assert response.status_code == 503
1916+
with pytest.raises(requests.ConnectionError):
1917+
postgrest.session.get("/projects")
19051918

19061919
# Should wait enough time to load the schema cache twice to guarantee that the test is valid
19071920
time.sleep(1)
19081921

1922+
response = postgrest.session.get("/projects")
1923+
assert response.status_code == 200
1924+
19091925
response = postgrest.admin.get("/metrics")
19101926
assert response.status_code == 200
19111927
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text
@@ -1987,7 +2003,7 @@ def test_schema_cache_error_observation(defaultenv):
19872003
output = postgrest.read_stdout(nlines=9)
19882004
assert (
19892005
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
1990-
in output[7]
2006+
in output[6]
19912007
)
19922008

19932009

0 commit comments

Comments
 (0)