Skip to content

Commit d8c2768

Browse files
authored
[CRE] Fix default stream config in Launcher (#21624)
1 parent fcffbb8 commit d8c2768

2 files changed

Lines changed: 18 additions & 2 deletions

File tree

core/capabilities/launcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
425425
}
426426
if w.don2donSharedPeer != nil {
427427
donPairs := w.donPairsToUpdate(w.myPeerID, localRegistry)
428-
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, defaultStreamConfig)
428+
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, w.p2pStreamConfig)
429429
if err != nil {
430430
return fmt.Errorf("failed to update peer connections: %w", err)
431431
}

core/capabilities/launcher_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"google.golang.org/protobuf/proto"
1313
"google.golang.org/protobuf/types/known/durationpb"
1414

15+
"github.com/smartcontractkit/libocr/ragep2p"
1516
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
1617

1718
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -892,10 +893,24 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
892893
addCapabilityToDON(localRegistry, zoneBDonID, fullExecutableCapID, capabilities.CapabilityTypeTarget, execCfg)
893894
addCapabilityToDON(localRegistry, capDonID, fullLocalCapID, capabilities.CapabilityTypeAction, localCfg) // should be skipped
894895

896+
customStreamConfig := p2ptypes.StreamConfig{
897+
IncomingMessageBufferSize: 999,
898+
OutgoingMessageBufferSize: 888,
899+
MaxMessageLenBytes: 777777,
900+
MessageRateLimiter: ragep2p.TokenBucketParams{
901+
Rate: 50.0,
902+
Capacity: 250,
903+
},
904+
BytesRateLimiter: ragep2p.TokenBucketParams{
905+
Rate: 2500000.0,
906+
Capacity: 5000000,
907+
},
908+
}
909+
895910
sharedPeer := mocks.NewSharedPeer(t)
896911
sharedPeer.On("ID").Return(workflowDonNodes[0])
897912
sharedPeer.On("IsBootstrap").Return(false)
898-
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, mock.Anything).Return(nil)
913+
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, customStreamConfig).Return(nil)
899914

900915
launcher, err := NewLauncher(
901916
lggr,
@@ -907,6 +922,7 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
907922
&mockDonNotifier{},
908923
)
909924
require.NoError(t, err)
925+
launcher.p2pStreamConfig = customStreamConfig
910926
servicetest.Run(t, launcher)
911927

912928
dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)

0 commit comments

Comments
 (0)