@@ -60,9 +60,12 @@ import qualified Cardano.Ledger.State as L
6060
6161import Prelude
6262
63+ import Control.Applicative ((<|>) )
64+ import Control.Concurrent.STM (TVar , modifyTVar' , newTVarIO , readTVar , writeTVar )
65+ import qualified Control.Concurrent.STM as STM
6366import Control.Monad
67+ import Control.Monad.Trans.Maybe (MaybeT (.. ), mapMaybeT , runMaybeT )
6468import Control.Monad.Trans.Resource
65- import Data.IORef
6669import Data.List (sortOn )
6770import qualified Data.Map as Map
6871import Data.Map.Strict (Map )
@@ -87,6 +90,8 @@ import qualified Hedgehog as H
8790import Hedgehog.Extras (MonadAssertion )
8891import qualified Hedgehog.Extras as H
8992
93+ import UnliftIO.STM (atomically , readTVarIO , registerDelay )
94+
9095-- | Block and wait for the desired epoch.
9196waitUntilEpoch
9297 :: HasCallStack
@@ -152,8 +157,10 @@ instance Show TestnetWaitPeriod where
152157 WaitForBlocks n -> " WaitForBlocks " <> show n
153158 WaitForSlots n -> " WaitForSlots " <> show n
154159
155- -- | Core retry loop. Repeats the action every 300ms until it returns 'Right'
156- -- or the timeout is reached, in which case the last 'Left' is returned.
160+ -- | Core retry loop. Returns early on 'Right'; on 'Left', blocks via STM until
161+ -- the 'EpochStateView' is updated (with a safety fallback timeout) and retries.
162+ -- Gives up and returns the last 'Left' once the 'TestnetWaitPeriod' deadline is
163+ -- exceeded.
157164retryUntilRightM
158165 :: HasCallStack
159166 => MonadIO m
@@ -167,13 +174,18 @@ retryUntilRightM esv timeout act = withFrozenCallStack $ do
167174 startingValue <- getCurrentValue
168175 go $ startingValue + timeoutW64
169176 where
170- go deadline = act >>= \ case
171- r@ (Right _) -> pure r
172- l@ (Left _) -> do
173- cv <- getCurrentValue
174- if cv > deadline
175- then pure l
176- else H. threadDelay 300_000 >> go deadline
177+ go deadline = do
178+ -- Sample the version before running 'act' so that any update landing during 'act'
179+ -- makes 'awaitStateUpdateTimeout' return without blocking, rather than waiting for
180+ -- the next update and adding a block/epoch of latency.
181+ versionBeforeAct <- readTVarIO $ epochStateVersion esv
182+ act >>= \ case
183+ r@ (Right _) -> pure r
184+ l@ (Left _) -> do
185+ cv <- getCurrentValue
186+ if cv > deadline
187+ then pure l
188+ else awaitStateUpdateTimeout esv 300 versionBeforeAct *> go deadline
177189
178190 (getCurrentValue, timeoutW64) = case timeout of
179191 WaitForEpochs (EpochInterval n) -> (unEpochNo <$> getCurrentEpochNo esv, fromIntegral n)
@@ -227,13 +239,47 @@ data EpochStateStatus
227239 -- ^ The background thread encountered an error while folding blocks
228240
229241-- | A read-only mutable pointer to an epoch state, updated automatically
230- newtype EpochStateView = EpochStateView
231- { epochStateView :: IORef (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ))
242+ data EpochStateView = EpochStateView
243+ { epochStateView :: ! ( TVar (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ) ))
232244 -- ^ Automatically updated current NewEpochState. 'Left' indicates the state is not yet available
233245 -- (either not initialised or an error occurred). 'Right' contains the latest epoch state.
234246 -- Use 'getEpochState', 'getBlockNumber', 'getSlotNumber' to access the values.
247+ , epochStateVersion :: ! (TVar Word64 )
248+ -- ^ Monotonically increasing counter, bumped on every state write.
249+ -- Used by 'awaitStateUpdateTimeout' to block until the next update.
235250 }
236251
252+ -- | Block until the epoch state version advances past the provided previously sampled
253+ -- version, or until the fallback timeout expires. Returns immediately if the current
254+ -- version already differs, so callers can sample before running an action and avoid
255+ -- missing updates that land during the action. Returns 'Nothing' on timeout.
256+ -- All threads blocked on the same 'EpochStateView' wake up on each update.
257+ awaitStateUpdateTimeout
258+ :: MonadIO m
259+ => EpochStateView
260+ -> DTC. NominalDiffTime -- ^ Fallback timeout
261+ -> Word64 -- ^ Previously sampled version
262+ -> m (Maybe (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )))
263+ awaitStateUpdateTimeout EpochStateView {epochStateVersion, epochStateView} timeout sinceVersion = runMaybeT $ fastResult <|> awaitedResult
264+ where
265+ -- Fast path: if the version already differs, read state and version atomically and return
266+ -- without allocating a 'registerDelay' timer. This avoids accumulating timer-queue entries
267+ -- when callers sample a stale version and an update has already landed.
268+ fastResult = mapMaybeT atomically $ do
269+ v <- lift $ readTVar epochStateVersion
270+ guard $ v /= sinceVersion
271+ lift $ readTVar epochStateView
272+
273+ awaitedResult = MaybeT $ do
274+ timedOutVar <- registerDelay . ceiling $ timeout * 1_000_000
275+ atomically $ do
276+ v <- readTVar epochStateVersion
277+ timedOut <- readTVar timedOutVar
278+ case (v /= sinceVersion, timedOut) of
279+ (True , _) -> Just <$> readTVar epochStateView
280+ (_, True ) -> pure Nothing
281+ _ -> STM. retry
282+
237283-- | Get epoch state from the view. If the state isn't available, retry waiting up to 25 seconds. Fails
238284-- immediately if the background thread encountered an error, or after 25 seconds if not yet initialised.
239285getEpochState
@@ -266,37 +312,56 @@ getSlotNumber
266312getSlotNumber epochStateView =
267313 withFrozenCallStack $ (\ (_, slotNumber, _) -> slotNumber) <$> getEpochStateDetails epochStateView
268314
269- -- | Utility function for accessing epoch state in 'IORef' .
270- -- Retries every 0.5s for up to 25 seconds while not initialised.
271- -- Fails immediately if the background fold thread encountered an error.
315+ -- | Access the current epoch state. Returns immediately if state is already available .
316+ -- Blocks up to 25 seconds waiting for initialisation if the background thread has not yet
317+ -- received any epoch state. Fails immediately if the background thread encountered an error.
272318getEpochStateDetails
273319 :: HasCallStack
274320 => MonadAssertion m
275321 => MonadTest m
276322 => MonadIO m
277323 => EpochStateView
278324 -> m (AnyNewEpochState , SlotNo , BlockNo )
279- getEpochStateDetails EpochStateView {epochStateView} =
280- withFrozenCallStack $ do
281- deadline <- liftIO $ DTC. addUTCTime 25 <$> DTC. getCurrentTime
282- go deadline
325+ getEpochStateDetails EpochStateView {epochStateView} = withFrozenCallStack $
326+ -- Fast path: read the TVar outside STM block so we don't register a pointless
327+ -- 'initTimeoutSeconds' timer on every call. These getters run inside tight
328+ -- retry loops, and the unused timer-queue entries would otherwise accumulate.
329+ readTVarIO epochStateView
330+ >>= awaitForState
331+ >>= failEpochStateFoldError
283332 where
284- go deadline = do
285- result <- H. evalIO $ readIORef epochStateView
286- case result of
287- Left (EpochStateFoldError err) -> do
288- H. note_ $ " EpochStateView background thread failed: " <> docToString (prettyError err)
289- H. failure
290- Left EpochStateNotInitialised -> do
291- currentTime <- liftIO DTC. getCurrentTime
292- if currentTime < deadline
293- then do
294- H. threadDelay 500_000
295- go deadline
296- else do
297- H. note_ " EpochStateView has not been initialised within 25 seconds"
298- H. failure
299- Right details -> pure details
333+ initTimeoutSeconds :: Int
334+ initTimeoutSeconds = 25
335+
336+ awaitForState
337+ :: MonadIO n
338+ => Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )
339+ -> n (Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo ))
340+ awaitForState = \ case
341+ Left EpochStateNotInitialised -> do
342+ -- register delay only when we're starting to retry
343+ timedOutVar <- registerDelay $ initTimeoutSeconds * 1_000_000
344+ atomically $ do
345+ state' <- readTVar epochStateView
346+ state' <$ case state' of
347+ -- retry until timeout
348+ Left EpochStateNotInitialised -> readTVar timedOutVar >>= guard
349+ _ -> pure ()
350+ state -> pure state
351+
352+ failEpochStateFoldError
353+ :: (HasCallStack , MonadTest n )
354+ => Either EpochStateStatus (AnyNewEpochState , SlotNo , BlockNo )
355+ -> n (AnyNewEpochState , SlotNo , BlockNo )
356+ failEpochStateFoldError = \ case
357+ Right details -> pure details
358+ Left (EpochStateFoldError err) -> do
359+ H. note_ $ " EpochStateView background thread failed: " <> docToString (prettyError err)
360+ H. failure
361+ Left EpochStateNotInitialised -> do
362+ H. note_ $ " EpochStateView has not been initialised within " <> show initTimeoutSeconds <> " seconds"
363+ H. failure
364+
300365
301366-- | Create a background thread listening for new epoch states. New epoch states are available to access
302367-- through 'EpochStateView', using query functions.
@@ -311,16 +376,21 @@ getEpochStateView
311376 -> SocketPath -- ^ node socket path
312377 -> m EpochStateView
313378getEpochStateView nodeConfigFile socketPath = withFrozenCallStack $ do
314- epochStateView <- H. evalIO $ newIORef $ Left EpochStateNotInitialised
379+ epochStateView <- H. evalIO $ newTVarIO $ Left EpochStateNotInitialised
380+ epochStateVersion <- H. evalIO $ newTVarIO 0
315381 void . asyncRegister_ $ do
316382 result <- runExceptT $ foldEpochState nodeConfigFile socketPath QuickValidation (EpochNo maxBound ) ()
317383 $ \ epochState slotNumber blockNumber -> do
318- liftIOAnnotated . writeIORef epochStateView $ Right (epochState, slotNumber, blockNumber)
384+ liftIOAnnotated . atomically $ do
385+ writeTVar epochStateView $ Right (epochState, slotNumber, blockNumber)
386+ modifyTVar' epochStateVersion (+ 1 )
319387 pure ConditionNotMet
320388 case result of
321- Left err -> writeIORef epochStateView $ Left $ EpochStateFoldError err
389+ Left err -> atomically $ do
390+ writeTVar epochStateView $ Left $ EpochStateFoldError err
391+ modifyTVar' epochStateVersion (+ 1 )
322392 Right _ -> pure ()
323- pure $ EpochStateView epochStateView
393+ pure $ EpochStateView epochStateView epochStateVersion
324394
325395-- | Retrieve all UTxOs map from the epoch state view.
326396findAllUtxos
0 commit comments