11{-# LANGUAGE FlexibleContexts #-}
2+ {-# LANGUAGE NamedFieldPuns #-}
23{-# LANGUAGE ScopedTypeVariables #-}
34
45module Ouroboros.Network.BlockFetch.ClientRegistry
56 ( -- * Registry of block fetch clients
67 FetchClientRegistry (.. )
78 , newFetchClientRegistry
89 , bracketFetchClient
9- , bracketKeepAliveClient
1010 , bracketSyncWithFetchClient
1111 , setFetchClientContext
1212 , FetchClientPolicy (.. )
1313 , readFetchClientsStatus
1414 , readFetchClientsStateVars
15- , readPeerGSVs
15+ -- * KeepAlive registry
16+ , module KeepAlive
1617 ) where
1718
1819import Data.Functor.Contravariant (contramap )
1920import Data.Map (Map )
2021import Data.Map qualified as Map
21- import Data.Set (Set )
2222import Data.Set qualified as Set
2323
2424import Control.Concurrent.Class.MonadSTM.Strict
@@ -32,8 +32,8 @@ import Control.Monad.Class.MonadTimer.SI
3232import Control.Tracer (Tracer )
3333
3434import Ouroboros.Network.BlockFetch.ClientState
35- import Ouroboros.Network.DeltaQ
3635import Ouroboros.Network.Diffusion.Policies (deactivateTimeout )
36+ import Ouroboros.Network.KeepAlive.Registry as KeepAlive
3737
3838
3939
@@ -48,31 +48,23 @@ import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
4848--
4949data FetchClientRegistry peer header block m =
5050 FetchClientRegistry {
51- fcrCtxVar
51+ ctxVar
5252 :: StrictTMVar
5353 m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
5454 , STM m (FetchClientPolicy header block m)
5555 ),
56- fcrFetchRegistry
56+ fetchRegistry
5757 :: StrictTVar m (Map peer (FetchClientStateVars m header)),
58- fcrSyncRegistry
59- :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
60- fcrDqRegistry
61- :: StrictTVar m (Map peer PeerGSV),
62- fcrKeepRegistry
63- :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
64- fcrDying
65- :: StrictTVar m (Set peer)
66- }
58+ syncRegistry
59+ :: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
60+ }
61+
6762
6863newFetchClientRegistry :: MonadSTM m
6964 => m (FetchClientRegistry peer header block m )
7065newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
7166 <*> newTVarIO Map. empty
7267 <*> newTVarIO Map. empty
73- <*> newTVarIO Map. empty
74- <*> newTVarIO Map. empty
75- <*> newTVarIO Set. empty
7668
7769-- | This is needed to start a block fetch client. It provides the required
7870-- 'FetchClientContext'. It registers and unregisters the fetch client on
@@ -83,12 +75,13 @@ newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
8375bracketFetchClient :: forall m a peer header block version .
8476 (MonadFork m , MonadMask m , MonadTimer m , Ord peer )
8577 => FetchClientRegistry peer header block m
78+ -> KeepAliveRegistry peer m
8679 -> version
8780 -> peer
8881 -> (FetchClientContext header block m -> m a )
8982 -> m a
90- bracketFetchClient ( FetchClientRegistry ctxVar
91- fetchRegistry syncRegistry dqRegistry keepRegistry dyingRegistry)
83+ bracketFetchClient FetchClientRegistry { ctxVar, fetchRegistry, syncRegistry }
84+ KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry }
9285 _version peer action = do
9386 ksVar <- newEmptyTMVarIO
9487 fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst )
@@ -214,7 +207,6 @@ bracketFetchClient (FetchClientRegistry ctxVar
214207 Map. delete peer m
215208
216209
217-
218210-- | The block fetch and chain sync clients for each peer need to synchronise
219211-- their startup and shutdown. This bracket operation provides that
220212-- synchronisation for the chain sync client.
@@ -229,8 +221,7 @@ bracketSyncWithFetchClient :: forall m a peer header block.
229221 -> peer
230222 -> m a
231223 -> m a
232- bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
233- _fetchRegistry syncRegistry _dqRegistry _keepRegistry _dyingRegistry) peer action = do
224+ bracketSyncWithFetchClient FetchClientRegistry { syncRegistry } peer action = do
234225 doneVar <- newEmptyTMVarIO
235226 startVar <- newEmptyTMVarIO
236227 bracket_ (register doneVar startVar) (unregister doneVar) action
@@ -265,68 +256,12 @@ bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
265256 assert (peer `Map.member` m) $
266257 Map. delete peer m
267258
268- bracketKeepAliveClient :: forall m a peer header block .
269- (MonadSTM m , MonadFork m , MonadMask m , Ord peer )
270- => FetchClientRegistry peer header block m
271- -> peer
272- -> (StrictTVar m (Map peer PeerGSV ) -> m a )
273- -> m a
274- bracketKeepAliveClient(FetchClientRegistry _ctxVar
275- _fetchRegistry _syncRegistry dqRegistry keepRegistry dyingRegistry) peer action = do
276- bracket_ register unregister (action dqRegistry)
277- where
278- -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
279- register :: m ()
280- register =
281- atomically $ do
282- -- Wait for previous keep alive client to cleanup
283- dr <- readTVar dqRegistry
284- check (peer `Map.notMember` dr)
285-
286- modifyTVar dqRegistry $ \ m ->
287- assert (peer `Map.notMember` m) $
288- Map. insert peer defaultGSV m
289-
290- -- It is possible for the keepAlive client to keep running even without a fetch client, but
291- -- a fetch client shouldn't run without a keepAlive client.
292- unregister :: m ()
293- unregister = uninterruptibleMask_ $ do
294- fetchclient_m <- atomically $ do
295- fetchclients <- readTVar keepRegistry
296- case Map. lookup peer fetchclients of
297- Nothing -> do
298- -- If the fetch client is already dead we remove PeerGSV ourself directly.
299- modifyTVar dqRegistry $ \ m ->
300- assert (peer `Map.member` m) $
301- Map. delete peer m
302- return Nothing
303- Just rc -> do
304- -- Prevent a new fetchclient from starting while we are killing the old one.
305- modifyTVar dyingRegistry $ \ s ->
306- assert (peer `Set.notMember` s) $
307- Set. insert peer s
308- return $ Just rc
309- case fetchclient_m of
310- Nothing -> return ()
311- Just (tid, doneVar) -> do
312- -- Cancel the fetch client.
313- throwTo tid AsyncCancelled
314- atomically $ do
315- -- wait for fetch client to exit.
316- readTMVar doneVar
317- modifyTVar dqRegistry $ \ m ->
318- assert (peer `Map.member` m) $
319- Map. delete peer m
320- modifyTVar dyingRegistry $ \ s ->
321- assert (peer `Set.member` s) $
322- Set. delete peer s
323-
324259setFetchClientContext :: MonadSTM m
325260 => FetchClientRegistry peer header block m
326261 -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header ))
327262 -> STM m (FetchClientPolicy header block m )
328263 -> m ()
329- setFetchClientContext ( FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
264+ setFetchClientContext FetchClientRegistry { ctxVar } tracer mkPolicy =
330265 atomically $ do
331266 ok <- tryPutTMVar ctxVar (tracer, mkPolicy)
332267 unless ok $ error " setFetchClientContext: called more than once"
@@ -337,29 +272,13 @@ setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
337272readFetchClientsStatus :: MonadSTM m
338273 => FetchClientRegistry peer header block m
339274 -> STM m (Map peer (PeerFetchStatus header ))
340- readFetchClientsStatus ( FetchClientRegistry _ registry _ _ _ _) =
341- readTVar registry >>= traverse (readTVar . fetchClientStatusVar)
275+ readFetchClientsStatus FetchClientRegistry { fetchRegistry } =
276+ readTVar fetchRegistry >>= traverse (readTVar . fetchClientStatusVar)
342277
343278-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
344279-- clients in the 'FetchClientRegistry'.
345280--
346281readFetchClientsStateVars :: MonadSTM m
347282 => FetchClientRegistry peer header block m
348283 -> STM m (Map peer (FetchClientStateVars m header ))
349- readFetchClientsStateVars (FetchClientRegistry _ registry _ _ _ _) = readTVar registry
350-
351- -- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
352- -- clients in the 'FetchClientRegistry'.
353- --
354- readPeerGSVs :: forall block header m peer .
355- ( MonadSTM m , Ord peer )
356- => FetchClientRegistry peer header block m
357- -> STM m (Map peer PeerGSV )
358- readPeerGSVs (FetchClientRegistry _ _ _ dqRegistry keepRegistry _) = do
359- dr <- readTVar dqRegistry
360- kr <- readTVar keepRegistry
361- -- The intersection gives us only the currently hot peers
362- return $ Map. intersection dr kr
363-
364-
365-
284+ readFetchClientsStateVars FetchClientRegistry { fetchRegistry } = readTVar fetchRegistry
0 commit comments