diff --git a/cabal.project b/cabal.project index b99426749..371576f0b 100644 --- a/cabal.project +++ b/cabal.project @@ -39,11 +39,6 @@ package cardano-db-tool package cardano-smash-server ghc-options: -Wall -Werror -Wredundant-constraints -Wincomplete-uni-patterns -Wincomplete-record-updates -Wpartial-fields -Wunused-imports -Wunused-packages -package blockio - -- Use serial block IO to avoid requiring liburing (not available in devx CI). - -- TODO: revert when CI provides liburing or switch to io_uring for better LSM perf. - flags: +serialblockio - package cardano-node -- We are using cardano-node as a library and we never use the systemd scribe, so there -- is no benefit to linking against it diff --git a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs index ad3ec337a..bc69d8768 100644 --- a/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs +++ b/cardano-chain-gen/test/Test/Cardano/Db/Mock/Config.hs @@ -281,8 +281,15 @@ withConfig staticDir mutableDir cmdLineArgs config action = do {-# ANN withConfig ("HLint: ignore Redundant pure" :: String) #-} mkSyncNodeConfig :: FilePath -> CommandLineArgs -> IO SyncNodeConfig -mkSyncNodeConfig configFilePath cmdLineArgs = - readSyncNodeConfig $ mkConfigFile configDir configFilename +mkSyncNodeConfig configFilePath cmdLineArgs = do + cfg <- readSyncNodeConfig $ mkConfigFile configDir configFilename + -- Allow env-var override of the ledger backend so CI can run the + -- full suite against both "inmemory" and "lsm" without per-test config changes. + mBackend <- lookupEnv "DB_SYNC_TEST_LEDGER_BACKEND" + pure $ case mBackend of + Just "lsm" -> cfg {dncLedgerBackend = LedgerBackendLSM Nothing} + Just "inmemory" -> cfg {dncLedgerBackend = LedgerBackendInMemory} + _ -> cfg where configFilename = claConfigFilename cmdLineArgs configDir = mkConfigDir configFilePath diff --git a/cardano-db-sync/app/cardano-db-sync.hs b/cardano-db-sync/app/cardano-db-sync.hs index 3f070f07d..3739b8d89 100644 --- a/cardano-db-sync/app/cardano-db-sync.hs +++ b/cardano-db-sync/app/cardano-db-sync.hs @@ -67,9 +67,10 @@ dbSyncMain = do stateDirErrorMsg :: [Char] stateDirErrorMsg = - "Error: If not using --state-dir then make sure to have ledger disabled. " - <> "For more details view https://github.com/IntersectMBO/cardano-db-sync/blob" - <> "/master/doc/syncing-and-rollbacks.md#ledger-state" + "Error: --state-dir is required when ledger is enabled. " + <> "Either provide --state-dir or set \"ledger\": \"disable\" in the config. " + <> "See https://github.com/IntersectMBO/cardano-db-sync/blob" + <> "/master/doc/configuration.md#ledger" --------------------------------------------------------------------------------------------------- -- Command Line Configurations diff --git a/cardano-db-sync/cardano-db-sync.cabal b/cardano-db-sync/cardano-db-sync.cabal index b382a588d..46accabeb 100644 --- a/cardano-db-sync/cardano-db-sync.cabal +++ b/cardano-db-sync/cardano-db-sync.cabal @@ -108,6 +108,7 @@ library Cardano.DbSync.Era.Util Cardano.DbSync.Ledger.Event + Cardano.DbSync.Ledger.Snapshot Cardano.DbSync.Ledger.State Cardano.DbSync.Ledger.Types @@ -190,6 +191,8 @@ library , either , extra , filepath + , fs-api + , lsm-tree , groups , hasql , http-client @@ -204,6 +207,8 @@ library , ouroboros-consensus , ouroboros-consensus:cardano , ouroboros-consensus:diffusion + , ouroboros-consensus:lsm + , resource-registry , ouroboros-consensus:protocol , ouroboros-network:api , ouroboros-network:framework @@ -211,6 +216,7 @@ library , plutus-ledger-api , prometheus , psqueues + , random , random-shuffle , scientific , serialise @@ -250,7 +256,11 @@ executable cardano-db-sync -Wno-unsafe -threaded -rtsopts - "-with-rtsopts=-A16m -N3 --disable-delayed-os-memory-return" + + if arch(arm) + ghc-options: "-with-rtsopts=-T -I0 -A16m -N1 --disable-delayed-os-memory-return" + else + ghc-options: "-with-rtsopts=-T -I0 -A16m -qg1 -qb1 -N2 --disable-delayed-os-memory-return" autogen-modules: Paths_cardano_db_sync MigrationValidations @@ -289,7 +299,11 @@ executable http-get-json-metadata -Wno-unsafe -threaded -rtsopts - "-with-rtsopts=-A16m -N3 --disable-delayed-os-memory-return" + + if arch(arm) + ghc-options: "-with-rtsopts=-T -I0 -A16m -N1 --disable-delayed-os-memory-return" + else + ghc-options: "-with-rtsopts=-T -I0 -A16m -qg1 -qb1 -N2 --disable-delayed-os-memory-return" build-depends: base , ansi-terminal @@ -318,7 +332,11 @@ executable test-http-get-json-metadata -Wno-unsafe -threaded -rtsopts - "-with-rtsopts=-A16m -N3 --disable-delayed-os-memory-return" + + if arch(arm) + ghc-options: "-with-rtsopts=-T -I0 -A16m -N1 --disable-delayed-os-memory-return" + else + ghc-options: "-with-rtsopts=-T -I0 -A16m -qg1 -qb1 -N2 --disable-delayed-os-memory-return" build-depends: base , bytestring diff --git a/cardano-db-sync/src/Cardano/DbSync.hs b/cardano-db-sync/src/Cardano/DbSync.hs index 06a620341..992ddbcc6 100644 --- a/cardano-db-sync/src/Cardano/DbSync.hs +++ b/cardano-db-sync/src/Cardano/DbSync.hs @@ -45,7 +45,7 @@ import Cardano.Slotting.Slot (EpochNo (..)) import qualified Cardano.Db as DB import Cardano.DbSync.Api -import Cardano.DbSync.Api.Types (InsertOptions (..), RunMigration, SyncEnv (..), SyncOptions (..), envLedgerEnv) +import Cardano.DbSync.Api.Types (InsertOptions (..), LedgerEnv (..), RunMigration, SyncEnv (..), SyncOptions (..), envLedgerEnv) import Cardano.DbSync.Config (configureLogging) import Cardano.DbSync.Config.Cardano import Cardano.DbSync.Config.Types @@ -54,6 +54,7 @@ import Cardano.DbSync.DbEvent import Cardano.DbSync.Era import Cardano.DbSync.Error import Cardano.DbSync.Ledger.State +import Cardano.DbSync.Ledger.Types (HasLedgerEnv (..)) import Cardano.DbSync.OffChain (runFetchOffChainPoolThread, runFetchOffChainVoteThread) import Cardano.DbSync.Rollback (handlePostRollbackSnapshots, unsafeRollback) import Cardano.DbSync.Sync (runSyncNodeClient) @@ -248,6 +249,10 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runNearTipMigrationFnc syncN -- communication channel between datalayer thread and chainsync-client thread threadChannels <- liftIO newThreadChannels + -- 'finally' on the worker pool ensures the LSM session (and any other + -- backend resources) are closed even when db-sync is cancelled or + -- crashes — important for tests that restart db-sync in the same + -- process and need the OS file lock to be released. liftIO $ mapConcurrently_ id @@ -257,6 +262,7 @@ runSyncNode metricsSetters trce iomgr dbConnSetting runNearTipMigrationFnc syncN , runFetchOffChainVoteThread syncEnv , runLedgerStateWriteThread (getTrace syncEnv) (envLedgerEnv syncEnv) ] + `finally` closeLedgerEnv syncEnv ) where useShelleyInit :: SyncNodeConfig -> Bool @@ -353,3 +359,14 @@ txOutConfigToTableType config = case config of TxOutConsumed _ (UseTxOutAddress flag) -> if flag then DB.TxOutVariantAddress else DB.TxOutVariantCore TxOutConsumedPrune _ (UseTxOutAddress flag) -> if flag then DB.TxOutVariantAddress else DB.TxOutVariantCore TxOutConsumedBootstrap _ (UseTxOutAddress flag) -> if flag then DB.TxOutVariantAddress else DB.TxOutVariantCore + +-- | Release backend resources held by the ledger environment. +-- Currently this closes the LSM session (no-op for InMemory and NoLedger). +closeLedgerEnv :: SyncEnv -> IO () +closeLedgerEnv syncEnv = case envLedgerEnv syncEnv of + HasLedger le -> do + let trce = leTrace le + logInfo trce "closeLedgerEnv: closing LSM session..." + leClose le + logInfo trce "closeLedgerEnv: closed." + NoLedger _ -> pure () diff --git a/cardano-db-sync/src/Cardano/DbSync/Api.hs b/cardano-db-sync/src/Cardano/DbSync/Api.hs index 858ff91ae..18d97e2d4 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Api.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Api.hs @@ -85,14 +85,14 @@ import Cardano.DbSync.Error import Cardano.DbSync.Ledger.Event (LedgerEvent (..)) import Cardano.DbSync.Ledger.State ( getHeaderHash, - hashToAnnotation, listKnownSnapshots, mkHasLedgerEnv, ) -import Cardano.DbSync.Ledger.Types (HasLedgerEnv (..), LedgerStateFile (..), SnapshotPoint (..)) +import Cardano.DbSync.Ledger.Types (HasLedgerEnv (..), SnapshotPoint (..)) import Cardano.DbSync.LocalStateQuery import Cardano.DbSync.Types import Cardano.DbSync.Util +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots (DiskSnapshot (..)) setConsistentLevel :: SyncEnv -> ConsistentLevel -> IO () setConsistentLevel env cst = do @@ -341,7 +341,15 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw maxLovelaceSupply nw else pure useNoCache consistentLevelVar <- newTVarIO Unchecked indexesVar <- newTVarIO $ enpForceIndexes syncNP - bts <- getBootstrapInProgress trce (isTxOutConsumedBootstrap' syncNodeConfigFromFile) dbEnv + let bootstrapFlag = isTxOutConsumedBootstrap' syncNodeConfigFromFile + case (bootstrapFlag, dncLedgerBackend syncNodeConfigFromFile) of + (True, LedgerBackendLSM _) -> + DB.logAndThrowIO trce $ + "bootstrap-tx-out is not supported with ledger_backend=lsm. " + <> "The bootstrap path reads the full UTxO from the in-memory ledger state, " + <> "which is empty under LSM. Use ledger_backend=inmemory or disable bootstrap." + _ -> pure () + bts <- getBootstrapInProgress trce bootstrapFlag dbEnv bootstrapVar <- newTVarIO bts -- Offline Pool + Anchor queues opwq <- newTBQueueIO 1000 @@ -363,6 +371,7 @@ mkSyncEnv metricSetters trce dbEnv syncOptions protoInfo nw maxLovelaceSupply nw maxLovelaceSupply systemStart syncOptions + (dncLedgerBackend syncNodeConfigFromFile) (Nothing, False) -> NoLedger <$> mkNoLedgerEnv trce protoInfo nw systemStart (Just _, False) -> do logWarning trce $ @@ -472,11 +481,11 @@ verifySnapshotPoint env snapPoints = catMaybes <$> mapM validLedgerFileToPoint snapPoints where validLedgerFileToPoint :: SnapshotPoint -> IO (Maybe (CardanoPoint, Bool)) - validLedgerFileToPoint (OnDisk lsf) = do - hashes <- getSlotHash (envDbEnv env) (lsfSlotNo lsf) - let valid = find (\(_, h) -> lsfHash lsf == hashToAnnotation h) hashes - case valid of - Just (slot, hash) | slot == lsfSlotNo lsf -> pure $ convertToDiskPoint slot hash + validLedgerFileToPoint (OnDisk ds) = do + let slot = SlotNo (dsNumber ds) + hashes <- getSlotHash (envDbEnv env) slot + case hashes of + [(s, _h)] | s == slot -> pure $ convertToDiskPoint slot _h _ -> pure Nothing validLedgerFileToPoint (InMemory pnt) = do case pnt of diff --git a/cardano-db-sync/src/Cardano/DbSync/Config.hs b/cardano-db-sync/src/Cardano/DbSync/Config.hs index 8f0471a9b..2b1886263 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Config.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Config.hs @@ -87,6 +87,7 @@ coalesceConfig pcfg ncfg adjustGenesisPath = do , dncInsertOptions = extractInsertOptions pcfg , dncIpfsGateway = endsInSlash <$> pcIpfsGateway pcfg , dncSnapshotInterval = pcSnapshotInterval pcfg + , dncLedgerBackend = pcLedgerBackend pcfg } mkAdjustPath :: SyncPreConfig -> (FilePath -> FilePath) diff --git a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs index d808c4399..e032770b9 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Config/Types.hs @@ -30,6 +30,7 @@ module Cardano.DbSync.Config.Types ( TxOutConfig (..), UseTxOutAddress (..), ForceTxIn (..), + LedgerBackend (..), LedgerInsertConfig (..), ShelleyInsertConfig (..), RewardsConfig (..), @@ -150,6 +151,7 @@ data SyncNodeConfig = SyncNodeConfig , dncInsertOptions :: !SyncInsertOptions , dncIpfsGateway :: [Text] , dncSnapshotInterval :: !SnapshotIntervalConfig + , dncLedgerBackend :: !LedgerBackend } data SyncPreConfig = SyncPreConfig @@ -163,6 +165,7 @@ data SyncPreConfig = SyncPreConfig , pcInsertConfig :: !SyncInsertConfig , pcIpfsGateway :: ![Text] , pcSnapshotInterval :: !SnapshotIntervalConfig + , pcLedgerBackend :: !LedgerBackend } deriving (Show) @@ -224,6 +227,20 @@ newtype UseTxOutAddress = UseTxOutAddress {unUseTxOutAddress :: Bool} deriving (Eq, Show) deriving newtype (ToJSON, FromJSON) +-- | Choose the backend for storing ledger tables (UTxO set). +-- 'LedgerBackendInMemory' keeps everything in RAM (current default). +-- 'LedgerBackendLSM' uses LSM trees on disk for lower memory usage. +data LedgerBackend + = LedgerBackendInMemory + | LedgerBackendLSM (Maybe FilePath) + deriving (Eq, Show) + +instance FromJSON LedgerBackend where + parseJSON = Aeson.withText "LedgerBackend" $ \case + "inmemory" -> pure LedgerBackendInMemory + "lsm" -> pure (LedgerBackendLSM Nothing) + other -> fail $ "unexpected ledger_backend: " <> show other <> ". Expected \"inmemory\" or \"lsm\"." + data LedgerInsertConfig = LedgerEnable | LedgerDisable @@ -423,6 +440,7 @@ parseGenSyncNodeConfig o = <*> o .:? "insert_options" .!= def <*> o .:? "ipfs_gateway" .!= ["https://ipfs.io/ipfs"] <*> o .:? "snapshot_interval" .!= def + <*> o .:? "ledger_backend" .!= LedgerBackendInMemory instance FromJSON SyncProtocol where parseJSON o = diff --git a/cardano-db-sync/src/Cardano/DbSync/Ledger/Snapshot.hs b/cardano-db-sync/src/Cardano/DbSync/Ledger/Snapshot.hs new file mode 100644 index 000000000..473b4ce26 --- /dev/null +++ b/cardano-db-sync/src/Cardano/DbSync/Ledger/Snapshot.hs @@ -0,0 +1,244 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE NoImplicitPrelude #-} + +-- | Snapshot operations for db-sync, using consensus snapshot format. +-- +-- This module replaces the old custom .lstate snapshot format with +-- consensus's directory-based format ([_suffix]/ containing +-- state, meta, utxoSize files). +module Cardano.DbSync.Ledger.Snapshot ( + -- * Migration + migrateOldSnapshots, + + -- * Save + saveCurrentLedgerState, + saveCleanupState, + snapshotWriteLoop, + runLedgerStateWriteThread, + + -- * Load + loadSnapshotFromDisk, + findStateFromSnapshot, + + -- * List / Cleanup + listDiskSnapshots, + deleteNewerSnapshots, + + -- * Snapshot points + listKnownSnapshots, + getSlotNoSnapshot, +) where + +import Cardano.BM.Trace (Trace, logInfo, logWarning) +import Cardano.DbSync.Api.Types (LedgerEnv (..)) +import Cardano.DbSync.Config.Types (LedgerStateDir (..)) +import Cardano.DbSync.Ledger.Types +import Cardano.DbSync.Types (CardanoPoint) +import Cardano.Prelude hiding (atomically) +import Cardano.Slotting.Slot (EpochNo (..), WithOrigin (..)) +import Control.Concurrent.Class.MonadSTM.Strict (atomically, readTVarIO, writeTVar) +import Control.Concurrent.STM.TBQueue (readTBQueue, writeTBQueue) +import qualified Data.List as List +import qualified Data.Strict.Maybe as Strict +import Data.Time.Clock (diffUTCTime, getCurrentTime) +import qualified Database.LSMTree as LSMTree +import qualified Ouroboros.Consensus.Ledger.Abstract as Consensus +import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..)) +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq as Consensus (StateRef (..)) +import Ouroboros.Network.Block +import qualified Ouroboros.Network.Point as Point +import System.Directory (doesDirectoryExist, listDirectory, removeFile) +import System.FilePath (takeExtension, ()) + +-- | Remove old .lstate files from a previous db-sync version. +-- This is a one-time migration on first startup with the new format. +migrateOldSnapshots :: LedgerStateDir -> Trace IO Text -> IO () +migrateOldSnapshots (LedgerStateDir stateDir) tracer = do + exists <- doesDirectoryExist stateDir + when exists $ do + files <- listDirectory stateDir + let oldFiles = filter (\f -> takeExtension f == ".lstate") files + unless (null oldFiles) $ do + logInfo tracer $ "Migrating: removing " <> textShow (length oldFiles) <> " old .lstate snapshots" + forM_ oldFiles $ \f -> + handle (\(_ :: IOException) -> pure ()) $ removeFile (stateDir f) + +-- | Queue a snapshot for async writing. +saveCurrentLedgerState :: HasLedgerEnv -> DbSyncStateRef -> Maybe EpochNo -> IO () +saveCurrentLedgerState env lState _mEpochNo = do + -- Don't store genesis + case Consensus.ledgerTipSlot $ ledgerState (clsState (srState lState)) of + Origin -> pure () + At _ -> do + atomically $ writeTVar (srCanClose lState) False + atomically $ writeTBQueue (leSnapshotQueue env) lState + +-- | Save a snapshot and clean up old ones. +saveCleanupState :: HasLedgerEnv -> DbSyncStateRef -> Maybe EpochNo -> IO () +saveCleanupState = saveCurrentLedgerState + +-- | The write thread that takes snapshots from the queue. +runLedgerStateWriteThread :: Trace IO Text -> LedgerEnv -> IO () +runLedgerStateWriteThread tracer lenv = + case lenv of + HasLedger le -> snapshotWriteLoop tracer le + NoLedger _ -> forever $ threadDelay 600000000 + +-- | Write loop: read from queue, take snapshot via consensus SnapshotManager. +snapshotWriteLoop :: Trace IO Text -> HasLedgerEnv -> IO () +snapshotWriteLoop tracer env = loop + where + loop :: IO () + loop = do + ledger <- atomically $ readTBQueue (leSnapshotQueue env) + startTime <- getCurrentTime + let cRef = toConsensusStateRef ledger + mResult <- takeSnapshot (leSnapshotManager env) Nothing cRef + atomically $ writeTVar (srCanClose ledger) True + endTime <- getCurrentTime + case mResult of + Nothing -> pure () + Just (ds, _pt) -> do + logInfo tracer $ + mconcat + [ "Wrote snapshot " + , textShow (snapshotToDirName ds) + , " in " + , textShow (diffUTCTime endTime startTime) + ] + -- Trim old snapshots after writing the new one + let policy = + SnapshotPolicy + { onDiskNumSnapshots = 3 + , onDiskShouldTakeSnapshot = \_ _ -> True + } + deleted <- trimSnapshots (leSnapshotManager env) policy + unless (null deleted) $ + logInfo tracer $ + "Trimmed " <> textShow (length deleted) <> " old snapshots" + loop + +-- | Load a snapshot from disk using consensus APIs. +-- Returns a DbSyncStateRef with clsEpochBlockNo = 0 +-- (will be corrected after first epoch boundary). +loadSnapshotFromDisk :: + HasLedgerEnv -> + DiskSnapshot -> + IO (Either Text DbSyncStateRef) +loadSnapshotFromDisk env ds = do + startTime <- getCurrentTime + eResult <- + handle (\(err :: LSMTree.SnapshotDoesNotExistError) -> pure $ Left $ textShow err) $ + leLoadSnapshot env ds + endTime <- getCurrentTime + case eResult of + Left err -> + pure $ + Left $ + "Failed to load snapshot " + <> textShow (snapshotToDirName ds) + <> ": " + <> err + Right cRef -> do + logInfo (leTrace env) $ + mconcat + [ "Loaded snapshot " + , textShow (snapshotToDirName ds) + , " in " + , textShow (diffUTCTime endTime startTime) + ] + let Consensus.StateRef st _ = cRef + Right <$> fromConsensusStateRef (deriveEpochBlockNo st) cRef + +-- | Try to find a snapshot matching the given point, or fall back to genesis. +findStateFromSnapshot :: + HasLedgerEnv -> + CardanoPoint -> + IO (Either [DiskSnapshot] DbSyncStateRef) +findStateFromSnapshot env point = do + snapshots <- listSnapshots (leSnapshotManager env) + case getPoint point of + Origin -> do + -- Delete all snapshots and start from genesis + forM_ snapshots $ safeDeleteSnapshot (leSnapshotManager env) + Right <$> initCardanoLedgerState env + At blk -> do + let targetSlot = Point.blockPointSlot blk + -- Delete snapshots newer than the target + let (newer, rest) = List.span (\ds -> SlotNo (dsNumber ds) > targetSlot) snapshots + forM_ newer $ \ds -> do + logInfo (leTrace env) $ "Deleting newer snapshot: " <> textShow (snapshotToDirName ds) + safeDeleteSnapshot (leSnapshotManager env) ds + -- Try to find a matching snapshot + case List.find (\ds -> SlotNo (dsNumber ds) == targetSlot) rest of + Just ds -> do + result <- loadSnapshotFromDisk env ds + case result of + Right sr -> pure $ Right sr + Left err -> do + logWarning (leTrace env) $ "Failed to load snapshot: " <> err <> ". Trying older." + safeDeleteSnapshot (leSnapshotManager env) ds + let older = List.filter (\d -> SlotNo (dsNumber d) < targetSlot) rest + pure $ Left older + Nothing -> do + let older = List.filter (\ds -> SlotNo (dsNumber ds) < targetSlot) rest + case older of + [] -> pure $ Left [] + _ -> pure $ Left older + +-- | List snapshots using the consensus snapshot manager. +listDiskSnapshots :: HasLedgerEnv -> IO [DiskSnapshot] +listDiskSnapshots env = listSnapshots (leSnapshotManager env) + +-- | Safely delete a snapshot, ignoring missing LSM snapshot errors. +-- Works around a consensus bug where 'deleteSnapshotIfTemporary' calls +-- 'LSM.deleteSnapshot' unconditionally, which throws 'SnapshotDoesNotExistError' +-- if the LSM snapshot is missing (e.g. after a crash during snapshot write). +safeDeleteSnapshot :: SnapshotManager IO IO blk st -> DiskSnapshot -> IO () +safeDeleteSnapshot sm ds = + handle (\(_ :: LSMTree.SnapshotDoesNotExistError) -> pure ()) $ + deleteSnapshotIfTemporary sm ds + +-- | Delete snapshot directories newer than the given point. +deleteNewerSnapshots :: HasLedgerEnv -> CardanoPoint -> IO () +deleteNewerSnapshots env point = do + snapshots <- listDiskSnapshots env + case getPoint point of + Origin -> + forM_ snapshots $ safeDeleteSnapshot (leSnapshotManager env) + At blk -> do + let targetSlot = Point.blockPointSlot blk + newer = filter (\ds -> SlotNo (dsNumber ds) > targetSlot) snapshots + forM_ newer $ safeDeleteSnapshot (leSnapshotManager env) + +-- | List known snapshot points (both in-memory and on-disk). +listKnownSnapshots :: HasLedgerEnv -> IO [SnapshotPoint] +listKnownSnapshots env = do + inMem <- fmap InMemory <$> listMemorySnapshots env + onDisk <- fmap OnDisk <$> listDiskSnapshots env + pure $ List.sortOn (Down . getSlotNoSnapshot) $ inMem <> onDisk + +getSlotNoSnapshot :: SnapshotPoint -> WithOrigin SlotNo +getSlotNoSnapshot (OnDisk ds) = At $ SlotNo (dsNumber ds) +getSlotNoSnapshot (InMemory cp) = pointSlot cp + +listMemorySnapshots :: HasLedgerEnv -> IO [CardanoPoint] +listMemorySnapshots env = do + mState <- readTVarIO (leStateVar env) + case mState of + Strict.Nothing -> pure [] + Strict.Just ledgerDB -> + pure $ + filter + notGenesis + (castPoint . Consensus.getTip . clsState . srState <$> getEdgePoints ledgerDB) + where + getEdgePoints ldb = + case toList $ ledgerDbCheckpoints ldb of + [] -> [] + [a] -> [a] + (h : ls) -> catMaybes [Just h, lastMay ls] + notGenesis GenesisPoint = False + notGenesis (BlockPoint _ _) = True diff --git a/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs b/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs index bf131fb16..9690d2c73 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Ledger/State.hs @@ -1,14 +1,15 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} -{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE NoImplicitPrelude #-} @@ -20,29 +21,27 @@ module Cardano.DbSync.Ledger.State ( getGovExpiresAt, mkHasLedgerEnv, applyBlockAndSnapshot, - listLedgerStateFilesOrdered, - listKnownSnapshots, - loadLedgerStateFromFile, - findLedgerStateFile, + initCardanoLedgerState, loadLedgerAtPoint, hashToAnnotation, getHeaderHash, - runLedgerStateWriteThread, getStakeSlice, findProposedCommittee, writeLedgerState, - saveCleanupState, + ledgerDbCurrent, + + -- * Re-exports from Snapshot + module Cardano.DbSync.Ledger.Snapshot, ) where -import Cardano.BM.Trace (Trace, logInfo, logWarning) -import Cardano.Binary (Decoder, DecoderError) -import qualified Cardano.Binary as Serialize -import Cardano.DbSync.Api.Types (InsertOptions (..), LedgerEnv (..), SyncOptions (..)) +import Cardano.BM.Trace (Trace, logInfo) +import Cardano.DbSync.Api.Types (InsertOptions (..), SyncOptions (..)) import Cardano.DbSync.Config.Types import qualified Cardano.DbSync.Era.Cardano.Util as Cardano import qualified Cardano.DbSync.Era.Shelley.Generic as Generic import Cardano.DbSync.Error (SyncNodeError (..), fromEitherSTM) import Cardano.DbSync.Ledger.Event +import Cardano.DbSync.Ledger.Snapshot import Cardano.DbSync.Ledger.Types import Cardano.DbSync.StateQuery import Cardano.DbSync.Types @@ -63,7 +62,6 @@ import Cardano.Slotting.Slot ( EpochNo (..), SlotNo (..), WithOrigin (..), - at, fromWithOrigin, ) import Control.Concurrent.Class.MonadSTM.Strict ( @@ -72,31 +70,28 @@ import Control.Concurrent.Class.MonadSTM.Strict ( readTVar, writeTVar, ) -import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue) -import qualified Control.Exception as Exception +import Control.Concurrent.STM.TBQueue (newTBQueueIO) +import Control.ResourceRegistry (runWithTempRegistry) +import qualified Control.Tracer as Tracer import qualified Data.ByteString.Base16 as Base16 import qualified Data.ByteString.Char8 as BS -import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.ByteString.Short as SBS import qualified Data.List as List import qualified Data.Map.Strict as Map +import Data.Sequence.Strict (StrictSeq (..)) +import qualified Data.Sequence.Strict as SSeq import qualified Data.Set as Set import qualified Data.Strict.Maybe as Strict import qualified Data.Text as Text -import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) +import Data.Time.Clock (UTCTime, getCurrentTime) import GHC.IO.Exception (userError) import Lens.Micro ((%~), (^.), (^?)) import Ouroboros.Consensus.Block ( - CodecConfig, - Point (..), WithOrigin (..), blockHash, blockIsEBB, blockPrevHash, - castPoint, - pointSlot, ) -import Ouroboros.Consensus.Block.Abstract (ConvertRawHash (..)) import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..)) import Ouroboros.Consensus.Cardano.Block (ConwayEra, LedgerState (..)) import Ouroboros.Consensus.Cardano.CanHardFork () @@ -108,22 +103,24 @@ import Ouroboros.Consensus.HardFork.Combinator.State (epochInfoLedger) import qualified Ouroboros.Consensus.HardFork.History as History import Ouroboros.Consensus.Ledger.Abstract (LedgerResult) import qualified Ouroboros.Consensus.Ledger.Abstract as Consensus -import Ouroboros.Consensus.Ledger.Basics (EmptyMK, KeysMK, LedgerTables) +import Ouroboros.Consensus.Ledger.Basics (EmptyMK) import Ouroboros.Consensus.Ledger.Extended (ExtLedgerCfg (..), ExtLedgerState (..)) -import qualified Ouroboros.Consensus.Ledger.Extended as Consensus -import Ouroboros.Consensus.Ledger.Tables.Utils (applyDiffsMK, forgetLedgerTables, restrictValuesMK) +import Ouroboros.Consensus.Ledger.Tables.Utils (forgetLedgerTables) import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus import Ouroboros.Consensus.Shelley.Ledger.Block import qualified Ouroboros.Consensus.Shelley.Ledger.Ledger as Consensus -import Ouroboros.Consensus.Storage.Serialisation (DecodeDisk (..), EncodeDisk (..)) -import Ouroboros.Network.AnchoredSeq (AnchoredSeq (..)) -import qualified Ouroboros.Network.AnchoredSeq as AS -import Ouroboros.Network.Block (HeaderHash, Point (..)) -import qualified Ouroboros.Network.Point as Point -import System.Directory (doesFileExist, listDirectory, removeFile) -import System.FilePath (dropExtension, takeExtension, ()) +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots +import Ouroboros.Consensus.Storage.LedgerDB.V2.Backend hiding (Trace) +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.InMemory as InMem +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LSM as LSM +import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq hiding (StateRef) +import Ouroboros.Network.Block (HeaderHash, pointSlot) +import System.FS.API (SomeHasFS (..), mkFsPath) +import System.FS.API.Types (MountPoint (..)) +import System.FS.IO (ioHasFS) +import System.FilePath (splitDirectories, ()) import System.Mem (performMajorGC) -import Prelude (String, id) +import System.Random (genWord64, newStdGen) -- Note: The decision on whether a ledger-state is written to disk is based on the block number -- rather than the slot number because while the block number is fully populated (for every block @@ -135,24 +132,59 @@ import Prelude (String, id) {- HLINT ignore "Reduce duplication" -} {- HLINT ignore "Use readTVarIO" -} -pushLedgerDB :: LedgerDB -> CardanoLedgerState -> LedgerDB +-- -- | Create a simple in-memory 'LedgerTablesHandle' from 'LedgerTables ValuesMK'. +-- -- Each handle closes over an IORef that is written to at most once (by pushDiffs). +-- mkHandleFromValues :: +-- LedgerTables (ExtLedgerState CardanoBlock) ValuesMK -> +-- IO (LedgerTablesHandle IO (ExtLedgerState CardanoBlock)) +-- mkHandleFromValues tables = do +-- ref <- newIORef tables +-- pure +-- LedgerTablesHandle +-- { close = pure () +-- , transfer = \_ -> pure () +-- , duplicate = \reg -> do +-- current <- readIORef ref +-- (rk, h) <- allocate reg (\_ -> mkHandleFromValues current) close +-- pure (rk, h) +-- , read = \_st keys -> do +-- vals <- readIORef ref +-- pure $ ltliftA2 restrictValuesMK vals keys +-- , readRange = \_st _ -> do +-- vals <- readIORef ref +-- pure (vals, Nothing) +-- , readAll = \_st -> readIORef ref +-- , pushDiffs = \_oldSt newSt -> do +-- let diffs = ltprj newSt +-- vals <- readIORef ref +-- writeIORef ref $ ltliftA2 applyDiffsMK vals diffs +-- , takeHandleSnapshot = \_st _name -> pure Nothing +-- , tablesSize = pure 0 +-- } + +-- | Push a new DbSyncStateRef and prune old ones. Returns the new DB and +-- any pruned DbSyncStateRefs whose handles should be closed. +pushLedgerDB :: LedgerDB -> DbSyncStateRef -> (LedgerDB, [DbSyncStateRef]) pushLedgerDB db st = pruneLedgerDb - 10 + 100 db - { ledgerDbCheckpoints = ledgerDbCheckpoints db :> st + { ledgerDbCheckpoints = st SSeq.<| ledgerDbCheckpoints db } --- | Prune snapshots until at we have at most @k@ snapshots in the LedgerDB, --- excluding the snapshots stored at the anchor. -pruneLedgerDb :: Word64 -> LedgerDB -> LedgerDB +-- | Prune snapshots until we have at most @k@ snapshots in the LedgerDB. +-- Returns the pruned DB and the dropped DbSyncStateRefs whose handles should be closed. +pruneLedgerDb :: Word64 -> LedgerDB -> (LedgerDB, [DbSyncStateRef]) pruneLedgerDb k db = - db {ledgerDbCheckpoints = AS.anchorNewest k (ledgerDbCheckpoints db)} + let (!kept, !dropped) = SSeq.splitAt (fromIntegral k) (ledgerDbCheckpoints db) + in (db {ledgerDbCheckpoints = kept}, toList dropped) {-# INLINE pruneLedgerDb #-} --- | The ledger state at the tip of the chain -ledgerDbCurrent :: LedgerDB -> CardanoLedgerState -ledgerDbCurrent = either id id . AS.head . ledgerDbCheckpoints +-- | The current DbSyncStateRef at the tip of the chain +ledgerDbCurrent :: LedgerDB -> DbSyncStateRef +ledgerDbCurrent db = case ledgerDbCheckpoints db of + sr SSeq.:<| _ -> sr + SSeq.Empty -> panic "ledgerDbCurrent: empty LedgerDB" mkHasLedgerEnv :: Trace IO Text -> @@ -162,11 +194,63 @@ mkHasLedgerEnv :: Word64 -> SystemStart -> SyncOptions -> + LedgerBackend -> IO HasLedgerEnv -mkHasLedgerEnv trce protoInfo dir nw maxLovelaceSupply systemStart syncOptions = do +mkHasLedgerEnv trce protoInfo dir nw maxLovelaceSupply systemStart syncOptions backend = do + -- Clean up any legacy .lstate files from previous versions + migrateOldSnapshots dir trce svar <- newTVarIO Strict.Nothing intervar <- newTVarIO Strict.Nothing - swQueue <- newTBQueueIO 5 -- Should be relatively shallow. + snapQueue <- newTBQueueIO 5 + + let codecConfig = configCodec $ Consensus.pInfoConfig protoInfo + someHasFS = SomeHasFS $ ioHasFS (MountPoint $ unLedgerStateDir dir) + snapTracer = Tracer.nullTracer -- TODO: wire up snapshot tracing + (snapMgr, initGen, loadSnap, closeBackend) <- case backend of + LedgerBackendInMemory -> do + res <- + runWithTempRegistry $ + (,()) + <$> mkResources (Proxy @CardanoBlock) Tracer.nullTracer InMem.InMemArgs someHasFS + let sm = snapshotManager (Proxy @CardanoBlock) res codecConfig snapTracer someHasFS + ig = do + let initState = Consensus.pInfoInitLedger protoInfo + createAndPopulateStateRefFromGenesis Tracer.nullTracer res initState + ld ds = do + eResult <- + runExceptT $ + openStateRefFromSnapshot Tracer.nullTracer codecConfig someHasFS res ds + case eResult of + Left err -> pure $ Left $ textShow err + Right (cRef, _pt) -> pure $ Right cRef + pure (sm, ig, ld, pure ()) + LedgerBackendLSM mPath -> do + let lsmPath = fromMaybe (unLedgerStateDir dir "lsm") mPath + salt <- fst . genWord64 <$> newStdGen + let args = LSM.LSMArgs (mkFsPath $ splitDirectories lsmPath) salt (LSM.stdMkBlockIOFS lsmPath) + res <- + runWithTempRegistry $ + (,()) + <$> mkResources (Proxy @CardanoBlock) Tracer.nullTracer args someHasFS + let sm = snapshotManager (Proxy @CardanoBlock) res codecConfig snapTracer someHasFS + ig = do + let initState = Consensus.pInfoInitLedger protoInfo + createAndPopulateStateRefFromGenesis Tracer.nullTracer res initState + ld ds = do + eResult <- + runExceptT $ + openStateRefFromSnapshot Tracer.nullTracer codecConfig someHasFS res ds + case eResult of + Left err -> pure $ Left $ textShow err + Right (cRef, _pt) -> pure $ Right cRef + -- Close the LSM session on shutdown to release the file lock. + -- mkResources allocates the session via 'allocateTemp ... impossibleToNotTransfer', + -- which means it is intentionally NOT closed when runWithTempRegistry exits + -- (similar to bracketOnError). We release it explicitly here so the lock + -- is freed even when the same process restarts db-sync (test scenario). + let releaseLsm = releaseResources (Proxy @CardanoBlock) res + pure (sm, ig, ld, releaseLsm) + pure HasLedgerEnv { leTrace = trce @@ -181,26 +265,21 @@ mkHasLedgerEnv trce protoInfo dir nw maxLovelaceSupply systemStart syncOptions = , leSnapshotNearTipEpoch = sicNearTipEpoch $ soptSnapshotInterval syncOptions , leInterpreter = intervar , leStateVar = svar - , leStateWriteQueue = swQueue + , leSnapshotQueue = snapQueue + , leLedgerBackend = backend + , leSnapshotManager = snapMgr + , leInitGenesis = initGen + , leLoadSnapshot = loadSnap + , leClose = closeBackend } -initCardanoLedgerState :: Consensus.ProtocolInfo CardanoBlock -> CardanoLedgerState -initCardanoLedgerState pInfo = - CardanoLedgerState - { clsState = forgetLedgerTables initState - , clsTables = Consensus.projectLedgerTables initState - , clsEpochBlockNo = GenesisEpochBlockNo - } - where - initState = Consensus.pInfoInitLedger pInfo - getTopLevelconfigHasLedger :: HasLedgerEnv -> TopLevelConfig CardanoBlock getTopLevelconfigHasLedger = Consensus.pInfoConfig . leProtocolInfo readCurrentStateUnsafe :: HasLedgerEnv -> IO (ExtLedgerState CardanoBlock EmptyMK) readCurrentStateUnsafe hle = atomically - (clsState . ledgerDbCurrent <$> readStateUnsafe hle) + (clsState . srState . ledgerDbCurrent <$> readStateUnsafe hle) -- TODO make this type safe. We make the assumption here that the first message of -- the chainsync protocol is 'RollbackTo'. @@ -213,61 +292,59 @@ readStateUnsafe env = do applyBlockAndSnapshot :: HasLedgerEnv -> CardanoBlock -> Bool -> IO (ApplyResult, Bool) applyBlockAndSnapshot ledgerEnv blk isCons = do - (oldState, appResult) <- applyBlock ledgerEnv blk + (oldRef, appResult, pruned) <- applyBlock ledgerEnv blk -- 864000 seconds = 10 days; consider synced "near tip" if within 10 days of current time - tookSnapshot <- storeSnapshotAndCleanupMaybe ledgerEnv oldState appResult isCons (isSyncedWithinSeconds (apSlotDetails appResult) 864000) + tookSnapshot <- storeSnapshotAndCleanupMaybe ledgerEnv oldRef appResult isCons (isSyncedWithinSeconds (apSlotDetails appResult) 864000) + -- Close pruned states. If a snapshot was taken, wait for the snapshot thread + -- to finish writing before closing, since it may still need the table handles. + forM_ pruned $ \sr -> do + atomically $ readTVar (srCanClose sr) >>= check + close (srTables sr) pure (appResult, tookSnapshot) --- The function 'tickThenReapply' does zero validation, so add minimal validation ('blockPrevHash' --- matches the tip hash of the 'LedgerState'). This was originally for debugging but the check is --- cheap enough to keep. -applyBlock :: HasLedgerEnv -> CardanoBlock -> IO (CardanoLedgerState, ApplyResult) +-- | Apply a block: delegates to tickThenReapplyCheckHash which handles +-- LedgerDB reads, handle duplication, block application, and LedgerDB update. +-- Returns the old DbSyncStateRef (for snapshotting), ApplyResult, and pruned DbSyncStateRefs (to close). +applyBlock :: HasLedgerEnv -> CardanoBlock -> IO (DbSyncStateRef, ApplyResult, [DbSyncStateRef]) applyBlock env blk = do time <- getCurrentTime - atomically $ do - -- Read the current ledger state - !ledgerDB <- readStateUnsafe env - let oldState = ledgerDbCurrent ledgerDB - -- Calculate ledger diffs - !result <- - fromEitherSTM $ - tickThenReapplyCheckHash - (ExtLedgerCfg (getTopLevelconfigHasLedger env)) - blk - oldState - -- Extract the ledger events - let ledgerEventsFull = mapMaybe (convertAuxLedgerEvent (leHasRewards env)) (Consensus.lrEvents result) - -- Find the deposits - let (ledgerEvents, deposits) = splitDeposits ledgerEventsFull - -- Calculate DRep distribution - let !newLedgerState = finaliseDrepDistr $ clsState (Consensus.lrResult result) - -- Apply the ledger diffs - -- Construct the new ledger state - !details <- getSlotDetails env (ledgerState newLedgerState) time (cardanoBlockSlotNo blk) - !newEpoch <- fromEitherSTM $ mkOnNewEpoch (clsState oldState) newLedgerState (findAdaPots ledgerEvents) - let !newEpochBlockNo = applyToEpochBlockNo (isJust $ blockIsEBB blk) (isJust newEpoch) (clsEpochBlockNo oldState) - let !newState = CardanoLedgerState newLedgerState (clsTables $ Consensus.lrResult result) newEpochBlockNo - -- Add the new ledger state to the in-memory db - let !ledgerDB' = pushLedgerDB ledgerDB newState - writeTVar (leStateVar env) (Strict.Just ledgerDB') - let !appResult = - if leUseLedger env - then - ApplyResult - { apPrices = getPrices newState - , apGovExpiresAfter = getGovExpiration newState - , apPoolsRegistered = getRegisteredPools oldState - , apNewEpoch = maybeToStrict newEpoch - , apOldLedger = Strict.Just oldState - , apDeposits = maybeToStrict $ Generic.getDeposits newLedgerState - , apSlotDetails = details - , apStakeSlice = getStakeSlice env newState False - , apEvents = ledgerEvents - , apGovActionState = getGovState newLedgerState - , apDepositsMap = DepositsMap deposits - } - else defaultApplyResult details - pure (oldState, appResult) + !result <- + either throwIO pure + =<< tickThenReapplyCheckHash + env + (ExtLedgerCfg (getTopLevelconfigHasLedger env)) + blk + let (oldRef, newResult, pruned) = result + !oldCls = srState oldRef + -- Build ApplyResult (STM for slot details and epoch detection) + appResult <- atomically $ do + let ledgerEventsFull = mapMaybe (convertAuxLedgerEvent (leHasRewards env)) (Consensus.lrEvents newResult) + (ledgerEvents, deposits) = splitDeposits ledgerEventsFull + !rawLedgerState = clsState (Consensus.lrResult newResult) + !details <- getSlotDetails env (ledgerState rawLedgerState) time (cardanoBlockSlotNo blk) + !newEpoch <- fromEitherSTM $ mkOnNewEpoch (clsState oldCls) rawLedgerState (findAdaPots ledgerEvents) + let !newLedgerState = case newEpoch of + Just _ -> finaliseDrepDistr rawLedgerState + Nothing -> rawLedgerState + !newState = (Consensus.lrResult newResult) {clsState = newLedgerState} + pure $ + if leUseLedger env + then + ApplyResult + { apPrices = getPrices newState + , apGovExpiresAfter = getGovExpiration newState + , apPoolsRegistered = getRegisteredPools oldCls + , apNewEpoch = maybeToStrict newEpoch + , apOldLedger = Strict.Just oldCls + , apDeposits = maybeToStrict $ Generic.getDeposits newLedgerState + , apSlotDetails = details + , apStakeSlice = getStakeSlice env newState False + , apEvents = ledgerEvents + , apGovActionState = getGovState newLedgerState + , apDepositsMap = DepositsMap deposits + } + else defaultApplyResult details + pure (oldRef, appResult, pruned) where mkOnNewEpoch :: ExtLedgerState CardanoBlock mk -> ExtLedgerState CardanoBlock mk -> Maybe AdaPots -> Either SyncNodeError (Maybe Generic.NewEpoch) mkOnNewEpoch oldState newState mPots = do @@ -304,13 +381,6 @@ applyBlock env blk = do fromIntegral (leMaxSupply env) - unCoin (sumAdaPots adaPots) } - applyToEpochBlockNo :: Bool -> Bool -> EpochBlockNo -> EpochBlockNo - applyToEpochBlockNo True _ _ = EBBEpochBlockNo - applyToEpochBlockNo _ True _ = EpochBlockNo 0 - applyToEpochBlockNo _ _ (EpochBlockNo n) = EpochBlockNo (n + 1) - applyToEpochBlockNo _ _ GenesisEpochBlockNo = EpochBlockNo 0 - applyToEpochBlockNo _ _ EBBEpochBlockNo = EpochBlockNo 0 - getDrepState :: ExtLedgerState CardanoBlock mk -> Maybe (DRepPulsingState ConwayEra) getDrepState ls = ls ^? newEpochStateT . Shelley.newEpochStateDRepPulsingStateL @@ -333,11 +403,11 @@ getStakeSlice env cls isMigration = n (clsState cls) isMigration - _ -> Generic.NoSlices + ByronEpochBlockNo -> Generic.NoSlices storeSnapshotAndCleanupMaybe :: HasLedgerEnv -> - CardanoLedgerState -> + DbSyncStateRef -> ApplyResult -> Bool -> SyncState -> @@ -355,390 +425,63 @@ storeSnapshotAndCleanupMaybe env oldState appResult isCons syncState = pure True _ -> pure False -saveCurrentLedgerState :: HasLedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () -saveCurrentLedgerState env lState mEpochNo = do - case mkLedgerStateFilename (leDir env) (clsState lState) mEpochNo of - Origin -> pure () -- we don't store genesis - At file -> do - exists <- doesFileExist file - if exists - then - logInfo (leTrace env) $ - mconcat - ["File ", Text.pack file, " exists"] - else atomically $ writeTBQueue (leStateWriteQueue env) (file, lState) - -runLedgerStateWriteThread :: Trace IO Text -> LedgerEnv -> IO () -runLedgerStateWriteThread tracer lenv = - case lenv of - HasLedger le -> ledgerStateWriteLoop tracer (leStateWriteQueue le) (configCodec $ getTopLevelconfigHasLedger le) - NoLedger _ -> forever $ threadDelay 600000000 -- 10 minutes - -ledgerStateWriteLoop :: Trace IO Text -> TBQueue (FilePath, CardanoLedgerState) -> CodecConfig CardanoBlock -> IO () -ledgerStateWriteLoop tracer swQueue codecConfig = - loop - where - loop :: IO () - loop = do - (file, ledger) <- atomically $ readTBQueue swQueue -- Blocks until the queue has elements. - writeLedgerStateFile file ledger - loop - - writeLedgerStateFile :: FilePath -> CardanoLedgerState -> IO () - writeLedgerStateFile file ledger = do - startTime <- getCurrentTime - -- TODO: write the builder directly. - -- BB.writeFile file $ toBuilder $ - LBS.writeFile file $ - Serialize.serialize $ - encodeCardanoLedgerState - ( Consensus.encodeExtLedgerState - (encodeDisk codecConfig) - (encodeDisk codecConfig) - (encodeDisk codecConfig) - . forgetLedgerTables - ) - ledger - endTime <- getCurrentTime - logInfo tracer $ - mconcat - [ "Asynchronously wrote a ledger snapshot to " - , Text.pack file - , " in " - , textShow (diffUTCTime endTime startTime) - , "." - ] - -mkLedgerStateFilename :: LedgerStateDir -> ExtLedgerState CardanoBlock mk -> Maybe EpochNo -> WithOrigin FilePath -mkLedgerStateFilename dir ledger mEpochNo = - lsfFilePath - . dbPointToFileName dir mEpochNo - <$> getPoint (Consensus.ledgerTipPoint @CardanoBlock (ledgerState ledger)) - -saveCleanupState :: HasLedgerEnv -> CardanoLedgerState -> Maybe EpochNo -> IO () -saveCleanupState env ledger mEpochNo = do - let st = clsState ledger - saveCurrentLedgerState env ledger mEpochNo - cleanupLedgerStateFiles env $ - fromWithOrigin (SlotNo 0) (Consensus.ledgerTipSlot $ ledgerState st) - hashToAnnotation :: ByteString -> ByteString hashToAnnotation = Base16.encode . BS.take 5 -mkRawHash :: HeaderHash CardanoBlock -> ByteString -mkRawHash = toRawHash (Proxy @CardanoBlock) - -mkShortHash :: HeaderHash CardanoBlock -> ByteString -mkShortHash = hashToAnnotation . mkRawHash - -dbPointToFileName :: LedgerStateDir -> Maybe EpochNo -> Point.Block SlotNo (HeaderHash CardanoBlock) -> LedgerStateFile -dbPointToFileName (LedgerStateDir stateDir) mEpochNo (Point.Block slot hash) = - LedgerStateFile - { lsfSlotNo = slot - , lsfHash = shortHash - , lsNewEpoch = maybeToStrict mEpochNo - , lsfFilePath = - mconcat - [ stateDir show (unSlotNo slot) - , "-" - , BS.unpack shortHash - , epochSuffix - , ".lstate" - ] - } - where - shortHash :: ByteString - shortHash = mkShortHash hash - - epochSuffix :: String - epochSuffix = - case mEpochNo of - Nothing -> "" - Just epoch -> "-" ++ show (unEpochNo epoch) - -parseLedgerStateFileName :: LedgerStateDir -> FilePath -> Maybe LedgerStateFile -parseLedgerStateFileName (LedgerStateDir stateDir) fp = - case break (== '-') (dropExtension fp) of - (slotStr, '-' : hashEpoch) -> do - slot <- readMaybe slotStr - case break (== '-') hashEpoch of - (hash, '-' : suffix) | Just epochNo <- readMaybe suffix -> do - Just $ build (BS.pack hash) slot (Just epochNo) - (hash, "") -> - Just $ build (BS.pack hash) slot Nothing - _otherwise -> Nothing - _otherwise -> Nothing - where - build :: ByteString -> Word64 -> Maybe Word64 -> LedgerStateFile - build hash slot mEpochNo = - LedgerStateFile - { lsfSlotNo = SlotNo slot - , lsfHash = hash - , lsNewEpoch = maybeToStrict $ EpochNo <$> mEpochNo - , lsfFilePath = stateDir fp - } - -- ------------------------------------------------------------------------------------------------- -cleanupLedgerStateFiles :: HasLedgerEnv -> SlotNo -> IO () -cleanupLedgerStateFiles env slotNo = do - files <- listLedgerStateFilesOrdered (leDir env) - let (epochBoundary, valid, invalid) = foldr groupFiles ([], [], []) files - -- Remove invalid (ie SlotNo >= current) ledger state files (occurs on rollback). - deleteAndLogFiles env "invalid" invalid - -- Remove all but 2 most recent state files. - deleteAndLogStateFile env "old" (List.drop 2 valid) - -- Remove all but 3 most recent epoch boundary state files. - deleteAndLogStateFile env "old epoch boundary" (List.drop 3 epochBoundary) - where - groupFiles :: - LedgerStateFile -> - ([LedgerStateFile], [LedgerStateFile], [FilePath]) -> - ([LedgerStateFile], [LedgerStateFile], [FilePath]) -- (epochBoundary, valid, invalid) - groupFiles lFile (epochBoundary, regularFile, invalid) - | lsfSlotNo lFile > slotNo = - (epochBoundary, regularFile, lsfFilePath lFile : invalid) - | Strict.Just _ <- lsNewEpoch lFile = - (lFile : epochBoundary, regularFile, invalid) - | otherwise = - (epochBoundary, lFile : regularFile, invalid) - -loadLedgerAtPoint :: HasLedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) +loadLedgerAtPoint :: HasLedgerEnv -> CardanoPoint -> IO (Either [DiskSnapshot] CardanoLedgerState) loadLedgerAtPoint hasLedgerEnv point = do mLedgerDB <- atomically $ readTVar $ leStateVar hasLedgerEnv - -- First try to find the ledger in memory - let mAnchoredSeq = rollbackLedger mLedgerDB - case mAnchoredSeq of + let (mStates, dropped) = rollbackLedger mLedgerDB + case mStates of Nothing -> do - -- Ledger states are growing to become very big in memory. - -- Before parsing the new ledger state we need to make sure the old states - -- are or can be garbage collected. writeLedgerState hasLedgerEnv Strict.Nothing + closeDroppedHandles dropped performMajorGC - mst <- findStateFromPoint hasLedgerEnv point + mst <- findStateFromSnapshot hasLedgerEnv point case mst of - Right st -> do - writeLedgerState hasLedgerEnv (Strict.Just . LedgerDB $ AS.Empty st) - logInfo (leTrace hasLedgerEnv) $ mconcat ["Found snapshot file for ", renderPoint point] - pure $ Right st - Left lsfs -> pure $ Left lsfs - Just anchoredSeq' -> do + Right sr -> do + writeLedgerState hasLedgerEnv (Strict.Just . LedgerDB $ SSeq.singleton sr) + logInfo (leTrace hasLedgerEnv) $ mconcat ["Found snapshot for ", renderPoint point] + pure $ Right (srState sr) + Left dss -> pure $ Left dss + Just states' -> do logInfo (leTrace hasLedgerEnv) $ mconcat ["Found in memory ledger snapshot at ", renderPoint point] - let ledgerDB' = LedgerDB anchoredSeq' - let st = ledgerDbCurrent ledgerDB' - deleteNewerFiles hasLedgerEnv point + let ledgerDB' = LedgerDB states' + let sr = ledgerDbCurrent ledgerDB' + deleteNewerSnapshots hasLedgerEnv point writeLedgerState hasLedgerEnv $ Strict.Just ledgerDB' - pure $ Right st + closeDroppedHandles dropped + pure $ Right (srState sr) where + -- \| Returns (kept states or Nothing, dropped states to close) rollbackLedger :: Strict.Maybe LedgerDB -> - Maybe (AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState) + (Maybe (StrictSeq DbSyncStateRef), [DbSyncStateRef]) rollbackLedger mLedgerDB = case mLedgerDB of - Strict.Nothing -> Nothing + Strict.Nothing -> (Nothing, []) Strict.Just ledgerDB -> - AS.rollback (pointSlot point) (const True) (ledgerDbCheckpoints ledgerDB) - -deleteNewerFiles :: HasLedgerEnv -> CardanoPoint -> IO () -deleteNewerFiles env point = do - files <- listLedgerStateFilesOrdered (leDir env) - -- Genesis can be reproduced from configuration. - -- TODO: We can make this a monadic action (reread config from disk) to save some memory. - case getPoint point of - Origin -> do - deleteAndLogStateFile env "newer" files - At blk -> do - let (newerFiles, _found, _olderFiles) = - findLedgerStateFile files (Point.blockPointSlot blk, mkRawHash $ Point.blockPointHash blk) - deleteAndLogStateFile env "newer" newerFiles - -deleteAndLogFiles :: HasLedgerEnv -> Text -> [FilePath] -> IO () -deleteAndLogFiles env descr files = - case files of - [] -> pure () - [fl] -> do - logInfo (leTrace env) $ mconcat ["Removing ", descr, " file ", Text.pack fl] - safeRemoveFile fl - _ -> do - logInfo (leTrace env) $ mconcat ["Removing ", descr, " files ", textShow files] - mapM_ safeRemoveFile files - -deleteAndLogStateFile :: HasLedgerEnv -> Text -> [LedgerStateFile] -> IO () -deleteAndLogStateFile env descr lsfs = deleteAndLogFiles env descr (lsfFilePath <$> lsfs) - -findStateFromPoint :: HasLedgerEnv -> CardanoPoint -> IO (Either [LedgerStateFile] CardanoLedgerState) -findStateFromPoint env point = do - files <- listLedgerStateFilesOrdered (leDir env) - -- Genesis can be reproduced from configuration. - -- TODO: We can make this a monadic action (reread config from disk) to save some memory. - case getPoint point of - Origin -> do - deleteAndLogStateFile env "newer" files - pure . Right $ initCardanoLedgerState (leProtocolInfo env) - At blk -> do - let (newerFiles, found, olderFiles) = - findLedgerStateFile files (Point.blockPointSlot blk, mkRawHash $ Point.blockPointHash blk) - deleteAndLogStateFile env "newer" newerFiles - case found of - Just lsf -> do - mState <- loadLedgerStateFromFile (leTrace env) (getTopLevelconfigHasLedger env) False point lsf - case mState of - Left err -> do - deleteLedgerFile err lsf - logNewerFiles olderFiles - pure $ Left olderFiles - Right st -> pure $ Right st - Nothing -> do - logNewerFiles olderFiles - pure $ Left olderFiles - where - deleteLedgerFile :: Text -> LedgerStateFile -> IO () - deleteLedgerFile err lsf = do - logWarning (leTrace env) $ - mconcat - [ "Failed to parse ledger state file " - , Text.pack (lsfFilePath lsf) - , " with error '" - , err - , "'. Deleting it." - ] - safeRemoveFile $ lsfFilePath lsf - - logNewerFiles :: [LedgerStateFile] -> IO () - logNewerFiles lsfs = - logWarning (leTrace env) $ - case lsfs of - [] -> "Rollback failed. No more ledger state files." - (x : _) -> mconcat ["Needs to Rollback further to slot ", textShow (unSlotNo $ lsfSlotNo x)] - --- Splits the files based on the comparison with the given point. It will return --- a list of newer files, a file at the given point if found and a list of older --- files. All lists of files should be ordered most recent first. --- --- Newer files can be deleted --- File at the exact point can be used to initial the LedgerState --- Older files can be used to rollback even further. --- --- Files with same slot, but different hash are considered newer. -findLedgerStateFile :: - [LedgerStateFile] -> - (SlotNo, ByteString) -> - ([LedgerStateFile], Maybe LedgerStateFile, [LedgerStateFile]) -findLedgerStateFile files pointPair = - go [] files - where - go newerFiles [] = (reverse newerFiles, Nothing, []) - go newerFiles (file : rest) = - case comparePointToFile file pointPair of - EQ -> (reverse newerFiles, Just file, rest) -- found the file we were looking for - LT -> (reverse newerFiles, Nothing, file : rest) -- found an older file first - GT -> go (file : newerFiles) rest -- keep looking on older files - -comparePointToFile :: LedgerStateFile -> (SlotNo, ByteString) -> Ordering -comparePointToFile lsf (blSlotNo, blHash) = - case compare (lsfSlotNo lsf) blSlotNo of - EQ -> - if hashToAnnotation blHash == lsfHash lsf - then EQ - else GT - x -> x - -loadLedgerStateFromFile :: Trace IO Text -> TopLevelConfig CardanoBlock -> Bool -> CardanoPoint -> LedgerStateFile -> IO (Either Text CardanoLedgerState) -loadLedgerStateFromFile tracer config delete point lsf = do - mst <- safeReadFile (lsfFilePath lsf) - case mst of - Left err -> when delete (safeRemoveFile $ lsfFilePath lsf) >> pure (Left err) - Right st -> pure $ Right st - where - safeReadFile :: FilePath -> IO (Either Text CardanoLedgerState) - safeReadFile fp = do - startTime <- getCurrentTime - mbs <- Exception.try $ BS.readFile fp - case mbs of - Left (err :: IOException) -> pure $ Left (Text.pack $ displayException err) - Right bs -> do - mediumTime <- getCurrentTime - case decode bs of - Left err -> pure $ Left $ textShow err - Right ls -> do - endTime <- getCurrentTime - logInfo tracer $ - mconcat - [ "Found snapshot file for " - , renderPoint point - , ". It took " - , textShow (diffUTCTime mediumTime startTime) - , " to read from disk and " - , textShow (diffUTCTime endTime mediumTime) - , " to parse." - ] - pure $ Right ls - - codecConfig :: CodecConfig CardanoBlock - codecConfig = configCodec config - - decode :: ByteString -> Either DecoderError CardanoLedgerState - decode = do - Serialize.decodeFullDecoder - "Ledger state file" - decodeState - . LBS.fromStrict - - decodeState :: (forall s. Decoder s CardanoLedgerState) - decodeState = - decodeCardanoLedgerState $ - Consensus.decodeExtLedgerState - (decodeDisk codecConfig) - (decodeDisk codecConfig) - (decodeDisk codecConfig) - -getSlotNoSnapshot :: SnapshotPoint -> WithOrigin SlotNo -getSlotNoSnapshot (OnDisk lsf) = at $ lsfSlotNo lsf -getSlotNoSnapshot (InMemory cp) = pointSlot cp - -listKnownSnapshots :: HasLedgerEnv -> IO [SnapshotPoint] -listKnownSnapshots env = do - inMem <- fmap InMemory <$> listMemorySnapshots env - onDisk <- fmap OnDisk <$> listLedgerStateFilesOrdered (leDir env) - pure $ List.sortOn (Down . getSlotNoSnapshot) $ inMem <> onDisk - -listMemorySnapshots :: HasLedgerEnv -> IO [CardanoPoint] -listMemorySnapshots env = do - mState <- atomically $ readTVar $ leStateVar env - case mState of - Strict.Nothing -> pure [] - Strict.Just ledgerDB -> - pure $ - filter - notGenesis - (castPoint . Consensus.getTip . clsState <$> getEdgePoints ledgerDB) - where - getEdgePoints ldb = - case AS.toNewestFirst $ ledgerDbCheckpoints ldb of - [] -> [] - [a] -> [a] - (h : ls) -> catMaybes [Just h, lastMay ls] - notGenesis GenesisPoint = False - notGenesis (BlockPoint _ _) = True - --- Get a list of the ledger state files order most recent -listLedgerStateFilesOrdered :: LedgerStateDir -> IO [LedgerStateFile] -listLedgerStateFilesOrdered dir = do - files <- filter isLedgerStateFile <$> listDirectory (unLedgerStateDir dir) - pure . List.sortBy revSlotNoOrder $ mapMaybe (parseLedgerStateFileName dir) files - where - isLedgerStateFile :: FilePath -> Bool - isLedgerStateFile fp = takeExtension fp == ".lstate" - - revSlotNoOrder :: LedgerStateFile -> LedgerStateFile -> Ordering - revSlotNoOrder a b = compare (lsfSlotNo b) (lsfSlotNo a) + let allEntries = toList $ ledgerDbCheckpoints ledgerDB + (newer, older) = + List.span + (\sr -> Consensus.getTipSlot (clsState (srState sr)) > pointSlot point) + allEntries + kept = SSeq.fromList older + in if SSeq.null kept + then (Nothing, allEntries) + else (Just kept, newer) + + -- \| Close handles from dropped states. + -- Waits for the snapshot writer to finish if any handle is still being used. + closeDroppedHandles :: [DbSyncStateRef] -> IO () + closeDroppedHandles refs = forM_ refs $ \sr -> do + atomically $ readTVar (srCanClose sr) >>= check + close (srTables sr) writeLedgerState :: HasLedgerEnv -> Strict.Maybe LedgerDB -> IO () writeLedgerState env mLedgerDb = atomically $ writeTVar (leStateVar env) mLedgerDb --- | Remove given file path and ignore any IOEXceptions. -safeRemoveFile :: FilePath -> IO () -safeRemoveFile fp = handle (\(_ :: IOException) -> pure ()) $ removeFile fp - getRegisteredPools :: CardanoLedgerState -> Set.Set PoolKeyHash getRegisteredPools st = case ledgerState $ clsState st of @@ -765,6 +508,17 @@ getRegisteredPoolShelley lState = Consensus.shelleyLedgerState lState in certState ^. Shelley.certPStateL . Shelley.psStakePoolsL +isByronLedger :: ExtLedgerState CardanoBlock mk -> Bool +isByronLedger st = case ledgerState st of + LedgerStateByron _ -> True + _ -> False + +applyToEpochBlockNo :: Bool -> Bool -> EpochBlockNo -> EpochBlockNo +applyToEpochBlockNo True _ _ = ByronEpochBlockNo -- Byron era +applyToEpochBlockNo _ True _ = EpochBlockNo 0 -- Shelley+ new epoch +applyToEpochBlockNo _ _ (EpochBlockNo n) = EpochBlockNo (n + 1) +applyToEpochBlockNo _ _ ByronEpochBlockNo = EpochBlockNo 0 -- first Shelley block + ledgerEpochNo :: HasLedgerEnv -> ExtLedgerState CardanoBlock mk -> Either SyncNodeError (Maybe EpochNo) ledgerEpochNo env cls = case Consensus.ledgerTipSlot (ledgerState cls) of @@ -777,51 +531,67 @@ ledgerEpochNo env cls = epochInfo :: EpochInfo (Except Consensus.PastHorizonException) epochInfo = epochInfoLedger (configLedger $ getTopLevelconfigHasLedger env) (hardForkLedgerStatePerEra $ ledgerState cls) --- Like 'Consensus.tickThenReapply' but also checks that the previous hash from the block matches --- the head hash of the ledger state. +-- | Apply a block to the current LedgerDB state. Reads the current DbSyncStateRef, +-- duplicates its handle, applies the block, pushes diffs, creates a new DbSyncStateRef, +-- and updates the LedgerDB. Returns the old DbSyncStateRef (for snapshotting), +-- the new CardanoLedgerState (for ApplyResult), and pruned DbSyncStateRefs (to close). tickThenReapplyCheckHash :: + HasLedgerEnv -> ExtLedgerCfg CardanoBlock -> CardanoBlock -> - CardanoLedgerState -> - Either - SyncNodeError - ( LedgerResult - (ExtLedgerState CardanoBlock) - CardanoLedgerState + IO + ( Either + SyncNodeError + ( DbSyncStateRef -- old state ref (for snapshotting) + , LedgerResult (ExtLedgerState CardanoBlock) CardanoLedgerState -- new state + , [DbSyncStateRef] -- pruned refs to close + ) ) -tickThenReapplyCheckHash cfg block state'@CardanoLedgerState {..} = - if blockPrevHash block == Consensus.ledgerTipHash (ledgerState clsState) - then - let - -- Get utxo keys set to update - keys :: LedgerTables (ExtLedgerState CardanoBlock) KeysMK - keys = Consensus.getBlockKeySets block - -- Get the current ledger tables - ledgerTables = Consensus.getLedgerTables clsTables - -- Limit ledger tables to utxo keys above - restrictedTables = restrictValuesMK ledgerTables (Consensus.getLedgerTables keys) - -- Attach the tables back to the ledger state - ledgerState' = Consensus.withLedgerTables clsState (Consensus.LedgerTables restrictedTables) - -- Apply the block - newLedgerState = - Consensus.tickThenReapplyLedgerResult Consensus.ComputeLedgerEvents cfg block ledgerState' - in - Right $ - fmap - ( \stt -> - state' - { clsState = forgetLedgerTables stt - , clsTables = - Consensus.LedgerTables - . applyDiffsMK ledgerTables - . Consensus.getLedgerTables - . Consensus.projectLedgerTables - $ stt - } - ) - newLedgerState +tickThenReapplyCheckHash env cfg block = do + -- Read the current state from LedgerDB + (ledgerDB, oldRef) <- atomically $ do + !db <- readStateUnsafe env + pure (db, ledgerDbCurrent db) + let !oldCls = srState oldRef + if blockPrevHash block == Consensus.ledgerTipHash (ledgerState (clsState oldCls)) + then do + -- Create a new handle first, then read from it + let keys = Consensus.getBlockKeySets block + restrictedTables <- read (srTables oldRef) (clsState oldCls) keys + -- Attach the tables to the ledger state and apply the block + let ledgerState' = Consensus.withLedgerTables (clsState oldCls) restrictedTables + newLedgerResult = + Consensus.tickThenReapplyLedgerResult Consensus.ComputeLedgerEvents cfg block ledgerState' + newLedgerState = forgetLedgerTables $ Consensus.lrResult newLedgerResult + isNewEpoch = case (ledgerEpochNo env (clsState oldCls), ledgerEpochNo env newLedgerState) of + (Right oldE, Right newE) -> oldE /= newE + _ -> False + !newEpochBlockNo = applyToEpochBlockNo (isByronLedger newLedgerState) isNewEpoch (clsEpochBlockNo oldCls) + -- Build pure CardanoLedgerState from result + newCls = + fmap + ( \stt -> + CardanoLedgerState + { clsState = forgetLedgerTables stt + , clsEpochBlockNo = newEpochBlockNo + } + ) + newLedgerResult + -- Push diffs to the new handle + newHandle <- duplicateWithDiffs (srTables oldRef) (clsState oldCls) (Consensus.lrResult newLedgerResult) + -- Create new DbSyncStateRef and push to LedgerDB + canClose <- newTVarIO True + let !newRef = + DbSyncStateRef + { srState = Consensus.lrResult newCls + , srTables = newHandle + , srCanClose = canClose + } + let (!ledgerDB', !prunedRefs) = pushLedgerDB ledgerDB newRef + atomically $ writeTVar (leStateVar env) (Strict.Just ledgerDB') + pure $ Right (oldRef, newCls, prunedRefs) else - Left $ + pure . Left $ SNErrLedgerState $ mconcat [ "Ledger state hash mismatch. Ledger head is slot " @@ -829,11 +599,11 @@ tickThenReapplyCheckHash cfg block state'@CardanoLedgerState {..} = ( unSlotNo $ fromWithOrigin (SlotNo 0) - (Consensus.ledgerTipSlot $ ledgerState clsState) + (Consensus.ledgerTipSlot $ ledgerState (clsState oldCls)) ) , " hash " , Text.unpack $ - renderByteArray (Cardano.unChainHash (Consensus.ledgerTipHash $ ledgerState clsState)) + renderByteArray (Cardano.unChainHash (Consensus.ledgerTipHash $ ledgerState (clsState oldCls))) , " but block previous hash is " , Text.unpack $ renderByteArray (Cardano.unChainHash $ blockPrevHash block) diff --git a/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs b/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs index 477236c75..93d130cdb 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Ledger/Types.hs @@ -1,7 +1,6 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} @@ -11,8 +10,7 @@ module Cardano.DbSync.Ledger.Types where import Cardano.BM.Trace (Trace) -import Cardano.Binary (Decoder, Encoding, FromCBOR (..), ToCBOR (..)) -import Cardano.DbSync.Config.Types (LedgerStateDir) +import Cardano.DbSync.Config.Types (LedgerBackend (..), LedgerStateDir) import qualified Cardano.DbSync.Era.Shelley.Generic as Generic import Cardano.DbSync.Ledger.Event (LedgerEvent) import Cardano.DbSync.Types ( @@ -29,40 +27,46 @@ import Cardano.Ledger.Conway.Governance import Cardano.Ledger.Credential (Credential (..)) import Cardano.Ledger.Keys (KeyRole (..)) import Cardano.Ledger.Shelley.LedgerState (NewEpochState ()) +import qualified Cardano.Ledger.Shelley.LedgerState as Shelley import Cardano.Prelude hiding (atomically) import Cardano.Slotting.Slot ( EpochNo (..), - SlotNo (..), - WithOrigin (..), ) import Control.Concurrent.Class.MonadSTM.Strict ( StrictTVar, + newTVarIO, ) import Control.Concurrent.STM.TBQueue (TBQueue) +import Data.List.NonEmpty () import qualified Data.Map.Strict as Map import Data.SOP.Functors (Flip (..)) import Data.SOP.Strict +import Data.Sequence.Strict (StrictSeq) import qualified Data.Set as Set import qualified Data.Strict.Maybe as Strict -import Lens.Micro (Traversal') +import Lens.Micro (Traversal', (^.)) import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..)) import Ouroboros.Consensus.Cardano.Block hiding (CardanoBlock, CardanoLedgerState) import Ouroboros.Consensus.HardFork.Combinator.Basics (LedgerState (..)) -import Ouroboros.Consensus.Ledger.Abstract (getTipSlot) -import Ouroboros.Consensus.Ledger.Basics (EmptyMK, LedgerTables, ValuesMK) +import Ouroboros.Consensus.Ledger.Abstract () +import Ouroboros.Consensus.Ledger.Basics (EmptyMK) import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..)) -import Ouroboros.Consensus.Ledger.Tables (valuesMKDecoder, valuesMKEncoder) import qualified Ouroboros.Consensus.Node.ProtocolInfo as Consensus import Ouroboros.Consensus.Shelley.Ledger (LedgerState (..), ShelleyBlock) -import Ouroboros.Network.AnchoredSeq (Anchorable (..), AnchoredSeq (..)) -import Prelude (fail, id) +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots (DiskSnapshot, SnapshotManager) +import Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq (LedgerTablesHandle (..)) +import qualified Ouroboros.Consensus.Storage.LedgerDB.V2.LedgerSeq as Consensus (StateRef (..)) +import Prelude (id) -------------------------------------------------------------------------- -- Ledger Types -------------------------------------------------------------------------- +-- | Consensus StateRef type used for snapshot operations. +type ConsensusStateRef = Consensus.StateRef IO (ExtLedgerState CardanoBlock) + data HasLedgerEnv = HasLedgerEnv - { leTrace :: Trace IO Text + { leTrace :: !(Trace IO Text) , leUseLedger :: !Bool , leHasRewards :: !Bool , leProtocolInfo :: !(Consensus.ProtocolInfo CardanoBlock) @@ -74,64 +78,86 @@ data HasLedgerEnv = HasLedgerEnv , leSnapshotNearTipEpoch :: !Word64 , leInterpreter :: !(StrictTVar IO (Strict.Maybe CardanoInterpreter)) , leStateVar :: !(StrictTVar IO (Strict.Maybe LedgerDB)) - , leStateWriteQueue :: !(TBQueue (FilePath, CardanoLedgerState)) + , leSnapshotQueue :: !(TBQueue DbSyncStateRef) + -- ^ Queue for async snapshot writing + , leLedgerBackend :: !LedgerBackend + , leSnapshotManager :: !(SnapshotManager IO IO CardanoBlock ConsensusStateRef) + -- ^ Consensus snapshot manager for save/load/list/cleanup + , leInitGenesis :: !(IO ConsensusStateRef) + -- ^ Create the initial consensus StateRef from genesis + , leLoadSnapshot :: !(DiskSnapshot -> IO (Either Text ConsensusStateRef)) + -- ^ Load a snapshot from disk using the appropriate backend + , leClose :: !(IO ()) + -- ^ Release backend resources (LSM session, registry, etc.). Call on shutdown. } +-- | Pure ledger state, stored in LedgerDB checkpoints. +-- Does not hold handle or close-related resources. +-- | Block number within the current epoch. Used by getStakeSlice +-- to insert epoch stake incrementally. +data EpochBlockNo + = -- | Shelley+: block number within epoch + EpochBlockNo !Word64 + | -- | Byron: no stake slicing needed + ByronEpochBlockNo + data CardanoLedgerState = CardanoLedgerState { clsState :: !(ExtLedgerState CardanoBlock EmptyMK) - , clsTables :: !(LedgerTables (ExtLedgerState CardanoBlock) ValuesMK) , clsEpochBlockNo :: !EpochBlockNo } --- The height of the block in the current Epoch. We maintain this --- data next to the ledger state and store it in the same blob file. -data EpochBlockNo - = GenesisEpochBlockNo - | EBBEpochBlockNo - | EpochBlockNo !Word64 - -instance ToCBOR EpochBlockNo where - toCBOR GenesisEpochBlockNo = toCBOR (0 :: Word8) - toCBOR EBBEpochBlockNo = toCBOR (1 :: Word8) - toCBOR (EpochBlockNo n) = - toCBOR (2 :: Word8) <> toCBOR n - -instance FromCBOR EpochBlockNo where - fromCBOR = do - tag :: Word8 <- fromCBOR - case tag of - 0 -> pure GenesisEpochBlockNo - 1 -> pure EBBEpochBlockNo - 2 -> EpochBlockNo <$> fromCBOR - n -> fail $ "unexpected EpochBlockNo value " <> show n - -encodeCardanoLedgerState :: - (ExtLedgerState CardanoBlock EmptyMK -> Encoding) -> - CardanoLedgerState -> - Encoding -encodeCardanoLedgerState encodeExt cls = - mconcat - [ encodeExt (clsState cls) - , toCBOR (clsEpochBlockNo cls) - , valuesMKEncoder (clsState cls) (clsTables cls) - ] - -decodeCardanoLedgerState :: - (forall s. Decoder s (ExtLedgerState CardanoBlock EmptyMK)) -> - (forall s. Decoder s CardanoLedgerState) -decodeCardanoLedgerState decodeExt = do - lState <- decodeExt - eBlockNo <- fromCBOR - lTables <- valuesMKDecoder lState - pure $ CardanoLedgerState lState lTables eBlockNo - -data LedgerStateFile = LedgerStateFile - { lsfSlotNo :: !SlotNo - , lsfHash :: !ByteString - , lsNewEpoch :: !(Strict.Maybe EpochNo) - , lsfFilePath :: !FilePath +-- | Full state with handle, used during block application and snapshotting. +data DbSyncStateRef = DbSyncStateRef + { srState :: !CardanoLedgerState + , srTables :: !(LedgerTablesHandle IO (ExtLedgerState CardanoBlock)) + , srCanClose :: !(StrictTVar IO Bool) } - deriving (Show) + +-- | Derive EpochBlockNo from the ledger state. +-- For Shelley+, sums BlocksMade (nesBcur) to get the block count in the current epoch. +-- For Byron, returns ByronEpochBlockNo. +deriveEpochBlockNo :: ExtLedgerState CardanoBlock mk -> EpochBlockNo +deriveEpochBlockNo st = + case ledgerState st of + LedgerStateByron _ -> ByronEpochBlockNo + LedgerStateShelley sls -> countBlocks sls + LedgerStateAllegra als -> countBlocks als + LedgerStateMary mls -> countBlocks mls + LedgerStateAlonzo als -> countBlocks als + LedgerStateBabbage bls -> countBlocks bls + LedgerStateConway cls -> countBlocks cls + LedgerStateDijkstra dls -> countBlocks dls + where + countBlocks :: LedgerState (ShelleyBlock p era) mk -> EpochBlockNo + countBlocks lstate = + let nes = shelleyLedgerState lstate + bm = nes ^. Shelley.nesBcurL + in EpochBlockNo $ fromIntegral $ sum bm + +-- | Convert a db-sync StateRef to a consensus StateRef for snapshot operations. +toConsensusStateRef :: DbSyncStateRef -> ConsensusStateRef +toConsensusStateRef sr = Consensus.StateRef (clsState $ srState sr) (srTables sr) + +-- | Convert a consensus StateRef to a db-sync DbSyncStateRef. +fromConsensusStateRef :: EpochBlockNo -> ConsensusStateRef -> IO DbSyncStateRef +fromConsensusStateRef ebn (Consensus.StateRef st tbl) = do + canClose <- newTVarIO True + pure + DbSyncStateRef + { srState = + CardanoLedgerState + { clsState = st + , clsEpochBlockNo = ebn + } + , srTables = tbl + , srCanClose = canClose + } + +-- | Create initial db-sync state from genesis using the consensus API. +initCardanoLedgerState :: HasLedgerEnv -> IO DbSyncStateRef +initCardanoLedgerState env = do + consensusRef <- leInitGenesis env + fromConsensusStateRef ByronEpochBlockNo consensusRef newtype DepositsMap = DepositsMap {unDepositsMap :: Map ByteString Coin} @@ -196,15 +222,13 @@ updatedCommittee membersToRemove membersToAdd newQuorum committee = newCommitteeMembers newQuorum +-- | In-memory ledger DB. Checkpoints are stored newest-first. +-- Uses StrictSeq for strict spine and elements. newtype LedgerDB = LedgerDB - { ledgerDbCheckpoints :: AnchoredSeq (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState + { ledgerDbCheckpoints :: StrictSeq DbSyncStateRef } -instance Anchorable (WithOrigin SlotNo) CardanoLedgerState CardanoLedgerState where - asAnchor = id - getAnchorMeasure _ = getTipSlot . clsState - -data SnapshotPoint = OnDisk LedgerStateFile | InMemory CardanoPoint +data SnapshotPoint = OnDisk DiskSnapshot | InMemory CardanoPoint -- | Per-era pure getters and setters on @NewEpochState@. Note this is a bit of an abuse -- of the cardano-ledger/ouroboros-consensus public APIs, because ledger state is not diff --git a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs index 60213d335..7da383e4d 100644 --- a/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs +++ b/cardano-db-sync/src/Cardano/DbSync/LocalStateQuery.hs @@ -55,7 +55,7 @@ import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as StateQuery import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure, Target (..)) data NoLedgerEnv = NoLedgerEnv - { nleTracer :: Trace IO Text + { nleTracer :: !(Trace IO Text) , nleSystemStart :: !SystemStart , nleQueryVar :: StateQueryTMVar CardanoBlock CardanoInterpreter , nleHistoryInterpreterVar :: StrictTVar IO (Strict.Maybe CardanoInterpreter) diff --git a/cardano-db-sync/src/Cardano/DbSync/Rollback.hs b/cardano-db-sync/src/Cardano/DbSync/Rollback.hs index 900cfd772..85049039a 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Rollback.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Rollback.hs @@ -29,11 +29,12 @@ import Cardano.DbSync.Api.Types (LedgerEnv (..), SyncEnv (..)) import Cardano.DbSync.Cache import Cardano.DbSync.DbEvent (liftDbLookup) import Cardano.DbSync.Error (SyncNodeError (..), logAndThrowIO, mkSyncNodeCallStack) -import Cardano.DbSync.Ledger.State (listKnownSnapshots, loadLedgerAtPoint, saveCleanupState, writeLedgerState) -import Cardano.DbSync.Ledger.Types (CardanoLedgerState (..), SnapshotPoint (..)) +import Cardano.DbSync.Ledger.State (ledgerDbCurrent, listKnownSnapshots, loadLedgerAtPoint, saveCleanupState, writeLedgerState) +import Cardano.DbSync.Ledger.Types (CardanoLedgerState (..), HasLedgerEnv (..), SnapshotPoint (..)) import Cardano.DbSync.Types import Cardano.DbSync.Util import Cardano.DbSync.Util.Constraint (addConstraintsIfNotExist) +import Control.Concurrent.Class.MonadSTM.Strict (readTVarIO) rollbackFromBlockNo :: SyncEnv -> @@ -197,10 +198,14 @@ handlePostRollbackSnapshots syncEnv mRollbackSlot = do -- Try to load ledger state at the database tip eitherLedgerState <- loadLedgerAtPoint hle dbTipPoint case eitherLedgerState of - Right loadedState -> do + Right _loadedState -> do logInfo trce $ "Successfully loaded ledger state at " <> renderPoint dbTipPoint logInfo trce "Creating new snapshot at database tip after rollback" - saveCleanupState hle loadedState Nothing + -- Read the current StateRef from LedgerDB for snapshotting + mDb <- readTVarIO (leStateVar hle) + case mDb of + Strict.Just db -> saveCleanupState hle (ledgerDbCurrent db) Nothing + Strict.Nothing -> logWarning trce "No LedgerDB available for snapshot" logInfo trce "Snapshot created successfully" Left lsFiles -> do logWarning trce $ "Failed to load ledger state at database tip. Missing snapshot files: " <> textShow (length lsFiles) diff --git a/cardano-db-sync/test/Cardano/DbSync/Gen.hs b/cardano-db-sync/test/Cardano/DbSync/Gen.hs index 4206c98d2..44db42c43 100644 --- a/cardano-db-sync/test/Cardano/DbSync/Gen.hs +++ b/cardano-db-sync/test/Cardano/DbSync/Gen.hs @@ -59,6 +59,7 @@ syncPreConfig = <*> syncInsertConfig <*> Gen.list (Range.linear 0 10) (Gen.text (Range.linear 0 100) Gen.unicode) <*> snapshotIntervalConfig + <*> pure LedgerBackendInMemory snapshotIntervalConfig :: Gen SnapshotIntervalConfig snapshotIntervalConfig = @@ -109,6 +110,7 @@ syncNodeConfig loggingCfg = <*> syncInsertOptions <*> pure [] <*> snapshotIntervalConfig + <*> pure LedgerBackendInMemory syncInsertConfig :: Gen SyncInsertConfig syncInsertConfig = diff --git a/cardano-db-tool/cardano-db-tool.cabal b/cardano-db-tool/cardano-db-tool.cabal index eb6d7052b..625dbe20a 100644 --- a/cardano-db-tool/cardano-db-tool.cabal +++ b/cardano-db-tool/cardano-db-tool.cabal @@ -69,8 +69,9 @@ library , cardano-prelude , containers , cardano-diffusion - , contra-tracer , extra + , filepath + , fs-api , ouroboros-consensus , ouroboros-consensus:cardano , ouroboros-network:api diff --git a/cardano-db-tool/src/Cardano/DbTool/PrepareSnapshot.hs b/cardano-db-tool/src/Cardano/DbTool/PrepareSnapshot.hs index e439c1c0a..67cd727e9 100644 --- a/cardano-db-tool/src/Cardano/DbTool/PrepareSnapshot.hs +++ b/cardano-db-tool/src/Cardano/DbTool/PrepareSnapshot.hs @@ -5,14 +5,16 @@ module Cardano.DbTool.PrepareSnapshot ( import Cardano.Db import Cardano.DbSync.Config.Types hiding (LogFileDir) -import Cardano.DbSync.Ledger.State -import Cardano.DbSync.Ledger.Types (LedgerStateFile (..)) import Cardano.Prelude (Word64, fromMaybe) import Control.Monad -import qualified Data.ByteString.Base16 as Base16 import Data.Version (makeVersion, showVersion, versionBranch) +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots (DiskSnapshot (..), defaultListSnapshots, snapshotToDirName) import Ouroboros.Network.Block hiding (blockHash) import Paths_cardano_db_tool (version) +import System.FS.API (SomeHasFS (..)) +import System.FS.API.Types (MountPoint (..)) +import System.FS.IO (ioHasFS) +import System.FilePath (()) import System.Info (arch, os) newtype PrepareSnapshotArgs = PrepareSnapshotArgs @@ -21,55 +23,51 @@ newtype PrepareSnapshotArgs = PrepareSnapshotArgs runPrepareSnapshot :: PrepareSnapshotArgs -> IO () runPrepareSnapshot args = do - ledgerFiles <- listLedgerStateFilesOrdered (unPrepareSnapshotArgs args) + let someHasFS = SomeHasFS $ ioHasFS (MountPoint $ unLedgerStateDir (unPrepareSnapshotArgs args)) + snapshots <- defaultListSnapshots someHasFS mblock <- runDbStandaloneSilent queryLatestBlock case mblock of Just block | Just bSlotNo <- SlotNo <$> blockSlotNo block -> do - let bHash = blockHash block - let (newerFiles, mfile, olderFiles) = findLedgerStateFile ledgerFiles (bSlotNo, bHash) - printNewerSnapshots newerFiles - case (mfile, olderFiles) of - (Just file, _) -> do + let targetSlot = unSlotNo bSlotNo + -- Find the snapshot matching the DB tip slot, or the closest older one + let (newer, rest) = span (\ds -> dsNumber ds > targetSlot) snapshots + (matching, older) = case rest of + (ds : os') | dsNumber ds == targetSlot -> ([ds], os') + _ -> ([], rest) + printNewerSnapshots newer + case (matching, older) of + (file : _, _) -> do let bblockNo = fromMaybe 0 $ blockBlockNo block - printCreateSnapshot bblockNo (lsfFilePath file) + printCreateSnapshot bblockNo (unLedgerStateDir (unPrepareSnapshotArgs args) snapshotToDirName file) (_, file : _) -> do - -- We couldn't find the tip of the db, so we return a list of - -- the available ledger files, before this tip. putStrLn $ concat - [ "Ledger and db don't match. DB tip is at " + [ "Ledger and db don't match. DB tip is at slot " , show bSlotNo - , " " - , show (hashToAnnotation bHash) - , " (full " - , show (Base16.encode bHash) - , ")" - , " and the closest ledger state file is at " - , show (lsfSlotNo file) - , " " - , show (lsfHash file) + , " and the closest snapshot is at slot " + , show (dsNumber file) , ". DBSync no longer requires them to match and " , "no rollback will be performed." ] let bblockNo = fromMaybe 0 $ blockBlockNo block - printCreateSnapshot bblockNo (lsfFilePath file) + printCreateSnapshot bblockNo (unLedgerStateDir (unPrepareSnapshotArgs args) snapshotToDirName file) (_, []) -> - putStrLn "No ledger state file before the tip found. Snapshots without ledger are not supported yet." + putStrLn "No snapshot before the tip found. Snapshots without ledger are not supported yet." _ -> do putStrLn "The db is empty. You need to sync from genesis and then create a snapshot." where - printNewerSnapshots :: [LedgerStateFile] -> IO () + printNewerSnapshots :: [DiskSnapshot] -> IO () printNewerSnapshots newerFiles = do unless (null newerFiles) $ putStrLn $ concat - [ "There are newer ledger state files, which are ignored: " - , show newerFiles + [ "There are newer snapshots, which are ignored: " + , show (map snapshotToDirName newerFiles) , "\n" ] - printCreateSnapshot :: Word64 -> FilePath -> IO () - printCreateSnapshot bblockNo fp = do + printCreateSnapshot :: Word64 -> String -> IO () + printCreateSnapshot bblockNo snapshotName = do let schemaVersion = makeVersion $ take 2 $ versionBranch version cmdStr = "Create a snapshot with:\n" @@ -85,5 +83,5 @@ runPrepareSnapshot args = do , "-block-" , show bblockNo , "-" ++ arch ++ " " - , fp + , snapshotName ] diff --git a/cardano-db-tool/src/Cardano/DbTool/Validate/Ledger.hs b/cardano-db-tool/src/Cardano/DbTool/Validate/Ledger.hs index a49cc5089..474ba5260 100644 --- a/cardano-db-tool/src/Cardano/DbTool/Validate/Ledger.hs +++ b/cardano-db-tool/src/Cardano/DbTool/Validate/Ledger.hs @@ -5,22 +5,22 @@ module Cardano.DbTool.Validate.Ledger ( import qualified Cardano.Db as DB import Cardano.DbSync.Config -import Cardano.DbSync.Config.Cardano import Cardano.DbSync.Error -import Cardano.DbSync.Ledger.State -import Cardano.DbSync.Ledger.Types (CardanoLedgerState (..), LedgerStateFile (..)) +import Cardano.DbSync.Ledger.Types (CardanoLedgerState (..)) import Cardano.DbSync.Tracing.ToObjectOrphans () import Cardano.DbTool.Validate.Balance (ledgerAddrBalance) import Cardano.DbTool.Validate.Util import Cardano.Network.NodeToClient (withIOManager) -import Control.Monad (when) import Control.Monad.Trans.Except (runExceptT) -import Control.Tracer (nullTracer) import Data.Text (Text) import qualified Data.Text as Text import Ouroboros.Consensus.Cardano.Node () import Ouroboros.Consensus.Ledger.Extended +import Ouroboros.Consensus.Storage.LedgerDB.Snapshots (defaultListSnapshots) import Ouroboros.Network.Block +import System.FS.API (SomeHasFS (..)) +import System.FS.API.Types (MountPoint (..)) +import System.FS.IO (ioHasFS) import Prelude data LedgerValidationParams = LedgerValidationParams @@ -30,33 +30,35 @@ data LedgerValidationParams = LedgerValidationParams } validateLedger :: LedgerValidationParams -> DB.TxOutVariantType -> IO () -validateLedger params txOutVariantType = +validateLedger params _txOutVariantType = withIOManager $ \_ -> do - enc <- readSyncNodeConfig (vpConfigFile params) - genCfg <- runOrThrowIO $ runExceptT $ readCardanoGenesisConfig enc - ledgerFiles <- listLedgerStateFilesOrdered (vpLedgerStateDir params) - slotNo <- SlotNo <$> DB.runDbStandaloneSilent DB.queryLatestSlotNo - validate params txOutVariantType genCfg slotNo ledgerFiles + -- TODO: Reimplement using consensus snapshot loading APIs (openStateRefFromSnapshot). + _enc <- readSyncNodeConfig (vpConfigFile params) + _genCfg <- runOrThrowIO (runExceptT (readCardanoGenesisConfig _enc)) + let someHasFS = SomeHasFS (ioHasFS (MountPoint (unLedgerStateDir (vpLedgerStateDir params)))) + snapshots <- defaultListSnapshots someHasFS + _slotNo <- SlotNo <$> DB.runDbStandaloneSilent DB.queryLatestSlotNo + putStrLn ("Found " <> show (length snapshots) <> " snapshots. Ledger validation not yet reimplemented for consensus snapshot format.") -validate :: LedgerValidationParams -> DB.TxOutVariantType -> GenesisConfig -> SlotNo -> [LedgerStateFile] -> IO () -validate params txOutVariantType genCfg slotNo ledgerFiles = - go ledgerFiles True - where - go :: [LedgerStateFile] -> Bool -> IO () - go [] _ = putStrLn $ redText "No ledger found" - go (ledgerFile : rest) logFailure = do - let ledgerSlot = lsfSlotNo ledgerFile - if ledgerSlot <= slotNo - then do - -- TODO fix GenesisPoint. This is only used for logging - Right state <- loadLedgerStateFromFile nullTracer (mkTopLevelConfig genCfg) False GenesisPoint ledgerFile - validateBalance txOutVariantType ledgerSlot (vpAddressUtxo params) state - else do - when logFailure . putStrLn $ redText "Ledger is newer than DB. Trying an older ledger." - go rest False +-- TODO: Reimplement using DiskSnapshot and consensus APIs +-- _validate :: LedgerValidationParams -> DB.TxOutVariantType -> GenesisConfig -> SlotNo -> [LedgerStateFile] -> IO () +-- _validate params txOutVariantType genCfg slotNo ledgerFiles = +-- go ledgerFiles True +-- where +-- go :: [LedgerStateFile] -> Bool -> IO () +-- go [] _ = putStrLn $ redText "No ledger found" +-- go (ledgerFile : rest) logFailure = do +-- let ledgerSlot = lsfSlotNo ledgerFile +-- if ledgerSlot <= slotNo +-- then do +-- Right state <- loadLedgerStateFromFile nullTracer (mkTopLevelConfig genCfg) False GenesisPoint ledgerFile +-- validateBalance txOutVariantType ledgerSlot (vpAddressUtxo params) state +-- else do +-- when logFailure . putStrLn $ redText "Ledger is newer than DB. Trying an older ledger." +-- go rest False -validateBalance :: DB.TxOutVariantType -> SlotNo -> Text -> CardanoLedgerState -> IO () -validateBalance txOutVariantType slotNo addr st = do +_validateBalance :: DB.TxOutVariantType -> SlotNo -> Text -> CardanoLedgerState -> IO () +_validateBalance txOutVariantType slotNo addr st = do balanceDB <- DB.runDbStandaloneSilent $ DB.queryAddressBalanceAtSlot txOutVariantType addr (unSlotNo slotNo) let eiBalanceLedger = DB.word64ToAda <$> ledgerAddrBalance addr (ledgerState $ clsState st) case eiBalanceLedger of diff --git a/doc/Readme.md b/doc/Readme.md index 99139b2ca..4b58252ba 100644 --- a/doc/Readme.md +++ b/doc/Readme.md @@ -30,7 +30,7 @@ This directory contains various documentation files for setting up, configuring, 13. [Schema Management](https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/schema-management.md) - Instructions on managing the database schema and creating migrations, covering tools and techniques for making schema changes and ensuring they are applied correctly. -14. [Syncing and Rollbacks](https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/syncing-and-rollbacks.md) - Details on the syncing procedure and handling rollbacks, explaining how the node syncs with the blockchain and manages rollbacks in case of errors or inconsistencies. +14. [State Snapshot](https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/state-snapshot.md) - Creating, restoring and converting ledger state snapshots, including LSM and InMemory backends. 15. [Manual Rollbacks](https://github.com/IntersectMBO/cardano-db-sync/blob/master/doc/manual-rollbacks.md) - Guide to performing manual rollbacks using cardano-db-sync or cardano-db-tool, including use cases, step-by-step instructions, and best practices for rolling back the database to fix incorrect data or recover from issues. diff --git a/doc/configuration.md b/doc/configuration.md index 378294800..8a35b5f87 100644 --- a/doc/configuration.md +++ b/doc/configuration.md @@ -696,3 +696,35 @@ Epoch threshold used to determine snapshot behavior. When syncing reaches this e - **Higher `near_tip_epoch` value**: Delay frequent snapshots until later in the chain, improving sync speed for longer. During initial sync (before reaching `near_tip_epoch`), snapshots are taken every 10 epochs - **Near tip detection**: Automatically switches to epoch-based snapshots when within 10 days of current time, regardless of epoch number +## Ledger Backend + +The `ledger_backend` top-level config option selects how the UTxO set is stored. + +| Value | Description | +| :----------- | :--------------------------------------------------- | +| `"inmemory"` | UTxO set kept in RAM (default) | +| `"lsm"` | UTxO set stored on disk using LSM trees | + +### Example + +```json +{ + "ledger_backend": "lsm" +} +``` + +### InMemory (default) + +Keeps the full UTxO set in memory. Faster but uses significantly more RAM +(~16GB on mainnet at tip). + +### LSM + +Stores the UTxO set on disk using LSM trees, reducing RAM usage to ~2-3GB. +The LSM database is stored in `/lsm/`. Requires more disk I/O +but is suitable for memory-constrained environments. + +Snapshots are compatible between backends and can be converted using the +`snapshot-converter` tool shipped with `cardano-node`. See +[State Snapshot](state-snapshot.md#converting-between-backends) for details. + diff --git a/doc/state-snapshot.md b/doc/state-snapshot.md index 582073bd5..b3b88cf98 100644 --- a/doc/state-snapshot.md +++ b/doc/state-snapshot.md @@ -1,63 +1,78 @@ # State Snapshot -As the size of the blockchain itself and the number of transactions and other data on the chain -increases, the time required to sync the full chain increases. At epoch 266 it was about 18 hours. -The other issue is that most major upgrades also update the database schema meaning the database -needs to be synced from scatch. +State snapshots bundle the PostgreSQL database and the ledger state, allowing +`db-sync` to resume syncing without starting from genesis. -To overcome these issues, we are providing a `cardano-db-sync` state snapshot, which should -drastically reduce the time required to get `db-sync` back up and running after the database is -dropped and recreated. This snapshot is compatible with both `cardano-db-sync` and -`cardano-db-sync` with --no-epoch-table (which doesn't maintain the extra `epoch` table). +## Prerequisites -**Note:** It is **not** possible to create a snapshot from one version of the database schema and -restore it so it can be used with a `db-sync` that uses another version of the schema. +- `cardano-db-tool` and `scripts/postgresql-setup.sh` available +- Sufficient disk space for the dump + ledger state +- Node tip should be ahead of the snapshot point, otherwise db-sync may need to roll back -All of the following assumes that the executable `cardano-db-tool` and the script -`postgresql-setup.sh` is available on the machine where the snapshot is being created or restored. +## Constraints -Currently (at epoch 269), creating a snapshot takes about 15 minutes and restoring one takes about -45 minutes. +- Not portable across schema versions or possibly CPU architectures +- The ledger snapshot format matches `cardano-node` (consensus format) -## Things to note: -* Snapshots (because they depend on the database schema) are not portable across `db-sync` versions. -* Snapshots (because they include a snapshot of the ledger state) are not portable across CPU - architectures (ie it is not possible to create a snapshot on `x86_64` and expect it to work - correctly on say `arm64`). -* Creating and restoring snapshots requires significant amounts of free disk space (at epoch 269 - it required about 10G). If there is insufficient disk space, `gzip` can give some odd error - messages. -* node tip should be ahead of the snapshot point during restoration otherwise `cardano-db-sync` will - roll back to genesis +## Backends -# Creating a Snapshot +- **InMemory** -- UTxO in memory, serialized to `/tables` +- **LSM** -- UTxO on disk via LSM trees, stored in `/lsm/` -To create a snapshot, the `cardano-db-sync` executable should be stopped. Taking a snapshot is -then a two step process: +Both backends produce snapshots in consensus directory format: `/state`, `meta`, `utxoSize`. + +## Creating + +Stop `db-sync`, then: + +``` +PGPASSFILE=config/pgpass-mainnet cardano-db-tool prepare-snapshot --state-dir +``` + +This prints the create command, e.g.: ``` -PGPASSFILE=config/pgpass-mainnet cardano-db-tool prepare-snapshot --state-dir ledger-state/mainnet/ +PGPASSFILE=config/pgpass-mainnet scripts/postgresql-setup.sh \ + --create-snapshot db-sync-snapshot-schema-13.7-block-5796064-x86_64 / ``` -which will then print out the command (combining the database schema version with the block number -in the database with the slot number used by the ledger state and the ) required to generated the snapshot: + +For LSM, the script auto-detects and bundles `lsm/snapshots//` and `lsm/metadata`. + +## Restoring + ``` -PGPASSFILE=config/pgpass-mainnet scripts/postgresql-setup.sh --create-snapshot \ - db-sync-snapshot-schema-9-block-5796064-x86_64 ledger-state/mainnet/31021676-f3873e4bec.lstate +PGPASSFILE=config/pgpass-mainnet scripts/postgresql-setup.sh \ + --restore-snapshot db-sync-snapshot.tgz ``` -# Restoring from a Snapshot +Creates `` if needed. For LSM, restores both ledger state and LSM database. + +## Converting between backends + +The `snapshot-converter` tool (shipped with `cardano-node`) converts between InMemory and LSM. +Use the same node version db-sync was built against. -Restoring the state from a snapsot will drop the current database, recreate the tables and then -populate them. It can be done as simply as: ``` -PGPASSFILE=config/pgpass-mainnet scripts/postgresql-setup.sh --restore-snapshot \ - db-sync-snapshot-schema-9-block-5796064-x86_64 ledger-state/mainnet +# InMemory -> LSM +snapshot-converter \ + --input-mem / \ + --output-lsm-snapshot / \ + --output-lsm-database /lsm \ + --config + +# LSM -> InMemory +snapshot-converter \ + --input-lsm-snapshot / \ + --input-lsm-database /lsm \ + --output-mem / \ + --config ``` -Once the script has completed successfully, `db-sync` can be restarted and it should continue -syncing from the block number listed in the state snapshot file name. +The `` directory name must match the slot number in the ledger state. +The `--config` flag takes the **node** config, not db-sync config. +Converting does not modify the source. -# Mainnet Snapshots Location +## Mainnet Snapshots -`Mainnet` snapshots can be found [here](https://update-cardano-mainnet.iohk.io/cardano-db-sync/index.html#). -They are also linked from the `cardano-db-sync` [releases page](https://github.com/IntersectMBO/cardano-db-sync/releases) +Available at the [downloads page](https://update-cardano-mainnet.iohk.io/cardano-db-sync/index.html#) +and linked from [releases](https://github.com/IntersectMBO/cardano-db-sync/releases). diff --git a/doc/syncing-and-rollbacks.md b/doc/syncing-and-rollbacks.md deleted file mode 100644 index 057a2d754..000000000 --- a/doc/syncing-and-rollbacks.md +++ /dev/null @@ -1,96 +0,0 @@ -# Syncing and Rollbacks - -`cardano-db-sync` gets it blocks from a `cardano-node` running on the same machine via a local -domain socket and the `LocalChainSync` protocol provided by the `cardano-node` itself but -implemented in the `ouroboros-network` repository. - - -## Syncing Procedure - -When `db-sync` starts up, it figures out its latest block number and block hash, sends it the -the `node` using the `LocalChainSync` protocol. The `node` then sends back either a rollback -message or a new block message where the new block correctly extends the existing chain that -`db-sync` has already collected. - -Ignoring rollbacks for now, the `node` will continue sending `db-sync` blocks that extend -`db-sync`'s existing chain. For each new block, `db-sync` checks that the block's previous -block hash is already in the database, and then inserts the block, extracts the transactions -contained in the block and inserts them, and so on. - - -## Ledger State - -Currently `db-sync` creates and maintains its own copy of ledger state, and stores a number -of recent ledger state files in the directory specified by the `--state-dir` command line -argument to `db-sync`. This ledger state directory must persist across machine reboots. -Each ledger state is valid only for a specific block. It is not valid for any block before -or any block after the block it is valid for. - -The option `--state-dir` can be ommited when one doesn't want to use local ledger, for the omittion to work a `--disable-ledger` flag should be used, more information on what this flag does can be found [here](./configuration.md#--disable-ledger). - - -## Concurrency - -`db-sync` currently runs in three threads and size bounded queues are used to communicate -between them. Attempts to add a new element to a full bounded queue, will result in the -insert blocking until an element is removed from the other end. The main queues of interest -are: - -* The `DbAction` queue. -* The pool offchain metadata request queue. -* The pool offchain metadata response queue. - -Due to limitations of PostgreSQL itself, all database operations are done in a single thread. -If database operations are attempted from more than one thread PostgreSQL returns "failure -to acquire lock" error messages. - -The main threads are: - -* The database insert thread. This thread retrieves data from the `DbAction` and pool - offchain metadata response queue and inserts the data into the database. - -* The `node` communication thread which retrieves blocks from the `node` and places them in - the `DbAction` queue. -* The pool metadata thread, which reads the request queue for metadata to fetch, fetches it - and posts a response in the response queue. - - -## Rollbacks - -Rollbacks are a feature of the vast majority of blockchains. In addition to the normal -blockchain rollbacks, `db-sync` has a second potential source of rollbacks. On start up, -if `db-sync` cannot find a ledger state file at all, it will try to rollback to genesis. -This can happen for instance in a Docker container if the ledger state directory is not -persisted across reboots. - -In `db-sync` version `8.0.0` and earlier rollbacks were performed in Haskell code. That -means that if a block was to be rolled back, the database would be queried for all transactions -in that block and all the sub-components of all the transactions. A PostgreSQL delete would -then be issued for each sub-component, each transaction and the block itself. Obviously -this was inefficient and slow. - -For `db-sync` version `9.0.0`, a newer version of the `Persistent` library was available that -makes rollbacks much more efficient. In this version of `db-sync` and later tables that -reference/index other tables are linked, with foreign keys. This means that `db-sync` can issue a -delete operation on a single object and then PostgreSQL will recursively delete all the objects that -reference the object to be deleted as well as the object itself. This leads to significantly -improved rollback times. During development it is often useful to be able to rollback 10000 -or more blocks, even though that is guaranteed to be an invalid rollback according to the ledger -rules. With this new version of `Persistent` a rollback of 10000 blocks could be done in -minutes whereas previously it was several hours. - -For `db-sync` versions 13.1, we removed all foreign keys and switched back to perform rollbacks from -Haskell, but in a much more effecient way. Instead of, for example, finding all transactions from -all the blocks that needs deletion, we found only the oldest one and delete with a single query -every transaction after it. We use the property that fields like `tx.block_id` are non-decreasing, -meaning newer entries will have bigger values. So if we simply find the oldest transaction that -needs to be deleted, it's easy to delete everything with a single query. - -### Manual Rollbacks - -In addition to automatic rollbacks that occur during normal chain-sync operations, you can perform -manual rollbacks using either `cardano-db-sync --rollback-to-slot` or `cardano-db-tool rollback`. -These are useful for testing, fixing incorrect data after bug fixes, or recovering from corrupted -database states. - -For detailed information on performing manual rollbacks, see [Manual Rollbacks](manual-rollbacks.md). diff --git a/scripts/postgresql-setup.sh b/scripts/postgresql-setup.sh index 58531ea39..fd381b957 100755 --- a/scripts/postgresql-setup.sh +++ b/scripts/postgresql-setup.sh @@ -160,17 +160,42 @@ function dump_schema { function create_snapshot { tgz_file=$1.tgz - ledger_file=$2 + ledger_path=$2 tmp_dir=$(mktemp "${directory}" -t db-sync-snapshot-XXXXXXXXXX) echo $"Working directory: ${tmp_dir}" - pg_dump --no-owner --schema=public --jobs="${numcores}" "${PGDATABASE}" --format=directory --file="${tmp_dir}/db/" - if [ -n "${ledger_file}" ]; then - lstate_gz_file=$(basename "${ledger_file}").gz - gzip --to-stdout "${ledger_file}" > "${tmp_dir}/${lstate_gz_file}" + # Process ledger state first to fail fast on missing/corrupt snapshots + if [ -n "${ledger_path}" ]; then + if [ -d "${ledger_path}" ]; then + # New consensus directory format: tar+gzip the snapshot directory + ledger_dir_name=$(basename "${ledger_path}") + state_dir=$(dirname "${ledger_path}") + lsm_snap_dir="${state_dir}/lsm/snapshots/${ledger_dir_name}" + if [ -d "${lsm_snap_dir}" ]; then + echo "Detected LSM backend snapshot at slot ${ledger_dir_name}" + # LSM backend: bundle ledger snapshot, LSM table snapshot, and LSM session metadata (salt). + # The active/ directory is not needed — it gets cleared on session restore. + tar czf "${tmp_dir}/${ledger_dir_name}.tar.gz" \ + -C "${state_dir}" "${ledger_dir_name}" "lsm/snapshots/${ledger_dir_name}" "lsm/metadata" + else + echo "Detected InMemory backend snapshot at slot ${ledger_dir_name}" + # InMemory backend: just the ledger snapshot directory + tar czf "${tmp_dir}/${ledger_dir_name}.tar.gz" -C "${state_dir}" "${ledger_dir_name}" + fi + elif [ -f "${ledger_path}" ]; then + echo "Detected legacy .lstate snapshot: $(basename "${ledger_path}")" + # Legacy .lstate file format + lstate_gz_file=$(basename "${ledger_path}").gz + gzip --to-stdout "${ledger_path}" > "${tmp_dir}/${lstate_gz_file}" + else + echo "Ledger state path '${ledger_path}' does not exist." + rm "${recursive}" "${force}" "${tmp_dir}" + exit 1 + fi fi + pg_dump --no-owner --schema=public --jobs="${numcores}" "${PGDATABASE}" --format=directory --file="${tmp_dir}/db/" # Use plain tar here because the database dump files and the ledger state file are already gzipped. Disable Shellcheck SC2046 to avoid empty '' getting added while quoting # shellcheck disable=SC2046 - tar cvf - --directory "${tmp_dir}" "db" $( [ -n "${lstate_gz_file:-}" ] && [ -f "/${tmp_dir}/${lstate_gz_file}" ] && echo "${lstate_gz_file}" ) | tee "${tgz_file}.tmp" | \ + tar cvf - --directory "${tmp_dir}" $(ls "${tmp_dir}") | tee "${tgz_file}.tmp" | \ sha256sum | head -c 64 | sed -e "s/$/ ${tgz_file}\n/" > "${tgz_file}.sha256sum" mv "${tgz_file}.tmp" "${tgz_file}" rm "${recursive}" "${force}" "${tmp_dir}" @@ -183,7 +208,13 @@ function create_snapshot { } function restore_snapshot { - file_count=$(find "$2" -type f -name '*.lstate' | wc -l) + # Create ledger state dir if it doesn't exist + if ! test -d "$2" ; then + echo "Creating ledger state directory: $2" + mkdir -p "$2" + fi + # Check ledger state dir is empty (both old .lstate files and new snapshot dirs) + file_count=$(find "$2" -maxdepth 1 -type f -name '*.lstate' -o -type d -name '[0-9]*' 2>/dev/null | wc -l) if test "${file_count}" -gt 0 ; then echo "Ledger state directory ($2) is not empty. Please empty it and then retry." exit 1 @@ -191,11 +222,26 @@ function restore_snapshot { tmp_dir=$(mktemp "${directory}" -t db-sync-snapshot-XXXXXXXXXX) tar xvf "$1" --directory "$tmp_dir" if test -d "${tmp_dir}/db/" ; then - # New pg_dump format - lstate_gz_file=$(find "${tmp_dir}/" -iname "*.lstate.gz") - if [ -n "${lstate_gz_file:-}" ] ; then - lstate_file=$(basename "${lstate_gz_file}" | sed 's/.gz$//') - gunzip --to-stdout "${lstate_gz_file}" > "$2/${lstate_file}" + # Check for new consensus snapshot directory format (tar.gz named after slot number) + snapshot_tgz=$(find "${tmp_dir}/" -maxdepth 1 -name '[0-9]*.tar.gz' | head -1) + if [ -n "${snapshot_tgz:-}" ] ; then + # New consensus directory format: extract snapshot tar.gz to ledger state dir + tar xzf "${snapshot_tgz}" -C "$2/" + # For LSM: create the active/ directory if lsm/ was restored (required by LSM session) + if [ -d "$2/lsm" ] && [ ! -d "$2/lsm/active" ]; then + echo "Restored LSM backend snapshot: $(basename "${snapshot_tgz}" .tar.gz)" + mkdir -p "$2/lsm/active" + else + echo "Restored InMemory backend snapshot: $(basename "${snapshot_tgz}" .tar.gz)" + fi + else + # Legacy .lstate file format + lstate_gz_file=$(find "${tmp_dir}/" -iname "*.lstate.gz") + if [ -n "${lstate_gz_file:-}" ] ; then + lstate_file=$(basename "${lstate_gz_file}" | sed 's/.gz$//') + echo "Restored legacy .lstate snapshot: ${lstate_file}" + gunzip --to-stdout "${lstate_gz_file}" > "$2/${lstate_file}" + fi fi # Important: specify --schema=public below to skip over `create schema public` @@ -313,10 +359,7 @@ case "${1:-""}" in echo "Second argument should be the snapshot file name (without extension)." exit 1 fi - if test -n "${3:-}" && test -d "${3}" ; then - echo "Third argument provided is a directory but expecting a file." - exit 1 - fi + # Third argument can be a file (.lstate) or directory (consensus snapshot format) create_snapshot "$2" "${3:-}" ;; --restore-snapshot)