diff --git a/architecture/evm/eth_getBlockByNumber.go b/architecture/evm/eth_getBlockByNumber.go index aef7fa025..0c75f7546 100644 --- a/architecture/evm/eth_getBlockByNumber.go +++ b/architecture/evm/eth_getBlockByNumber.go @@ -575,8 +575,37 @@ func isHyperEVMSystemTransaction(tx interface{}) bool { if !ok { return false } + from, ok := txObj["from"].(string) + if !ok || !isZeroishHex(from) { + return false + } + gas, ok := txObj["gas"].(string) + if !ok || !isZeroishHex(gas) { + return false + } gasPrice, ok := txObj["gasPrice"].(string) - return ok && isZeroishHex(gasPrice) + if !ok || !isZeroishHex(gasPrice) { + return false + } + if to, exists := txObj["to"]; exists && to != nil { + toStr, ok := to.(string) + if !ok || !isZeroishHex(toStr) { + return false + } + } + if value, exists := txObj["value"]; exists && value != nil { + valueStr, ok := value.(string) + if !ok || !isZeroishHex(valueStr) { + return false + } + } + if input, exists := txObj["input"]; exists && input != nil { + inputStr, ok := input.(string) + if !ok || !isZeroishHex(inputStr) { + return false + } + } + return true } // blockValidationTxLite is a minimal transaction model for block validation diff --git a/architecture/evm/eth_getBlockByNumber_test.go b/architecture/evm/eth_getBlockByNumber_test.go index fc41306a6..eb472c3ed 100644 --- a/architecture/evm/eth_getBlockByNumber_test.go +++ b/architecture/evm/eth_getBlockByNumber_test.go @@ -82,9 +82,10 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { "number": "0x100", "hash": "0xabc123", "transactions": [ - {"hash": "0xsystem1", "gasPrice": "0x0"}, + {"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"}, + {"hash": "0xreal-zero-price", "from": "0x1111111111111111111111111111111111111111", "to": "0x2222222222222222222222222222222222222222", "gas": "0x5208", "gasPrice": "0x0", "input": "0x", "value": "0x0"}, {"hash": "0xreal", "gasPrice": "0x1"}, - {"hash": "0xsystem2", "gasPrice": "0x00"} + {"hash": "0xsystem2", "from": "0x0000000000000000000000000000000000000000", "to": "0x0000000000000000000000000000000000000000", "gas": "0x00", "gasPrice": "0x00", "input": "0x00", "value": "0x00"} ] }` @@ -100,7 +101,7 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { return req, resp } - t.Run("FiltersGasPriceZeroTransactionsOnHyperEVM", func(t *testing.T) { + t.Run("FiltersOnlySystemTransactionsOnHyperEVM", func(t *testing.T) { req, resp := newResponse(t) network := &testNetwork{cfg: &common.NetworkConfig{ Architecture: common.ArchitectureEvm, @@ -118,9 +119,11 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { Transactions []map[string]interface{} `json:"transactions"` } require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block)) - require.Len(t, block.Transactions, 1) - assert.Equal(t, "0xreal", block.Transactions[0]["hash"]) - assert.Equal(t, "0x1", block.Transactions[0]["gasPrice"]) + require.Len(t, block.Transactions, 2) + assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"]) + assert.Equal(t, "0x0", block.Transactions[0]["gasPrice"]) + assert.Equal(t, "0xreal", block.Transactions[1]["hash"]) + assert.Equal(t, "0x1", block.Transactions[1]["gasPrice"]) }) t.Run("NetworkPostForwardFiltersCachedBlocks", func(t *testing.T) { @@ -142,8 +145,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { Transactions []map[string]interface{} `json:"transactions"` } require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block)) - require.Len(t, block.Transactions, 1) - assert.Equal(t, "0xreal", block.Transactions[0]["hash"]) + require.Len(t, block.Transactions, 2) + assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"]) + assert.Equal(t, "0xreal", block.Transactions[1]["hash"]) }) t.Run("NetworkPostForwardFiltersBlockByHash", func(t *testing.T) { @@ -169,8 +173,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { Transactions []map[string]interface{} `json:"transactions"` } require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block)) - require.Len(t, block.Transactions, 1) - assert.Equal(t, "0xreal", block.Transactions[0]["hash"]) + require.Len(t, block.Transactions, 2) + assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"]) + assert.Equal(t, "0xreal", block.Transactions[1]["hash"]) }) t.Run("PreservesHashOnlyTransactionsOnHyperEVM", func(t *testing.T) { @@ -225,7 +230,42 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) { Transactions []map[string]interface{} `json:"transactions"` } require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block)) - require.Len(t, block.Transactions, 3) + require.Len(t, block.Transactions, 4) + }) + + t.Run("ValidatesOriginalBlockBeforeFilteringAllSystemTransactions", func(t *testing.T) { + systemOnlyBlockJSON := `{ + "number": "0x100", + "hash": "0xabc123", + "transactionsRoot": "0x1111111111111111111111111111111111111111111111111111111111111111", + "transactions": [ + {"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"} + ] + }` + jrpcResp, err := common.NewJsonRpcResponseFromBytes([]byte(`1`), []byte(systemOnlyBlockJSON), nil) + require.NoError(t, err) + req := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBlockByNumber","params":["0x100",true]}`)) + req.SetDirectives(&common.RequestDirectives{ValidateTransactionsRoot: true}) + resp := common.NewNormalizedResponse(). + WithRequest(req). + WithJsonRpcResponse(jrpcResp) + network := &testNetwork{cfg: &common.NetworkConfig{ + Architecture: common.ArchitectureEvm, + Evm: &common.EvmNetworkConfig{ + ChainId: 999, + }, + }} + + filteredResp, err := upstreamPostForward_eth_getBlockByNumber(ctx, network, nil, req, resp, nil) + require.NoError(t, err) + + jrr, err := filteredResp.JsonRpcResponse(ctx) + require.NoError(t, err) + var block struct { + Transactions []map[string]interface{} `json:"transactions"` + } + require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block)) + require.Empty(t, block.Transactions) }) } diff --git a/common/config.go b/common/config.go index 8989c5fb7..59769c002 100644 --- a/common/config.go +++ b/common/config.go @@ -1746,12 +1746,9 @@ type ConsensusPolicyConfig struct { // the participant set so the first `maxParticipants` drawn satisfy // every entry — without changing `maxParticipants` itself. // - // Best-effort and governed by the EXISTING consensus behaviors: if a - // required group has fewer healthy upstreams than requested (or the - // quotas can't all fit within `maxParticipants`), consensus simply - // runs with what it can promote and the resulting participation is - // handled by `lowParticipantsBehavior` / `agreementThreshold` exactly - // like any other low-participation tick. Empty (default) = disabled. + // Reordering is best-effort, but final analysis fails closed when valid + // consensus responses do not satisfy every required quota. Empty + // (default) = disabled. RequiredParticipants []*ConsensusRequiredParticipant `yaml:"requiredParticipants,omitempty" json:"requiredParticipants,omitempty"` } diff --git a/consensus/analysis.go b/consensus/analysis.go index 8227e7108..3059307be 100644 --- a/consensus/analysis.go +++ b/consensus/analysis.go @@ -52,6 +52,8 @@ type consensusAnalysis struct { originalRequest *common.NormalizedRequest leaderUpstream common.Upstream method string // The RPC method being called (e.g., "eth_getTransactionCount") + waitCapped bool + missingRequired []string // Cached computed values cachedBestNonEmpty *responseGroup @@ -73,11 +75,16 @@ func contextFrom(ctxOrExec any) context.Context { } func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult) *consensusAnalysis { + return newConsensusAnalysisWithOptions(lg, ctxOrExec, config, responses, false) +} + +func newConsensusAnalysisWithOptions(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult, waitCapped bool) *consensusAnalysis { ctx := contextFrom(ctxOrExec) analysis := &consensusAnalysis{ config: config, groups: make(map[string]*responseGroup), totalParticipants: len(responses), + waitCapped: waitCapped, } // Try to extract original request and compute leader upstream once @@ -162,10 +169,61 @@ func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, res analysis.getBestError() analysis.getBestByCount() analysis.getBestBySize() + analysis.missingRequired = analysis.computeMissingRequiredParticipants() return analysis } +func (a *consensusAnalysis) computeMissingRequiredParticipants() []string { + if a == nil || a.config == nil || len(a.config.requiredParticipants) == 0 { + return nil + } + + seenByRequirement := make([]map[string]struct{}, len(a.config.requiredParticipants)) + for _, group := range a.groups { + for _, result := range group.Results { + if result == nil || result.Upstream == nil || result.CachedResponseType == ResponseTypeInfrastructureError { + continue + } + upstreamID := result.Upstream.Id() + if upstreamID == "" { + upstreamID = fmt.Sprintf("%p", result.Upstream) + } + for i, required := range a.config.requiredParticipants { + if required == nil || required.Tag == "" || required.MinParticipants <= 0 { + continue + } + if !upstreamMatchesTag(result.Upstream, required.Tag) { + continue + } + if seenByRequirement[i] == nil { + seenByRequirement[i] = make(map[string]struct{}) + } + seenByRequirement[i][upstreamID] = struct{}{} + } + } + } + + var missing []string + for i, required := range a.config.requiredParticipants { + if required == nil || required.Tag == "" || required.MinParticipants <= 0 { + continue + } + have := len(seenByRequirement[i]) + if have < required.MinParticipants { + missing = append(missing, fmt.Sprintf("%s=%d/%d", required.Tag, have, required.MinParticipants)) + } + } + return missing +} + +func (a *consensusAnalysis) hasPendingRequiredParticipants() bool { + if a == nil || a.config == nil || len(a.missingRequired) == 0 || a.waitCapped { + return false + } + return a.hasRemaining() +} + func (a *consensusAnalysis) hasRemaining() bool { return a.config.maxParticipants > a.totalParticipants } diff --git a/consensus/executor.go b/consensus/executor.go index 8764f8ad6..52eb51511 100644 --- a/consensus/executor.go +++ b/consensus/executor.go @@ -158,8 +158,8 @@ func (e *executor) Run( // include the configured minimum from each required group. Runs at the // very top of consensus, before any participant slot consumes an // upstream (req.UpstreamIdx is still 0), so the reorder takes effect. - // Best-effort: shortfalls fall through to lowParticipantsBehavior / - // agreementThreshold like organic low participation. + // Reordering is best-effort, but final analysis fails closed if valid + // responses do not satisfy the required tag quotas. if len(e.config.requiredParticipants) > 0 { if reordered := reorderForParticipantQuota(originalReq.Upstreams(), e.config.requiredParticipants); len(reordered) > 0 { originalReq.SetUpstreams(reordered) @@ -552,10 +552,7 @@ func (e *executor) runAnalyzer( if waitTimer != nil { waitTimer.Stop() } - if analysis == nil { - analysis = newConsensusAnalysis(e.logger, ctx, e.config, responses) - winner = e.determineWinner(lg, analysis) - } + analysis, winner = e.finalizeAnalyzerDecision(lg, ctx, responses, analysis, winner, waitCapped, shortCircuited) if !shortCircuited { // Short-circuit branch already marked winners; mark here only // for the wait-all path. @@ -626,6 +623,24 @@ func (e *executor) runAnalyzer( e.releaseNonWinningResponses(analysis, winner) } +func (e *executor) finalizeAnalyzerDecision( + lg *zerolog.Logger, + ctx context.Context, + responses []*execResult, + analysis *consensusAnalysis, + winner *slotResult, + waitCapped bool, + shortCircuited bool, +) (*consensusAnalysis, *slotResult) { + if analysis == nil || waitCapped { + analysis = newConsensusAnalysisWithOptions(e.logger, ctx, e.config, responses, waitCapped) + } + if shortCircuited { + return analysis, winner + } + return analysis, e.determineWinner(lg, analysis) +} + // releaseNonWinningResponses releases the Result pointers on every non-winning // execResult in analysis.groups. Extracted verbatim from the previous inline // loop in Apply() so behavior is preserved. diff --git a/consensus/quota.go b/consensus/quota.go index c5a6ead52..8d451d5b8 100644 --- a/consensus/quota.go +++ b/consensus/quota.go @@ -11,10 +11,8 @@ import "github.com/erpc/erpc/common" // Semantics: // - Best-effort: if a required group has fewer matching upstreams than // requested (or several quotas can't all fit within maxParticipants), -// it promotes everything it can and leaves the shortfall to the -// existing lowParticipantsBehavior / agreementThreshold handling — -// consensus is not aware this happened, it just sees fewer/uneven -// participants like any organic low-participation tick. +// it promotes everything it can. Final analysis enforces the quota +// against valid responses and returns low-participants on shortfall. // - Minimal disturbance: non-required upstreams keep their incoming // (selection-policy) order in the remaining slots, so ranking/quality // is preserved wherever the quota doesn't force a change. Order WITHIN diff --git a/consensus/quota_test.go b/consensus/quota_test.go index f2fa65ad7..9ed7a5dea 100644 --- a/consensus/quota_test.go +++ b/consensus/quota_test.go @@ -1,9 +1,13 @@ package consensus import ( + "context" + "sync/atomic" "testing" + "time" "github.com/erpc/erpc/common" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" ) @@ -117,3 +121,91 @@ func TestReorderForParticipantQuota(t *testing.T) { require.Equal(t, []string{"a", "b"}, idsOf(reorderForParticipantQuota(ups, reqs))) }) } + +func TestRequiredParticipantQuotaFailsClosedAtAnalysis(t *testing.T) { + lg := zerolog.Nop() + cfg := &config{ + maxParticipants: 1, + agreementThreshold: 1, + disputeBehavior: common.ConsensusDisputeBehaviorReturnError, + lowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult, + requiredParticipants: []*common.ConsensusRequiredParticipant{ + {Tag: "tier:paid", MinParticipants: 1}, + }, + } + + t.Run("missing required tag rejects threshold winner", func(t *testing.T) { + analysis := newConsensusAnalysis(&lg, context.Background(), cfg, []*execResult{{ + Result: validResponseWithValue("0x1"), + Upstream: common.NewFakeUpstream("public-1", common.WithTags("tier:public")), + }}) + + winner := (&executor{}).determineWinner(&lg, analysis) + require.NotNil(t, winner) + require.Nil(t, winner.Result) + require.Error(t, winner.Error) + require.True(t, common.HasErrorCode(winner.Error, common.ErrCodeConsensusLowParticipants), "got: %v", winner.Error) + }) + + t.Run("required tag present allows threshold winner", func(t *testing.T) { + analysis := newConsensusAnalysis(&lg, context.Background(), cfg, []*execResult{{ + Result: validResponseWithValue("0x1"), + Upstream: common.NewFakeUpstream("paid-1", common.WithTags("tier:paid")), + }}) + + winner := (&executor{}).determineWinner(&lg, analysis) + require.NotNil(t, winner) + require.NoError(t, winner.Error) + require.NotNil(t, winner.Result) + }) +} + +func TestRequiredParticipantQuotaWaitsForSlowRequiredParticipant(t *testing.T) { + logger := zerolog.New(zerolog.NewTestWriter(t)) + paid := common.NewFakeUpstream("paid-1", common.WithTags("tier:paid")) + public1 := common.NewFakeUpstream("public-1", common.WithTags("tier:public")) + public2 := common.NewFakeUpstream("public-2", common.WithTags("tier:public")) + + pol := newBuilder(). + WithLogger(&logger). + WithMaxParticipants(3). + WithAgreementThreshold(2). + WithDisputeBehavior(common.ConsensusDisputeBehaviorAcceptMostCommonValidResult). + WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult). + WithMaxWaitOnResult(common.NewStaticDuration(500 * time.Millisecond)). + WithMaxWaitOnEmpty(common.NewStaticDuration(500 * time.Millisecond)). + WithRequiredParticipants([]*common.ConsensusRequiredParticipant{ + {Tag: "tier:paid", MinParticipants: 1}, + }). + Build() + + req := newTestRequest() + req.SetUpstreams([]common.Upstream{public1, public2, paid}) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var calls atomic.Int32 + var paidCompleted atomic.Bool + resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { + switch calls.Add(1) { + case 1: + select { + case <-time.After(120 * time.Millisecond): + paidCompleted.Store(true) + return validResponseWithValue("0x1").SetUpstream(paid), nil + case <-ctx.Done(): + return nil, ctx.Err() + } + case 2: + return validResponseWithValue("0x1").SetUpstream(public1), nil + default: + return validResponseWithValue("0x1").SetUpstream(public2), nil + } + }) + + require.NoError(t, err) + require.NotNil(t, resp) + require.True(t, paidCompleted.Load(), "slow required participant must be allowed to finish") + require.GreaterOrEqual(t, calls.Load(), int32(3), "third participant should start before quota is satisfied") +} diff --git a/consensus/rules.go b/consensus/rules.go index b467f8c15..7b22a8439 100644 --- a/consensus/rules.go +++ b/consensus/rules.go @@ -2,6 +2,7 @@ package consensus import ( "math/big" + "strings" "github.com/erpc/erpc/common" ) @@ -31,6 +32,9 @@ var consensusRules = []consensusRule{ if a.method != "eth_sendRawTransaction" { return false } + if len(a.missingRequired) > 0 { + return false + } // Check if we have any non-empty response (tx hash) for _, g := range a.groups { if g.ResponseType == ResponseTypeNonEmpty && g.Count >= 1 { @@ -58,6 +62,36 @@ var consensusRules = []consensusRule{ } }, }, + { + Description: "wait-capped low participants fail closed", + Condition: func(a *consensusAnalysis) bool { + return a.waitCapped && a.validParticipants < a.config.agreementThreshold + }, + Action: func(a *consensusAnalysis) *slotResult { + return &slotResult{ + Error: common.NewErrConsensusLowParticipants( + "consensus wait cap fired before enough valid participants responded", + a.participants(), + nil, + ), + } + }, + }, + { + Description: "required participant quotas must be represented in valid responses", + Condition: func(a *consensusAnalysis) bool { + return len(a.missingRequired) > 0 && !a.hasPendingRequiredParticipants() + }, + Action: func(a *consensusAnalysis) *slotResult { + return &slotResult{ + Error: common.NewErrConsensusLowParticipants( + "not enough required consensus participants: "+strings.Join(a.missingRequired, ", "), + a.participants(), + nil, + ), + } + }, + }, // PreferHighestValueFor: when configured for this method, return the response with highest field values // that meets the agreementThreshold. This rule takes precedence over all other consensus rules. { @@ -848,6 +882,9 @@ var shortCircuitRules = []shortCircuitRule{ if a.method != "eth_sendRawTransaction" { return false } + if len(a.missingRequired) > 0 { + return false + } // Short-circuit as soon as we have any valid non-empty response (tx hash) // For eth_sendRawTransaction, once a tx is accepted by any node, it will // propagate through the network, so we don't need to wait for consensus. @@ -863,6 +900,9 @@ var shortCircuitRules = []shortCircuitRule{ Description: "consensus-valid error meets agreement threshold -> short-circuit to error", Reason: "consensus_error_threshold", Condition: func(w *slotResult, a *consensusAnalysis) bool { + if a.hasPendingRequiredParticipants() { + return false + } best := a.getBestByCount() if best == nil { return false @@ -896,6 +936,9 @@ var shortCircuitRules = []shortCircuitRule{ Description: "winner meets agreement threshold, is non-empty, and lead is unassailable (no possible tie with remaining)", Reason: "unassailable_lead", Condition: func(w *slotResult, a *consensusAnalysis) bool { + if a.hasPendingRequiredParticipants() { + return false + } // With remaining participants, avoid short-circuiting when a preference could still // change the winner. In particular, when PreferLargerResponses is enabled, a later // larger response can override a smaller above-threshold winner regardless of counts. diff --git a/consensus/rules_sendrawtx_test.go b/consensus/rules_sendrawtx_test.go index c3b4f63be..e9e288109 100644 --- a/consensus/rules_sendrawtx_test.go +++ b/consensus/rules_sendrawtx_test.go @@ -1,6 +1,7 @@ package consensus import ( + "context" "errors" "testing" @@ -175,6 +176,61 @@ func TestSendRawTransaction_ConsensusRule(t *testing.T) { }) } +func TestSendRawTransaction_RequiredParticipantQuota(t *testing.T) { + logger := zerolog.Nop() + cfg := &config{ + maxParticipants: 2, + agreementThreshold: 1, + requiredParticipants: []*common.ConsensusRequiredParticipant{ + {Tag: "tier:paid", MinParticipants: 1}, + }, + } + req := common.NewNormalizedRequest([]byte( + `{"jsonrpc":"2.0","id":1,"method":"eth_sendRawTransaction","params":["0xdeadbeef"]}`, + )) + ctx := context.WithValue(context.Background(), common.RequestContextKey, req) + public1 := common.NewFakeUpstream("public-1", common.WithTags("tier:public")) + public2 := common.NewFakeUpstream("public-2", common.WithTags("tier:public")) + paid := common.NewFakeUpstream("paid-1", common.WithTags("tier:paid")) + + t.Run("does not bypass while required participant can still respond", func(t *testing.T) { + resp := validResponseWithValue("0xpublic") + analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{{ + Result: resp, + Upstream: public1, + }}) + + require.NotEmpty(t, analysis.missingRequired) + assert.False(t, consensusRules[0].Condition(analysis), "sendRawTx result rule must wait for required quota") + assert.False(t, shortCircuitRules[0].Condition(&slotResult{Result: resp}, analysis), "sendRawTx short-circuit must wait for required quota") + }) + + t.Run("fails closed when required participant never returned valid response", func(t *testing.T) { + analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{ + {Result: validResponseWithValue("0xpublic1"), Upstream: public1}, + {Result: validResponseWithValue("0xpublic2"), Upstream: public2}, + }) + + winner := (&executor{}).determineWinner(&logger, analysis) + require.NotNil(t, winner) + require.Nil(t, winner.Result) + require.Error(t, winner.Error) + require.True(t, common.HasErrorCode(winner.Error, common.ErrCodeConsensusLowParticipants), "got: %v", winner.Error) + }) + + t.Run("allows first tx hash once required quota is satisfied", func(t *testing.T) { + resp := validResponseWithValue("0xpaid") + analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{{ + Result: resp, + Upstream: paid, + }}) + + require.Empty(t, analysis.missingRequired) + assert.True(t, consensusRules[0].Condition(analysis), "sendRawTx result rule can apply after required quota") + assert.True(t, shortCircuitRules[0].Condition(&slotResult{Result: resp}, analysis), "sendRawTx short-circuit can apply after required quota") + }) +} + // TestSendRawTransaction_ShortCircuitRule tests that eth_sendRawTransaction // short-circuits as soon as one valid response is received. func TestSendRawTransaction_ShortCircuitRule(t *testing.T) { diff --git a/consensus/wait_cap_test.go b/consensus/wait_cap_test.go index 944a95b69..37de3e91c 100644 --- a/consensus/wait_cap_test.go +++ b/consensus/wait_cap_test.go @@ -14,17 +14,30 @@ import ( "github.com/stretchr/testify/require" ) -// TestWaitCap_MaxWaitOnResult_BoundsTailLatency verifies that once one -// non-empty response arrives, the analyzer resolves within maxWaitOnResult -// even if a sibling participant is still running. +func waitForStragglerOrCancel(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +// TestWaitCap_MaxWaitOnResult_BoundsTailLatency verifies that once enough +// non-empty responses arrive to satisfy the threshold, the analyzer does not +// wait for an unrelated straggler. func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) { logger := zerolog.New(zerolog.NewTestWriter(t)) pol := newBuilder(). WithLogger(&logger). WithMaxParticipants(3). - WithAgreementThreshold(3). // require 3 to disable short-circuit + WithAgreementThreshold(2). WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult). + WithPreferLargerResponses(true). WithMaxWaitOnResult(common.NewStaticDuration(100 * time.Millisecond)). Build() @@ -34,7 +47,7 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) { var slot atomic.Int32 start := time.Now() - resp, err := pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { + resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { idx := slot.Add(1) switch idx { case 1: @@ -46,7 +59,9 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) { return validResponseWithValue("0xfast"), nil default: // slow straggler — exceeds the cap, should be cancelled - time.Sleep(2 * time.Second) + if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil { + return nil, err + } return validResponseWithValue("0xslow"), nil } }) @@ -54,10 +69,48 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) + assert.GreaterOrEqual(t, elapsed, 80*time.Millisecond, + "test must exercise maxWaitOnResult instead of short-circuiting before the cap") assert.Less(t, elapsed, 800*time.Millisecond, "wait cap must bound elapsed time well below the straggler's 2s") } +func TestWaitCap_MaxWaitOnResult_FailsClosedBelowThreshold(t *testing.T) { + logger := zerolog.New(zerolog.NewTestWriter(t)) + + pol := newBuilder(). + WithLogger(&logger). + WithMaxParticipants(3). + WithAgreementThreshold(2). + WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult). + WithMaxWaitOnResult(common.NewStaticDuration(50 * time.Millisecond)). + Build() + + req := newTestRequest() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var slot atomic.Int32 + start := time.Now() + resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { + idx := slot.Add(1) + if idx == 1 { + return validResponseWithValue("0xfast"), nil + } + if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil { + return nil, err + } + return validResponseWithValue("0xfast"), nil + }) + elapsed := time.Since(start) + + require.Error(t, err) + require.Nil(t, resp) + assert.True(t, common.HasErrorCode(err, common.ErrCodeConsensusLowParticipants), "got: %v", err) + assert.Less(t, elapsed, 800*time.Millisecond, + "wait cap should still bound latency, but must fail closed below threshold") +} + func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) { logger := zerolog.New(zerolog.NewTestWriter(t)) @@ -100,8 +153,9 @@ func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) { }) elapsed := time.Since(start) - require.NoError(t, err) - require.NotNil(t, resp) + require.Error(t, err) + require.Nil(t, resp) + assert.True(t, common.HasErrorCode(err, common.ErrCodeConsensusLowParticipants), "got: %v", err) require.Less(t, elapsed, 500*time.Millisecond, "maxWaitOnResult must return before slow participant finishes") require.Equal(t, int32(0), slowBodyClosed.Load(), "late response should not be released before it exists") @@ -113,6 +167,49 @@ func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) { }, 5*time.Second, 10*time.Millisecond, "late post-wait-cap response should be released exactly once") } +func TestWaitCap_DoesNotReplaceShortCircuitWinner(t *testing.T) { + logger := zerolog.New(zerolog.NewTestWriter(t)) + + pol := newBuilder(). + WithLogger(&logger). + WithMaxParticipants(3). + WithAgreementThreshold(2). + WithMaxWaitOnResult(common.NewStaticDuration(50 * time.Millisecond)). + Build() + + req := common.NewNormalizedRequest([]byte( + `{"jsonrpc":"2.0","id":1,"method":"eth_sendRawTransaction","params":["0xdeadbeef"]}`, + )) + ctx := context.WithValue(context.Background(), common.RequestContextKey, req) + + var callCount atomic.Int32 + var winnerBodyClosed atomic.Int32 + var slowBodyClosed atomic.Int32 + slowRelease := make(chan struct{}) + + resp, err := pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { + if callCount.Add(1) == 1 { + return validResponseWithCloser("0xwinner", &winnerBodyClosed), nil + } + <-slowRelease + return validResponseWithCloser("0xslow", &slowBodyClosed), nil + }) + + require.NoError(t, err) + require.NotNil(t, resp) + defer resp.Release() + + time.Sleep(100 * time.Millisecond) + close(slowRelease) + + require.Eventually(t, func() bool { + return slowBodyClosed.Load() == 1 + }, 5*time.Second, 10*time.Millisecond, "late response should still be drained") + assert.Never(t, func() bool { + return winnerBodyClosed.Load() != 0 + }, 200*time.Millisecond, 10*time.Millisecond, "wait-cap finalization must not replace or release short-circuit winner") +} + // TestWaitCap_MaxWaitOnEmpty_TighterFloor verifies that when ONLY empty // responses have arrived, the (typically larger) maxWaitOnEmpty cap // applies — bounded even if no real answer is ever produced. @@ -133,7 +230,7 @@ func TestWaitCap_MaxWaitOnEmpty_TighterFloor(t *testing.T) { var slot atomic.Int32 start := time.Now() - _, _ = pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { + _, _ = pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) { idx := slot.Add(1) switch idx { case 1: @@ -144,7 +241,9 @@ func TestWaitCap_MaxWaitOnEmpty_TighterFloor(t *testing.T) { return validResponseWithValue(""), nil default: // straggler way over the cap - time.Sleep(2 * time.Second) + if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil { + return nil, err + } return validResponseWithValue("0xslow"), nil } }) @@ -304,7 +403,9 @@ func TestWaitCap_ArmedByRealAttemptFailures(t *testing.T) { // Real attempt failure: a plain transport error, not a skip. return nil, errors.New("connection reset by peer") } - time.Sleep(2 * time.Second) + if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil { + return nil, err + } return validResponseWithValue("0xslow"), nil }) elapsed := time.Since(start) diff --git a/docs/pages/config/failsafe/consensus.mdx b/docs/pages/config/failsafe/consensus.mdx index 4423b018c..177ebd8f1 100644 --- a/docs/pages/config/failsafe/consensus.mdx +++ b/docs/pages/config/failsafe/consensus.mdx @@ -128,42 +128,44 @@ https://docs.erpc.cloud/config/failsafe/consensus.llms.txt`} **Integration point.** Consensus sits at `networkExecutor.Run`. When a `ConsensusPolicyConfig` is present and the `SkipConsensus` directive is false, the executor calls `consensus.Run(ctx, req, slotInner)` where `slotInner` is `retry(hedge(tryOneUpstream))`. Without consensus the path is `retry(hedge(runUpstreamSweep))`. Composition order: `consensus(retry(hedge(tryOneUpstream)))`. () -**Participant selection.** Before spawning goroutines, if `requiredParticipants` is configured, `reorderForParticipantQuota` front-loads tag-matching upstreams so the first `maxParticipants` drawn satisfy every `{tag, minParticipants}` entry. Tag matching uses exact equality first, then glob wildcards (`*`, `?`). This is best-effort: quota shortfalls are handled by `lowParticipantsBehavior`. The executor then spawns exactly `maxParticipants` goroutines, each calling `slotInner` independently. () +**Participant selection.** Before spawning goroutines, if `requiredParticipants` is configured, `reorderForParticipantQuota` front-loads tag-matching upstreams so the first `maxParticipants` drawn satisfy every `{tag, minParticipants}` entry. Tag matching uses exact equality first, then glob wildcards (`*`, `?`). Reordering is best-effort, but final analysis is strict: if fewer than `minParticipants` valid responses carry a required tag, consensus returns `ErrConsensusLowParticipants`. The executor then spawns exactly `maxParticipants` goroutines, each calling `slotInner` independently. () -**Collection and analysis loop.** The `runAnalyzer` goroutine reads responses from a buffered channel. After each response it calls `newConsensusAnalysis` (classify, hash, group) and `determineWinner` (apply 24 priority-ordered rules). If `shouldShortCircuit` returns true, the outcome is sent immediately and remaining requests are cancelled (or left running in fire-and-forget mode). After all participants respond — or a wait-cap deadline fires — the final outcome is sent to the caller's select. +**Collection and analysis loop.** The `runAnalyzer` goroutine reads responses from a buffered channel. After each response it calls `newConsensusAnalysis` (classify, hash, group) and `determineWinner` (apply 26 priority-ordered rules). If `shouldShortCircuit` returns true, the outcome is sent immediately and remaining requests are cancelled (or left running in fire-and-forget mode). After all participants respond — or a wait-cap deadline fires — the final outcome is sent to the caller's select. **Response classification.** Each response is classified into one of four types: `ResponseTypeNonEmpty` (successful non-null result), `ResponseTypeEmpty` (successful but null/empty result), `ResponseTypeConsensusError` (JSON-RPC execution exception, client exception, unsupported, or missing-data), `ResponseTypeInfrastructureError` (all other errors including `ErrUpstreamsExhausted`). Infrastructure errors are excluded from `validParticipants`. `ErrUpstreamsExhausted` is always infra even when it wraps execution exceptions via the shared `ErrorsByUpstream` map. () Specific error-code mappings: `ErrCodeEndpointExecutionException` (EVM revert, code 3) → `ConsensusError`; `ErrCodeEndpointClientSideException`, `ErrCodeEndpointUnsupported`, `ErrCodeEndpointMissingData` → `ConsensusError`; all other errors (timeout, network, server 500) → `InfrastructureError`; nil JSON-RPC response on a successful response object → `InfrastructureError` with hash `"error:generic"`. A successful response where `IsResultEmptyish` returns true → `ResponseTypeEmpty`. **Hashing.** Non-error responses are hashed via `JsonRpcResponse.CanonicalHash()` or `CanonicalHashWithIgnoredFields(fields)` when `ignoreFields` is configured for the method. Errors are hashed via `errorToConsensusHash`: JSON-RPC exceptions hash as `"jsonrpc:"`; standard errors hash as their code; unknown errors hash as `"error:generic"`. Within each hash group, `LargestResult` tracks the member with the highest `CachedResponseSize`; the winner returns `group.LargestResult`, not the first response. () -**Rule evaluation.** `determineWinner` walks `consensusRules` in priority order (). Full 24-rule ordering: +**Rule evaluation.** `determineWinner` walks `consensusRules` in priority order (). Full 26-rule ordering: | Priority | Rule | Triggers when | Action | |---|---|---|---| | 1 | `eth_sendRawTransaction` special | Method is `eth_sendRawTransaction` AND any non-empty response exists | Return first non-empty; threshold ignored | -| 2 | `preferHighestValueFor` | Method configured AND extractable numeric values exist | Group by numeric value; highest ≥ threshold wins; else dispute | -| 3 | `onlyBlockHeadLeader` on dispute | `disputeBehavior == onlyBlockHeadLeader` AND no group meets threshold | Leader non-error → leader error (incl. infra) → dispute | -| 4 | `onlyBlockHeadLeader` on low participants | `lowParticipantsBehavior == onlyBlockHeadLeader` AND low participants | Leader non-error → leader non-infra error → low-participants error | -| 5 | `preferBlockHeadLeader` on dispute/low | `preferBlockHeadLeader` AND no group meets threshold AND leader non-error exists | Return leader non-error; else dispute | -| 6 | `preferLarger + acceptMostCommon` below threshold | `preferLargerResponses` AND `acceptMostCommon` AND below threshold | Return largest non-empty by size | -| 7 | `acceptMostCommon + preferNonEmpty` above threshold (empty/error leads) | `preferNonEmpty` AND `acceptMostCommon` AND threshold met AND leader is empty or error AND non-empty exists | Return best non-empty (count, then size) | -| 8 | Tie at/above threshold (no preference) | Multiple non-error groups share best count ≥ threshold | Dispute | -| 9 | `acceptMostCommon` below threshold: non-empty over empty | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND exactly 1 non-empty AND ≥1 empty | Return single non-empty | -| 10 | `acceptMostCommon` below threshold: non-empty over error | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND leader is error AND non-empty exists | Return best non-empty | -| 11 | `returnError + preferNonEmpty` (empty threshold winner) | `disputeBehavior == returnError` AND empty meets threshold AND non-empty minority exists | Dispute (escalate from empty winner) | -| 12 | `acceptMostCommon + preferNonEmpty` above threshold (both qualify) | Both non-empty and consensus-error ≥ threshold | Return best non-empty | -| 13 | Tie above threshold (general, no preference) | Multiple valid groups share best count ≥ threshold | Dispute | -| 14 | `preferLarger`: above threshold, multiple groups | `preferLargerResponses` AND best meets threshold AND >1 group ≥ threshold | Return largest non-empty by size | -| 15 | `preferLarger + acceptMostCommon`: larger exists below threshold | `preferLargerResponses` AND `acceptMostCommon` AND best meets threshold AND non-empty AND larger exists below | Return largest non-empty by size | -| 16 | `returnError + preferLarger`: smaller wins but larger exists | `disputeBehavior == returnError` AND `preferLargerResponses` AND threshold met AND larger below threshold | Dispute | -| 17 | `acceptMostCommon` below threshold: unique leader | `acceptMostCommon` AND best count < threshold AND best > second-best | Return best valid group | -| 18 | `lowParticipants + acceptMostCommon` | `lowParticipantsBehavior == acceptMostCommon` AND low participants | Non-empty (if untied) → empty → error → low-participants error | -| 19 | Threshold winner (generic) | Any valid group meets threshold | Return result or consensus-error | -| 20 | Dispute (multiple groups, none at threshold) | Best count < threshold AND >1 valid group | Dispute error | -| 21 | All infra errors meet threshold | `validParticipants == 0` AND infra group meets threshold | Return agreed infra error | -| 22 | `lowParticipants + returnError` | `lowParticipantsBehavior == returnError` AND low participants | Low-participants error | -| 23 | No responses | `len(groups) == 0` | Low-participants error | -| 24 | Fallback | Catch-all | Low-participants error | +| 2 | Wait-capped low participants | Wait cap fired AND `validParticipants < agreementThreshold` | Low-participants error | +| 3 | Required-participant quota missing | Any `requiredParticipants` entry has fewer than `minParticipants` valid responses carrying its tag | Low-participants error | +| 4 | `preferHighestValueFor` | Method configured AND extractable numeric values exist | Group by numeric value; highest ≥ threshold wins; else dispute | +| 5 | `onlyBlockHeadLeader` on dispute | `disputeBehavior == onlyBlockHeadLeader` AND no group meets threshold | Leader non-error → leader error (incl. infra) → dispute | +| 6 | `onlyBlockHeadLeader` on low participants | `lowParticipantsBehavior == onlyBlockHeadLeader` AND low participants | Leader non-error → leader non-infra error → low-participants error | +| 7 | `preferBlockHeadLeader` on dispute/low | `preferBlockHeadLeader` AND no group meets threshold AND leader non-error exists | Return leader non-error; else dispute | +| 8 | `preferLarger + acceptMostCommon` below threshold | `preferLargerResponses` AND `acceptMostCommon` AND below threshold | Return largest non-empty by size | +| 9 | `acceptMostCommon + preferNonEmpty` above threshold (empty/error leads) | `preferNonEmpty` AND `acceptMostCommon` AND threshold met AND leader is empty or error AND non-empty exists | Return best non-empty (count, then size) | +| 10 | Tie at/above threshold (no preference) | Multiple non-error groups share best count ≥ threshold | Dispute | +| 11 | `acceptMostCommon` below threshold: non-empty over empty | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND exactly 1 non-empty AND ≥1 empty | Return single non-empty | +| 12 | `acceptMostCommon` below threshold: non-empty over error | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND leader is error AND non-empty exists | Return best non-empty | +| 13 | `returnError + preferNonEmpty` (empty threshold winner) | `disputeBehavior == returnError` AND empty meets threshold AND non-empty minority exists | Dispute (escalate from empty winner) | +| 14 | `acceptMostCommon + preferNonEmpty` above threshold (both qualify) | Both non-empty and consensus-error ≥ threshold | Return best non-empty | +| 15 | Tie above threshold (general, no preference) | Multiple valid groups share best count ≥ threshold | Dispute | +| 16 | `preferLarger`: above threshold, multiple groups | `preferLargerResponses` AND best meets threshold AND >1 group ≥ threshold | Return largest non-empty by size | +| 17 | `preferLarger + acceptMostCommon`: larger exists below threshold | `preferLargerResponses` AND `acceptMostCommon` AND best meets threshold AND non-empty AND larger exists below | Return largest non-empty by size | +| 18 | `returnError + preferLarger`: smaller wins but larger exists | `disputeBehavior == returnError` AND `preferLargerResponses` AND threshold met AND larger below threshold | Dispute | +| 19 | `acceptMostCommon` below threshold: unique leader | `acceptMostCommon` AND best count < threshold AND best > second-best | Return best valid group | +| 20 | `lowParticipants + acceptMostCommon` | `lowParticipantsBehavior == acceptMostCommon` AND low participants | Non-empty (if untied) → empty → error → low-participants error | +| 21 | Threshold winner (generic) | Any valid group meets threshold | Return result or consensus-error | +| 22 | Dispute (multiple groups, none at threshold) | Best count < threshold AND >1 valid group | Dispute error | +| 23 | All infra errors meet threshold | `validParticipants == 0` AND infra group meets threshold | Return agreed infra error | +| 24 | `lowParticipants + returnError` | `lowParticipantsBehavior == returnError` AND low participants | Low-participants error | +| 25 | No responses | `len(groups) == 0` | Low-participants error | +| 26 | Fallback | Catch-all | Low-participants error | **Short-circuit rules.** `shouldShortCircuit` evaluates 3 rules after each response. (1) `sendrawtx_first_success`: fires on any single non-empty for `eth_sendRawTransaction`, never blocked. (2) `consensus_error_threshold`: best group is consensus-error ≥ threshold; blocked when `preferHighestValueFor` configured, or `acceptMostCommon` active AND (`preferNonEmpty` OR `preferLargerResponses`). (3) `unassailable_lead`: best non-empty ≥ threshold with unassailable lead; blocked when `preferLargerResponses` active, `preferHighestValueFor` configured, or `preferNonEmpty` active and leader is empty. @@ -171,7 +173,7 @@ https://docs.erpc.cloud/config/failsafe/consensus.llms.txt`} **`eth_sendRawTransaction` broadcast mode.** Rule 1 fires as soon as any single non-empty response exists — threshold is ignored entirely. Combined with `fireAndForget: true`, eRPC returns the first accepted tx hash immediately while remaining participant goroutines continue broadcasting using `context.WithoutCancel(ctx)` as their base, surviving HTTP client disconnection. They are bounded by upstream call completion, wait-cap timers, or process exit — not by graceful shutdown signals. -**Wait caps.** `maxWaitOnResult` (adaptive p50, default min=5ms, max=1s) arms when the first non-empty response arrives. `maxWaitOnEmpty` (adaptive p90, default min=50ms, max=2s) arms on the very first response of any kind. When a cap fires, the analyzer resolves with what it has collected, applying `disputeBehavior` or `lowParticipantsBehavior` as appropriate. `erpc_consensus_wait_capped_total{trigger}` tracks firings. +**Wait caps.** `maxWaitOnResult` (adaptive p50, default min=5ms, max=1s) arms when the first non-empty response arrives. `maxWaitOnEmpty` (adaptive p90, default min=50ms, max=2s) arms on the very first response of any kind. When a cap fires before `validParticipants >= agreementThreshold`, the analyzer fails closed with `ErrConsensusLowParticipants`; it does not use `lowParticipantsBehavior: acceptMostCommonValidResult` to return a lone fast response. Once enough valid participants are present, the analyzer resolves with collected responses, applying normal dispute rules. `erpc_consensus_wait_capped_total{trigger}` tracks firings. **SkipConsensus directive.** Any request can bypass consensus via `X-ERPC-Skip-Consensus: true` header, `?skip-consensus=true` query param, or `directiveDefaults.skipConsensus: true` in network/project config. When set, the executor falls through to `retry(hedge(runUpstreamSweep))`. Only `"true"` activates bypass; any other value keeps consensus active. @@ -190,7 +192,7 @@ All fields under `networks[].failsafe[].consensus.` (or project-level `failsafe[ | `maxParticipants` | int | `5` () | Number of upstreams to fan-out to. If ≤ 0, clamped to 1. | | `agreementThreshold` | int | `2` () | Minimum participants that must agree on the same response hash. Also used by `isLowParticipants` (`validParticipants < threshold`). | | `disputeBehavior` | string enum | `"returnError"` () | Action when `validParticipants >= agreementThreshold` but no group meets threshold. Values: `returnError`, `acceptMostCommonValidResult`, `preferBlockHeadLeader`, `onlyBlockHeadLeader`. | -| `lowParticipantsBehavior` | string enum | `"acceptMostCommonValidResult"` () | Action when `validParticipants < agreementThreshold` (fewer upstreams produced non-infrastructure-error responses than the threshold, typically because too few upstreams were reachable before wait caps fired). Same four values as `disputeBehavior`. Distinction: `disputeBehavior` fires when `validParticipants >= agreementThreshold` but no group meets threshold (participants disagree); `lowParticipantsBehavior` fires when there are not enough valid participants to even reach threshold. Under `acceptMostCommon`: non-empty > empty > error; ties among non-empty groups → dispute. | +| `lowParticipantsBehavior` | string enum | `"acceptMostCommonValidResult"` () | Action when `validParticipants < agreementThreshold` after normal collection. Same four values as `disputeBehavior`. Distinction: `disputeBehavior` fires when `validParticipants >= agreementThreshold` but no group meets threshold (participants disagree); `lowParticipantsBehavior` fires when there are not enough valid participants to even reach threshold. Wait-capped low-participant rounds fail closed before this setting can pick a winner. Under `acceptMostCommon`: non-empty > empty > error; ties among non-empty groups → dispute. | | `disputeLogLevel` | string | `"warn"` () | Zerolog level for misbehavior events. Values: `trace`, `debug`, `info`, `warn`, `error`. **Footgun**: zerolog zero value is `TraceLevel`; builder coerces unset to `WarnLevel`. Set `"trace"` explicitly for trace-level logging. | | `ignoreFields` | `map[string][]string` | `{"eth_getLogs":["*.blockTimestamp"],"eth_getTransactionReceipt":["blockTimestamp","logs.*.blockTimestamp"],"eth_getBlockReceipts":["*.blockTimestamp","*.logs.*.blockTimestamp"]}` () | Per-method dot-path field patterns excluded from canonical hash. **Set replacement, not merge**: setting ANY entry replaces the ENTIRE default map. To add a method, re-include all three defaults. Setting `{}` disables all defaults. | | `preferNonEmpty` | \*bool | `true` () | Prefer non-empty over empty or consensus-error in dispute resolution. Disables short-circuit when empty would lead. | @@ -199,7 +201,7 @@ All fields under `networks[].failsafe[].consensus.` (or project-level `failsafe[ | `fireAndForget` | bool | `false` (Go zero value; no `SetDefaults` entry) | When `true`, remaining in-flight requests are NOT cancelled after short-circuit. Goroutines use `context.WithoutCancel(ctx)` and survive HTTP disconnection. **Footgun**: process shutdown signals do not reach these goroutines; budget shutdown timeouts accordingly. | | `maxWaitOnResult` | \*AdaptiveDuration | `{quantile:0.5, min:"5ms", max:"1s"}` () | Cap after first non-empty response. Cold-start fallback is `min` (5ms). If you configure without a `min`, cold start returns 0 — no cap until data accumulates. | | `maxWaitOnEmpty` | \*AdaptiveDuration | `{quantile:0.9, min:"50ms", max:"2s"}` () | Cap after first response of any kind. Same cold-start behavior: fallback to `min` (50ms). | -| `requiredParticipants` | `[]*ConsensusRequiredParticipant` | `[]` (disabled) | List of `{tag, minParticipants}` entries. Front-loads tag-matching upstreams. Best-effort: shortfall falls through to `lowParticipantsBehavior`. | +| `requiredParticipants` | `[]*ConsensusRequiredParticipant` | `[]` (disabled) | List of `{tag, minParticipants}` entries. Front-loads tag-matching upstreams, then enforces that at least `minParticipants` valid responses carry each tag. Shortfall returns `ErrConsensusLowParticipants`. | | `requiredParticipants[].tag` | string | **Required** | Glob pattern matched against `upstream.tags[]`. Empty tag is a startup validation error. | | `requiredParticipants[].minParticipants` | int | **Required; no default** | Must be > 0 and ≤ `maxParticipants`. Omitting or setting 0 is a startup validation error — not a silent no-op. | | `punishMisbehavior` | \*PunishMisbehaviorConfig | `nil` (disabled) | Enables cordon punishment for persistent misbehavior. | @@ -523,9 +525,9 @@ The 100 MiB `maxSize` guard prevents unbounded memory use during high-dispute pe 1. **`ignoreFields` is full set replacement.** Adding one method removes all three built-in entries. Spurious disputes on `blockTimestamp` fields result. Always re-include all default entries when extending the map. () 2. **`preferLargerResponses` disables all short-circuit.** Even a clear majority above threshold does not short-circuit because a larger response may still arrive. Disable in latency-sensitive scenarios. 3. **`eth_sendRawTransaction` bypasses threshold entirely.** Rule 1 fires on any single non-empty response regardless of how many others errored. Intentional for tx broadcasting. -4. **`preferNonEmpty` + `returnError` escalates empty threshold winner to dispute.** When empty meets threshold, non-empty minority exists, and `preferNonEmpty: true` under `disputeBehavior: returnError`, the result is a dispute — not the empty winner and not the minority non-empty. (Rule 11) -5. **Tie among non-empty at/above threshold without preference → dispute.** Rule 8 fires before the generic threshold-winner rule 19. Enable `preferLargerResponses` to resolve ties by size. -6. **`requiredParticipants` shortfall is silent.** No warning, no metric, no error. Operators cannot distinguish tag-quota shortfall from general upstream unavailability in current observability output. +4. **`preferNonEmpty` + `returnError` escalates empty threshold winner to dispute.** When empty meets threshold, non-empty minority exists, and `preferNonEmpty: true` under `disputeBehavior: returnError`, the result is a dispute — not the empty winner and not the minority non-empty. (Rule 13) +5. **Tie among non-empty at/above threshold without preference → dispute.** Rule 10 fires before the generic threshold-winner rule 21. Enable `preferLargerResponses` to resolve ties by size. +6. **`requiredParticipants` shortfall fails closed.** Front-loading is best-effort, but final analysis requires each configured tag quota to be represented by valid responses. Shortfall returns `ErrConsensusLowParticipants` before any threshold winner can be returned. 7. **`fireAndForget` goroutines survive graceful shutdown.** `context.WithoutCancel` strips shutdown signals. Budget shutdown timeouts to account for in-flight fire-and-forget participants. 8. **S3 uses `PutObject`, not append.** With `{dateByHour}` in `filePattern`, flushes within the same hour overwrite the previous S3 object. Use `{timestampMs}` if each flush should create a new object. 9. **S3 `accessKeyID` casing is load-bearing.** YAML key must be `accessKeyID` (capital D). Lowercase `accessKeyId` silently falls back to the SDK credential chain. diff --git a/erpc/networks_hedge_cancel_test.go b/erpc/networks_hedge_cancel_test.go index 40773c921..665a761ce 100644 --- a/erpc/networks_hedge_cancel_test.go +++ b/erpc/networks_hedge_cancel_test.go @@ -107,6 +107,8 @@ func TestHedgeConsensus_AllUpstreamsMissingData_HedgeRecovery(t *testing.T) { LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult, PreferNonEmpty: &common.TRUE, PreferLargerResponses: &common.TRUE, + MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond), + MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond), }, &common.RetryPolicyConfig{ MaxAttempts: 1, @@ -205,6 +207,8 @@ func TestHedgeConsensus_ServerError_HedgeRecovery(t *testing.T) { LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult, PreferNonEmpty: &common.TRUE, PreferLargerResponses: &common.TRUE, + MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond), + MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond), }, &common.RetryPolicyConfig{ MaxAttempts: 1, @@ -364,6 +368,8 @@ func TestHedgeConsensus_OneUpstreamMissingData_OthersSucceed_StillWorks(t *testi LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult, PreferNonEmpty: &common.TRUE, PreferLargerResponses: &common.TRUE, + MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond), + MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond), }, &common.RetryPolicyConfig{ MaxAttempts: 2,