diff --git a/cardano-diffusion/demo/chain-sync.hs b/cardano-diffusion/demo/chain-sync.hs index 32efff35323..0bceb61918c 100644 --- a/cardano-diffusion/demo/chain-sync.hs +++ b/cardano-diffusion/demo/chain-sync.hs @@ -74,7 +74,7 @@ import Ouroboros.Network.Protocol.BlockFetch.Type qualified as BlockFetch import Ouroboros.Network.BlockFetch import Ouroboros.Network.BlockFetch.Client -import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientRegistry (..)) +import Ouroboros.Network.BlockFetch.ClientRegistry (KeepAliveRegistry (..)) import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..), initialWithFingerprint) import Ouroboros.Network.DeltaQ (defaultGSV) @@ -348,10 +348,10 @@ demoProtocol3 chainSync blockFetch = -- We run `chain-sync` and `block-fetch`, without `keep-alive`, so we need to -- register peers in `dqRegistry`. -bracketDqRegistry :: FetchClientRegistry (ConnectionId LocalAddress) header block IO +bracketDqRegistry :: KeepAliveRegistry (ConnectionId LocalAddress) IO -> ConnectionId LocalAddress -> IO a -> IO a -bracketDqRegistry FetchClientRegistry { fcrDqRegistry = dqRegistry } peer k = +bracketDqRegistry KeepAliveRegistry { dqRegistry } peer k = bracket (atomically (modifyTVar dqRegistry (Map.insert peer defaultGSV))) (\_ -> atomically (modifyTVar dqRegistry (Map.delete peer))) (\_ -> k) @@ -360,7 +360,8 @@ clientBlockFetch :: [FilePath] -> Maybe SlotNo -> IO () clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do - registry <- newFetchClientRegistry + blockFetchRegistry <- newFetchClientRegistry + keepAliveRegistry <- newKeepAliveRegistry blockHeap <- mkTestFetchedBlockHeap [] candidateChainsVar <- newTVarIO Map.empty @@ -396,7 +397,7 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do unregister _ = atomically $ modifyTVar candidateChainsVar (Map.delete connId) - in bracketSyncWithFetchClient registry connId $ + in bracketSyncWithFetchClient blockFetchRegistry connId $ bracket register unregister $ \chainVar -> runPeer nullTracer -- (contramap (show . TraceLabelPeer ("chain-sync", getFilePath $ remoteAddress connId)) stdoutTracer) @@ -410,8 +411,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do blockFetch = InitiatorProtocolOnly $ MiniProtocolCb $ \MinimalInitiatorContext { micConnectionId = connId } channel -> - bracketDqRegistry registry connId $ - bracketFetchClient registry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do + bracketDqRegistry keepAliveRegistry connId $ + bracketFetchClient blockFetchRegistry keepAliveRegistry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do threadDelay 1000000 runPipelinedPeer nullTracer -- (contramap (show . TraceLabelPeer ("block-fetch", getFilePath $ remoteAddress connId)) stdoutTracer) @@ -512,7 +513,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do (contramap show stdoutTracer) -- decisionTracer (contramap show stdoutTracer) -- state tracer blockFetchPolicy - registry + blockFetchRegistry + keepAliveRegistry (BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, diff --git a/cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs b/cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs index 9236cf4cf68..45b8800696c 100644 --- a/cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs +++ b/cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs @@ -511,6 +511,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node channel -> do labelThisThread "BlockFetchClient" bracketFetchClient (nkFetchClientRegistry nodeKernel) + (nkKeepAliveRegistry nodeKernel) UnversionedProtocol remoteAddress $ \clientCtx -> @@ -574,7 +575,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node remoteAddress ctxVar (KeepAliveInterval aaKeepAliveInterval)) - bracketKeepAliveClient (nkFetchClientRegistry nodeKernel) + bracketKeepAliveClient (nkKeepAliveRegistry nodeKernel) remoteAddress kacApp diff --git a/ouroboros-network/changelog.d/20260514_173638_coot_bracketKeepAliveClient.md b/ouroboros-network/changelog.d/20260514_173638_coot_bracketKeepAliveClient.md new file mode 100644 index 00000000000..ed6455237da --- /dev/null +++ b/ouroboros-network/changelog.d/20260514_173638_coot_bracketKeepAliveClient.md @@ -0,0 +1,29 @@ + + +### Breaking + +- Split `FetchClientRegistry` into two parts + - `FetchClientRegistry` - block-fetch related + - `KeepAliveRegistry` - keep-alive related + Added `newKeepAliveRegistry` to create `KeepAliveRegistry`, it should be + called along side `newFetchClientRegistry` whenever `block-fetch` is used. +- `FetchClientRegistry` record fields where renamed, the `fcr` prefix was + dropped, `KeepAliveRegistry` field names were kept without the prefix too. + + + diff --git a/ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs b/ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs index adfd52df187..d2375bae5c0 100644 --- a/ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs +++ b/ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs @@ -95,6 +95,9 @@ module Ouroboros.Network.BlockFetch , newFetchClientRegistry , bracketFetchClient , bracketSyncWithFetchClient + -- * The 'KeepAliveRegistry' + , KeepAliveRegistry + , newKeepAliveRegistry , bracketKeepAliveClient -- * Re-export types used by 'BlockFetchConsensusInterface' , PraosFetchMode (..) @@ -117,8 +120,9 @@ import Ouroboros.Network.Block import Ouroboros.Network.SizeInBytes (SizeInBytes) import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientPolicy (..), - FetchClientRegistry, bracketFetchClient, bracketKeepAliveClient, - bracketSyncWithFetchClient, newFetchClientRegistry, + FetchClientRegistry, KeepAliveRegistry, bracketFetchClient, + bracketKeepAliveClient, bracketSyncWithFetchClient, + newFetchClientRegistry, newKeepAliveRegistry, readFetchClientsStateVars, readFetchClientsStatus, readPeerGSVs, setFetchClientContext) import Ouroboros.Network.BlockFetch.ConsensusInterface @@ -185,14 +189,16 @@ blockFetchLogic :: forall addr header block m. -> Tracer m (TraceLabelPeer addr (TraceFetchClientState header)) -> BlockFetchConsensusInterface addr header block m -> FetchClientRegistry addr header block m + -> KeepAliveRegistry addr m -> BlockFetchConfiguration -> m Void blockFetchLogic decisionTracer clientStateTracer BlockFetchConsensusInterface{..} - registry + blockFetchRegistry + keepAliveRegistry BlockFetchConfiguration{..} = do - setFetchClientContext registry clientStateTracer mkFetchClientPolicy + setFetchClientContext blockFetchRegistry clientStateTracer mkFetchClientPolicy fetchLogicIterations decisionTracer clientStateTracer @@ -230,7 +236,7 @@ blockFetchLogic decisionTracer clientStateTracer FetchTriggerVariables { readStateCurrentChain = readCurrentChain, readStateCandidateChains = readCandidateChains, - readStatePeerStatus = readFetchClientsStatus registry, + readStatePeerStatus = readFetchClientsStatus blockFetchRegistry, readStateChainComparison = readChainComparison } @@ -238,8 +244,8 @@ blockFetchLogic decisionTracer clientStateTracer fetchNonTriggerVariables = FetchNonTriggerVariables { readStateFetchedBlocks = readFetchedBlocks, - readStatePeerStateVars = readFetchClientsStateVars registry, - readStatePeerGSVs = readPeerGSVs registry, + readStatePeerStateVars = readFetchClientsStateVars blockFetchRegistry, + readStatePeerGSVs = readPeerGSVs keepAliveRegistry, readStateFetchMode = readFetchMode, readStateFetchedMaxSlotNo = readFetchedMaxSlotNo, readStateChainSelStarvation = readChainSelStarvation diff --git a/ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs b/ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs index 2379cf215a8..cb398b6c909 100644 --- a/ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs +++ b/ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs @@ -1,4 +1,5 @@ {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} module Ouroboros.Network.BlockFetch.ClientRegistry @@ -6,19 +7,18 @@ module Ouroboros.Network.BlockFetch.ClientRegistry FetchClientRegistry (..) , newFetchClientRegistry , bracketFetchClient - , bracketKeepAliveClient , bracketSyncWithFetchClient , setFetchClientContext , FetchClientPolicy (..) , readFetchClientsStatus , readFetchClientsStateVars - , readPeerGSVs + -- * KeepAlive registry + , module KeepAlive ) where import Data.Functor.Contravariant (contramap) import Data.Map (Map) import Data.Map qualified as Map -import Data.Set (Set) import Data.Set qualified as Set import Control.Concurrent.Class.MonadSTM.Strict @@ -32,8 +32,8 @@ import Control.Monad.Class.MonadTimer.SI import Control.Tracer (Tracer) import Ouroboros.Network.BlockFetch.ClientState -import Ouroboros.Network.DeltaQ import Ouroboros.Network.Diffusion.Policies (deactivateTimeout) +import Ouroboros.Network.KeepAlive.Registry as KeepAlive @@ -48,31 +48,23 @@ import Ouroboros.Network.Diffusion.Policies (deactivateTimeout) -- data FetchClientRegistry peer header block m = FetchClientRegistry { - fcrCtxVar + ctxVar :: StrictTMVar m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header)) , STM m (FetchClientPolicy header block m) ), - fcrFetchRegistry + fetchRegistry :: StrictTVar m (Map peer (FetchClientStateVars m header)), - fcrSyncRegistry - :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())), - fcrDqRegistry - :: StrictTVar m (Map peer PeerGSV), - fcrKeepRegistry - :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())), - fcrDying - :: StrictTVar m (Set peer) - } + syncRegistry + :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())) + } + newFetchClientRegistry :: MonadSTM m => m (FetchClientRegistry peer header block m) newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO <*> newTVarIO Map.empty <*> newTVarIO Map.empty - <*> newTVarIO Map.empty - <*> newTVarIO Map.empty - <*> newTVarIO Set.empty -- | This is needed to start a block fetch client. It provides the required -- 'FetchClientContext'. It registers and unregisters the fetch client on @@ -83,12 +75,13 @@ newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO bracketFetchClient :: forall m a peer header block version. (MonadFork m, MonadMask m, MonadTimer m, Ord peer) => FetchClientRegistry peer header block m + -> KeepAliveRegistry peer m -> version -> peer -> (FetchClientContext header block m -> m a) -> m a -bracketFetchClient (FetchClientRegistry ctxVar - fetchRegistry syncRegistry dqRegistry keepRegistry dyingRegistry) +bracketFetchClient FetchClientRegistry { ctxVar, fetchRegistry, syncRegistry } + KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry } _version peer action = do ksVar <- newEmptyTMVarIO fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst) @@ -214,7 +207,6 @@ bracketFetchClient (FetchClientRegistry ctxVar Map.delete peer m - -- | The block fetch and chain sync clients for each peer need to synchronise -- their startup and shutdown. This bracket operation provides that -- synchronisation for the chain sync client. @@ -229,8 +221,7 @@ bracketSyncWithFetchClient :: forall m a peer header block. -> peer -> m a -> m a -bracketSyncWithFetchClient (FetchClientRegistry _ctxVar - _fetchRegistry syncRegistry _dqRegistry _keepRegistry _dyingRegistry) peer action = do +bracketSyncWithFetchClient FetchClientRegistry { syncRegistry } peer action = do doneVar <- newEmptyTMVarIO startVar <- newEmptyTMVarIO bracket_ (register doneVar startVar) (unregister doneVar) action @@ -265,68 +256,12 @@ bracketSyncWithFetchClient (FetchClientRegistry _ctxVar assert (peer `Map.member` m) $ Map.delete peer m -bracketKeepAliveClient :: forall m a peer header block. - (MonadSTM m, MonadFork m, MonadMask m, Ord peer) - => FetchClientRegistry peer header block m - -> peer - -> (StrictTVar m (Map peer PeerGSV) -> m a) - -> m a -bracketKeepAliveClient(FetchClientRegistry _ctxVar - _fetchRegistry _syncRegistry dqRegistry keepRegistry dyingRegistry) peer action = do - bracket_ register unregister (action dqRegistry) - where - -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it. - register :: m () - register = - atomically $ do - -- Wait for previous keep alive client to cleanup - dr <- readTVar dqRegistry - check (peer `Map.notMember` dr) - - modifyTVar dqRegistry $ \m -> - assert (peer `Map.notMember` m) $ - Map.insert peer defaultGSV m - - -- It is possible for the keepAlive client to keep running even without a fetch client, but - -- a fetch client shouldn't run without a keepAlive client. - unregister :: m () - unregister = uninterruptibleMask_ $ do - fetchclient_m <- atomically $ do - fetchclients <- readTVar keepRegistry - case Map.lookup peer fetchclients of - Nothing -> do - -- If the fetch client is already dead we remove PeerGSV ourself directly. - modifyTVar dqRegistry $ \m -> - assert (peer `Map.member` m) $ - Map.delete peer m - return Nothing - Just rc -> do - -- Prevent a new fetchclient from starting while we are killing the old one. - modifyTVar dyingRegistry $ \s -> - assert (peer `Set.notMember` s) $ - Set.insert peer s - return $ Just rc - case fetchclient_m of - Nothing -> return () - Just (tid, doneVar) -> do - -- Cancel the fetch client. - throwTo tid AsyncCancelled - atomically $ do - -- wait for fetch client to exit. - readTMVar doneVar - modifyTVar dqRegistry $ \m -> - assert (peer `Map.member` m) $ - Map.delete peer m - modifyTVar dyingRegistry $ \s -> - assert (peer `Set.member` s) $ - Set.delete peer s - setFetchClientContext :: MonadSTM m => FetchClientRegistry peer header block m -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header)) -> STM m (FetchClientPolicy header block m) -> m () -setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy = +setFetchClientContext FetchClientRegistry { ctxVar } tracer mkPolicy = atomically $ do ok <- tryPutTMVar ctxVar (tracer, mkPolicy) unless ok $ error "setFetchClientContext: called more than once" @@ -337,8 +272,8 @@ setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy = readFetchClientsStatus :: MonadSTM m => FetchClientRegistry peer header block m -> STM m (Map peer (PeerFetchStatus header)) -readFetchClientsStatus (FetchClientRegistry _ registry _ _ _ _) = - readTVar registry >>= traverse (readTVar . fetchClientStatusVar) +readFetchClientsStatus FetchClientRegistry { fetchRegistry } = + readTVar fetchRegistry >>= traverse (readTVar . fetchClientStatusVar) -- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch -- clients in the 'FetchClientRegistry'. @@ -346,20 +281,4 @@ readFetchClientsStatus (FetchClientRegistry _ registry _ _ _ _) = readFetchClientsStateVars :: MonadSTM m => FetchClientRegistry peer header block m -> STM m (Map peer (FetchClientStateVars m header)) -readFetchClientsStateVars (FetchClientRegistry _ registry _ _ _ _) = readTVar registry - --- | A read-only 'STM' action to get the 'PeerGSV's for all fetch --- clients in the 'FetchClientRegistry'. --- -readPeerGSVs :: forall block header m peer. - ( MonadSTM m, Ord peer) - => FetchClientRegistry peer header block m - -> STM m (Map peer PeerGSV) -readPeerGSVs (FetchClientRegistry _ _ _ dqRegistry keepRegistry _) = do - dr <- readTVar dqRegistry - kr <- readTVar keepRegistry - -- The intersection gives us only the currently hot peers - return $ Map.intersection dr kr - - - +readFetchClientsStateVars FetchClientRegistry { fetchRegistry } = readTVar fetchRegistry diff --git a/ouroboros-network/lib/Ouroboros/Network/KeepAlive/Registry.hs b/ouroboros-network/lib/Ouroboros/Network/KeepAlive/Registry.hs new file mode 100644 index 00000000000..d4b3ccf9dd4 --- /dev/null +++ b/ouroboros-network/lib/Ouroboros/Network/KeepAlive/Registry.hs @@ -0,0 +1,109 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Ouroboros.Network.KeepAlive.Registry + ( KeepAliveRegistry (..) + , newKeepAliveRegistry + , bracketKeepAliveClient + , readPeerGSVs + ) where + +import Data.Map (Map) +import Data.Map qualified as Map +import Data.Set (Set) +import Data.Set qualified as Set + +import Control.Concurrent.Class.MonadSTM.Strict +import Control.Exception (assert) +import Control.Monad.Class.MonadAsync +import Control.Monad.Class.MonadFork +import Control.Monad.Class.MonadThrow + +import Ouroboros.Network.DeltaQ + +-- | A registry which keeps `PeerGSV` information based on `keep-alive` +-- measurements. +-- +data KeepAliveRegistry peer m = KeepAliveRegistry { + dqRegistry + :: StrictTVar m (Map peer PeerGSV), + keepRegistry + :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())), + dyingRegistry + :: StrictTVar m (Set peer) + } + +newKeepAliveRegistry :: MonadSTM m + => m (KeepAliveRegistry peer m) +newKeepAliveRegistry = KeepAliveRegistry <$> newTVarIO Map.empty + <*> newTVarIO Map.empty + <*> newTVarIO Set.empty + +bracketKeepAliveClient :: forall m a peer. + (MonadSTM m, MonadFork m, MonadMask m, Ord peer) + => KeepAliveRegistry peer m + -> peer + -> (StrictTVar m (Map peer PeerGSV) -> m a) + -> m a +bracketKeepAliveClient KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry } peer action = do + bracket_ register unregister (action dqRegistry) + where + -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it. + register :: m () + register = + atomically $ do + -- Wait for previous keep alive client to cleanup + dr <- readTVar dqRegistry + check (peer `Map.notMember` dr) + + modifyTVar dqRegistry $ \m -> + assert (peer `Map.notMember` m) $ + Map.insert peer defaultGSV m + + -- It is possible for the keepAlive client to keep running even without a fetch client, but + -- a fetch client shouldn't run without a keepAlive client. + unregister :: m () + unregister = uninterruptibleMask_ $ do + fetchclient_m <- atomically $ do + fetchclients <- readTVar keepRegistry + case Map.lookup peer fetchclients of + Nothing -> do + -- If the fetch client is already dead we remove PeerGSV ourself directly. + modifyTVar dqRegistry $ \m -> + assert (peer `Map.member` m) $ + Map.delete peer m + return Nothing + Just rc -> do + -- Prevent a new fetchclient from starting while we are killing the old one. + modifyTVar dyingRegistry $ \s -> + assert (peer `Set.notMember` s) $ + Set.insert peer s + return $ Just rc + case fetchclient_m of + Nothing -> return () + Just (tid, doneVar) -> do + -- Cancel the fetch client. + throwTo tid AsyncCancelled + atomically $ do + -- wait for fetch client to exit. + readTMVar doneVar + modifyTVar dqRegistry $ \m -> + assert (peer `Map.member` m) $ + Map.delete peer m + modifyTVar dyingRegistry $ \s -> + assert (peer `Set.member` s) $ + Set.delete peer s + +-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch +-- clients in the 'FetchClientRegistry'. +-- +readPeerGSVs :: forall m peer. + ( MonadSTM m, Ord peer) + => KeepAliveRegistry peer m + -> STM m (Map peer PeerGSV) +readPeerGSVs KeepAliveRegistry { dqRegistry, keepRegistry } = do + dr <- readTVar dqRegistry + kr <- readTVar keepRegistry + -- The intersection gives us only the currently hot peers + return $ Map.intersection dr kr diff --git a/ouroboros-network/ouroboros-network.cabal b/ouroboros-network/ouroboros-network.cabal index 74de4ca71a5..99feb26d15d 100644 --- a/ouroboros-network/ouroboros-network.cabal +++ b/ouroboros-network/ouroboros-network.cabal @@ -233,6 +233,7 @@ library Ouroboros.Network.Diffusion.Utils Ouroboros.Network.ExitPolicy Ouroboros.Network.KeepAlive + Ouroboros.Network.KeepAlive.Registry Ouroboros.Network.PeerSelection Ouroboros.Network.PeerSelection.Churn Ouroboros.Network.PeerSelection.Governor diff --git a/ouroboros-network/tests/lib/Ouroboros/Network/BlockFetch/Examples.hs b/ouroboros-network/tests/lib/Ouroboros/Network/BlockFetch/Examples.hs index b496bc09a6c..14c13c76c2c 100644 --- a/ouroboros-network/tests/lib/Ouroboros/Network/BlockFetch/Examples.hs +++ b/ouroboros-network/tests/lib/Ouroboros/Network/BlockFetch/Examples.hs @@ -84,7 +84,8 @@ blockFetchExample0 fetchMode decisionTracer clientStateTracer clientMsgTracer controlMessageSTM currentChain candidateChain = do - registry <- newFetchClientRegistry :: m (FetchClientRegistry Int BlockHeader Block m) + blockFetchRegistry <- newFetchClientRegistry :: m (FetchClientRegistry Int BlockHeader Block m) + keepAliveRegistry <- newKeepAliveRegistry :: m (KeepAliveRegistry Int m) blockHeap <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain) (clientAsync, serverAsync, syncClientAsync, keepAliveAsync) @@ -93,14 +94,16 @@ blockFetchExample0 fetchMode decisionTracer clientStateTracer clientMsgTracer (contramap (TraceLabelPeer peerno) serverMsgTracer) (maxBound :: NodeToNodeVersion) clientDelay serverDelay - registry peerno + blockFetchRegistry + keepAliveRegistry + peerno (blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer) (mockBlockFetchServer1 candidateChain) fetchAsync <- async $ do threadId <- myThreadId labelThread threadId "block-fetch-logic" - blockFetch registry blockHeap + blockFetch blockFetchRegistry keepAliveRegistry blockHeap driverAsync <- async $ do threadId <- myThreadId labelThread threadId "driver" @@ -132,13 +135,15 @@ blockFetchExample0 fetchMode decisionTracer clientStateTracer clientMsgTracer : map blockPoint (AnchoredFragment.toOldestFirst c) blockFetch :: FetchClientRegistry Int BlockHeader Block m + -> KeepAliveRegistry Int m -> TestFetchedBlockHeap m Block -> m () - blockFetch registry blockHeap = + blockFetch blockFetchRegistry keepAliveRegistry blockHeap = blockFetchLogic decisionTracer clientStateTracer (sampleBlockFetchPolicy1 fetchMode headerForgeUTCTime blockHeap currentChainHeaders candidateChainHeaders) - registry + blockFetchRegistry + keepAliveRegistry (BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, @@ -198,7 +203,8 @@ blockFetchExample1 fetchMode decisionTracer clientStateTracer clientMsgTracer controlMessageVar <- newTVarIO Continue let controlMessageSTM = readTVar controlMessageVar - registry <- newFetchClientRegistry + blockFetchRegistry <- newFetchClientRegistry + keepAliveRegistry <- newKeepAliveRegistry blockHeap <- mkTestFetchedBlockHeap (anchoredChainPoints currentChain) peerAsyncs <- sequence @@ -207,7 +213,9 @@ blockFetchExample1 fetchMode decisionTracer clientStateTracer clientMsgTracer (contramap (TraceLabelPeer peerno) serverMsgTracer) (maxBound :: NodeToNodeVersion) clientDelay serverDelay - registry peerno + blockFetchRegistry + keepAliveRegistry + peerno (blockFetchClient (maxBound :: NodeToNodeVersion) controlMessageSTM nullTracer) (mockBlockFetchServer1 candidateChain) | (peerno, candidateChain) <- zip [1..] candidateChains @@ -215,7 +223,7 @@ blockFetchExample1 fetchMode decisionTracer clientStateTracer clientMsgTracer fetchAsync <- async $ do threadId <- myThreadId labelThread threadId "block-fetch-logic" - blockFetch registry blockHeap + blockFetch blockFetchRegistry keepAliveRegistry blockHeap driverAsync <- async $ do threadId <- myThreadId labelThread threadId "block-fetch-driver" @@ -246,13 +254,15 @@ blockFetchExample1 fetchMode decisionTracer clientStateTracer clientMsgTracer : map blockPoint (AnchoredFragment.toOldestFirst c) blockFetch :: FetchClientRegistry Int BlockHeader Block m + -> KeepAliveRegistry Int m -> TestFetchedBlockHeap m Block -> m () - blockFetch registry blockHeap = + blockFetch blockFetchRegistry keepAliveRegistry blockHeap = blockFetchLogic decisionTracer clientStateTracer (sampleBlockFetchPolicy1 fetchMode headerForgeUTCTime blockHeap currentChainHeaders candidateChainHeaders) - registry + blockFetchRegistry + keepAliveRegistry (BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, @@ -355,13 +365,14 @@ runFetchClient :: ( MonadAsync m => Tracer m (TraceSendRecv (BlockFetch block point)) -> version -> FetchClientRegistry peerid header block m + -> KeepAliveRegistry peerid m -> peerid -> Channel m LBS.ByteString -> ( FetchClientContext header block m -> ClientPipelined (BlockFetch block point) BFIdle m a) -> m a -runFetchClient tracer version registry peerid channel client = - bracketFetchClient registry version peerid $ \clientCtx -> +runFetchClient tracer version blockFetchRegistry keepAliveRegistry peerid channel client = + bracketFetchClient blockFetchRegistry keepAliveRegistry version peerid $ \clientCtx -> fst <$> runPipelinedPeerWithLimits tracer codec (byteLimitsBlockFetch (fromIntegral . LBS.length)) timeLimitsBlockFetch channel (client clientCtx) @@ -418,6 +429,7 @@ runFetchClientAndServerAsync -> Maybe DiffTime -- ^ client's channel delay -> Maybe DiffTime -- ^ server's channel delay -> FetchClientRegistry peerid header block m + -> KeepAliveRegistry peerid m -> peerid -> ( FetchClientContext header block m -> ClientPipelined (BlockFetch block (Point block)) BFIdle m a) @@ -426,7 +438,9 @@ runFetchClientAndServerAsync runFetchClientAndServerAsync clientTracer serverTracer version clientDelay serverDelay - registry peerid client server = do + blockFetchRegistry + keepAliveRegistry + peerid client server = do (clientChannel, serverChannel) <- createConnectedChannels clientAsync <- async $ do @@ -435,7 +449,9 @@ runFetchClientAndServerAsync clientTracer serverTracer runFetchClient clientTracer version - registry peerid + blockFetchRegistry + keepAliveRegistry + peerid (fromMaybe id (delayChannel <$> clientDelay) clientChannel) client @@ -454,13 +470,14 @@ runFetchClientAndServerAsync clientTracer serverTracer threadId <- myThreadId labelThread threadId ("registry-" ++ show peerid) bracketSyncWithFetchClient - registry peerid + blockFetchRegistry + peerid (forever (threadDelay 1000) >> return ()) keepAliveAsync <- async $ do threadId <- myThreadId labelThread threadId ("keep-alive-" ++ show peerid) bracketKeepAliveClient - registry peerid + keepAliveRegistry peerid (\_ -> forever (threadDelay 1000) >> return ()) return (clientAsync, serverAsync, syncClientAsync, keepAliveAsync) diff --git a/ouroboros-network/tests/lib/Test/Ouroboros/Network/BlockFetch.hs b/ouroboros-network/tests/lib/Test/Ouroboros/Network/BlockFetch.hs index 11680fbb195..bb81343772c 100644 --- a/ouroboros-network/tests/lib/Test/Ouroboros/Network/BlockFetch.hs +++ b/ouroboros-network/tests/lib/Test/Ouroboros/Network/BlockFetch.hs @@ -829,20 +829,26 @@ unit_bracketSyncWithFetchClient step = do -> ((forall c. m c -> m c) -> m d) -> m (Either SomeException a, Either SomeException b) testSkeleton withFetchTestAction withSyncTestAction withKeepAliveTestAction = do - registry <- newFetchClientRegistry - setFetchClientContext registry nullTracer dummyPolicy + blockFetchRegistry <- newFetchClientRegistry + keepAliveRegistry <- newKeepAliveRegistry + setFetchClientContext blockFetchRegistry nullTracer dummyPolicy fetchStatePeerChainsVar <- newTVarIO Map.empty let peer = "thepeer" fetch :: m a - fetch = withFetchTestAction $ \body -> - bracketFetchClient registry (maxBound @NodeToNodeVersion) peer $ \_ -> - body + fetch = withFetchTestAction + $ \body -> + bracketFetchClient + blockFetchRegistry + keepAliveRegistry + (maxBound @NodeToNodeVersion) + peer + $ \_ -> body sync :: m b sync = withSyncTestAction $ \body -> - bracketSyncWithFetchClient registry peer $ + bracketSyncWithFetchClient blockFetchRegistry peer $ bracket_ (atomically (modifyTVar fetchStatePeerChainsVar (Map.insert peer ()))) @@ -852,13 +858,13 @@ unit_bracketSyncWithFetchClient step = do keep :: m d keep = withKeepAliveTestAction $ \body -> - bracketKeepAliveClient registry peer $ const body + bracketKeepAliveClient keepAliveRegistry peer $ const body logic :: (Map String (PeerFetchStatus BlockHeader), Map String ()) -> m () logic fingerprint = do fingerprint' <- atomically $ do - fetchStatePeerStates <- readFetchClientsStatus registry + fetchStatePeerStates <- readFetchClientsStatus blockFetchRegistry fetchStatePeerChains <- readTVar fetchStatePeerChainsVar let fingerprint' = (fetchStatePeerStates, fetchStatePeerChains) check (fingerprint' /= fingerprint) @@ -894,11 +900,11 @@ unit_bracketSyncWithFetchClient step = do syncRes <- waitCatch syncAsync void $ waitCatch keepAsync atomically $ do - fr <- readTVar $ fcrFetchRegistry registry - sr <- readTVar $ fcrSyncRegistry registry - dr <- readTVar $ fcrDqRegistry registry - kr <- readTVar $ fcrKeepRegistry registry - yr <- readTVar $ fcrDying registry + fr <- readTVar $ fetchRegistry blockFetchRegistry + sr <- readTVar $ syncRegistry blockFetchRegistry + dr <- readTVar $ dqRegistry keepAliveRegistry + kr <- readTVar $ keepRegistry keepAliveRegistry + yr <- readTVar $ dyingRegistry keepAliveRegistry if and [Map.null fr, Map.null sr, Map.null dr, Map.null kr, Set.null yr] then return () else error "state leak" diff --git a/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node.hs b/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node.hs index f4d87d94668..09a6723921f 100644 --- a/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node.hs +++ b/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node.hs @@ -361,6 +361,7 @@ run blockGeneratorArgs ni na tracerBlockFetch (blockFetchPolicy nodeKernel) (nkFetchClientRegistry nodeKernel) + (nkKeepAliveRegistry nodeKernel) (BlockFetchConfiguration { bfcMaxConcurrencyBulkSync = 1, bfcMaxConcurrencyDeadline = 2, diff --git a/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node/Kernel.hs b/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node/Kernel.hs index 63b18872538..21683fc3854 100644 --- a/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node/Kernel.hs +++ b/ouroboros-network/tests/lib/Test/Ouroboros/Network/Diffusion/Node/Kernel.hs @@ -294,6 +294,9 @@ data NodeKernel header block s txid m = NodeKernel { nkFetchClientRegistry :: FetchClientRegistry NtNAddr header block m, + nkKeepAliveRegistry + :: KeepAliveRegistry NtNAddr m, + nkPeerSharingRegistry :: PeerSharingRegistry NtNAddr m, @@ -338,6 +341,7 @@ newNodeKernel psRng txSeed txs = do <$> newTVarIO Map.empty <*> newTVarIO (ChainProducerState Chain.Genesis Map.empty 0) <*> newFetchClientRegistry + <*> newKeepAliveRegistry <*> newPeerSharingRegistry <*> ChainDB.newChainDB <*> newPeerSharingAPI publicStateVar psRng diff --git a/ouroboros-network/tests/lib/Test/Ouroboros/Network/KeepAlive.hs b/ouroboros-network/tests/lib/Test/Ouroboros/Network/KeepAlive.hs index f011d5417f0..6a4aa55c619 100644 --- a/ouroboros-network/tests/lib/Test/Ouroboros/Network/KeepAlive.hs +++ b/ouroboros-network/tests/lib/Test/Ouroboros/Network/KeepAlive.hs @@ -43,7 +43,7 @@ tests = testGroup "KeepAlive" [ testProperty "KeepAlive Convergence" prop_keepAlive_convergence] runKeepAliveClient - :: forall m peer header block. + :: forall m peer. ( MonadAsync m , MonadEvaluate m , MonadFork m @@ -55,7 +55,7 @@ runKeepAliveClient => Tracer m (TraceKeepAliveClient peer) -> StdGen -> ControlMessageSTM m - -> FetchClientRegistry peer header block m + -> KeepAliveRegistry peer m -> peer -> Channel m BL.ByteString -> KeepAliveInterval @@ -94,7 +94,7 @@ runKeepAliveServer channel = $ keepAliveServer runKeepAliveClientAndServer - :: forall m peer header block. + :: forall m peer. ( MonadAsync m , MonadDelay m , MonadEvaluate m @@ -110,7 +110,7 @@ runKeepAliveClientAndServer -> Int -> Tracer m (TraceKeepAliveClient peer) -> ControlMessageSTM m - -> FetchClientRegistry peer header block m + -> KeepAliveRegistry peer m -> peer -> KeepAliveInterval -> m (Async m ((), Maybe BL.ByteString), Async m ((), Maybe BL.ByteString)) @@ -149,7 +149,7 @@ prop_keepAlive_convergenceM -> Int -> m () prop_keepAlive_convergenceM tracer (NetworkDelay nd) seed = do - registry <- newFetchClientRegistry + registry <- newKeepAliveRegistry controlMessageV <- newTVarIO Continue let controlMessageSTM = readTVar controlMessageV clientId = "client"