Skip to content

Commit 0fabb69

Browse files
committed
block-fetch: split FetchClientRegistry
As a consequence `bracketFetchClient` is decoupled from `bracketKeepAlive`.
1 parent e0eafca commit 0fabb69

10 files changed

Lines changed: 153 additions & 79 deletions

File tree

cardano-diffusion/demo/chain-sync.hs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ import Ouroboros.Network.Protocol.BlockFetch.Type qualified as BlockFetch
7474

7575
import Ouroboros.Network.BlockFetch
7676
import Ouroboros.Network.BlockFetch.Client
77-
import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientRegistry (..))
77+
import Ouroboros.Network.BlockFetch.ClientRegistry (KeepAliveRegistry (..))
7878
import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..),
7979
initialWithFingerprint)
8080
import Ouroboros.Network.DeltaQ (defaultGSV)
@@ -348,10 +348,10 @@ demoProtocol3 chainSync blockFetch =
348348

349349
-- We run `chain-sync` and `block-fetch`, without `keep-alive`, so we need to
350350
-- register peers in `dqRegistry`.
351-
bracketDqRegistry :: FetchClientRegistry (ConnectionId LocalAddress) header block IO
351+
bracketDqRegistry :: KeepAliveRegistry (ConnectionId LocalAddress) IO
352352
-> ConnectionId LocalAddress
353353
-> IO a -> IO a
354-
bracketDqRegistry FetchClientRegistry { fcrDqRegistry = dqRegistry } peer k =
354+
bracketDqRegistry KeepAliveRegistry { dqRegistry } peer k =
355355
bracket (atomically (modifyTVar dqRegistry (Map.insert peer defaultGSV)))
356356
(\_ -> atomically (modifyTVar dqRegistry (Map.delete peer)))
357357
(\_ -> k)
@@ -360,7 +360,8 @@ clientBlockFetch :: [FilePath]
360360
-> Maybe SlotNo
361361
-> IO ()
362362
clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
363-
registry <- newFetchClientRegistry
363+
blockFetchRegistry <- newFetchClientRegistry
364+
keepAliveRegistry <- newKeepAliveRegistry
364365
blockHeap <- mkTestFetchedBlockHeap []
365366

366367
candidateChainsVar <- newTVarIO Map.empty
@@ -396,7 +397,7 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
396397
unregister _ = atomically $
397398
modifyTVar candidateChainsVar
398399
(Map.delete connId)
399-
in bracketSyncWithFetchClient registry connId $
400+
in bracketSyncWithFetchClient blockFetchRegistry connId $
400401
bracket register unregister $ \chainVar ->
401402
runPeer
402403
nullTracer -- (contramap (show . TraceLabelPeer ("chain-sync", getFilePath $ remoteAddress connId)) stdoutTracer)
@@ -410,8 +411,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
410411
blockFetch =
411412
InitiatorProtocolOnly $
412413
MiniProtocolCb $ \MinimalInitiatorContext { micConnectionId = connId } channel ->
413-
bracketDqRegistry registry connId $
414-
bracketFetchClient registry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do
414+
bracketDqRegistry keepAliveRegistry connId $
415+
bracketFetchClient blockFetchRegistry keepAliveRegistry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do
415416
threadDelay 1000000
416417
runPipelinedPeer
417418
nullTracer -- (contramap (show . TraceLabelPeer ("block-fetch", getFilePath $ remoteAddress connId)) stdoutTracer)
@@ -512,7 +513,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
512513
(contramap show stdoutTracer) -- decisionTracer
513514
(contramap show stdoutTracer) -- state tracer
514515
blockFetchPolicy
515-
registry
516+
blockFetchRegistry
517+
keepAliveRegistry
516518
(BlockFetchConfiguration {
517519
bfcMaxConcurrencyBulkSync = 1,
518520
bfcMaxConcurrencyDeadline = 2,

cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
511511
channel
512512
-> do labelThisThread "BlockFetchClient"
513513
bracketFetchClient (nkFetchClientRegistry nodeKernel)
514+
(nkKeepAliveRegistry nodeKernel)
514515
UnversionedProtocol
515516
remoteAddress
516517
$ \clientCtx ->
@@ -574,7 +575,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
574575
remoteAddress
575576
ctxVar
576577
(KeepAliveInterval aaKeepAliveInterval))
577-
bracketKeepAliveClient (nkFetchClientRegistry nodeKernel)
578+
bracketKeepAliveClient (nkKeepAliveRegistry nodeKernel)
578579
remoteAddress
579580
kacApp
580581

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<!--
2+
A new scriv changelog fragment.
3+
4+
Uncomment the section that is right (remove the HTML comment wrapper).
5+
For top level release notes, leave all the headers commented out.
6+
-->
7+
8+
### Breaking
9+
10+
- Split `FetchClientRegistry` into two parts
11+
- `FetchClientRegistry` - block-fetch related
12+
- `KeepAliveRegistry` - keep-alive related
13+
Added `newKeepAliveRegistry` to create `KeepAliveRegistry`, it should be
14+
called along side `newFetchClientRegistry` whenever `block-fetch` is used.
15+
- `FetchClientRegistry` record fields where renamed, the `fcr` prefix was
16+
dropped, `KeepAliveRegistry` field names were kept without the prefix too.
17+
18+
<!--
19+
### Non-Breaking
20+
21+
- A bullet item for the Non-Breaking category.
22+
23+
-->
24+
<!--
25+
### Patch
26+
27+
- A bullet item for the Patch category.
28+
29+
-->

ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ module Ouroboros.Network.BlockFetch
9595
, newFetchClientRegistry
9696
, bracketFetchClient
9797
, bracketSyncWithFetchClient
98+
-- * The 'KeepAliveRegistry'
99+
, KeepAliveRegistry
100+
, newKeepAliveRegistry
98101
, bracketKeepAliveClient
99102
-- * Re-export types used by 'BlockFetchConsensusInterface'
100103
, PraosFetchMode (..)
@@ -117,8 +120,9 @@ import Ouroboros.Network.Block
117120
import Ouroboros.Network.SizeInBytes (SizeInBytes)
118121

119122
import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientPolicy (..),
120-
FetchClientRegistry, bracketFetchClient, bracketKeepAliveClient,
121-
bracketSyncWithFetchClient, newFetchClientRegistry,
123+
FetchClientRegistry, KeepAliveRegistry, bracketFetchClient,
124+
bracketKeepAliveClient, bracketSyncWithFetchClient,
125+
newFetchClientRegistry, newKeepAliveRegistry,
122126
readFetchClientsStateVars, readFetchClientsStatus, readPeerGSVs,
123127
setFetchClientContext)
124128
import Ouroboros.Network.BlockFetch.ConsensusInterface
@@ -185,14 +189,16 @@ blockFetchLogic :: forall addr header block m.
185189
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
186190
-> BlockFetchConsensusInterface addr header block m
187191
-> FetchClientRegistry addr header block m
192+
-> KeepAliveRegistry addr m
188193
-> BlockFetchConfiguration
189194
-> m Void
190195
blockFetchLogic decisionTracer clientStateTracer
191196
BlockFetchConsensusInterface{..}
192-
registry
197+
blockFetchRegistry
198+
keepAliveRegistry
193199
BlockFetchConfiguration{..} = do
194200

195-
setFetchClientContext registry clientStateTracer mkFetchClientPolicy
201+
setFetchClientContext blockFetchRegistry clientStateTracer mkFetchClientPolicy
196202

197203
fetchLogicIterations
198204
decisionTracer clientStateTracer
@@ -230,16 +236,16 @@ blockFetchLogic decisionTracer clientStateTracer
230236
FetchTriggerVariables {
231237
readStateCurrentChain = readCurrentChain,
232238
readStateCandidateChains = readCandidateChains,
233-
readStatePeerStatus = readFetchClientsStatus registry,
239+
readStatePeerStatus = readFetchClientsStatus blockFetchRegistry,
234240
readStateChainComparison = readChainComparison
235241
}
236242

237243
fetchNonTriggerVariables :: FetchNonTriggerVariables addr header block m
238244
fetchNonTriggerVariables =
239245
FetchNonTriggerVariables {
240246
readStateFetchedBlocks = readFetchedBlocks,
241-
readStatePeerStateVars = readFetchClientsStateVars registry,
242-
readStatePeerGSVs = readPeerGSVs registry,
247+
readStatePeerStateVars = readFetchClientsStateVars blockFetchRegistry,
248+
readStatePeerGSVs = readPeerGSVs keepAliveRegistry,
243249
readStateFetchMode = readFetchMode,
244250
readStateFetchedMaxSlotNo = readFetchedMaxSlotNo,
245251
readStateChainSelStarvation = readChainSelStarvation

ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE NamedFieldPuns #-}
23
{-# LANGUAGE ScopedTypeVariables #-}
34

45
module Ouroboros.Network.BlockFetch.ClientRegistry
56
( -- * Registry of block fetch clients
67
FetchClientRegistry (..)
78
, newFetchClientRegistry
9+
, KeepAliveRegistry (..)
10+
, newKeepAliveRegistry
811
, bracketFetchClient
912
, bracketKeepAliveClient
1013
, bracketSyncWithFetchClient
@@ -48,31 +51,38 @@ import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
4851
--
4952
data FetchClientRegistry peer header block m =
5053
FetchClientRegistry {
51-
fcrCtxVar
54+
ctxVar
5255
:: StrictTMVar
5356
m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
5457
, STM m (FetchClientPolicy header block m)
5558
),
56-
fcrFetchRegistry
59+
fetchRegistry
5760
:: StrictTVar m (Map peer (FetchClientStateVars m header)),
58-
fcrSyncRegistry
59-
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
60-
fcrDqRegistry
61-
:: StrictTVar m (Map peer PeerGSV),
62-
fcrKeepRegistry
63-
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
64-
fcrDying
65-
:: StrictTVar m (Set peer)
66-
}
61+
syncRegistry
62+
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
63+
}
64+
6765

6866
newFetchClientRegistry :: MonadSTM m
6967
=> m (FetchClientRegistry peer header block m)
7068
newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
7169
<*> newTVarIO Map.empty
7270
<*> newTVarIO Map.empty
73-
<*> newTVarIO Map.empty
74-
<*> newTVarIO Map.empty
75-
<*> newTVarIO Set.empty
71+
72+
data KeepAliveRegistry peer m = KeepAliveRegistry {
73+
dqRegistry
74+
:: StrictTVar m (Map peer PeerGSV),
75+
keepRegistry
76+
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
77+
dyingRegistry
78+
:: StrictTVar m (Set peer)
79+
}
80+
81+
newKeepAliveRegistry :: MonadSTM m
82+
=> m (KeepAliveRegistry peer m)
83+
newKeepAliveRegistry = KeepAliveRegistry <$> newTVarIO Map.empty
84+
<*> newTVarIO Map.empty
85+
<*> newTVarIO Set.empty
7686

7787
-- | This is needed to start a block fetch client. It provides the required
7888
-- 'FetchClientContext'. It registers and unregisters the fetch client on
@@ -83,12 +93,13 @@ newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
8393
bracketFetchClient :: forall m a peer header block version.
8494
(MonadFork m, MonadMask m, MonadTimer m, Ord peer)
8595
=> FetchClientRegistry peer header block m
96+
-> KeepAliveRegistry peer m
8697
-> version
8798
-> peer
8899
-> (FetchClientContext header block m -> m a)
89100
-> m a
90-
bracketFetchClient (FetchClientRegistry ctxVar
91-
fetchRegistry syncRegistry dqRegistry keepRegistry dyingRegistry)
101+
bracketFetchClient FetchClientRegistry { ctxVar, fetchRegistry, syncRegistry }
102+
KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry }
92103
_version peer action = do
93104
ksVar <- newEmptyTMVarIO
94105
fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst)
@@ -214,7 +225,6 @@ bracketFetchClient (FetchClientRegistry ctxVar
214225
Map.delete peer m
215226

216227

217-
218228
-- | The block fetch and chain sync clients for each peer need to synchronise
219229
-- their startup and shutdown. This bracket operation provides that
220230
-- synchronisation for the chain sync client.
@@ -229,8 +239,7 @@ bracketSyncWithFetchClient :: forall m a peer header block.
229239
-> peer
230240
-> m a
231241
-> m a
232-
bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
233-
_fetchRegistry syncRegistry _dqRegistry _keepRegistry _dyingRegistry) peer action = do
242+
bracketSyncWithFetchClient FetchClientRegistry { syncRegistry } peer action = do
234243
doneVar <- newEmptyTMVarIO
235244
startVar <- newEmptyTMVarIO
236245
bracket_ (register doneVar startVar) (unregister doneVar) action
@@ -267,12 +276,11 @@ bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
267276

268277
bracketKeepAliveClient :: forall m a peer header block.
269278
(MonadSTM m, MonadFork m, MonadMask m, Ord peer)
270-
=> FetchClientRegistry peer header block m
279+
=> KeepAliveRegistry peer m
271280
-> peer
272281
-> (StrictTVar m (Map peer PeerGSV) -> m a)
273282
-> m a
274-
bracketKeepAliveClient(FetchClientRegistry _ctxVar
275-
_fetchRegistry _syncRegistry dqRegistry keepRegistry dyingRegistry) peer action = do
283+
bracketKeepAliveClient KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry } peer action = do
276284
bracket_ register unregister (action dqRegistry)
277285
where
278286
-- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
@@ -326,7 +334,7 @@ setFetchClientContext :: MonadSTM m
326334
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
327335
-> STM m (FetchClientPolicy header block m)
328336
-> m ()
329-
setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
337+
setFetchClientContext FetchClientRegistry { ctxVar } tracer mkPolicy =
330338
atomically $ do
331339
ok <- tryPutTMVar ctxVar (tracer, mkPolicy)
332340
unless ok $ error "setFetchClientContext: called more than once"
@@ -337,25 +345,25 @@ setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
337345
readFetchClientsStatus :: MonadSTM m
338346
=> FetchClientRegistry peer header block m
339347
-> STM m (Map peer (PeerFetchStatus header))
340-
readFetchClientsStatus (FetchClientRegistry _ registry _ _ _ _) =
341-
readTVar registry >>= traverse (readTVar . fetchClientStatusVar)
348+
readFetchClientsStatus FetchClientRegistry { fetchRegistry } =
349+
readTVar fetchRegistry >>= traverse (readTVar . fetchClientStatusVar)
342350

343351
-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
344352
-- clients in the 'FetchClientRegistry'.
345353
--
346354
readFetchClientsStateVars :: MonadSTM m
347355
=> FetchClientRegistry peer header block m
348356
-> STM m (Map peer (FetchClientStateVars m header))
349-
readFetchClientsStateVars (FetchClientRegistry _ registry _ _ _ _) = readTVar registry
357+
readFetchClientsStateVars FetchClientRegistry { fetchRegistry } = readTVar fetchRegistry
350358

351359
-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
352360
-- clients in the 'FetchClientRegistry'.
353361
--
354-
readPeerGSVs :: forall block header m peer.
362+
readPeerGSVs :: forall m peer.
355363
( MonadSTM m, Ord peer)
356-
=> FetchClientRegistry peer header block m
364+
=> KeepAliveRegistry peer m
357365
-> STM m (Map peer PeerGSV)
358-
readPeerGSVs (FetchClientRegistry _ _ _ dqRegistry keepRegistry _) = do
366+
readPeerGSVs KeepAliveRegistry { dqRegistry, keepRegistry } = do
359367
dr <- readTVar dqRegistry
360368
kr <- readTVar keepRegistry
361369
-- The intersection gives us only the currently hot peers

0 commit comments

Comments
 (0)