Skip to content

Commit 319743a

Browse files
nfrisbyclaude
authored andcommitted
Leios N: Reception arrival-time on recv (demo-tuning)
Wrap `Channel m a` recv values in a new `Reception a = MkReception !(IntMap Time) !a`, recording per-chunk arrival times. In the mux demuxer the chunk's arrival time is paired with its starting byte offset; codec/driver layers either strip the time map (the codec sees only bytes) or propagate it through `wrapMiniProtocolTrailing` at the mini-protocol callback boundary. Tests/demos/benches use `IntMap.empty` when synthesising trailing. This is the network half of the November Leios demo's arrival-time plumbing; consumers (e.g. Praos block-arrival tracing on the consensus side) will be wired up separately. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 55c16cf commit 319743a

37 files changed

Lines changed: 775 additions & 436 deletions

File tree

.github/CODEOWNERS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Note: later rules override earlier rules.
22

33
# Default
4-
* @intersectmbo/ouroboros-network-maintainers
4+
# FIXME: disabled codeowners for leios-prototype
5+
# * @intersectmbo/ouroboros-network-maintainers
56

67
# CICD
78
nix/ @intersectmbo/core-tech-release-1 @intersectmbo/ouroboros-network-maintainers

cardano-diffusion/demo/chain-sync.hs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
398398
(Map.delete connId)
399399
in bracketSyncWithFetchClient registry connId $
400400
bracket register unregister $ \chainVar ->
401+
wrapMiniProtocolTrailing $
401402
runPeer
402403
nullTracer -- (contramap (show . TraceLabelPeer ("chain-sync", getFilePath $ remoteAddress connId)) stdoutTracer)
403404
codecChainSync
@@ -413,12 +414,14 @@ clientBlockFetch sockAddrs maxSlotNo = withIOManager $ \iocp -> do
413414
bracketDqRegistry registry connId $
414415
bracketFetchClient registry (maxBound :: NodeToNodeVersion) connId $ \clientCtx -> do
415416
threadDelay 1000000
416-
runPipelinedPeer
417-
nullTracer -- (contramap (show . TraceLabelPeer ("block-fetch", getFilePath $ remoteAddress connId)) stdoutTracer)
418-
codecBlockFetch
419-
channel
420-
(blockFetchClient (maxBound :: NodeToNodeVersion) continueUntilMaxSlot
421-
nullTracer clientCtx)
417+
wrapMiniProtocolTrailing
418+
( runPipelinedPeer
419+
nullTracer -- (contramap (show . TraceLabelPeer ("block-fetch", getFilePath $ remoteAddress connId)) stdoutTracer)
420+
codecBlockFetch
421+
channel
422+
(blockFetchClient (maxBound :: NodeToNodeVersion) continueUntilMaxSlot
423+
nullTracer clientCtx)
424+
)
422425

423426
blockFetchPolicy :: BlockFetchConsensusInterface
424427
LocalConnectionId BlockHeader Block IO

cardano-diffusion/protocols/tests-lib/Cardano/Network/Protocol/Handshake/Test.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Codec.CBOR.Term qualified as CBOR
2424
import Control.DeepSeq (NFData)
2525
import Control.Monad.Class.MonadAsync
2626
import Control.Monad.Class.MonadThrow
27+
import Control.Monad.Class.MonadTime.SI
2728
import Control.Monad.IOSim (runSimOrThrow)
2829
import Control.Tracer (nullTracer)
2930

@@ -334,9 +335,10 @@ prop_query_version_NodeToClient_SimNet
334335
-- | Run a query for the server's supported version.
335336
--
336337
prop_peerSharing_symmetric ::
337-
( MonadAsync m
338-
, MonadCatch m
339-
, MonadEvaluate m
338+
( MonadAsync m
339+
, MonadCatch m
340+
, MonadEvaluate m
341+
, MonadMonotonicTime m
340342
)
341343
=> m (Channel m ByteString, Channel m ByteString)
342344
-> Codec (Handshake NodeToNodeVersion CBOR.Term)

cardano-diffusion/tests/lib/Test/Cardano/Network/Diffusion/Testnet/MiniProtocols.hs

Lines changed: 135 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -470,17 +470,19 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
470470
bracket (registerClientChains nodeKernel (remoteAddress connId))
471471
(\_ -> unregisterClientChains nodeKernel (remoteAddress connId))
472472
(\chainVar ->
473-
runPeerWithLimitsRnd
474-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
475-
(mkStdGen 0) -- TODO
476-
chainSyncCodec
477-
(chainSyncSizeLimits limits)
478-
(chainSyncTimeLimits limits extraFlags)
479-
channel
480-
(chainSyncClientPeer $
481-
chainSyncClientExample
482-
chainVar
483-
client)
473+
wrapMiniProtocolTrailing
474+
( runPeerWithLimitsRnd
475+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
476+
(mkStdGen 0) -- TODO
477+
chainSyncCodec
478+
(chainSyncSizeLimits limits)
479+
(chainSyncTimeLimits limits extraFlags)
480+
channel
481+
(chainSyncClientPeer $
482+
chainSyncClientExample
483+
chainVar
484+
client)
485+
)
484486
)
485487

486488
chainSyncResponder
@@ -489,16 +491,18 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
489491
\ ResponderContext { rcConnectionId = connId }
490492
channel -> do
491493
labelThisThread "ChainSyncServer"
492-
runPeerWithLimitsRnd
493-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
494-
(mkStdGen 0)
495-
chainSyncCodec
496-
(chainSyncSizeLimits limits)
497-
(chainSyncTimeLimits limits IsNotTrustable)
498-
channel
499-
(chainSyncServerPeer
500-
(chainSyncServerExample
501-
() (nkChainProducerState nodeKernel) toHeader))
494+
wrapMiniProtocolTrailing
495+
( runPeerWithLimitsRnd
496+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
497+
(mkStdGen 0)
498+
chainSyncCodec
499+
(chainSyncSizeLimits limits)
500+
(chainSyncTimeLimits limits IsNotTrustable)
501+
channel
502+
(chainSyncServerPeer
503+
(chainSyncServerExample
504+
() (nkChainProducerState nodeKernel) toHeader))
505+
)
502506

503507
blockFetchInitiator
504508
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr PeerTrustable m) ByteString m ()
@@ -514,15 +518,17 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
514518
UnversionedProtocol
515519
remoteAddress
516520
$ \clientCtx ->
517-
runPeerWithLimits
518-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
519-
blockFetchCodec
520-
(blockFetchSizeLimits limits)
521-
(blockFetchTimeLimits limits)
522-
channel
523-
(forgetPipelined []
524-
$ blockFetchClient UnversionedProtocol controlMessageSTM
525-
nullTracer clientCtx)
521+
wrapMiniProtocolTrailing
522+
( runPeerWithLimits
523+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
524+
blockFetchCodec
525+
(blockFetchSizeLimits limits)
526+
(blockFetchTimeLimits limits)
527+
channel
528+
(forgetPipelined []
529+
$ blockFetchClient UnversionedProtocol controlMessageSTM
530+
nullTracer clientCtx)
531+
)
526532

527533
blockFetchResponder
528534
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
@@ -531,22 +537,24 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
531537
\ ResponderContext { rcConnectionId = connId }
532538
channel -> do
533539
labelThisThread "BlockFetchServer"
534-
runPeerWithLimits
535-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
536-
blockFetchCodec
537-
(blockFetchSizeLimits limits)
538-
(blockFetchTimeLimits limits)
539-
channel
540-
(blockFetchServerPeer $
541-
blockFetchServer
542-
(constantRangeRequests $ \(ChainRange from to) -> do
543-
nkChainProducer <- Pipes.lift
544-
$ readTVarIO (nkChainProducerState nodeKernel)
545-
Pipes.each $ fromMaybe []
546-
$ Chain.selectBlockRange (chainState nkChainProducer)
547-
from
548-
to
549-
)
540+
wrapMiniProtocolTrailing
541+
( runPeerWithLimits
542+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
543+
blockFetchCodec
544+
(blockFetchSizeLimits limits)
545+
(blockFetchTimeLimits limits)
546+
channel
547+
(blockFetchServerPeer $
548+
blockFetchServer
549+
(constantRangeRequests $ \(ChainRange from to) -> do
550+
nkChainProducer <- Pipes.lift
551+
$ readTVarIO (nkChainProducerState nodeKernel)
552+
Pipes.each $ fromMaybe []
553+
$ Chain.selectBlockRange (chainState nkChainProducer)
554+
from
555+
to
556+
)
557+
)
550558
)
551559

552560
keepAliveInitiator
@@ -560,20 +568,22 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
560568
channel
561569
-> do labelThisThread "KeepAliveClient"
562570
let kacApp =
563-
\ctxVar -> runPeerWithLimits
564-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
565-
keepAliveCodec
566-
(keepAliveSizeLimits limits)
567-
(keepAliveTimeLimits limits)
568-
channel
569-
(keepAliveClientPeer $
570-
keepAliveClient
571-
nullTracer
572-
aaKeepAliveStdGen
573-
controlMessageSTM
574-
remoteAddress
575-
ctxVar
576-
(KeepAliveInterval aaKeepAliveInterval))
571+
\ctxVar -> wrapMiniProtocolTrailing
572+
( runPeerWithLimits
573+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
574+
keepAliveCodec
575+
(keepAliveSizeLimits limits)
576+
(keepAliveTimeLimits limits)
577+
channel
578+
(keepAliveClientPeer $
579+
keepAliveClient
580+
nullTracer
581+
aaKeepAliveStdGen
582+
controlMessageSTM
583+
remoteAddress
584+
ctxVar
585+
(KeepAliveInterval aaKeepAliveInterval))
586+
)
577587
bracketKeepAliveClient (nkFetchClientRegistry nodeKernel)
578588
remoteAddress
579589
kacApp
@@ -584,13 +594,15 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
584594
\ ResponderContext { rcConnectionId = connId }
585595
channel -> do
586596
labelThisThread "KeepAliveServer"
587-
runPeerWithLimits
588-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
589-
keepAliveCodec
590-
(keepAliveSizeLimits limits)
591-
(keepAliveTimeLimits limits)
592-
channel
593-
(keepAliveServerPeer keepAliveServer)
597+
wrapMiniProtocolTrailing
598+
( runPeerWithLimits
599+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
600+
keepAliveCodec
601+
(keepAliveSizeLimits limits)
602+
(keepAliveTimeLimits limits)
603+
channel
604+
(keepAliveServerPeer keepAliveServer)
605+
)
594606

595607
pingPongInitiator
596608
:: MiniProtocolCb (ExpandedInitiatorContext NtNAddr PeerTrustable m) ByteString m ()
@@ -627,25 +639,29 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
627639
if continue
628640
then return pingPongClient
629641
else return $ PingPong.SendMsgDone ()
630-
in runPeerWithLimits
631-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
632-
pingPongCodec
633-
(pingPongSizeLimits limits)
634-
(pingPongTimeLimits limits)
635-
channel
636-
(pingPongClientPeer pingPongClient)
642+
in wrapMiniProtocolTrailing
643+
( runPeerWithLimits
644+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
645+
pingPongCodec
646+
(pingPongSizeLimits limits)
647+
(pingPongTimeLimits limits)
648+
channel
649+
(pingPongClientPeer pingPongClient)
650+
)
637651

638652
pingPongResponder
639653
:: MiniProtocolCb (ResponderContext NtNAddr) ByteString m ()
640654
pingPongResponder = MiniProtocolCb $
641655
\ResponderContext { rcConnectionId = connId } channel ->
642-
runPeerWithLimits
643-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
644-
pingPongCodec
645-
(pingPongSizeLimits limits)
646-
(pingPongTimeLimits limits)
647-
channel
648-
(pingPongServerPeer pingPongServerStandard)
656+
wrapMiniProtocolTrailing
657+
( runPeerWithLimits
658+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
659+
pingPongCodec
660+
(pingPongSizeLimits limits)
661+
(pingPongTimeLimits limits)
662+
channel
663+
(pingPongServerPeer pingPongServerStandard)
664+
)
649665

650666

651667
peerSharingInitiator
@@ -661,13 +677,15 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
661677
bracketPeerSharingClient (nkPeerSharingRegistry nodeKernel) them
662678
$ \controller -> do
663679
psClient <- peerSharingClient controlMessageSTM controller
664-
runPeerWithLimits
665-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
666-
peerSharingCodec
667-
(peerSharingSizeLimits limits)
668-
(peerSharingTimeLimits limits)
669-
channel
670-
(peerSharingClientPeer psClient)
680+
wrapMiniProtocolTrailing
681+
( runPeerWithLimits
682+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
683+
peerSharingCodec
684+
(peerSharingSizeLimits limits)
685+
(peerSharingTimeLimits limits)
686+
channel
687+
(peerSharingClientPeer psClient)
688+
)
671689

672690
peerSharingResponder
673691
:: PeerSharingAPI NtNAddr s m
@@ -676,14 +694,17 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
676694
\ ResponderContext { rcConnectionId = connId }
677695
channel -> do
678696
labelThisThread "PeerSharingServer"
679-
runPeerWithLimits
680-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
681-
peerSharingCodec
682-
(peerSharingSizeLimits limits)
683-
(peerSharingTimeLimits limits)
684-
channel
685-
$ peerSharingServerPeer
686-
$ peerSharingServer psAPI
697+
wrapMiniProtocolTrailing
698+
( runPeerWithLimits
699+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
700+
peerSharingCodec
701+
(peerSharingSizeLimits limits)
702+
(peerSharingTimeLimits limits)
703+
channel
704+
( peerSharingServerPeer
705+
$ peerSharingServer psAPI
706+
)
707+
)
687708

688709
txSubmissionInitiator
689710
:: TxDecisionPolicy
@@ -705,13 +726,15 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
705726
(maxBound :: UnversionedProtocol)
706727
controlMessageSTM
707728
labelThisThread "TxSubmissionClient"
708-
runPeerWithLimits
709-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
710-
txSubmissionCodec
711-
(txSubmissionSizeLimits limits)
712-
(txSubmissionTimeLimits limits)
713-
channel
714-
(txSubmissionClientPeer client)
729+
wrapMiniProtocolTrailing
730+
( runPeerWithLimits
731+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
732+
txSubmissionCodec
733+
(txSubmissionSizeLimits limits)
734+
(txSubmissionTimeLimits limits)
735+
channel
736+
(txSubmissionClientPeer client)
737+
)
715738

716739
txSubmissionResponder
717740
:: Mempool m TxId (Tx TxId)
@@ -738,13 +761,15 @@ applications debugTracer txSubmissionInboundTracer txSubmissionInboundDebug node
738761
(getMempoolWriter duplicateTxVar mempool)
739762
api
740763
labelThisThread "TxSubmissionServer"
741-
runPipelinedPeerWithLimits
742-
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
743-
txSubmissionCodec
744-
(txSubmissionSizeLimits limits)
745-
(txSubmissionTimeLimits limits)
746-
channel
747-
(txSubmissionServerPeerPipelined server)
764+
wrapMiniProtocolTrailing
765+
( runPipelinedPeerWithLimits
766+
(((prettyShow connId ++) . (" " ++) . show) `contramap` debugTracer)
767+
txSubmissionCodec
768+
(txSubmissionSizeLimits limits)
769+
(txSubmissionTimeLimits limits)
770+
channel
771+
(txSubmissionServerPeerPipelined server)
772+
)
748773

749774
--
750775
-- Orphaned Instances

0 commit comments

Comments
 (0)