Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions cardano-diffusion/demo/chain-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ import Ouroboros.Network.Protocol.BlockFetch.Type qualified as BlockFetch

import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientRegistry (..))
import Ouroboros.Network.BlockFetch.ClientRegistry (KeepAliveRegistry (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..),
initialWithFingerprint)
import Ouroboros.Network.DeltaQ (defaultGSV)
Expand Down Expand Up @@ -348,10 +348,10 @@ demoProtocol3 chainSync blockFetch =

-- We run `chain-sync` and `block-fetch`, without `keep-alive`, so we need to
-- register peers in `dqRegistry`.
bracketDqRegistry :: FetchClientRegistry (ConnectionId LocalAddress) header block IO
bracketDqRegistry :: KeepAliveRegistry (ConnectionId LocalAddress) IO
-> ConnectionId LocalAddress
-> IO a -> IO a
bracketDqRegistry FetchClientRegistry { fcrDqRegistry = dqRegistry } peer k =
bracketDqRegistry KeepAliveRegistry { dqRegistry } peer k =
bracket (atomically (modifyTVar dqRegistry (Map.insert peer defaultGSV)))
(\_ -> atomically (modifyTVar dqRegistry (Map.delete peer)))
(\_ -> k)
Expand All @@ -360,7 +360,8 @@ clientBlockFetch :: [FilePath]
-> Maybe SlotNo
-> IO ()
clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
registry <- newFetchClientRegistry
blockFetchRegistry <- newFetchClientRegistry
keepAliveRegistry <- newKeepAliveRegistry
blockHeap <- mkTestFetchedBlockHeap []

candidateChainsVar <- newTVarIO Map.empty
Expand Down Expand Up @@ -396,7 +397,7 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
unregister _ = atomically $
modifyTVar candidateChainsVar
(Map.delete connId)
in bracketSyncWithFetchClient registry connId $
in bracketSyncWithFetchClient blockFetchRegistry connId $
bracket register unregister $ \chainVar ->
runPeer
nullTracer -- (contramap (show . TraceLabelPeer ("chain-sync", getFilePath $ remoteAddress connId)) stdoutTracer)
Expand All @@ -410,8 +411,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
blockFetch =
InitiatorProtocolOnly $
MiniProtocolCb $ \MinimalInitiatorContext { micConnectionId = connId } channel ->
bracketDqRegistry registry connId $
bracketFetchClient registry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do
bracketDqRegistry keepAliveRegistry connId $
bracketFetchClient blockFetchRegistry keepAliveRegistry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do
threadDelay 1000000
runPipelinedPeer
nullTracer -- (contramap (show . TraceLabelPeer ("block-fetch", getFilePath $ remoteAddress connId)) stdoutTracer)
Expand Down Expand Up @@ -512,7 +513,8 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
(contramap show stdoutTracer) -- decisionTracer
(contramap show stdoutTracer) -- state tracer
blockFetchPolicy
registry
blockFetchRegistry
keepAliveRegistry
(BlockFetchConfiguration {
bfcMaxConcurrencyBulkSync = 1,
bfcMaxConcurrencyDeadline = 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
channel
-> do labelThisThread "BlockFetchClient"
bracketFetchClient (nkFetchClientRegistry nodeKernel)
(nkKeepAliveRegistry nodeKernel)
UnversionedProtocol
remoteAddress
$ \clientCtx ->
Expand Down Expand Up @@ -574,7 +575,7 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
remoteAddress
ctxVar
(KeepAliveInterval aaKeepAliveInterval))
bracketKeepAliveClient (nkFetchClientRegistry nodeKernel)
bracketKeepAliveClient (nkKeepAliveRegistry nodeKernel)
remoteAddress
kacApp

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<!--
A new scriv changelog fragment.
Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

### Breaking

- Split `FetchClientRegistry` into two parts
- `FetchClientRegistry` - block-fetch related
- `KeepAliveRegistry` - keep-alive related
Added `newKeepAliveRegistry` to create `KeepAliveRegistry`, it should be
called along side `newFetchClientRegistry` whenever `block-fetch` is used.
- `FetchClientRegistry` record fields where renamed, the `fcr` prefix was
dropped, `KeepAliveRegistry` field names were kept without the prefix too.

<!--
### Non-Breaking
- A bullet item for the Non-Breaking category.
-->
<!--
### Patch
- A bullet item for the Patch category.
-->
20 changes: 13 additions & 7 deletions ouroboros-network/lib/Ouroboros/Network/BlockFetch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ module Ouroboros.Network.BlockFetch
, newFetchClientRegistry
, bracketFetchClient
, bracketSyncWithFetchClient
-- * The 'KeepAliveRegistry'
, KeepAliveRegistry
, newKeepAliveRegistry
, bracketKeepAliveClient
-- * Re-export types used by 'BlockFetchConsensusInterface'
, PraosFetchMode (..)
Expand All @@ -117,8 +120,9 @@ import Ouroboros.Network.Block
import Ouroboros.Network.SizeInBytes (SizeInBytes)

import Ouroboros.Network.BlockFetch.ClientRegistry (FetchClientPolicy (..),
FetchClientRegistry, bracketFetchClient, bracketKeepAliveClient,
bracketSyncWithFetchClient, newFetchClientRegistry,
FetchClientRegistry, KeepAliveRegistry, bracketFetchClient,
bracketKeepAliveClient, bracketSyncWithFetchClient,
newFetchClientRegistry, newKeepAliveRegistry,
readFetchClientsStateVars, readFetchClientsStatus, readPeerGSVs,
setFetchClientContext)
import Ouroboros.Network.BlockFetch.ConsensusInterface
Expand Down Expand Up @@ -185,14 +189,16 @@ blockFetchLogic :: forall addr header block m.
-> Tracer m (TraceLabelPeer addr (TraceFetchClientState header))
-> BlockFetchConsensusInterface addr header block m
-> FetchClientRegistry addr header block m
-> KeepAliveRegistry addr m
-> BlockFetchConfiguration
-> m Void
blockFetchLogic decisionTracer clientStateTracer
BlockFetchConsensusInterface{..}
registry
blockFetchRegistry
keepAliveRegistry
BlockFetchConfiguration{..} = do

setFetchClientContext registry clientStateTracer mkFetchClientPolicy
setFetchClientContext blockFetchRegistry clientStateTracer mkFetchClientPolicy

fetchLogicIterations
decisionTracer clientStateTracer
Expand Down Expand Up @@ -230,16 +236,16 @@ blockFetchLogic decisionTracer clientStateTracer
FetchTriggerVariables {
readStateCurrentChain = readCurrentChain,
readStateCandidateChains = readCandidateChains,
readStatePeerStatus = readFetchClientsStatus registry,
readStatePeerStatus = readFetchClientsStatus blockFetchRegistry,
readStateChainComparison = readChainComparison
}

fetchNonTriggerVariables :: FetchNonTriggerVariables addr header block m
fetchNonTriggerVariables =
FetchNonTriggerVariables {
readStateFetchedBlocks = readFetchedBlocks,
readStatePeerStateVars = readFetchClientsStateVars registry,
readStatePeerGSVs = readPeerGSVs registry,
readStatePeerStateVars = readFetchClientsStateVars blockFetchRegistry,
readStatePeerGSVs = readPeerGSVs keepAliveRegistry,
readStateFetchMode = readFetchMode,
readStateFetchedMaxSlotNo = readFetchedMaxSlotNo,
readStateChainSelStarvation = readChainSelStarvation
Expand Down
117 changes: 18 additions & 99 deletions ouroboros-network/lib/Ouroboros/Network/BlockFetch/ClientRegistry.hs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
( -- * Registry of block fetch clients
FetchClientRegistry (..)
, newFetchClientRegistry
, bracketFetchClient
, bracketKeepAliveClient
, bracketSyncWithFetchClient
, setFetchClientContext
, FetchClientPolicy (..)
, readFetchClientsStatus
, readFetchClientsStateVars
, readPeerGSVs
-- * KeepAlive registry
, module KeepAlive
) where

import Data.Functor.Contravariant (contramap)
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
import Data.Set qualified as Set

import Control.Concurrent.Class.MonadSTM.Strict
Expand All @@ -32,8 +32,8 @@ import Control.Monad.Class.MonadTimer.SI
import Control.Tracer (Tracer)

import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
import Ouroboros.Network.KeepAlive.Registry as KeepAlive



Expand All @@ -48,31 +48,23 @@ import Ouroboros.Network.Diffusion.Policies (deactivateTimeout)
--
data FetchClientRegistry peer header block m =
FetchClientRegistry {
fcrCtxVar
ctxVar
:: StrictTMVar
m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
, STM m (FetchClientPolicy header block m)
),
fcrFetchRegistry
fetchRegistry
:: StrictTVar m (Map peer (FetchClientStateVars m header)),
fcrSyncRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ())),
fcrDqRegistry
:: StrictTVar m (Map peer PeerGSV),
fcrKeepRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
fcrDying
:: StrictTVar m (Set peer)
}
syncRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m (), StrictTMVar m ()))
}


newFetchClientRegistry :: MonadSTM m
=> m (FetchClientRegistry peer header block m)
newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
<*> newTVarIO Map.empty
<*> newTVarIO Map.empty
<*> newTVarIO Map.empty
<*> newTVarIO Map.empty
<*> newTVarIO Set.empty

-- | This is needed to start a block fetch client. It provides the required
-- 'FetchClientContext'. It registers and unregisters the fetch client on
Expand All @@ -83,12 +75,13 @@ newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
bracketFetchClient :: forall m a peer header block version.
(MonadFork m, MonadMask m, MonadTimer m, Ord peer)
=> FetchClientRegistry peer header block m
-> KeepAliveRegistry peer m
-> version
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry ctxVar
fetchRegistry syncRegistry dqRegistry keepRegistry dyingRegistry)
bracketFetchClient FetchClientRegistry { ctxVar, fetchRegistry, syncRegistry }
KeepAliveRegistry { dqRegistry, keepRegistry, dyingRegistry }
_version peer action = do
ksVar <- newEmptyTMVarIO
fst <$> generalBracket (register ksVar) (unregister ksVar) (action . fst)
Expand Down Expand Up @@ -214,7 +207,6 @@ bracketFetchClient (FetchClientRegistry ctxVar
Map.delete peer m



-- | The block fetch and chain sync clients for each peer need to synchronise
-- their startup and shutdown. This bracket operation provides that
-- synchronisation for the chain sync client.
Expand All @@ -229,8 +221,7 @@ bracketSyncWithFetchClient :: forall m a peer header block.
-> peer
-> m a
-> m a
bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
_fetchRegistry syncRegistry _dqRegistry _keepRegistry _dyingRegistry) peer action = do
bracketSyncWithFetchClient FetchClientRegistry { syncRegistry } peer action = do
doneVar <- newEmptyTMVarIO
startVar <- newEmptyTMVarIO
bracket_ (register doneVar startVar) (unregister doneVar) action
Expand Down Expand Up @@ -265,68 +256,12 @@ bracketSyncWithFetchClient (FetchClientRegistry _ctxVar
assert (peer `Map.member` m) $
Map.delete peer m

bracketKeepAliveClient :: forall m a peer header block.
(MonadSTM m, MonadFork m, MonadMask m, Ord peer)
=> FetchClientRegistry peer header block m
-> peer
-> (StrictTVar m (Map peer PeerGSV) -> m a)
-> m a
bracketKeepAliveClient(FetchClientRegistry _ctxVar
_fetchRegistry _syncRegistry dqRegistry keepRegistry dyingRegistry) peer action = do
bracket_ register unregister (action dqRegistry)
where
-- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
register :: m ()
register =
atomically $ do
-- Wait for previous keep alive client to cleanup
dr <- readTVar dqRegistry
check (peer `Map.notMember` dr)

modifyTVar dqRegistry $ \m ->
assert (peer `Map.notMember` m) $
Map.insert peer defaultGSV m

-- It is possible for the keepAlive client to keep running even without a fetch client, but
-- a fetch client shouldn't run without a keepAlive client.
unregister :: m ()
unregister = uninterruptibleMask_ $ do
fetchclient_m <- atomically $ do
fetchclients <- readTVar keepRegistry
case Map.lookup peer fetchclients of
Nothing -> do
-- If the fetch client is already dead we remove PeerGSV ourself directly.
modifyTVar dqRegistry $ \m ->
assert (peer `Map.member` m) $
Map.delete peer m
return Nothing
Just rc -> do
-- Prevent a new fetchclient from starting while we are killing the old one.
modifyTVar dyingRegistry $ \s ->
assert (peer `Set.notMember` s) $
Set.insert peer s
return $ Just rc
case fetchclient_m of
Nothing -> return ()
Just (tid, doneVar) -> do
-- Cancel the fetch client.
throwTo tid AsyncCancelled
atomically $ do
-- wait for fetch client to exit.
readTMVar doneVar
modifyTVar dqRegistry $ \m ->
assert (peer `Map.member` m) $
Map.delete peer m
modifyTVar dyingRegistry $ \s ->
assert (peer `Set.member` s) $
Set.delete peer s

setFetchClientContext :: MonadSTM m
=> FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> STM m (FetchClientPolicy header block m)
-> m ()
setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
setFetchClientContext FetchClientRegistry { ctxVar } tracer mkPolicy =
atomically $ do
ok <- tryPutTMVar ctxVar (tracer, mkPolicy)
unless ok $ error "setFetchClientContext: called more than once"
Expand All @@ -337,29 +272,13 @@ setFetchClientContext (FetchClientRegistry ctxVar _ _ _ _ _) tracer mkPolicy =
readFetchClientsStatus :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry _ registry _ _ _ _) =
readTVar registry >>= traverse (readTVar . fetchClientStatusVar)
readFetchClientsStatus FetchClientRegistry { fetchRegistry } =
readTVar fetchRegistry >>= traverse (readTVar . fetchClientStatusVar)

-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
-- clients in the 'FetchClientRegistry'.
--
readFetchClientsStateVars :: MonadSTM m
=> FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry _ registry _ _ _ _) = readTVar registry

-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
-- clients in the 'FetchClientRegistry'.
--
readPeerGSVs :: forall block header m peer.
( MonadSTM m, Ord peer)
=> FetchClientRegistry peer header block m
-> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry _ _ _ dqRegistry keepRegistry _) = do
dr <- readTVar dqRegistry
kr <- readTVar keepRegistry
-- The intersection gives us only the currently hot peers
return $ Map.intersection dr kr



readFetchClientsStateVars FetchClientRegistry { fetchRegistry } = readTVar fetchRegistry
Loading
Loading