Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ All notable changes to this project will be documented in this file. From versio
- Shutdown should wait for in flight requests by @mkleczek in #4702
- Remove automatic transaction retries on `40001 (serialization_failure)` errors to prevent replication lag by @laurenceisla in #3673
- Fix unexpected results when embedding and filtering the same table more than once by @laurenceisla in #4075
- Stop reporting 503s errors unnecessarily while the schema cache is loading at startup by @mkleczek in #4880

### Changed

Expand Down
16 changes: 8 additions & 8 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
import qualified Network.Socket as NS
import Protolude

runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket socketREST settings = do
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket getSocketREST settings = do
whenJust maybeAdminSocket $ \adminSocket -> do
address <- resolveSocketToAddress adminSocket
observer $ AdminStartObs address
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState socketREST
adminApp = admin appState getSocketREST
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> NS.Socket -> Wai.Application
admin appState socketREST req respond = do
isMainAppReachable <- isRight <$> reachMainApp socketREST
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
admin appState getSocketREST req respond = do
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
isLoaded <- AppState.isLoaded appState
isPending <- AppState.isPending appState

Expand All @@ -44,8 +44,8 @@ admin appState socketREST req respond = do
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
["ready"] ->
let
status | not isMainAppReachable = HTTP.status500
| isPending = HTTP.status503
status | isPending = HTTP.status503
| not isMainAppReachable = HTTP.status500
| isLoaded = HTTP.status200
| otherwise = HTTP.status500
in
Expand Down
77 changes: 32 additions & 45 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import System.IO.Error (ioeGetErrorType)
import Control.Monad.Except (liftEither)
import Control.Monad.Extra (whenJust)
import Data.Either.Combinators (mapLeft, whenLeft)
import Data.IORef (atomicWriteIORef, newIORef,
readIORef)
import Data.String (IsString (..))
import Network.Wai.Handler.Warp (defaultSettings, setHost,
setOnException, setPort,
Expand Down Expand Up @@ -63,11 +65,10 @@ import PostgREST.Version (docsVersion, prettyVersion)

import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import Data.Streaming.Network (bindPortTCP,
bindRandomPortTCP)
import Data.Streaming.Network (bindPortTCP)
import qualified Data.Text as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Types.Header as HTTP (hVary)
import qualified Network.HTTP.Types.Header as HTTP
import qualified Network.Socket as NS
import PostgREST.Unix (createAndBindDomainSocket)
import Protolude hiding (Handler)
Expand All @@ -76,22 +77,30 @@ run :: AppState -> IO ()
run appState = do
conf <- AppState.getConfig appState

AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
(mainSocket, adminSocket) <- initSockets conf
mainSocketRef <- newIORef Nothing
adminSocket <- initAdminServerSocket conf

let closeSockets = do
whenJust adminSocket NS.close
NS.close mainSocket
readIORef mainSocketRef >>= foldMap NS.close
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)

Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)

Listener.runListener appState

Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
-- Kick off and wait for the initial SchemaCache load before creating the
-- main API socket.
AppState.schemaCacheLoader appState
AppState.waitForSchemaCacheInit appState

mainSocket <- initServerSocket conf
atomicWriteIORef mainSocketRef $ Just mainSocket

let app = postgrest appState (AppState.schemaCacheLoader appState)

do
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address

Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
where
Expand Down Expand Up @@ -258,38 +267,16 @@ addRetryHint delay response = do
isServiceUnavailable :: Wai.Response -> Bool
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503

type AppSockets = (NS.Socket, Maybe NS.Socket)

initSockets :: AppConfig -> IO AppSockets
initSockets AppConfig{..} = do
let
cfg'usp = configServerUnixSocket
cfg'uspm = configServerUnixSocketMode
cfg'host = configServerHost
cfg'port = configServerPort
cfg'adminHost = configAdminServerHost
cfg'adminPort = configAdminServerPort

sock <- case cfg'usp of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path cfg'uspm
Nothing -> do
(_, sock) <-
if cfg'port /= 0
then do
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
pure (cfg'port, sock)
else do
-- explicitly bind to a random port, returning bound port number
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
pure (num, sock)
pure sock

adminSock <- case cfg'adminPort of
Just adminPort -> do
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
pure $ Just adminSock
Nothing -> pure Nothing

pure (sock, adminSock)
initServerSocket :: AppConfig -> IO NS.Socket
initServerSocket AppConfig{..} = case configServerUnixSocket of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
Nothing -> bindPortTCP configServerPort (fromString $ T.unpack configServerHost)

initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
initAdminServerSocket AppConfig{..} =
traverse (`bindPortTCP` adminHost) configAdminServerPort
where
adminHost = fromString $ T.unpack configAdminServerHost

23 changes: 17 additions & 6 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module PostgREST.AppState
, getObserver
, isLoaded
, isPending
, waitForSchemaCacheInit
) where

import qualified Data.ByteString.Char8 as BS
Expand All @@ -53,6 +54,9 @@ import Data.IORef (IORef, atomicWriteIORef, newIORef,
readIORef)
import Data.Time.Clock (UTCTime, getCurrentTime)

import Control.Concurrent.STM (TMVar, newEmptyTMVarIO,
putTMVar, readTMVar,
tryReadTMVar, tryTakeTMVar)
import PostgREST.Auth.JwtCache (JwtCacheState, update)
import PostgREST.Config (AppConfig (..),
readAppConfig,
Expand Down Expand Up @@ -102,9 +106,11 @@ data AppState = AppState
}

-- | Schema cache status.
-- Empty means pending and full means loaded.
-- Empty means initial loading on startup, False means pending and True means loaded.
-- "Initial" state is needed so that we can wait with application socket listening
-- until after initial schema cache querying.
newtype SchemaCacheStatus = SchemaCacheStatus
{ getSCStatusMVar :: MVar ()
{ getSCStatusTMVar :: TMVar Bool
}

init :: AppConfig -> IO AppState
Expand Down Expand Up @@ -380,16 +386,21 @@ retryingSchemaCacheLoad appState@AppState{stateObserver=observer, stateMainThrea
oneSecondInUs = 1000000 -- one second in microseconds

newSchemaCacheStatus :: IO SchemaCacheStatus
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyMVar
newSchemaCacheStatus = SchemaCacheStatus <$> newEmptyTMVarIO

markSchemaCachePending :: AppState -> IO ()
markSchemaCachePending = void . tryTakeMVar . getSCStatusMVar . stateSCacheStatus
markSchemaCachePending = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` False) . getSCStatusTMVar . stateSCacheStatus

markSchemaCacheLoaded :: AppState -> IO ()
markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCacheStatus
markSchemaCacheLoaded = atomically . liftA2 (*>) tryTakeTMVar (`putTMVar` True) . getSCStatusTMVar . stateSCacheStatus

isSchemaCacheLoaded :: AppState -> IO Bool
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus
isSchemaCacheLoaded = atomically . (pure . fromMaybe False <=< tryReadTMVar) . getSCStatusTMVar . stateSCacheStatus

-- | Wait for initial schema cache load to either finish or retry
-- | We wait until scStatusTMVar is not empty.
waitForSchemaCacheInit :: AppState -> IO ()
waitForSchemaCacheInit = atomically . void . readTMVar . getSCStatusTMVar . stateSCacheStatus

-- | Reads the in-db config and reads the config file again
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
Expand Down
6 changes: 3 additions & 3 deletions src/PostgREST/SchemaCache.hs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ maxDbTablesForFuzzySearch = 500
querySchemaCache :: AppConfig -> SQL.Transaction SchemaCache
querySchemaCache conf@AppConfig{..} = do
SQL.sql "set local schema ''" -- This voids the search path. The following queries need this for getting the fully qualified name(schema.name) of every db object
_ <-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing
tabs <- sqlTimedStmt gucTbls conf allTables
keyDeps <- sqlTimedStmt gucKDeps conf allViewsKeyDependencies
m2oRels <- sqlTimedStmt gucRels mempty allM2OandO2ORels
Expand All @@ -167,9 +170,6 @@ querySchemaCache conf@AppConfig{..} = do
tzones <- if configDbTimezoneEnabled
then sqlTimedStmt gucTzones mempty timezones
else pure S.empty
_ <-
let sleepCall = SQL.Statement "select pg_sleep($1 / 1000.0)" (param HE.int4) HD.noResult True in
for_ configInternalSCQuerySleep (`SQL.statement` sleepCall) -- only used for testing

qsTime <-
if isLogDebug
Expand Down
40 changes: 28 additions & 12 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import time
import pytest
import requests

from config import CONFIGSDIR, FIXTURES, SECRET
from util import (
Expand Down Expand Up @@ -1090,7 +1091,7 @@ def test_empty_schema_cache_log_contains_jwt_role(defaultenv):

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
"PGRST_JWT_SECRET": SECRET,
}
headers = jwtauthheader({"role": "postgrest_test_author"}, SECRET)
Expand Down Expand Up @@ -1543,14 +1544,19 @@ def test_log_postgrest_host_and_port(host, defaultenv):
with run(
env=defaultenv, host=host, port=port, no_startup_stdout=False
) as postgrest:
output = postgrest.read_stdout(nlines=10)
output = postgrest.read_stdout(nlines=11)

# Cannot assume a particular log entry order
# Listening on a socket happens after schema querying
# but is concurrent to the schema loading process
# and migh happen before or after writing of the
# "Schema cache loaded" log entry
if is_unix:
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
match_log(output, [r".*API server listening on .*/tmp/.*\.sock"])
elif is_ipv6(host):
assert f"API server listening on [{host}]:{port}" in output[2]
match_log(output, [r".*API server listening on \[.+]:\d+"])
else: # IPv4
assert f"API server listening on {host}:{port}" in output[2]
match_log(output, [r".*API server listening on .+:\d+"])


def test_succeed_w_role_having_superuser_settings(defaultenv):
Expand Down Expand Up @@ -1898,17 +1904,24 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
assert any(log_message in line for line in output)


def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an empty schema cache on startup"
def test_log_error_when_schema_cache_load_error_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an error loading schema cache on startup"

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "1000",
"PGRST_DB_SCHEMAS": "non_existent_schema_aaaa",
}

with run(env=env, wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

# First call should fail with connection refused
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")

# Next call should return 503
time.sleep(1)
response = postgrest.session.get("/projects")
assert response.status_code == 503

Expand All @@ -1920,7 +1933,7 @@ def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):


def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"
"Should only load the schema cache once when there's an empty schema cache on startup"

env = {
**defaultenv,
Expand All @@ -1930,12 +1943,15 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
with run(env=env, port=freeport(), wait_for=None) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
assert response.status_code == 503
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")

# Should wait enough time to load the schema cache twice to guarantee that the test is valid
time.sleep(1)

response = postgrest.session.get("/projects")
assert response.status_code == 200

response = postgrest.admin.get("/metrics")
assert response.status_code == 200
assert 'pgrst_schema_cache_loads_total{status="SUCCESS"} 1.0' in response.text
Expand Down Expand Up @@ -2017,7 +2033,7 @@ def test_schema_cache_error_observation(defaultenv):
output = postgrest.read_stdout(nlines=9)
assert (
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
in output[7]
in output[6]
)


Expand Down