Skip to content

Commit 441a997

Browse files
committed
fixup: move peer score to local state
Since peer score is now only a peer local thing move it into PeerTxLocalState and update it outside of atomic.
1 parent 7d0e755 commit 441a997

8 files changed

Lines changed: 146 additions & 179 deletions

File tree

cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ applications debugTracer txSubmissionInboundTracer _txSubmissionInboundDebug nod
729729
let server = txSubmissionInboundV2
730730
txSubmissionInboundTracer
731731
NoTxSubmissionInitDelay
732+
aaTxDecisionPolicy
732733
(getMempoolReader mempool)
733734
(getMempoolWriter duplicateTxVar mempool)
734735
getTxSize

ouroboros-network/bench/Bench/TxSubmissionV2Server.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ runDirectServerBenchmark
9595
txSubmissionInboundV2
9696
nullTracer
9797
NoTxSubmissionInitDelay
98+
defaultTxDecisionPolicy
9899
(getMempoolReader inboundMempool)
99100
(getMempoolWriter duplicateTxIdsVar inboundMempool)
100101
getTxSize

ouroboros-network/lib/Ouroboros/Network/TxSubmission/Inbound/V2.hs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ module Ouroboros.Network.TxSubmission.Inbound.V2
1616
, TxSubmissionInitDelay (..)
1717
) where
1818

19-
import Data.Functor (void)
2019
import Data.List.NonEmpty qualified as NonEmpty
2120
import Data.Map.Strict qualified as Map
2221
import Data.Sequence.Strict qualified as StrictSeq
@@ -37,6 +36,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Type (NumTxIdsToAck,
3736
SizeInBytes)
3837
import Ouroboros.Network.TxSubmission.Inbound.V2.Policy
3938
import Ouroboros.Network.TxSubmission.Inbound.V2.Registry as V2
39+
import Ouroboros.Network.TxSubmission.Inbound.V2.State qualified as State
4040
import Ouroboros.Network.TxSubmission.Inbound.V2.Types as V2
4141
import Ouroboros.Network.TxSubmission.Mempool.Reader
4242

@@ -88,6 +88,7 @@ txSubmissionInboundV2
8888
)
8989
=> Tracer m (TraceTxSubmissionInbound txid tx)
9090
-> TxSubmissionInitDelay
91+
-> TxDecisionPolicy
9192
-> TxSubmissionMempoolReader txid tx idx m
9293
-> TxSubmissionMempoolWriter txid tx idx m err
9394
-> (tx -> SizeInBytes)
@@ -96,6 +97,7 @@ txSubmissionInboundV2
9697
txSubmissionInboundV2
9798
tracer
9899
initDelay
100+
policy
99101
TxSubmissionMempoolReader { mempoolGetSnapshot }
100102
TxSubmissionMempoolWriter { txId, mempoolAddTxs }
101103
txSize
@@ -106,7 +108,6 @@ txSubmissionInboundV2
106108
applyReceivedTxIds,
107109
applyReceivedTxs,
108110
applySubmittedTxs,
109-
countRejectedTxs,
110111
resolveTxRequest,
111112
resolveBufferedTxs,
112113
startSubmittingTxs,
@@ -136,7 +137,7 @@ txSubmissionInboundV2
136137
addCounters mempty { txPipelineWaitMs =
137138
diffTimeToMillis (now `diffTime` startTime) }
138139
pure $ peerState { peerDownloadStartTime = Nothing }
139-
(peerAction, peerState'') <- runNextPeerAction now peerState'
140+
(peerAction, peerState'') <- runNextPeerAction now (State.drainPeerScore policy now peerState')
140141
case peerAction of
141142
PeerDoNothing generation mDelay -> do
142143
awaitSharedChange generation mDelay
@@ -183,7 +184,8 @@ txSubmissionInboundV2
183184
delta = end `diffTime` start
184185

185186
addCounters mempty { txSubmissionWaitMs = diffTimeToMillis delta }
186-
score <- countRejectedTxs end rejectedCount
187+
peerState' <- applySubmittedTxs end resolvedTxKeys (fmap fst rejectedTxs) peerState
188+
let (score, peerState'') = State.applyPeerRejections policy end rejectedCount peerState'
187189
traceWith tracer $
188190
TraceTxSubmissionProcessed ProcessedTxCount {
189191
ptxcAccepted = length acceptedTxs,
@@ -194,9 +196,7 @@ txSubmissionInboundV2
194196
traceWith tracer (TraceTxInboundAddedToMempool (snd <$> acceptedTxs) delta)
195197
unless (null rejectedForTrace) $
196198
traceWith tracer (TraceTxInboundRejectedFromMempool rejectedForTrace delta)
197-
198-
peerState' <- applySubmittedTxs end resolvedTxKeys (fmap fst rejectedTxs) peerState
199-
continueWithStateM k peerState'
199+
continueWithStateM k peerState''
200200

201201
-- Request transaction bodies from the peer.
202202
requestTxBodies :: forall (n :: N).
@@ -224,7 +224,7 @@ txSubmissionInboundV2
224224
continueAfterReplies Zero = serverIdle
225225
continueAfterReplies n@Succ{} = StatefulM $ \peerState -> do
226226
now <- getMonotonicTime
227-
(peerAction, peerState') <- runNextPeerActionPipelined now peerState
227+
(peerAction, peerState') <- runNextPeerActionPipelined now (State.drainPeerScore policy now peerState)
228228
case peerAction of
229229
PeerSubmitTxs txKeys ->
230230
continueWithStateM (submitBufferedTxs txKeys (continueAfterReplies n)) peerState'
@@ -241,7 +241,7 @@ txSubmissionInboundV2
241241
-> StatefulM (PeerTxLocalState tx) (S n) txid tx m
242242
continueAfterBodyRequests n = StatefulM $ \peerState -> do
243243
now <- getMonotonicTime
244-
(peerAction, peerState') <- runNextPeerActionPipelined now peerState
244+
(peerAction, peerState') <- runNextPeerActionPipelined now (State.drainPeerScore policy now peerState)
245245
case peerAction of
246246
PeerSubmitTxs txKeys ->
247247
continueWithStateM (submitBufferedTxs txKeys (continueAfterReplies n)) peerState'
@@ -331,9 +331,9 @@ txSubmissionInboundV2
331331
throwIO protocolError
332332
now <- getMonotonicTime
333333
(penaltyCount, peerState') <- applyReceivedTxs now [ (txId tx, tx) | tx <- txs ] peerState
334-
unless (penaltyCount == 0) $
335-
void $ countRejectedTxs now penaltyCount
336-
continueWithStateM (continueAfterReplies n) peerState'
334+
let peerState'' | penaltyCount == 0 = peerState'
335+
| otherwise = snd (State.applyPeerRejections policy now penaltyCount peerState')
336+
continueWithStateM (continueAfterReplies n) peerState''
337337

338338
-- Partition submitted transactions into accepted and rejected groups
339339
classifySubmittedTxs :: [(TxKey, txid)]

ouroboros-network/lib/Ouroboros/Network/TxSubmission/Inbound/V2/Registry.hs

Lines changed: 14 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,6 @@ data PeerTxAPI m txid tx = PeerTxAPI {
111111
-> PeerTxLocalState tx
112112
-> m (PeerTxLocalState tx),
113113

114-
-- | Update the peer's rejection score based on the number of txs rejected
115-
-- by the mempool, or late/missing delivieries.
116-
countRejectedTxs :: Time
117-
-> Int
118-
-> m Double,
119-
120114
-- | Resolve txids and advertised sizes for a batch of tx keys to request.
121115
resolveTxRequest :: PeerTxLocalState tx
122116
-> [TxKey]
@@ -155,8 +149,7 @@ withPeer
155149
withPeer policy TxSubmissionMempoolReader { mempoolGetSnapshot } sharedStateVar countersVar peeraddr io =
156150
bracket
157151
(do
158-
now <- getMonotonicTime
159-
atomically $ modifyTVar sharedStateVar (registerPeer now)
152+
atomically $ modifyTVar sharedStateVar registerPeer
160153
pure PeerTxAPI {
161154
awaitSharedChange = awaitSharedChangeImp sharedStateVar peeraddr
162155
, runNextPeerAction = runNextPeerActionImp policy sharedStateVar countersVar peeraddr
@@ -167,7 +160,6 @@ withPeer policy TxSubmissionMempoolReader { mempoolGetSnapshot } sharedStateVar
167160
, applyReceivedTxs = applyReceivedTxsImp policy mempoolGetSnapshot sharedStateVar
168161
countersVar peeraddr
169162
, applySubmittedTxs = applySubmittedTxsImp policy sharedStateVar countersVar peeraddr
170-
, countRejectedTxs = countRejectedTxsImp policy sharedStateVar peeraddr
171163
, resolveTxRequest = resolveTxRequestImp sharedStateVar
172164
, resolveBufferedTxs = resolveBufferedTxsImp sharedStateVar
173165
, startSubmittingTxs = atomically . modifyTVar sharedStateVar .
@@ -180,16 +172,15 @@ withPeer policy TxSubmissionMempoolReader { mempoolGetSnapshot } sharedStateVar
180172
atomically $ modifyTVar sharedStateVar (unregisterPeer now))
181173
io
182174
where
183-
registerPeer :: Time -> SharedTxState peeraddr txid -> SharedTxState peeraddr txid
184-
registerPeer now st@SharedTxState { sharedPeers, sharedGeneration } =
175+
registerPeer :: SharedTxState peeraddr txid -> SharedTxState peeraddr txid
176+
registerPeer st@SharedTxState { sharedPeers, sharedGeneration } =
185177
st {
186178
sharedPeers = Map.insert peeraddr sharedPeerState sharedPeers,
187179
sharedGeneration = sharedGeneration + 1
188180
}
189181
where
190182
sharedPeerState = SharedPeerState {
191183
sharedPeerPhase = PeerIdle,
192-
sharedPeerScore = emptyPeerScore now,
193184
sharedPeerAdvertisedTxKeys = IntSet.empty,
194185
sharedPeerGeneration = 0
195186
}
@@ -337,7 +328,7 @@ runNextPeerActionImp policy sharedStateVar countersVar peeraddr now peerState =
337328
let sharedGeneration0 = sharedGeneration sharedState
338329
(peerAction, peerState', sharedState') = State.nextPeerAction now policy peeraddr
339330
peerState sharedState
340-
sharedState'' = updatePeerPhase now policy peeraddr
331+
sharedState'' = updatePeerPhase peeraddr
341332
(peerPhaseForActionIdle peerAction) sharedState'
342333
writeSharedStateIfChanged sharedStateVar sharedGeneration0 sharedState''
343334
updateCountersForAction countersVar peerAction
@@ -364,7 +355,7 @@ runNextPeerActionPipelinedImp policy sharedStateVar countersVar peeraddr now pee
364355
let sharedGeneration0 = sharedGeneration sharedState
365356
(peerAction, peerState', sharedState') = State.nextPeerActionPipelined now policy
366357
peeraddr peerState sharedState
367-
sharedState'' = updatePeerPhase now policy peeraddr
358+
sharedState'' = updatePeerPhase peeraddr
368359
(peerPhaseForActionPipelined peeraddr peerAction sharedState')
369360
sharedState'
370361
writeSharedStateIfChanged sharedStateVar sharedGeneration0 sharedState''
@@ -466,42 +457,6 @@ applySubmittedTxsImp policy sharedStateVar countersVar peeraddr now acceptedTxs
466457
, txsRejected = fromIntegral (length rejectedTxs) })
467458
return peerState'
468459

469-
-- | Update the peer's rejection score based on the number of txs rejected
470-
-- by the mempool.
471-
-- Returns the new score value for tracing. The score
472-
-- decays over time and affects fallback peer selection when leases expire.
473-
countRejectedTxsImp :: ( MonadSTM m
474-
, Ord peeraddr)
475-
=> TxDecisionPolicy
476-
-> SharedTxStateVar m peeraddr txid
477-
-> peeraddr
478-
-> Time
479-
-> Int
480-
-> m Double
481-
countRejectedTxsImp TxDecisionPolicy { scoreRate, scoreMax } sharedStateVar peeraddr now
482-
rejectedCount = atomically $ stateTVar sharedStateVar $
483-
updatePeerRejects (fromIntegral rejectedCount)
484-
where
485-
updatePeerRejects n sharedState =
486-
case Map.lookup peeraddr (sharedPeers sharedState) of
487-
Nothing -> (0, sharedState) -- TODO this is an invariant violation
488-
Just sharedPeerState@SharedPeerState { sharedPeerScore } ->
489-
let sharedPeerScore' = updateRejects n sharedPeerScore
490-
sharedPeerState' = sharedPeerState { sharedPeerScore = sharedPeerScore' }
491-
sharedState' = sharedState {
492-
sharedPeers = Map.insert peeraddr sharedPeerState' (sharedPeers sharedState),
493-
sharedGeneration = sharedGeneration sharedState + 1
494-
} in
495-
(peerScoreValue sharedPeerScore', sharedState')
496-
497-
updateRejects 0 ps@PeerScore { peerScoreValue = 0 } = ps { peerScoreTs = now }
498-
updateRejects n ps@PeerScore { peerScoreValue, peerScoreTs } =
499-
let duration = diffTime now peerScoreTs
500-
!drain = realToFrac duration * scoreRate
501-
!drained = max 0 (peerScoreValue - drain) in
502-
ps { peerScoreValue = min scoreMax (drained + n)
503-
, peerScoreTs = now }
504-
505460
-- | Resolve txids and advertised sizes for a batch of tx keys to request.
506461
--
507462
-- Looks up the real txid and size from peer-local state for building the
@@ -548,55 +503,35 @@ resolveBufferedTxsImp sharedStateVar peerState txKeys = atomically $ do
548503

549504
-- | Update a peer's phase.
550505
--
551-
-- A phase change always bumps the shared generation and normalizes the moving
552-
-- peer's score by draining it to @now@. In addition:
506+
-- A phase change always bumps the shared generation. In addition:
553507
--
554508
-- * When a peer becomes 'PeerIdle', bump that peer's own generation so a
555509
-- 'PeerDoNothing' action computed before the phase change does not put that
556510
-- same peer thread to sleep on a stale generation. This makes its next
557511
-- 'awaitSharedChange' return immediately and re-run scheduling as an idle
558512
-- claimant.
559-
-- * When a peer becomes 'PeerIdle', bump that peer's own generation so it
560-
-- immediately re-runs scheduling against any txs whose score-derived claim
561-
-- delay may already have elapsed.
513+
-- * When a peer leaves idle, bump idle advertisers so they can immediately
514+
-- compete for any leases the departing peer held.
562515
updatePeerPhase
563516
:: Ord peeraddr
564-
=> Time
565-
-> TxDecisionPolicy
566-
-> peeraddr
517+
=> peeraddr
567518
-> PeerPhase
568519
-> SharedTxState peeraddr txid
569520
-> SharedTxState peeraddr txid
570-
updatePeerPhase now policy peeraddr peerPhaseNew
521+
updatePeerPhase peeraddr peerPhaseNew
571522
st@SharedTxState { sharedPeers, sharedGeneration } =
572523
case Map.lookup peeraddr sharedPeers of
573524
Just sharedPeerState ->
574525
let peerPhaseOld = sharedPeerPhase sharedPeerState in
575526
if peerPhaseOld /= peerPhaseNew
576527
then
577-
let sharedPeerScore' =
578-
normalizePeerScore (sharedPeerScore sharedPeerState)
579-
sharedPeerState' =
580-
sharedPeerState {
581-
sharedPeerPhase = peerPhaseNew,
582-
sharedPeerScore = sharedPeerScore'
583-
}
584-
in
585-
let st' = st { sharedPeers = Map.insert peeraddr
586-
sharedPeerState' sharedPeers
587-
, sharedGeneration = sharedGeneration + 1 } in
588-
bumpIdlePeerGenerations (phaseWakePeers peerPhaseOld) st'
528+
let sharedPeerState' = sharedPeerState { sharedPeerPhase = peerPhaseNew }
529+
st' = st { sharedPeers = Map.insert peeraddr sharedPeerState' sharedPeers
530+
, sharedGeneration = sharedGeneration + 1 }
531+
in bumpIdlePeerGenerations (phaseWakePeers peerPhaseOld) st'
589532
else st
590533
_ -> st -- TODO error?
591534
where
592-
normalizePeerScore ps@PeerScore { peerScoreValue }
593-
| peerScoreValue == 0 = ps
594-
| otherwise =
595-
let PeerScore { peerScoreTs } = ps
596-
!drain = realToFrac (diffTime now peerScoreTs) * scoreRate policy
597-
!drained = max 0 (peerScoreValue - drain)
598-
in ps { peerScoreValue = drained, peerScoreTs = now }
599-
600535
phaseWakePeers peerPhaseOld
601536
| peerPhaseOld /= PeerIdle
602537
, peerPhaseNew == PeerIdle = Set.singleton peeraddr

ouroboros-network/lib/Ouroboros/Network/TxSubmission/Inbound/V2/State.hs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ module Ouroboros.Network.TxSubmission.Inbound.V2.State
1212
, advertisingPeersForTxKeysExcept
1313
, advertisingPeersForTxExcept
1414
, removeAdvertisingPeersForResolvedTx
15+
, drainPeerScore
16+
, applyPeerRejections
1517
) where
1618

1719
import Control.Monad.Class.MonadTime.SI (DiffTime, Time, addTime, diffTime)
@@ -74,7 +76,7 @@ mkPeerActionContext now policy peeraddr peerState sharedState =
7476
pacPeerState = peerState',
7577
pacSharedState = sharedState',
7678
pacSharedPeerState = sharedPeerState',
77-
pacClaimDelay = peerClaimDelay policy now (sharedPeerScore sharedPeerState')
79+
pacClaimDelay = peerClaimDelay policy now (peerScore peerState')
7880
}
7981
where
8082
-- Remove expired retained TX keys from all shared state tables.
@@ -544,6 +546,40 @@ peerClaimDelay policy currentTime peerScore
544546
-- Delay contribution in milliseconds is peerScore / 20, then converted to seconds.
545547
realToFrac . (/ 20000) $ currentPeerScore policy currentTime peerScore
546548

549+
-- | Decay the peer's score to @now@, updating the timestamp.
550+
drainPeerScore :: TxDecisionPolicy
551+
-> Time
552+
-> PeerTxLocalState tx
553+
-> PeerTxLocalState tx
554+
drainPeerScore policy now peerState@PeerTxLocalState { peerScore }
555+
| peerScoreValue peerScore == 0 =
556+
peerState { peerScore = peerScore { peerScoreTs = now } }
557+
| otherwise =
558+
let drained = currentPeerScore policy now peerScore in
559+
peerState { peerScore = PeerScore { peerScoreValue = drained, peerScoreTs = now } }
560+
{-# INLINE drainPeerScore #-}
561+
562+
-- | Apply a rejection penalty to the peer's local score.
563+
-- Returns the new score value (for tracing) and the updated local state.
564+
applyPeerRejections :: TxDecisionPolicy
565+
-> Time
566+
-> Int
567+
-> PeerTxLocalState tx
568+
-> (Double, PeerTxLocalState tx)
569+
applyPeerRejections TxDecisionPolicy { scoreRate, scoreMax } now rejectedCount
570+
peerState@PeerTxLocalState { peerScore } =
571+
(peerScoreValue peerScore', peerState { peerScore = peerScore' })
572+
where
573+
n = fromIntegral rejectedCount :: Double
574+
peerScore' = applyRejects n peerScore
575+
applyRejects 0 ps@PeerScore { peerScoreValue = 0 } = ps { peerScoreTs = now }
576+
applyRejects n' ps@PeerScore { peerScoreValue, peerScoreTs } =
577+
let duration = diffTime now peerScoreTs
578+
!drain = realToFrac duration * scoreRate
579+
!drained = max 0 (peerScoreValue - drain) in
580+
ps { peerScoreValue = min scoreMax (drained + n'), peerScoreTs = now }
581+
{-# INLINE applyPeerRejections #-}
582+
547583
txClaimReadyAt :: DiffTime -> TxEntry peeraddr -> Time
548584
txClaimReadyAt claimDelay TxEntry { txLease } =
549585
addTime claimDelay claimableAt

ouroboros-network/lib/Ouroboros/Network/TxSubmission/Inbound/V2/Types.hs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ emptyPeerScore scoreTs = PeerScore {
375375
--
376376
-- These are the pieces of state that naturally belong to the worker
377377
-- thread handling one peer. Shared arbitration state such as peer
378-
-- phase and peer score is kept separately in 'SharedPeerState'.
378+
-- phase is kept separately in 'SharedPeerState'.
379379
data PeerTxLocalState tx = PeerTxLocalState {
380380
-- | Unacknowledged txids in the order advertised by the peer.
381381
peerUnacknowledgedTxIds :: !(StrictSeq TxKey),
@@ -397,7 +397,11 @@ data PeerTxLocalState tx = PeerTxLocalState {
397397

398398
-- | Time at which the first outstanding body-request batch was
399399
-- sent in the current download episode.
400-
peerDownloadStartTime :: !(Maybe Time)
400+
peerDownloadStartTime :: !(Maybe Time),
401+
402+
-- | Usefulness score for this peer, tracking rejection penalties and
403+
-- time-based decay.
404+
peerScore :: !PeerScore
401405
}
402406
deriving stock (Eq, Show, Generic)
403407
deriving anyclass (NFData, NoThunks)
@@ -411,14 +415,14 @@ emptyPeerTxLocalState = PeerTxLocalState {
411415
peerRequestedTxsSize = 0,
412416
peerRequestedTxIds = 0,
413417
peerDownloadedTxs = IntMap.empty,
414-
peerDownloadStartTime = Nothing
418+
peerDownloadStartTime = Nothing,
419+
peerScore = emptyPeerScore (Time 0)
415420
}
416421

417422
-- | Small shared view of peer state used for lease claiming and peer
418423
-- selection.
419424
data SharedPeerState = SharedPeerState {
420425
sharedPeerPhase :: !PeerPhase,
421-
sharedPeerScore :: !PeerScore,
422426
sharedPeerAdvertisedTxKeys :: !IntSet,
423427
sharedPeerGeneration :: !Word64
424428
}

ouroboros-network/tests/lib/Test/Ouroboros/Network/TxSubmission/AppV2.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ runTxSubmission tracer _tracerTxLogic st0 txDecisionPolicy = do
209209
let server =
210210
txSubmissionInboundV2 sayTracer
211211
NoTxSubmissionInitDelay
212+
txDecisionPolicy
212213
(getMempoolReader inboundMempool)
213214
(getMempoolWriter duplicateTxIdsVar inboundMempool)
214215
getTxSize

0 commit comments

Comments
 (0)