From c442894d737f0bf0a755207557779cb2579f7304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C5=82eczek?= Date: Tue, 5 May 2026 06:26:38 +0200 Subject: [PATCH 1/2] fix: Start listening after schema cache load This change ensures PostgREST starts listening on a server socket only after it loaded the schema cache and is ready to handle requests. It is no longer going to return 503 errors during startup until the schema cache is loaded. --- CHANGELOG.md | 1 + src/PostgREST/Admin.hs | 16 ++++++------ src/PostgREST/App.hs | 47 ++++++++++++++++++++---------------- src/PostgREST/AppState.hs | 23 +++++++++++++----- src/PostgREST/SchemaCache.hs | 6 ++--- test/io/test_io.py | 40 +++++++++++++++++++++--------- 6 files changed, 83 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 575c86f68d..f526d35a03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file. From versio - Shutdown should wait for in flight requests by @mkleczek in #4702 - Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673 - Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075 +- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880 ### Changed diff --git a/src/PostgREST/Admin.hs b/src/PostgREST/Admin.hs index 99733a6995..f8501417be 100644 --- a/src/PostgREST/Admin.hs +++ b/src/PostgREST/Admin.hs @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState import qualified Network.Socket as NS import Protolude -runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO () -runAdmin appState maybeAdminSocket socketREST settings = do +runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO () +runAdmin appState maybeAdminSocket getSocketREST settings = do whenJust maybeAdminSocket $ \adminSocket -> do address <- resolveSocketToAddress adminSocket observer $ AdminStartObs address void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp where - adminApp = admin appState socketREST + adminApp = admin appState getSocketREST observer = AppState.getObserver appState -- | PostgREST admin application -admin :: AppState.AppState -> NS.Socket -> Wai.Application -admin appState socketREST req respond = do - isMainAppReachable <- isRight <$> reachMainApp socketREST +admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application +admin appState getSocketREST req respond = do + isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp) isLoaded <- AppState.isLoaded appState isPending <- AppState.isPending appState @@ -44,8 +44,8 @@ admin appState socketREST req respond = do respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty ["ready"] -> let - status | not isMainAppReachable = HTTP.status500 - | isPending = HTTP.status503 + status | isPending = HTTP.status503 + | not isMainAppReachable = HTTP.status500 | isLoaded = HTTP.status200 | otherwise = HTTP.status500 in diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index 25f7f03a23..ee4e526daa 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -25,6 +25,8 @@ import System.IO.Error (ioeGetErrorType) import Control.Monad.Except (liftEither) import Control.Monad.Extra (whenJust) import Data.Either.Combinators (mapLeft, whenLeft) +import Data.IORef (atomicWriteIORef, newIORef, + readIORef) import Data.String (IsString (..)) import Network.Wai.Handler.Warp (defaultSettings, setHost, setOnException, setPort, @@ -67,7 +69,7 @@ import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP) import qualified Data.Text as T import qualified Network.HTTP.Types as HTTP -import qualified Network.HTTP.Types.Header as HTTP (hVary) +import qualified Network.HTTP.Types.Header as HTTP import qualified Network.Socket as NS import PostgREST.Unix (createAndBindDomainSocket) import Protolude hiding (Handler) @@ -76,22 +78,30 @@ run :: AppState -> IO () run appState = do conf <- AppState.getConfig appState - AppState.schemaCacheLoader appState -- Loads the initial SchemaCache - (mainSocket, adminSocket) <- initSockets conf + mainSocketRef <- newIORef Nothing + adminSocket <- initAdminServerSocket conf + let closeSockets = do whenJust adminSocket NS.close - NS.close mainSocket + readIORef mainSocketRef >>= foldMap NS.close Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState) + Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf) + Listener.runListener appState - Admin.runAdmin appState adminSocket mainSocket (serverSettings conf) + -- Kick off and wait for the initial SchemaCache load before creating the + -- main API socket. + AppState.schemaCacheLoader appState + AppState.waitForSchemaCacheInit appState + + mainSocket <- initServerSocket conf + atomicWriteIORef mainSocketRef $ Just mainSocket let app = postgrest appState (AppState.schemaCacheLoader appState) - do - address <- resolveSocketToAddress mainSocket - observer $ AppServerAddressObs address + address <- resolveSocketToAddress mainSocket + observer $ AppServerAddressObs address Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app where @@ -258,19 +268,15 @@ addRetryHint delay response = do isServiceUnavailable :: Wai.Response -> Bool isServiceUnavailable response = Wai.responseStatus response == HTTP.status503 -type AppSockets = (NS.Socket, Maybe NS.Socket) - -initSockets :: AppConfig -> IO AppSockets -initSockets AppConfig{..} = do +initServerSocket :: AppConfig -> IO NS.Socket +initServerSocket AppConfig{..} = do let cfg'usp = configServerUnixSocket cfg'uspm = configServerUnixSocketMode cfg'host = configServerHost cfg'port = configServerPort - cfg'adminHost = configAdminServerHost - cfg'adminPort = configAdminServerPort - sock <- case cfg'usp of + case cfg'usp of -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, -- but we need to have runtime error if we try to use it in Windows, not compile time error Just path -> createAndBindDomainSocket path cfg'uspm @@ -286,10 +292,9 @@ initSockets AppConfig{..} = do pure (num, sock) pure sock - adminSock <- case cfg'adminPort of - Just adminPort -> do - adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost) - pure $ Just adminSock - Nothing -> pure Nothing +initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket) +initAdminServerSocket AppConfig{..} = + traverse (`bindPortTCP` adminHost) configAdminServerPort + where + adminHost = fromString $ T.unpack configAdminServerHost - pure (sock, adminSock) diff --git a/src/PostgREST/AppState.hs b/src/PostgREST/AppState.hs index fdaac1a908..f975ea4134 100644 --- a/src/PostgREST/AppState.hs +++ b/src/PostgREST/AppState.hs @@ -27,6 +27,7 @@ module PostgREST.AppState , getObserver , isLoaded , isPending + , waitForSchemaCacheInit ) where import qualified Data.ByteString.Char8 as BS @@ -53,6 +54,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef, readIORef) import Data.Time.Clock (UTCTime, getCurrentTime) +import Control.Concurrent.STM (TMVar, newEmptyTMVarIO, + putTMVar, readTMVar, + tryReadTMVar, tryTakeTMVar) import PostgREST.Auth.JwtCache (JwtCacheState, update) import PostgREST.Config (AppConfig (..), readAppConfig, @@ -102,9 +106,11 @@ data AppState = AppState } -- | Schema cache status. --- Empty means pending and full means loaded. +-- Empty means initial loading on startup, False means pending and True means loaded. +-- "Initial" state is needed so that we can wait with application socket listening +-- until after initial schema cache querying. newtype SchemaCacheStatus = SchemaCacheStatus - { getSCStatusMVar :: MVar () + { getSCStatusTMVar :: TMVar Bool } init :: AppConfig -> IO AppState @@ -380,16 +386,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea oneSecondInUs = 1000000 -- one second in microseconds newSchemaCacheStatus :: IO SchemaCacheStatus -newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar +newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO markSchemaCachePending :: AppState -> IO () -markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus +markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus markSchemaCacheLoaded :: AppState -> IO () -markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus +markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus isSchemaCacheLoaded :: AppState -> IO Bool -isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus +isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus + +-- | Wait for initial schema cache load to either finish or retry +-- | We wait until scStatusTMVar is not empty. +waitForSchemaCacheInit :: AppState -> IO () +waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus -- | Reads the in-db config and reads the config file again -- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue. diff --git a/src/PostgREST/SchemaCache.hs b/src/PostgREST/SchemaCache.hs index 997d16ab23..c85dfe878e 100644 --- a/src/PostgREST/SchemaCache.hs +++ b/src/PostgREST/SchemaCache.hs @@ -157,6 +157,9 @@ maxDbTablesForFuzzySearch = 500 querySchemaCache :: AppConfig -> SQL.Transaction SchemaCache querySchemaCache conf@AppConfig{..} = do SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object + _ <- + let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in + for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing tabs <- sqlTimedStmt gucTbls conf allTables keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels @@ -167,9 +170,6 @@ querySchemaCache conf@AppConfig{..} = do tzones <- if configDbTimezoneEnabled then sqlTimedStmt gucTzones mempty timezones else pure S.empty - _ <- - let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in - for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing qsTime <- if isLogDebug diff --git a/test/io/test_io.py b/test/io/test_io.py index aa3e923dcd..a506992587 100644 --- a/test/io/test_io.py +++ b/test/io/test_io.py @@ -6,6 +6,7 @@ import subprocess import time import pytest +import requests from config import CONFIGSDIR, FIXTURES, SECRET from util import ( @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv): env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", "PGRST_JWT_SECRET": SECRET, } headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET) @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv): with run( env=defaultenv, host=host, port=port, no_startup_stdout=False ) as postgrest: - output = postgrest.read_stdout(nlines=10) + output = postgrest.read_stdout(nlines=11) + # Cannot assume a particular log entry order + # Listening on a socket happens after schema querying + # but is concurrent to the schema loading process + # and migh happen before or after writing of the + # "Schema cache loaded" log entry if is_unix: - re.match(r'API server listening on "/tmp/.*\.sock"', output[2]) + match_log(output, [r".*API server listening on .*/tmp/.*\.sock"]) elif is_ipv6(host): - assert f"API server listening on [{host}]:{port}" in output[2] + match_log(output, [r".*API server listening on \[.+]:\d+"]) else: # IPv4 - assert f"API server listening on {host}:{port}" in output[2] + match_log(output, [r".*API server listening on .+:\d+"]) def test_succeed_w_role_having_superuser_settings(defaultenv): @@ -1898,17 +1904,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv): assert any(log_message in line for line in output) -def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): - "Should log the 503 error message when there is an empty schema cache on startup" +def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv): + "Should log the 503 error message when there is an error loading schema cache on startup" env = { **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300", + "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000", + "PGRST_DB_SCHEMAS": "non_existent_schema_aaaa", } with run(env=env, wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() + # First call should fail with connection refused + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") + + # Next call should return 503 + time.sleep(1) response = postgrest.session.get("/projects") assert response.status_code == 503 @@ -1920,7 +1933,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): - "Should only load the schema cache once on a 503 error when there's an empty schema cache on startup" + "Should only load the schema cache once when there's an empty schema cache on startup" env = { **defaultenv, @@ -1930,12 +1943,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): with run(env=env, port=freeport(), wait_for=None) as postgrest: postgrest.wait_until_scache_starts_loading() - response = postgrest.session.get("/projects") - assert response.status_code == 503 + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") # Should wait enough time to load the schema cache twice to guarantee that the test is valid time.sleep(1) + response = postgrest.session.get("/projects") + assert response.status_code == 200 + response = postgrest.admin.get("/metrics") assert response.status_code == 200 assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text @@ -2017,7 +2033,7 @@ def test_schema_cache_error_observation(defaultenv): output = postgrest.read_stdout(nlines=9) assert ( "Failed to load the schema cache using db-schemas=public and db-extra-search-path=x" - in output[7] + in output[6] ) From 841ccef849c80f4daccb097fd877794e72a79280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20K=C5=82eczek?= Date: Tue, 5 May 2026 06:26:38 +0200 Subject: [PATCH 2/2] refactor: Simplify App.initServerSocket This change gets rid of unnecessary explicit bindRandomPortTCP in initServerSocket. Returned port value was ignored in removed code anyway as assigned port retrieval from an open socket is handled elsewhere. --- src/PostgREST/App.hs | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index ee4e526daa..aa73354781 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -65,8 +65,7 @@ import PostgREST.Version (docsVersion, prettyVersion) import qualified Data.ByteString.Char8 as BS import qualified Data.List as L -import Data.Streaming.Network (bindPortTCP, - bindRandomPortTCP) +import Data.Streaming.Network (bindPortTCP) import qualified Data.Text as T import qualified Network.HTTP.Types as HTTP import qualified Network.HTTP.Types.Header as HTTP @@ -269,28 +268,11 @@ isServiceUnavailable :: Wai.Response -> Bool isServiceUnavailable response = Wai.responseStatus response == HTTP.status503 initServerSocket :: AppConfig -> IO NS.Socket -initServerSocket AppConfig{..} = do - let - cfg'usp = configServerUnixSocket - cfg'uspm = configServerUnixSocketMode - cfg'host = configServerHost - cfg'port = configServerPort - - case cfg'usp of - -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, - -- but we need to have runtime error if we try to use it in Windows, not compile time error - Just path -> createAndBindDomainSocket path cfg'uspm - Nothing -> do - (_, sock) <- - if cfg'port /= 0 - then do - sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host) - pure (cfg'port, sock) - else do - -- explicitly bind to a random port, returning bound port number - (num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host) - pure (num, sock) - pure sock +initServerSocket AppConfig{..} = case configServerUnixSocket of + -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, + -- but we need to have runtime error if we try to use it in Windows, not compile time error + Just path -> createAndBindDomainSocket path configServerUnixSocketMode + Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost) initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket) initAdminServerSocket AppConfig{..} =