diff --git a/architecture/evm/eth_getBlockByNumber.go b/architecture/evm/eth_getBlockByNumber.go
index aef7fa025..0c75f7546 100644
--- a/architecture/evm/eth_getBlockByNumber.go
+++ b/architecture/evm/eth_getBlockByNumber.go
@@ -575,8 +575,37 @@ func isHyperEVMSystemTransaction(tx interface{}) bool {
if !ok {
return false
}
+ from, ok := txObj["from"].(string)
+ if !ok || !isZeroishHex(from) {
+ return false
+ }
+ gas, ok := txObj["gas"].(string)
+ if !ok || !isZeroishHex(gas) {
+ return false
+ }
gasPrice, ok := txObj["gasPrice"].(string)
- return ok && isZeroishHex(gasPrice)
+ if !ok || !isZeroishHex(gasPrice) {
+ return false
+ }
+ if to, exists := txObj["to"]; exists && to != nil {
+ toStr, ok := to.(string)
+ if !ok || !isZeroishHex(toStr) {
+ return false
+ }
+ }
+ if value, exists := txObj["value"]; exists && value != nil {
+ valueStr, ok := value.(string)
+ if !ok || !isZeroishHex(valueStr) {
+ return false
+ }
+ }
+ if input, exists := txObj["input"]; exists && input != nil {
+ inputStr, ok := input.(string)
+ if !ok || !isZeroishHex(inputStr) {
+ return false
+ }
+ }
+ return true
}
// blockValidationTxLite is a minimal transaction model for block validation
diff --git a/architecture/evm/eth_getBlockByNumber_test.go b/architecture/evm/eth_getBlockByNumber_test.go
index fc41306a6..eb472c3ed 100644
--- a/architecture/evm/eth_getBlockByNumber_test.go
+++ b/architecture/evm/eth_getBlockByNumber_test.go
@@ -82,9 +82,10 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
"number": "0x100",
"hash": "0xabc123",
"transactions": [
- {"hash": "0xsystem1", "gasPrice": "0x0"},
+ {"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"},
+ {"hash": "0xreal-zero-price", "from": "0x1111111111111111111111111111111111111111", "to": "0x2222222222222222222222222222222222222222", "gas": "0x5208", "gasPrice": "0x0", "input": "0x", "value": "0x0"},
{"hash": "0xreal", "gasPrice": "0x1"},
- {"hash": "0xsystem2", "gasPrice": "0x00"}
+ {"hash": "0xsystem2", "from": "0x0000000000000000000000000000000000000000", "to": "0x0000000000000000000000000000000000000000", "gas": "0x00", "gasPrice": "0x00", "input": "0x00", "value": "0x00"}
]
}`
@@ -100,7 +101,7 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
return req, resp
}
- t.Run("FiltersGasPriceZeroTransactionsOnHyperEVM", func(t *testing.T) {
+ t.Run("FiltersOnlySystemTransactionsOnHyperEVM", func(t *testing.T) {
req, resp := newResponse(t)
network := &testNetwork{cfg: &common.NetworkConfig{
Architecture: common.ArchitectureEvm,
@@ -118,9 +119,11 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
- require.Len(t, block.Transactions, 1)
- assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
- assert.Equal(t, "0x1", block.Transactions[0]["gasPrice"])
+ require.Len(t, block.Transactions, 2)
+ assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
+ assert.Equal(t, "0x0", block.Transactions[0]["gasPrice"])
+ assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
+ assert.Equal(t, "0x1", block.Transactions[1]["gasPrice"])
})
t.Run("NetworkPostForwardFiltersCachedBlocks", func(t *testing.T) {
@@ -142,8 +145,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
- require.Len(t, block.Transactions, 1)
- assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
+ require.Len(t, block.Transactions, 2)
+ assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
+ assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
})
t.Run("NetworkPostForwardFiltersBlockByHash", func(t *testing.T) {
@@ -169,8 +173,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
- require.Len(t, block.Transactions, 1)
- assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
+ require.Len(t, block.Transactions, 2)
+ assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
+ assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
})
t.Run("PreservesHashOnlyTransactionsOnHyperEVM", func(t *testing.T) {
@@ -225,7 +230,42 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
- require.Len(t, block.Transactions, 3)
+ require.Len(t, block.Transactions, 4)
+ })
+
+ t.Run("ValidatesOriginalBlockBeforeFilteringAllSystemTransactions", func(t *testing.T) {
+ systemOnlyBlockJSON := `{
+ "number": "0x100",
+ "hash": "0xabc123",
+ "transactionsRoot": "0x1111111111111111111111111111111111111111111111111111111111111111",
+ "transactions": [
+ {"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"}
+ ]
+ }`
+ jrpcResp, err := common.NewJsonRpcResponseFromBytes([]byte(`1`), []byte(systemOnlyBlockJSON), nil)
+ require.NoError(t, err)
+ req := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBlockByNumber","params":["0x100",true]}`))
+ req.SetDirectives(&common.RequestDirectives{ValidateTransactionsRoot: true})
+ resp := common.NewNormalizedResponse().
+ WithRequest(req).
+ WithJsonRpcResponse(jrpcResp)
+ network := &testNetwork{cfg: &common.NetworkConfig{
+ Architecture: common.ArchitectureEvm,
+ Evm: &common.EvmNetworkConfig{
+ ChainId: 999,
+ },
+ }}
+
+ filteredResp, err := upstreamPostForward_eth_getBlockByNumber(ctx, network, nil, req, resp, nil)
+ require.NoError(t, err)
+
+ jrr, err := filteredResp.JsonRpcResponse(ctx)
+ require.NoError(t, err)
+ var block struct {
+ Transactions []map[string]interface{} `json:"transactions"`
+ }
+ require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
+ require.Empty(t, block.Transactions)
})
}
diff --git a/common/config.go b/common/config.go
index 8989c5fb7..59769c002 100644
--- a/common/config.go
+++ b/common/config.go
@@ -1746,12 +1746,9 @@ type ConsensusPolicyConfig struct {
// the participant set so the first `maxParticipants` drawn satisfy
// every entry — without changing `maxParticipants` itself.
//
- // Best-effort and governed by the EXISTING consensus behaviors: if a
- // required group has fewer healthy upstreams than requested (or the
- // quotas can't all fit within `maxParticipants`), consensus simply
- // runs with what it can promote and the resulting participation is
- // handled by `lowParticipantsBehavior` / `agreementThreshold` exactly
- // like any other low-participation tick. Empty (default) = disabled.
+ // Reordering is best-effort, but final analysis fails closed when valid
+ // consensus responses do not satisfy every required quota. Empty
+ // (default) = disabled.
RequiredParticipants []*ConsensusRequiredParticipant `yaml:"requiredParticipants,omitempty" json:"requiredParticipants,omitempty"`
}
diff --git a/consensus/analysis.go b/consensus/analysis.go
index 8227e7108..3059307be 100644
--- a/consensus/analysis.go
+++ b/consensus/analysis.go
@@ -52,6 +52,8 @@ type consensusAnalysis struct {
originalRequest *common.NormalizedRequest
leaderUpstream common.Upstream
method string // The RPC method being called (e.g., "eth_getTransactionCount")
+ waitCapped bool
+ missingRequired []string
// Cached computed values
cachedBestNonEmpty *responseGroup
@@ -73,11 +75,16 @@ func contextFrom(ctxOrExec any) context.Context {
}
func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult) *consensusAnalysis {
+ return newConsensusAnalysisWithOptions(lg, ctxOrExec, config, responses, false)
+}
+
+func newConsensusAnalysisWithOptions(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult, waitCapped bool) *consensusAnalysis {
ctx := contextFrom(ctxOrExec)
analysis := &consensusAnalysis{
config: config,
groups: make(map[string]*responseGroup),
totalParticipants: len(responses),
+ waitCapped: waitCapped,
}
// Try to extract original request and compute leader upstream once
@@ -162,10 +169,61 @@ func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, res
analysis.getBestError()
analysis.getBestByCount()
analysis.getBestBySize()
+ analysis.missingRequired = analysis.computeMissingRequiredParticipants()
return analysis
}
+func (a *consensusAnalysis) computeMissingRequiredParticipants() []string {
+ if a == nil || a.config == nil || len(a.config.requiredParticipants) == 0 {
+ return nil
+ }
+
+ seenByRequirement := make([]map[string]struct{}, len(a.config.requiredParticipants))
+ for _, group := range a.groups {
+ for _, result := range group.Results {
+ if result == nil || result.Upstream == nil || result.CachedResponseType == ResponseTypeInfrastructureError {
+ continue
+ }
+ upstreamID := result.Upstream.Id()
+ if upstreamID == "" {
+ upstreamID = fmt.Sprintf("%p", result.Upstream)
+ }
+ for i, required := range a.config.requiredParticipants {
+ if required == nil || required.Tag == "" || required.MinParticipants <= 0 {
+ continue
+ }
+ if !upstreamMatchesTag(result.Upstream, required.Tag) {
+ continue
+ }
+ if seenByRequirement[i] == nil {
+ seenByRequirement[i] = make(map[string]struct{})
+ }
+ seenByRequirement[i][upstreamID] = struct{}{}
+ }
+ }
+ }
+
+ var missing []string
+ for i, required := range a.config.requiredParticipants {
+ if required == nil || required.Tag == "" || required.MinParticipants <= 0 {
+ continue
+ }
+ have := len(seenByRequirement[i])
+ if have < required.MinParticipants {
+ missing = append(missing, fmt.Sprintf("%s=%d/%d", required.Tag, have, required.MinParticipants))
+ }
+ }
+ return missing
+}
+
+func (a *consensusAnalysis) hasPendingRequiredParticipants() bool {
+ if a == nil || a.config == nil || len(a.missingRequired) == 0 || a.waitCapped {
+ return false
+ }
+ return a.hasRemaining()
+}
+
func (a *consensusAnalysis) hasRemaining() bool {
return a.config.maxParticipants > a.totalParticipants
}
diff --git a/consensus/executor.go b/consensus/executor.go
index 8764f8ad6..52eb51511 100644
--- a/consensus/executor.go
+++ b/consensus/executor.go
@@ -158,8 +158,8 @@ func (e *executor) Run(
// include the configured minimum from each required group. Runs at the
// very top of consensus, before any participant slot consumes an
// upstream (req.UpstreamIdx is still 0), so the reorder takes effect.
- // Best-effort: shortfalls fall through to lowParticipantsBehavior /
- // agreementThreshold like organic low participation.
+ // Reordering is best-effort, but final analysis fails closed if valid
+ // responses do not satisfy the required tag quotas.
if len(e.config.requiredParticipants) > 0 {
if reordered := reorderForParticipantQuota(originalReq.Upstreams(), e.config.requiredParticipants); len(reordered) > 0 {
originalReq.SetUpstreams(reordered)
@@ -552,10 +552,7 @@ func (e *executor) runAnalyzer(
if waitTimer != nil {
waitTimer.Stop()
}
- if analysis == nil {
- analysis = newConsensusAnalysis(e.logger, ctx, e.config, responses)
- winner = e.determineWinner(lg, analysis)
- }
+ analysis, winner = e.finalizeAnalyzerDecision(lg, ctx, responses, analysis, winner, waitCapped, shortCircuited)
if !shortCircuited {
// Short-circuit branch already marked winners; mark here only
// for the wait-all path.
@@ -626,6 +623,24 @@ func (e *executor) runAnalyzer(
e.releaseNonWinningResponses(analysis, winner)
}
+func (e *executor) finalizeAnalyzerDecision(
+ lg *zerolog.Logger,
+ ctx context.Context,
+ responses []*execResult,
+ analysis *consensusAnalysis,
+ winner *slotResult,
+ waitCapped bool,
+ shortCircuited bool,
+) (*consensusAnalysis, *slotResult) {
+ if analysis == nil || waitCapped {
+ analysis = newConsensusAnalysisWithOptions(e.logger, ctx, e.config, responses, waitCapped)
+ }
+ if shortCircuited {
+ return analysis, winner
+ }
+ return analysis, e.determineWinner(lg, analysis)
+}
+
// releaseNonWinningResponses releases the Result pointers on every non-winning
// execResult in analysis.groups. Extracted verbatim from the previous inline
// loop in Apply() so behavior is preserved.
diff --git a/consensus/quota.go b/consensus/quota.go
index c5a6ead52..8d451d5b8 100644
--- a/consensus/quota.go
+++ b/consensus/quota.go
@@ -11,10 +11,8 @@ import "github.com/erpc/erpc/common"
// Semantics:
// - Best-effort: if a required group has fewer matching upstreams than
// requested (or several quotas can't all fit within maxParticipants),
-// it promotes everything it can and leaves the shortfall to the
-// existing lowParticipantsBehavior / agreementThreshold handling —
-// consensus is not aware this happened, it just sees fewer/uneven
-// participants like any organic low-participation tick.
+// it promotes everything it can. Final analysis enforces the quota
+// against valid responses and returns low-participants on shortfall.
// - Minimal disturbance: non-required upstreams keep their incoming
// (selection-policy) order in the remaining slots, so ranking/quality
// is preserved wherever the quota doesn't force a change. Order WITHIN
diff --git a/consensus/quota_test.go b/consensus/quota_test.go
index f2fa65ad7..9ed7a5dea 100644
--- a/consensus/quota_test.go
+++ b/consensus/quota_test.go
@@ -1,9 +1,13 @@
package consensus
import (
+ "context"
+ "sync/atomic"
"testing"
+ "time"
"github.com/erpc/erpc/common"
+ "github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)
@@ -117,3 +121,91 @@ func TestReorderForParticipantQuota(t *testing.T) {
require.Equal(t, []string{"a", "b"}, idsOf(reorderForParticipantQuota(ups, reqs)))
})
}
+
+func TestRequiredParticipantQuotaFailsClosedAtAnalysis(t *testing.T) {
+ lg := zerolog.Nop()
+ cfg := &config{
+ maxParticipants: 1,
+ agreementThreshold: 1,
+ disputeBehavior: common.ConsensusDisputeBehaviorReturnError,
+ lowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult,
+ requiredParticipants: []*common.ConsensusRequiredParticipant{
+ {Tag: "tier:paid", MinParticipants: 1},
+ },
+ }
+
+ t.Run("missing required tag rejects threshold winner", func(t *testing.T) {
+ analysis := newConsensusAnalysis(&lg, context.Background(), cfg, []*execResult{{
+ Result: validResponseWithValue("0x1"),
+ Upstream: common.NewFakeUpstream("public-1", common.WithTags("tier:public")),
+ }})
+
+ winner := (&executor{}).determineWinner(&lg, analysis)
+ require.NotNil(t, winner)
+ require.Nil(t, winner.Result)
+ require.Error(t, winner.Error)
+ require.True(t, common.HasErrorCode(winner.Error, common.ErrCodeConsensusLowParticipants), "got: %v", winner.Error)
+ })
+
+ t.Run("required tag present allows threshold winner", func(t *testing.T) {
+ analysis := newConsensusAnalysis(&lg, context.Background(), cfg, []*execResult{{
+ Result: validResponseWithValue("0x1"),
+ Upstream: common.NewFakeUpstream("paid-1", common.WithTags("tier:paid")),
+ }})
+
+ winner := (&executor{}).determineWinner(&lg, analysis)
+ require.NotNil(t, winner)
+ require.NoError(t, winner.Error)
+ require.NotNil(t, winner.Result)
+ })
+}
+
+func TestRequiredParticipantQuotaWaitsForSlowRequiredParticipant(t *testing.T) {
+ logger := zerolog.New(zerolog.NewTestWriter(t))
+ paid := common.NewFakeUpstream("paid-1", common.WithTags("tier:paid"))
+ public1 := common.NewFakeUpstream("public-1", common.WithTags("tier:public"))
+ public2 := common.NewFakeUpstream("public-2", common.WithTags("tier:public"))
+
+ pol := newBuilder().
+ WithLogger(&logger).
+ WithMaxParticipants(3).
+ WithAgreementThreshold(2).
+ WithDisputeBehavior(common.ConsensusDisputeBehaviorAcceptMostCommonValidResult).
+ WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult).
+ WithMaxWaitOnResult(common.NewStaticDuration(500 * time.Millisecond)).
+ WithMaxWaitOnEmpty(common.NewStaticDuration(500 * time.Millisecond)).
+ WithRequiredParticipants([]*common.ConsensusRequiredParticipant{
+ {Tag: "tier:paid", MinParticipants: 1},
+ }).
+ Build()
+
+ req := newTestRequest()
+ req.SetUpstreams([]common.Upstream{public1, public2, paid})
+
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ var calls atomic.Int32
+ var paidCompleted atomic.Bool
+ resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
+ switch calls.Add(1) {
+ case 1:
+ select {
+ case <-time.After(120 * time.Millisecond):
+ paidCompleted.Store(true)
+ return validResponseWithValue("0x1").SetUpstream(paid), nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ case 2:
+ return validResponseWithValue("0x1").SetUpstream(public1), nil
+ default:
+ return validResponseWithValue("0x1").SetUpstream(public2), nil
+ }
+ })
+
+ require.NoError(t, err)
+ require.NotNil(t, resp)
+ require.True(t, paidCompleted.Load(), "slow required participant must be allowed to finish")
+ require.GreaterOrEqual(t, calls.Load(), int32(3), "third participant should start before quota is satisfied")
+}
diff --git a/consensus/rules.go b/consensus/rules.go
index b467f8c15..7b22a8439 100644
--- a/consensus/rules.go
+++ b/consensus/rules.go
@@ -2,6 +2,7 @@ package consensus
import (
"math/big"
+ "strings"
"github.com/erpc/erpc/common"
)
@@ -31,6 +32,9 @@ var consensusRules = []consensusRule{
if a.method != "eth_sendRawTransaction" {
return false
}
+ if len(a.missingRequired) > 0 {
+ return false
+ }
// Check if we have any non-empty response (tx hash)
for _, g := range a.groups {
if g.ResponseType == ResponseTypeNonEmpty && g.Count >= 1 {
@@ -58,6 +62,36 @@ var consensusRules = []consensusRule{
}
},
},
+ {
+ Description: "wait-capped low participants fail closed",
+ Condition: func(a *consensusAnalysis) bool {
+ return a.waitCapped && a.validParticipants < a.config.agreementThreshold
+ },
+ Action: func(a *consensusAnalysis) *slotResult {
+ return &slotResult{
+ Error: common.NewErrConsensusLowParticipants(
+ "consensus wait cap fired before enough valid participants responded",
+ a.participants(),
+ nil,
+ ),
+ }
+ },
+ },
+ {
+ Description: "required participant quotas must be represented in valid responses",
+ Condition: func(a *consensusAnalysis) bool {
+ return len(a.missingRequired) > 0 && !a.hasPendingRequiredParticipants()
+ },
+ Action: func(a *consensusAnalysis) *slotResult {
+ return &slotResult{
+ Error: common.NewErrConsensusLowParticipants(
+ "not enough required consensus participants: "+strings.Join(a.missingRequired, ", "),
+ a.participants(),
+ nil,
+ ),
+ }
+ },
+ },
// PreferHighestValueFor: when configured for this method, return the response with highest field values
// that meets the agreementThreshold. This rule takes precedence over all other consensus rules.
{
@@ -848,6 +882,9 @@ var shortCircuitRules = []shortCircuitRule{
if a.method != "eth_sendRawTransaction" {
return false
}
+ if len(a.missingRequired) > 0 {
+ return false
+ }
// Short-circuit as soon as we have any valid non-empty response (tx hash)
// For eth_sendRawTransaction, once a tx is accepted by any node, it will
// propagate through the network, so we don't need to wait for consensus.
@@ -863,6 +900,9 @@ var shortCircuitRules = []shortCircuitRule{
Description: "consensus-valid error meets agreement threshold -> short-circuit to error",
Reason: "consensus_error_threshold",
Condition: func(w *slotResult, a *consensusAnalysis) bool {
+ if a.hasPendingRequiredParticipants() {
+ return false
+ }
best := a.getBestByCount()
if best == nil {
return false
@@ -896,6 +936,9 @@ var shortCircuitRules = []shortCircuitRule{
Description: "winner meets agreement threshold, is non-empty, and lead is unassailable (no possible tie with remaining)",
Reason: "unassailable_lead",
Condition: func(w *slotResult, a *consensusAnalysis) bool {
+ if a.hasPendingRequiredParticipants() {
+ return false
+ }
// With remaining participants, avoid short-circuiting when a preference could still
// change the winner. In particular, when PreferLargerResponses is enabled, a later
// larger response can override a smaller above-threshold winner regardless of counts.
diff --git a/consensus/rules_sendrawtx_test.go b/consensus/rules_sendrawtx_test.go
index c3b4f63be..e9e288109 100644
--- a/consensus/rules_sendrawtx_test.go
+++ b/consensus/rules_sendrawtx_test.go
@@ -1,6 +1,7 @@
package consensus
import (
+ "context"
"errors"
"testing"
@@ -175,6 +176,61 @@ func TestSendRawTransaction_ConsensusRule(t *testing.T) {
})
}
+func TestSendRawTransaction_RequiredParticipantQuota(t *testing.T) {
+ logger := zerolog.Nop()
+ cfg := &config{
+ maxParticipants: 2,
+ agreementThreshold: 1,
+ requiredParticipants: []*common.ConsensusRequiredParticipant{
+ {Tag: "tier:paid", MinParticipants: 1},
+ },
+ }
+ req := common.NewNormalizedRequest([]byte(
+ `{"jsonrpc":"2.0","id":1,"method":"eth_sendRawTransaction","params":["0xdeadbeef"]}`,
+ ))
+ ctx := context.WithValue(context.Background(), common.RequestContextKey, req)
+ public1 := common.NewFakeUpstream("public-1", common.WithTags("tier:public"))
+ public2 := common.NewFakeUpstream("public-2", common.WithTags("tier:public"))
+ paid := common.NewFakeUpstream("paid-1", common.WithTags("tier:paid"))
+
+ t.Run("does not bypass while required participant can still respond", func(t *testing.T) {
+ resp := validResponseWithValue("0xpublic")
+ analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{{
+ Result: resp,
+ Upstream: public1,
+ }})
+
+ require.NotEmpty(t, analysis.missingRequired)
+ assert.False(t, consensusRules[0].Condition(analysis), "sendRawTx result rule must wait for required quota")
+ assert.False(t, shortCircuitRules[0].Condition(&slotResult{Result: resp}, analysis), "sendRawTx short-circuit must wait for required quota")
+ })
+
+ t.Run("fails closed when required participant never returned valid response", func(t *testing.T) {
+ analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{
+ {Result: validResponseWithValue("0xpublic1"), Upstream: public1},
+ {Result: validResponseWithValue("0xpublic2"), Upstream: public2},
+ })
+
+ winner := (&executor{}).determineWinner(&logger, analysis)
+ require.NotNil(t, winner)
+ require.Nil(t, winner.Result)
+ require.Error(t, winner.Error)
+ require.True(t, common.HasErrorCode(winner.Error, common.ErrCodeConsensusLowParticipants), "got: %v", winner.Error)
+ })
+
+ t.Run("allows first tx hash once required quota is satisfied", func(t *testing.T) {
+ resp := validResponseWithValue("0xpaid")
+ analysis := newConsensusAnalysis(&logger, ctx, cfg, []*execResult{{
+ Result: resp,
+ Upstream: paid,
+ }})
+
+ require.Empty(t, analysis.missingRequired)
+ assert.True(t, consensusRules[0].Condition(analysis), "sendRawTx result rule can apply after required quota")
+ assert.True(t, shortCircuitRules[0].Condition(&slotResult{Result: resp}, analysis), "sendRawTx short-circuit can apply after required quota")
+ })
+}
+
// TestSendRawTransaction_ShortCircuitRule tests that eth_sendRawTransaction
// short-circuits as soon as one valid response is received.
func TestSendRawTransaction_ShortCircuitRule(t *testing.T) {
diff --git a/consensus/wait_cap_test.go b/consensus/wait_cap_test.go
index 944a95b69..37de3e91c 100644
--- a/consensus/wait_cap_test.go
+++ b/consensus/wait_cap_test.go
@@ -14,17 +14,30 @@ import (
"github.com/stretchr/testify/require"
)
-// TestWaitCap_MaxWaitOnResult_BoundsTailLatency verifies that once one
-// non-empty response arrives, the analyzer resolves within maxWaitOnResult
-// even if a sibling participant is still running.
+func waitForStragglerOrCancel(ctx context.Context, d time.Duration) error {
+ timer := time.NewTimer(d)
+ defer timer.Stop()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ return nil
+ }
+}
+
+// TestWaitCap_MaxWaitOnResult_BoundsTailLatency verifies that once enough
+// non-empty responses arrive to satisfy the threshold, the analyzer does not
+// wait for an unrelated straggler.
func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) {
logger := zerolog.New(zerolog.NewTestWriter(t))
pol := newBuilder().
WithLogger(&logger).
WithMaxParticipants(3).
- WithAgreementThreshold(3). // require 3 to disable short-circuit
+ WithAgreementThreshold(2).
WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult).
+ WithPreferLargerResponses(true).
WithMaxWaitOnResult(common.NewStaticDuration(100 * time.Millisecond)).
Build()
@@ -34,7 +47,7 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) {
var slot atomic.Int32
start := time.Now()
- resp, err := pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
+ resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
idx := slot.Add(1)
switch idx {
case 1:
@@ -46,7 +59,9 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) {
return validResponseWithValue("0xfast"), nil
default:
// slow straggler — exceeds the cap, should be cancelled
- time.Sleep(2 * time.Second)
+ if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil {
+ return nil, err
+ }
return validResponseWithValue("0xslow"), nil
}
})
@@ -54,10 +69,48 @@ func TestWaitCap_MaxWaitOnResult_BoundsTailLatency(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, resp)
+ assert.GreaterOrEqual(t, elapsed, 80*time.Millisecond,
+ "test must exercise maxWaitOnResult instead of short-circuiting before the cap")
assert.Less(t, elapsed, 800*time.Millisecond,
"wait cap must bound elapsed time well below the straggler's 2s")
}
+func TestWaitCap_MaxWaitOnResult_FailsClosedBelowThreshold(t *testing.T) {
+ logger := zerolog.New(zerolog.NewTestWriter(t))
+
+ pol := newBuilder().
+ WithLogger(&logger).
+ WithMaxParticipants(3).
+ WithAgreementThreshold(2).
+ WithLowParticipantsBehavior(common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult).
+ WithMaxWaitOnResult(common.NewStaticDuration(50 * time.Millisecond)).
+ Build()
+
+ req := newTestRequest()
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ var slot atomic.Int32
+ start := time.Now()
+ resp, err := pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
+ idx := slot.Add(1)
+ if idx == 1 {
+ return validResponseWithValue("0xfast"), nil
+ }
+ if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil {
+ return nil, err
+ }
+ return validResponseWithValue("0xfast"), nil
+ })
+ elapsed := time.Since(start)
+
+ require.Error(t, err)
+ require.Nil(t, resp)
+ assert.True(t, common.HasErrorCode(err, common.ErrCodeConsensusLowParticipants), "got: %v", err)
+ assert.Less(t, elapsed, 800*time.Millisecond,
+ "wait cap should still bound latency, but must fail closed below threshold")
+}
+
func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) {
logger := zerolog.New(zerolog.NewTestWriter(t))
@@ -100,8 +153,9 @@ func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) {
})
elapsed := time.Since(start)
- require.NoError(t, err)
- require.NotNil(t, resp)
+ require.Error(t, err)
+ require.Nil(t, resp)
+ assert.True(t, common.HasErrorCode(err, common.ErrCodeConsensusLowParticipants), "got: %v", err)
require.Less(t, elapsed, 500*time.Millisecond,
"maxWaitOnResult must return before slow participant finishes")
require.Equal(t, int32(0), slowBodyClosed.Load(), "late response should not be released before it exists")
@@ -113,6 +167,49 @@ func TestWaitCap_MaxWaitOnResult_ReleasesLateResponses(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond, "late post-wait-cap response should be released exactly once")
}
+func TestWaitCap_DoesNotReplaceShortCircuitWinner(t *testing.T) {
+ logger := zerolog.New(zerolog.NewTestWriter(t))
+
+ pol := newBuilder().
+ WithLogger(&logger).
+ WithMaxParticipants(3).
+ WithAgreementThreshold(2).
+ WithMaxWaitOnResult(common.NewStaticDuration(50 * time.Millisecond)).
+ Build()
+
+ req := common.NewNormalizedRequest([]byte(
+ `{"jsonrpc":"2.0","id":1,"method":"eth_sendRawTransaction","params":["0xdeadbeef"]}`,
+ ))
+ ctx := context.WithValue(context.Background(), common.RequestContextKey, req)
+
+ var callCount atomic.Int32
+ var winnerBodyClosed atomic.Int32
+ var slowBodyClosed atomic.Int32
+ slowRelease := make(chan struct{})
+
+ resp, err := pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
+ if callCount.Add(1) == 1 {
+ return validResponseWithCloser("0xwinner", &winnerBodyClosed), nil
+ }
+ <-slowRelease
+ return validResponseWithCloser("0xslow", &slowBodyClosed), nil
+ })
+
+ require.NoError(t, err)
+ require.NotNil(t, resp)
+ defer resp.Release()
+
+ time.Sleep(100 * time.Millisecond)
+ close(slowRelease)
+
+ require.Eventually(t, func() bool {
+ return slowBodyClosed.Load() == 1
+ }, 5*time.Second, 10*time.Millisecond, "late response should still be drained")
+ assert.Never(t, func() bool {
+ return winnerBodyClosed.Load() != 0
+ }, 200*time.Millisecond, 10*time.Millisecond, "wait-cap finalization must not replace or release short-circuit winner")
+}
+
// TestWaitCap_MaxWaitOnEmpty_TighterFloor verifies that when ONLY empty
// responses have arrived, the (typically larger) maxWaitOnEmpty cap
// applies — bounded even if no real answer is ever produced.
@@ -133,7 +230,7 @@ func TestWaitCap_MaxWaitOnEmpty_TighterFloor(t *testing.T) {
var slot atomic.Int32
start := time.Now()
- _, _ = pol.Run(ctx, req, func(_ context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
+ _, _ = pol.Run(ctx, req, func(ctx context.Context, _ *common.NormalizedRequest) (*common.NormalizedResponse, error) {
idx := slot.Add(1)
switch idx {
case 1:
@@ -144,7 +241,9 @@ func TestWaitCap_MaxWaitOnEmpty_TighterFloor(t *testing.T) {
return validResponseWithValue(""), nil
default:
// straggler way over the cap
- time.Sleep(2 * time.Second)
+ if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil {
+ return nil, err
+ }
return validResponseWithValue("0xslow"), nil
}
})
@@ -304,7 +403,9 @@ func TestWaitCap_ArmedByRealAttemptFailures(t *testing.T) {
// Real attempt failure: a plain transport error, not a skip.
return nil, errors.New("connection reset by peer")
}
- time.Sleep(2 * time.Second)
+ if err := waitForStragglerOrCancel(ctx, 2*time.Second); err != nil {
+ return nil, err
+ }
return validResponseWithValue("0xslow"), nil
})
elapsed := time.Since(start)
diff --git a/docs/pages/config/failsafe/consensus.mdx b/docs/pages/config/failsafe/consensus.mdx
index 4423b018c..177ebd8f1 100644
--- a/docs/pages/config/failsafe/consensus.mdx
+++ b/docs/pages/config/failsafe/consensus.mdx
@@ -128,42 +128,44 @@ https://docs.erpc.cloud/config/failsafe/consensus.llms.txt`}
**Integration point.** Consensus sits at `networkExecutor.Run`. When a `ConsensusPolicyConfig` is present and the `SkipConsensus` directive is false, the executor calls `consensus.Run(ctx, req, slotInner)` where `slotInner` is `retry(hedge(tryOneUpstream))`. Without consensus the path is `retry(hedge(runUpstreamSweep))`. Composition order: `consensus(retry(hedge(tryOneUpstream)))`. ()
-**Participant selection.** Before spawning goroutines, if `requiredParticipants` is configured, `reorderForParticipantQuota` front-loads tag-matching upstreams so the first `maxParticipants` drawn satisfy every `{tag, minParticipants}` entry. Tag matching uses exact equality first, then glob wildcards (`*`, `?`). This is best-effort: quota shortfalls are handled by `lowParticipantsBehavior`. The executor then spawns exactly `maxParticipants` goroutines, each calling `slotInner` independently. ()
+**Participant selection.** Before spawning goroutines, if `requiredParticipants` is configured, `reorderForParticipantQuota` front-loads tag-matching upstreams so the first `maxParticipants` drawn satisfy every `{tag, minParticipants}` entry. Tag matching uses exact equality first, then glob wildcards (`*`, `?`). Reordering is best-effort, but final analysis is strict: if fewer than `minParticipants` valid responses carry a required tag, consensus returns `ErrConsensusLowParticipants`. The executor then spawns exactly `maxParticipants` goroutines, each calling `slotInner` independently. ()
-**Collection and analysis loop.** The `runAnalyzer` goroutine reads responses from a buffered channel. After each response it calls `newConsensusAnalysis` (classify, hash, group) and `determineWinner` (apply 24 priority-ordered rules). If `shouldShortCircuit` returns true, the outcome is sent immediately and remaining requests are cancelled (or left running in fire-and-forget mode). After all participants respond — or a wait-cap deadline fires — the final outcome is sent to the caller's select.
+**Collection and analysis loop.** The `runAnalyzer` goroutine reads responses from a buffered channel. After each response it calls `newConsensusAnalysis` (classify, hash, group) and `determineWinner` (apply 26 priority-ordered rules). If `shouldShortCircuit` returns true, the outcome is sent immediately and remaining requests are cancelled (or left running in fire-and-forget mode). After all participants respond — or a wait-cap deadline fires — the final outcome is sent to the caller's select.
**Response classification.** Each response is classified into one of four types: `ResponseTypeNonEmpty` (successful non-null result), `ResponseTypeEmpty` (successful but null/empty result), `ResponseTypeConsensusError` (JSON-RPC execution exception, client exception, unsupported, or missing-data), `ResponseTypeInfrastructureError` (all other errors including `ErrUpstreamsExhausted`). Infrastructure errors are excluded from `validParticipants`. `ErrUpstreamsExhausted` is always infra even when it wraps execution exceptions via the shared `ErrorsByUpstream` map. () Specific error-code mappings: `ErrCodeEndpointExecutionException` (EVM revert, code 3) → `ConsensusError`; `ErrCodeEndpointClientSideException`, `ErrCodeEndpointUnsupported`, `ErrCodeEndpointMissingData` → `ConsensusError`; all other errors (timeout, network, server 500) → `InfrastructureError`; nil JSON-RPC response on a successful response object → `InfrastructureError` with hash `"error:generic"`. A successful response where `IsResultEmptyish` returns true → `ResponseTypeEmpty`.
**Hashing.** Non-error responses are hashed via `JsonRpcResponse.CanonicalHash()` or `CanonicalHashWithIgnoredFields(fields)` when `ignoreFields` is configured for the method. Errors are hashed via `errorToConsensusHash`: JSON-RPC exceptions hash as `"jsonrpc:"`; standard errors hash as their code; unknown errors hash as `"error:generic"`. Within each hash group, `LargestResult` tracks the member with the highest `CachedResponseSize`; the winner returns `group.LargestResult`, not the first response. ()
-**Rule evaluation.** `determineWinner` walks `consensusRules` in priority order (). Full 24-rule ordering:
+**Rule evaluation.** `determineWinner` walks `consensusRules` in priority order (). Full 26-rule ordering:
| Priority | Rule | Triggers when | Action |
|---|---|---|---|
| 1 | `eth_sendRawTransaction` special | Method is `eth_sendRawTransaction` AND any non-empty response exists | Return first non-empty; threshold ignored |
-| 2 | `preferHighestValueFor` | Method configured AND extractable numeric values exist | Group by numeric value; highest ≥ threshold wins; else dispute |
-| 3 | `onlyBlockHeadLeader` on dispute | `disputeBehavior == onlyBlockHeadLeader` AND no group meets threshold | Leader non-error → leader error (incl. infra) → dispute |
-| 4 | `onlyBlockHeadLeader` on low participants | `lowParticipantsBehavior == onlyBlockHeadLeader` AND low participants | Leader non-error → leader non-infra error → low-participants error |
-| 5 | `preferBlockHeadLeader` on dispute/low | `preferBlockHeadLeader` AND no group meets threshold AND leader non-error exists | Return leader non-error; else dispute |
-| 6 | `preferLarger + acceptMostCommon` below threshold | `preferLargerResponses` AND `acceptMostCommon` AND below threshold | Return largest non-empty by size |
-| 7 | `acceptMostCommon + preferNonEmpty` above threshold (empty/error leads) | `preferNonEmpty` AND `acceptMostCommon` AND threshold met AND leader is empty or error AND non-empty exists | Return best non-empty (count, then size) |
-| 8 | Tie at/above threshold (no preference) | Multiple non-error groups share best count ≥ threshold | Dispute |
-| 9 | `acceptMostCommon` below threshold: non-empty over empty | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND exactly 1 non-empty AND ≥1 empty | Return single non-empty |
-| 10 | `acceptMostCommon` below threshold: non-empty over error | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND leader is error AND non-empty exists | Return best non-empty |
-| 11 | `returnError + preferNonEmpty` (empty threshold winner) | `disputeBehavior == returnError` AND empty meets threshold AND non-empty minority exists | Dispute (escalate from empty winner) |
-| 12 | `acceptMostCommon + preferNonEmpty` above threshold (both qualify) | Both non-empty and consensus-error ≥ threshold | Return best non-empty |
-| 13 | Tie above threshold (general, no preference) | Multiple valid groups share best count ≥ threshold | Dispute |
-| 14 | `preferLarger`: above threshold, multiple groups | `preferLargerResponses` AND best meets threshold AND >1 group ≥ threshold | Return largest non-empty by size |
-| 15 | `preferLarger + acceptMostCommon`: larger exists below threshold | `preferLargerResponses` AND `acceptMostCommon` AND best meets threshold AND non-empty AND larger exists below | Return largest non-empty by size |
-| 16 | `returnError + preferLarger`: smaller wins but larger exists | `disputeBehavior == returnError` AND `preferLargerResponses` AND threshold met AND larger below threshold | Dispute |
-| 17 | `acceptMostCommon` below threshold: unique leader | `acceptMostCommon` AND best count < threshold AND best > second-best | Return best valid group |
-| 18 | `lowParticipants + acceptMostCommon` | `lowParticipantsBehavior == acceptMostCommon` AND low participants | Non-empty (if untied) → empty → error → low-participants error |
-| 19 | Threshold winner (generic) | Any valid group meets threshold | Return result or consensus-error |
-| 20 | Dispute (multiple groups, none at threshold) | Best count < threshold AND >1 valid group | Dispute error |
-| 21 | All infra errors meet threshold | `validParticipants == 0` AND infra group meets threshold | Return agreed infra error |
-| 22 | `lowParticipants + returnError` | `lowParticipantsBehavior == returnError` AND low participants | Low-participants error |
-| 23 | No responses | `len(groups) == 0` | Low-participants error |
-| 24 | Fallback | Catch-all | Low-participants error |
+| 2 | Wait-capped low participants | Wait cap fired AND `validParticipants < agreementThreshold` | Low-participants error |
+| 3 | Required-participant quota missing | Any `requiredParticipants` entry has fewer than `minParticipants` valid responses carrying its tag | Low-participants error |
+| 4 | `preferHighestValueFor` | Method configured AND extractable numeric values exist | Group by numeric value; highest ≥ threshold wins; else dispute |
+| 5 | `onlyBlockHeadLeader` on dispute | `disputeBehavior == onlyBlockHeadLeader` AND no group meets threshold | Leader non-error → leader error (incl. infra) → dispute |
+| 6 | `onlyBlockHeadLeader` on low participants | `lowParticipantsBehavior == onlyBlockHeadLeader` AND low participants | Leader non-error → leader non-infra error → low-participants error |
+| 7 | `preferBlockHeadLeader` on dispute/low | `preferBlockHeadLeader` AND no group meets threshold AND leader non-error exists | Return leader non-error; else dispute |
+| 8 | `preferLarger + acceptMostCommon` below threshold | `preferLargerResponses` AND `acceptMostCommon` AND below threshold | Return largest non-empty by size |
+| 9 | `acceptMostCommon + preferNonEmpty` above threshold (empty/error leads) | `preferNonEmpty` AND `acceptMostCommon` AND threshold met AND leader is empty or error AND non-empty exists | Return best non-empty (count, then size) |
+| 10 | Tie at/above threshold (no preference) | Multiple non-error groups share best count ≥ threshold | Dispute |
+| 11 | `acceptMostCommon` below threshold: non-empty over empty | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND exactly 1 non-empty AND ≥1 empty | Return single non-empty |
+| 12 | `acceptMostCommon` below threshold: non-empty over error | `preferNonEmpty` AND `acceptMostCommon` AND below threshold AND leader is error AND non-empty exists | Return best non-empty |
+| 13 | `returnError + preferNonEmpty` (empty threshold winner) | `disputeBehavior == returnError` AND empty meets threshold AND non-empty minority exists | Dispute (escalate from empty winner) |
+| 14 | `acceptMostCommon + preferNonEmpty` above threshold (both qualify) | Both non-empty and consensus-error ≥ threshold | Return best non-empty |
+| 15 | Tie above threshold (general, no preference) | Multiple valid groups share best count ≥ threshold | Dispute |
+| 16 | `preferLarger`: above threshold, multiple groups | `preferLargerResponses` AND best meets threshold AND >1 group ≥ threshold | Return largest non-empty by size |
+| 17 | `preferLarger + acceptMostCommon`: larger exists below threshold | `preferLargerResponses` AND `acceptMostCommon` AND best meets threshold AND non-empty AND larger exists below | Return largest non-empty by size |
+| 18 | `returnError + preferLarger`: smaller wins but larger exists | `disputeBehavior == returnError` AND `preferLargerResponses` AND threshold met AND larger below threshold | Dispute |
+| 19 | `acceptMostCommon` below threshold: unique leader | `acceptMostCommon` AND best count < threshold AND best > second-best | Return best valid group |
+| 20 | `lowParticipants + acceptMostCommon` | `lowParticipantsBehavior == acceptMostCommon` AND low participants | Non-empty (if untied) → empty → error → low-participants error |
+| 21 | Threshold winner (generic) | Any valid group meets threshold | Return result or consensus-error |
+| 22 | Dispute (multiple groups, none at threshold) | Best count < threshold AND >1 valid group | Dispute error |
+| 23 | All infra errors meet threshold | `validParticipants == 0` AND infra group meets threshold | Return agreed infra error |
+| 24 | `lowParticipants + returnError` | `lowParticipantsBehavior == returnError` AND low participants | Low-participants error |
+| 25 | No responses | `len(groups) == 0` | Low-participants error |
+| 26 | Fallback | Catch-all | Low-participants error |
**Short-circuit rules.** `shouldShortCircuit` evaluates 3 rules after each response. (1) `sendrawtx_first_success`: fires on any single non-empty for `eth_sendRawTransaction`, never blocked. (2) `consensus_error_threshold`: best group is consensus-error ≥ threshold; blocked when `preferHighestValueFor` configured, or `acceptMostCommon` active AND (`preferNonEmpty` OR `preferLargerResponses`). (3) `unassailable_lead`: best non-empty ≥ threshold with unassailable lead; blocked when `preferLargerResponses` active, `preferHighestValueFor` configured, or `preferNonEmpty` active and leader is empty.
@@ -171,7 +173,7 @@ https://docs.erpc.cloud/config/failsafe/consensus.llms.txt`}
**`eth_sendRawTransaction` broadcast mode.** Rule 1 fires as soon as any single non-empty response exists — threshold is ignored entirely. Combined with `fireAndForget: true`, eRPC returns the first accepted tx hash immediately while remaining participant goroutines continue broadcasting using `context.WithoutCancel(ctx)` as their base, surviving HTTP client disconnection. They are bounded by upstream call completion, wait-cap timers, or process exit — not by graceful shutdown signals.
-**Wait caps.** `maxWaitOnResult` (adaptive p50, default min=5ms, max=1s) arms when the first non-empty response arrives. `maxWaitOnEmpty` (adaptive p90, default min=50ms, max=2s) arms on the very first response of any kind. When a cap fires, the analyzer resolves with what it has collected, applying `disputeBehavior` or `lowParticipantsBehavior` as appropriate. `erpc_consensus_wait_capped_total{trigger}` tracks firings.
+**Wait caps.** `maxWaitOnResult` (adaptive p50, default min=5ms, max=1s) arms when the first non-empty response arrives. `maxWaitOnEmpty` (adaptive p90, default min=50ms, max=2s) arms on the very first response of any kind. When a cap fires before `validParticipants >= agreementThreshold`, the analyzer fails closed with `ErrConsensusLowParticipants`; it does not use `lowParticipantsBehavior: acceptMostCommonValidResult` to return a lone fast response. Once enough valid participants are present, the analyzer resolves with collected responses, applying normal dispute rules. `erpc_consensus_wait_capped_total{trigger}` tracks firings.
**SkipConsensus directive.** Any request can bypass consensus via `X-ERPC-Skip-Consensus: true` header, `?skip-consensus=true` query param, or `directiveDefaults.skipConsensus: true` in network/project config. When set, the executor falls through to `retry(hedge(runUpstreamSweep))`. Only `"true"` activates bypass; any other value keeps consensus active.
@@ -190,7 +192,7 @@ All fields under `networks[].failsafe[].consensus.` (or project-level `failsafe[
| `maxParticipants` | int | `5` () | Number of upstreams to fan-out to. If ≤ 0, clamped to 1. |
| `agreementThreshold` | int | `2` () | Minimum participants that must agree on the same response hash. Also used by `isLowParticipants` (`validParticipants < threshold`). |
| `disputeBehavior` | string enum | `"returnError"` () | Action when `validParticipants >= agreementThreshold` but no group meets threshold. Values: `returnError`, `acceptMostCommonValidResult`, `preferBlockHeadLeader`, `onlyBlockHeadLeader`. |
-| `lowParticipantsBehavior` | string enum | `"acceptMostCommonValidResult"` () | Action when `validParticipants < agreementThreshold` (fewer upstreams produced non-infrastructure-error responses than the threshold, typically because too few upstreams were reachable before wait caps fired). Same four values as `disputeBehavior`. Distinction: `disputeBehavior` fires when `validParticipants >= agreementThreshold` but no group meets threshold (participants disagree); `lowParticipantsBehavior` fires when there are not enough valid participants to even reach threshold. Under `acceptMostCommon`: non-empty > empty > error; ties among non-empty groups → dispute. |
+| `lowParticipantsBehavior` | string enum | `"acceptMostCommonValidResult"` () | Action when `validParticipants < agreementThreshold` after normal collection. Same four values as `disputeBehavior`. Distinction: `disputeBehavior` fires when `validParticipants >= agreementThreshold` but no group meets threshold (participants disagree); `lowParticipantsBehavior` fires when there are not enough valid participants to even reach threshold. Wait-capped low-participant rounds fail closed before this setting can pick a winner. Under `acceptMostCommon`: non-empty > empty > error; ties among non-empty groups → dispute. |
| `disputeLogLevel` | string | `"warn"` () | Zerolog level for misbehavior events. Values: `trace`, `debug`, `info`, `warn`, `error`. **Footgun**: zerolog zero value is `TraceLevel`; builder coerces unset to `WarnLevel`. Set `"trace"` explicitly for trace-level logging. |
| `ignoreFields` | `map[string][]string` | `{"eth_getLogs":["*.blockTimestamp"],"eth_getTransactionReceipt":["blockTimestamp","logs.*.blockTimestamp"],"eth_getBlockReceipts":["*.blockTimestamp","*.logs.*.blockTimestamp"]}` () | Per-method dot-path field patterns excluded from canonical hash. **Set replacement, not merge**: setting ANY entry replaces the ENTIRE default map. To add a method, re-include all three defaults. Setting `{}` disables all defaults. |
| `preferNonEmpty` | \*bool | `true` () | Prefer non-empty over empty or consensus-error in dispute resolution. Disables short-circuit when empty would lead. |
@@ -199,7 +201,7 @@ All fields under `networks[].failsafe[].consensus.` (or project-level `failsafe[
| `fireAndForget` | bool | `false` (Go zero value; no `SetDefaults` entry) | When `true`, remaining in-flight requests are NOT cancelled after short-circuit. Goroutines use `context.WithoutCancel(ctx)` and survive HTTP disconnection. **Footgun**: process shutdown signals do not reach these goroutines; budget shutdown timeouts accordingly. |
| `maxWaitOnResult` | \*AdaptiveDuration | `{quantile:0.5, min:"5ms", max:"1s"}` () | Cap after first non-empty response. Cold-start fallback is `min` (5ms). If you configure without a `min`, cold start returns 0 — no cap until data accumulates. |
| `maxWaitOnEmpty` | \*AdaptiveDuration | `{quantile:0.9, min:"50ms", max:"2s"}` () | Cap after first response of any kind. Same cold-start behavior: fallback to `min` (50ms). |
-| `requiredParticipants` | `[]*ConsensusRequiredParticipant` | `[]` (disabled) | List of `{tag, minParticipants}` entries. Front-loads tag-matching upstreams. Best-effort: shortfall falls through to `lowParticipantsBehavior`. |
+| `requiredParticipants` | `[]*ConsensusRequiredParticipant` | `[]` (disabled) | List of `{tag, minParticipants}` entries. Front-loads tag-matching upstreams, then enforces that at least `minParticipants` valid responses carry each tag. Shortfall returns `ErrConsensusLowParticipants`. |
| `requiredParticipants[].tag` | string | **Required** | Glob pattern matched against `upstream.tags[]`. Empty tag is a startup validation error. |
| `requiredParticipants[].minParticipants` | int | **Required; no default** | Must be > 0 and ≤ `maxParticipants`. Omitting or setting 0 is a startup validation error — not a silent no-op. |
| `punishMisbehavior` | \*PunishMisbehaviorConfig | `nil` (disabled) | Enables cordon punishment for persistent misbehavior. |
@@ -523,9 +525,9 @@ The 100 MiB `maxSize` guard prevents unbounded memory use during high-dispute pe
1. **`ignoreFields` is full set replacement.** Adding one method removes all three built-in entries. Spurious disputes on `blockTimestamp` fields result. Always re-include all default entries when extending the map. ()
2. **`preferLargerResponses` disables all short-circuit.** Even a clear majority above threshold does not short-circuit because a larger response may still arrive. Disable in latency-sensitive scenarios.
3. **`eth_sendRawTransaction` bypasses threshold entirely.** Rule 1 fires on any single non-empty response regardless of how many others errored. Intentional for tx broadcasting.
-4. **`preferNonEmpty` + `returnError` escalates empty threshold winner to dispute.** When empty meets threshold, non-empty minority exists, and `preferNonEmpty: true` under `disputeBehavior: returnError`, the result is a dispute — not the empty winner and not the minority non-empty. (Rule 11)
-5. **Tie among non-empty at/above threshold without preference → dispute.** Rule 8 fires before the generic threshold-winner rule 19. Enable `preferLargerResponses` to resolve ties by size.
-6. **`requiredParticipants` shortfall is silent.** No warning, no metric, no error. Operators cannot distinguish tag-quota shortfall from general upstream unavailability in current observability output.
+4. **`preferNonEmpty` + `returnError` escalates empty threshold winner to dispute.** When empty meets threshold, non-empty minority exists, and `preferNonEmpty: true` under `disputeBehavior: returnError`, the result is a dispute — not the empty winner and not the minority non-empty. (Rule 13)
+5. **Tie among non-empty at/above threshold without preference → dispute.** Rule 10 fires before the generic threshold-winner rule 21. Enable `preferLargerResponses` to resolve ties by size.
+6. **`requiredParticipants` shortfall fails closed.** Front-loading is best-effort, but final analysis requires each configured tag quota to be represented by valid responses. Shortfall returns `ErrConsensusLowParticipants` before any threshold winner can be returned.
7. **`fireAndForget` goroutines survive graceful shutdown.** `context.WithoutCancel` strips shutdown signals. Budget shutdown timeouts to account for in-flight fire-and-forget participants.
8. **S3 uses `PutObject`, not append.** With `{dateByHour}` in `filePattern`, flushes within the same hour overwrite the previous S3 object. Use `{timestampMs}` if each flush should create a new object.
9. **S3 `accessKeyID` casing is load-bearing.** YAML key must be `accessKeyID` (capital D). Lowercase `accessKeyId` silently falls back to the SDK credential chain.
diff --git a/erpc/networks_hedge_cancel_test.go b/erpc/networks_hedge_cancel_test.go
index 40773c921..665a761ce 100644
--- a/erpc/networks_hedge_cancel_test.go
+++ b/erpc/networks_hedge_cancel_test.go
@@ -107,6 +107,8 @@ func TestHedgeConsensus_AllUpstreamsMissingData_HedgeRecovery(t *testing.T) {
LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult,
PreferNonEmpty: &common.TRUE,
PreferLargerResponses: &common.TRUE,
+ MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond),
+ MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond),
},
&common.RetryPolicyConfig{
MaxAttempts: 1,
@@ -205,6 +207,8 @@ func TestHedgeConsensus_ServerError_HedgeRecovery(t *testing.T) {
LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult,
PreferNonEmpty: &common.TRUE,
PreferLargerResponses: &common.TRUE,
+ MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond),
+ MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond),
},
&common.RetryPolicyConfig{
MaxAttempts: 1,
@@ -364,6 +368,8 @@ func TestHedgeConsensus_OneUpstreamMissingData_OthersSucceed_StillWorks(t *testi
LowParticipantsBehavior: common.ConsensusLowParticipantsBehaviorAcceptMostCommonValidResult,
PreferNonEmpty: &common.TRUE,
PreferLargerResponses: &common.TRUE,
+ MaxWaitOnResult: common.NewStaticDuration(500 * time.Millisecond),
+ MaxWaitOnEmpty: common.NewStaticDuration(500 * time.Millisecond),
},
&common.RetryPolicyConfig{
MaxAttempts: 2,