@@ -5,17 +5,22 @@ import (
55 "crypto/rand"
66 "encoding/hex"
77 "math/big"
8+ "net/url"
9+ "os"
810 "sync"
11+ "sync/atomic"
912 "testing"
1013 "time"
1114
1215 "github.com/ethereum/go-ethereum/accounts/abi/bind"
16+ "github.com/ethereum/go-ethereum/common"
1317 "github.com/google/uuid"
1418 "github.com/stretchr/testify/assert"
1519 "github.com/stretchr/testify/require"
1620
1721 "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
1822 "github.com/smartcontractkit/chainlink-common/pkg/logger"
23+ "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
1924 feeds_consumer "github.com/smartcontractkit/chainlink-evm/gethwrappers/keystone/generated/feeds_consumer_1_0_0"
2025 reporttypes "github.com/smartcontractkit/chainlink-evm/pkg/mercury/v3/types"
2126 "github.com/smartcontractkit/chainlink-protos/cre/go/values"
@@ -31,6 +36,99 @@ func Test_OneAtATimeTransmissionSchedule(t *testing.T) {
3136 testTransmissionSchedule (t , "2s" , "oneAtATime" )
3237}
3338
39+ func Test_AllAtOnceZeroDelta_SubmitsDuplicateForwarderTxs (t * testing.T ) {
40+ rawDBURL , ok := os .LookupEnv ("CL_DATABASE_URL" )
41+ if ! ok {
42+ t .Skip ("CL_DATABASE_URL is required for this integration test" )
43+ }
44+ parsedDBURL , err := url .Parse (rawDBURL )
45+ if err != nil || parsedDBURL .Path == "" {
46+ t .Skip ("CL_DATABASE_URL must include a database path for this integration test" )
47+ }
48+
49+ ctx := t .Context ()
50+ lggr := logger .Test (t )
51+
52+ workflowDonConfiguration , err := framework .NewDonConfiguration (framework.NewDonConfigurationParams {
53+ Name : "Workflow" ,
54+ NumNodes : 7 ,
55+ F : 2 ,
56+ AcceptsWorkflows : true ,
57+ })
58+ require .NoError (t , err )
59+ triggerDonConfiguration , err := framework .NewDonConfiguration (framework.NewDonConfigurationParams {
60+ Name : "Trigger" ,
61+ NumNodes : 7 ,
62+ F : 2 ,
63+ })
64+ require .NoError (t , err )
65+ targetDonConfiguration , err := framework .NewDonConfiguration (framework.NewDonConfigurationParams {
66+ Name : "Target" ,
67+ NumNodes : 7 ,
68+ F : 2 ,
69+ })
70+ require .NoError (t , err )
71+
72+ triggerSink := framework .NewTriggerSink (t , "streams-trigger" , "1.0.0" )
73+ donContext := framework .CreateDonContext (ctx , t )
74+
75+ workflowDon := createKeystoneWorkflowDon (ctx , t , lggr , workflowDonConfiguration , triggerDonConfiguration , targetDonConfiguration , donContext )
76+ forwarderAddr , _ := SetupForwarderContract (t , workflowDon , donContext .EthBlockchain )
77+ _ , consumer := SetupConsumerContract (t , donContext .EthBlockchain , forwarderAddr , workflowOwnerID , workflowName )
78+ writeTargetDon := createKeystoneWriteTargetDon (ctx , t , lggr , targetDonConfiguration , donContext , forwarderAddr )
79+ triggerDon := createKeystoneTriggerDon (ctx , t , lggr , triggerDonConfiguration , donContext , triggerSink )
80+
81+ servicetest .Run (t , workflowDon )
82+ servicetest .Run (t , triggerDon )
83+ servicetest .Run (t , writeTargetDon )
84+ donContext .WaitForCapabilitiesToBeExposed (t , workflowDon , triggerDon , writeTargetDon )
85+
86+ feedID := newFeedID (t )
87+ job := createKeystoneWorkflowJob (
88+ t ,
89+ workflowName ,
90+ workflowOwnerID ,
91+ []string {feedID },
92+ consumer .Address (),
93+ "0s" ,
94+ "allAtOnce" ,
95+ )
96+ err = workflowDon .AddJob (ctx , & job )
97+ require .NoError (t , err )
98+
99+ report := createFeedReport (t , big .NewInt (1 ), 7 , feedID , triggerDonConfiguration .KeyBundles )
100+ wrappedReports , err := wrapReports ([]* datastreams.FeedReport {report }, 12 , datastreams.Metadata {})
101+ require .NoError (t , err )
102+
103+ startBlock , err := donContext .EthBlockchain .Client ().BlockNumber (ctx )
104+ require .NoError (t , err )
105+
106+ triggerSink .SendOutput (wrappedReports , uuid .New ().String ())
107+ waitForConsumerReports (t , consumer , newStreamsV1Handler ([]* datastreams.FeedReport {report }))
108+
109+ var observedTxCount atomic.Uint64
110+ require .Eventually (t , func () bool {
111+ latestBlock , blockErr := donContext .EthBlockchain .Client ().BlockNumber (ctx )
112+ if blockErr != nil || latestBlock <= startBlock {
113+ return false
114+ }
115+ txCount , countErr := countTransactionsToAddress (ctx , donContext .EthBlockchain , forwarderAddr , startBlock + 1 , latestBlock )
116+ if countErr != nil {
117+ return false
118+ }
119+ observedTxCount .Store (txCount )
120+ return txCount > 1
121+ }, 20 * time .Second , 500 * time .Millisecond , "expected duplicate forwarder tx submissions for one execution" )
122+
123+ // TODO: @ilija42 Expected behavior after fix: exactly one forwarder transaction should be submitted for a single execution.
124+ // require.EqualValues(
125+ // t,
126+ // 1,
127+ // observedTxCount.Load(),
128+ // "TODO: enforce single forwarder submission per execution under allAtOnce + low deltaStage",
129+ // )
130+ }
131+
34132func testTransmissionSchedule (t * testing.T , deltaStage string , schedule string ) {
35133 ctx := t .Context ()
36134
@@ -49,7 +147,7 @@ func testTransmissionSchedule(t *testing.T, deltaStage string, schedule string)
49147 targetDonConfiguration , triggerSink )
50148
51149 feedCount := 3
52- var feedIDs []string
150+ feedIDs := make ( []string , 0 , feedCount )
53151 for range feedCount {
54152 feedIDs = append (feedIDs , newFeedID (t ))
55153 }
@@ -161,3 +259,27 @@ func (h *streamsV1Handler) handleDone(t *testing.T) {
161259 defer h .mu .Unlock ()
162260 t .Logf ("found (%v) %d of %d" , h .found , len (h .found ), len (h .expected ))
163261}
262+
263+ func countTransactionsToAddress (
264+ ctx context.Context ,
265+ chain * framework.EthBlockchain ,
266+ target common.Address ,
267+ fromBlock uint64 ,
268+ toBlock uint64 ,
269+ ) (uint64 , error ) {
270+ var count uint64
271+ for blockNum := fromBlock ; blockNum <= toBlock ; blockNum ++ {
272+ block , err := chain .Client ().BlockByNumber (ctx , new (big.Int ).SetUint64 (blockNum ))
273+ if err != nil {
274+ return 0 , err
275+ }
276+
277+ for _ , tx := range block .Transactions () {
278+ if tx .To () != nil && * tx .To () == target {
279+ count ++
280+ }
281+ }
282+ }
283+
284+ return count , nil
285+ }
0 commit comments