@@ -60,9 +60,12 @@ import qualified Cardano.Ledger.State as L
6060
6161import Prelude
6262
63+ import Control.Applicative ((<|>) )
64+ import Control.Concurrent.STM (STM , TVar , modifyTVar' , newTVarIO , readTVar , writeTVar )
65+ import qualified Control.Concurrent.STM as STM
6366import Control.Monad
67+ import Control.Monad.Trans.Maybe (MaybeT (.. ), mapMaybeT , runMaybeT )
6468import Control.Monad.Trans.Resource
65- import Data.IORef
6669import Data.List (sortOn )
6770import qualified Data.Map as Map
6871import Data.Map.Strict (Map )
@@ -87,6 +90,8 @@ import qualified Hedgehog as H
8790import Hedgehog.Extras (MonadAssertion )
8891import qualified Hedgehog.Extras as H
8992
93+ import UnliftIO.STM (atomically , readTVarIO , registerDelay )
94+
9095-- | Block and wait for the desired epoch.
9196waitUntilEpoch
9297 :: HasCallStack
@@ -152,8 +157,10 @@ instance Show TestnetWaitPeriod where
152157 WaitForBlocks n -> " WaitForBlocks " <> show n
153158 WaitForSlots n -> " WaitForSlots " <> show n
154159
155- -- | Core retry loop. Repeats the action every 300ms until it returns 'Right'
156- -- or the timeout is reached, in which case the last 'Left' is returned.
160+ -- | Core retry loop. Returns early on 'Right'; on 'Left', blocks via STM until
161+ -- the 'EpochStateView' is updated (with a safety fallback timeout) and retries.
162+ -- Gives up and returns the last 'Left' once the 'TestnetWaitPeriod' deadline is
163+ -- exceeded.
157164retryUntilRightM
158165 :: HasCallStack
159166 => MonadIO m
@@ -167,13 +174,18 @@ retryUntilRightM esv timeout act = withFrozenCallStack $ do
167174 startingValue <- getCurrentValue
168175 go $ startingValue + timeoutW64
169176 where
170- go deadline = act >>= \ case
171- r@ (Right _) -> pure r
172- l@ (Left _) -> do
173- cv <- getCurrentValue
174- if cv > deadline
175- then pure l
176- else H. threadDelay 300_000 >> go deadline
177+ go deadline = do
178+ -- Sample the version before running 'act' so that any update landing during 'act'
179+ -- makes 'awaitStateUpdateTimeout' return without blocking, rather than waiting for
180+ -- the next update and adding a block/epoch of latency.
181+ versionBeforeAct <- readTVarIO $ epochStateVersion esv
182+ act >>= \ case
183+ r@ (Right _) -> pure r
184+ l@ (Left _) -> do
185+ cv <- getCurrentValue
186+ if cv > deadline
187+ then pure l
188+ else awaitStateUpdateTimeout esv 300 versionBeforeAct *> go deadline
177189
178190 (getCurrentValue, timeoutW64) = case timeout of
179191 WaitForEpochs (EpochInterval n) -> (unEpochNo <$> getCurrentEpochNo esv, fromIntegral n)
@@ -227,13 +239,57 @@ data EpochStateStatus
227239 -- ^ The background thread encountered an error while folding blocks
228240
229241-- | A read-only mutable pointer to an epoch state, updated automatically
230- newtype EpochStateView = EpochStateView
231- { epochStateView :: IORef (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ))
242+ data EpochStateView = EpochStateView
243+ { epochStateView :: ! ( TVar (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ) ))
232244 -- ^ Automatically updated current NewEpochState. 'Left' indicates the state is not yet available
233245 -- (either not initialised or an error occurred). 'Right' contains the latest epoch state.
234246 -- Use 'getEpochState', 'getBlockNumber', 'getSlotNumber' to access the values.
247+ , epochStateVersion :: ! (TVar Word64 )
248+ -- ^ Monotonically increasing counter, bumped on every state write.
249+ -- Used by 'awaitStateUpdateTimeout' to block until the next update.
235250 }
236251
252+ -- | Write a new value to the epoch state and bump the version counter atomically.
253+ writeEpochStateView
254+ :: EpochStateView
255+ -> Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )
256+ -- ^ new state value
257+ -> STM ()
258+ writeEpochStateView EpochStateView {epochStateView, epochStateVersion} newState = do
259+ writeTVar epochStateView newState
260+ modifyTVar' epochStateVersion (+ 1 )
261+
262+ -- | Block until the epoch state version advances past the provided previously sampled
263+ -- version, or until the fallback timeout expires. Returns immediately if the current
264+ -- version already differs, so callers can sample before running an action and avoid
265+ -- missing updates that land during the action. Returns 'Nothing' on timeout.
266+ -- All threads blocked on the same 'EpochStateView' wake up on each update.
267+ awaitStateUpdateTimeout
268+ :: MonadIO m
269+ => EpochStateView
270+ -> DTC. NominalDiffTime -- ^ Fallback timeout
271+ -> Word64 -- ^ Previously sampled version
272+ -> m (Maybe (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )))
273+ awaitStateUpdateTimeout EpochStateView {epochStateVersion, epochStateView} timeout sinceVersion = runMaybeT $ fastResult <|> awaitedResult
274+ where
275+ -- Fast path: if the version already differs, read state and version atomically and return
276+ -- without allocating a 'registerDelay' timer. This avoids accumulating timer-queue entries
277+ -- when callers sample a stale version and an update has already landed.
278+ fastResult = mapMaybeT atomically $ do
279+ v <- lift $ readTVar epochStateVersion
280+ guard $ v /= sinceVersion
281+ lift $ readTVar epochStateView
282+
283+ awaitedResult = MaybeT $ do
284+ timedOutVar <- registerDelay . ceiling $ timeout * 1_000_000
285+ atomically $ do
286+ v <- readTVar epochStateVersion
287+ timedOut <- readTVar timedOutVar
288+ case (v /= sinceVersion, timedOut) of
289+ (True , _) -> Just <$> readTVar epochStateView
290+ (_, True ) -> pure Nothing
291+ _ -> STM. retry
292+
237293-- | Get epoch state from the view. If the state isn't available, retry waiting up to 25 seconds. Fails
238294-- immediately if the background thread encountered an error, or after 25 seconds if not yet initialised.
239295getEpochState
@@ -266,37 +322,56 @@ getSlotNumber
266322getSlotNumber epochStateView =
267323 withFrozenCallStack $ (\ (_, slotNumber, _) -> slotNumber) <$> getEpochStateDetails epochStateView
268324
269- -- | Utility function for accessing epoch state in 'IORef' .
270- -- Retries every 0.5s for up to 25 seconds while not initialised.
271- -- Fails immediately if the background fold thread encountered an error.
325+ -- | Access the current epoch state. Returns immediately if state is already available .
326+ -- Blocks up to 25 seconds waiting for initialisation if the background thread has not yet
327+ -- received any epoch state. Fails immediately if the background thread encountered an error.
272328getEpochStateDetails
273329 :: HasCallStack
274330 => MonadAssertion m
275331 => MonadTest m
276332 => MonadIO m
277333 => EpochStateView
278334 -> m (AnyNewEpochState , SlotNo , BlockNo )
279- getEpochStateDetails EpochStateView {epochStateView} =
280- withFrozenCallStack $ do
281- deadline <- liftIO $ DTC. addUTCTime 25 <$> DTC. getCurrentTime
282- go deadline
335+ getEpochStateDetails EpochStateView {epochStateView} = withFrozenCallStack $
336+ -- Fast path: read the TVar outside STM block so we don't register a pointless
337+ -- 'initTimeoutSeconds' timer on every call. These getters run inside tight
338+ -- retry loops, and the unused timer-queue entries would otherwise accumulate.
339+ readTVarIO epochStateView
340+ >>= awaitForState
341+ >>= failEpochStateFoldError
283342 where
284- go deadline = do
285- result <- H. evalIO $ readIORef epochStateView
286- case result of
287- Left (EpochStateFoldError err) -> do
288- H. note_ $ " EpochStateView background thread failed: " <> docToString (prettyError err)
289- H. failure
290- Left EpochStateNotInitialised -> do
291- currentTime <- liftIO DTC. getCurrentTime
292- if currentTime < deadline
293- then do
294- H. threadDelay 500_000
295- go deadline
296- else do
297- H. note_ " EpochStateView has not been initialised within 25 seconds"
298- H. failure
299- Right details -> pure details
343+ initTimeoutSeconds :: Int
344+ initTimeoutSeconds = 25
345+
346+ awaitForState
347+ :: MonadIO n
348+ => Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )
349+ -> n (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ))
350+ awaitForState = \ case
351+ Left EpochStateNotInitialised -> do
352+ -- register delay only when we're starting to retry
353+ timedOutVar <- registerDelay $ initTimeoutSeconds * 1_000_000
354+ atomically $ do
355+ state' <- readTVar epochStateView
356+ state' <$ case state' of
357+ -- retry until timeout
358+ Left EpochStateNotInitialised -> readTVar timedOutVar >>= guard
359+ _ -> pure ()
360+ state -> pure state
361+
362+ failEpochStateFoldError
363+ :: (HasCallStack , MonadTest n )
364+ => Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )
365+ -> n (AnyNewEpochState , SlotNo , BlockNo )
366+ failEpochStateFoldError = \ case
367+ Right details -> pure details
368+ Left (EpochStateFoldError err) -> do
369+ H. note_ $ " EpochStateView background thread failed: " <> docToString (prettyError err)
370+ H. failure
371+ Left EpochStateNotInitialised -> do
372+ H. note_ $ " EpochStateView has not been initialised within " <> show initTimeoutSeconds <> " seconds"
373+ H. failure
374+
300375
301376-- | Create a background thread listening for new epoch states. New epoch states are available to access
302377-- through 'EpochStateView', using query functions.
@@ -311,16 +386,16 @@ getEpochStateView
311386 -> SocketPath -- ^ node socket path
312387 -> m EpochStateView
313388getEpochStateView nodeConfigFile socketPath = withFrozenCallStack $ do
314- epochStateView <- H. evalIO $ newIORef $ Left EpochStateNotInitialised
315- void . asyncRegister_ $ do
389+ esv <- H. evalIO $ EpochStateView <$> newTVarIO ( Left EpochStateNotInitialised ) <*> newTVarIO 0
390+ _ <- asyncRegister_ $ do
316391 result <- runExceptT $ foldEpochState nodeConfigFile socketPath QuickValidation (EpochNo maxBound ) ()
317392 $ \ epochState slotNumber blockNumber -> do
318- liftIOAnnotated . writeIORef epochStateView $ Right (epochState, slotNumber, blockNumber)
393+ liftIOAnnotated . atomically $ writeEpochStateView esv $ Right (epochState, slotNumber, blockNumber)
319394 pure ConditionNotMet
320395 case result of
321- Left err -> writeIORef epochStateView $ Left $ EpochStateFoldError err
396+ Left err -> atomically $ writeEpochStateView esv $ Left $ EpochStateFoldError err
322397 Right _ -> pure ()
323- pure $ EpochStateView epochStateView
398+ pure esv
324399
325400-- | Retrieve all UTxOs map from the epoch state view.
326401findAllUtxos
0 commit comments