Skip to content
31 changes: 30 additions & 1 deletion architecture/evm/eth_getBlockByNumber.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,37 @@ func isHyperEVMSystemTransaction(tx interface{}) bool {
if !ok {
return false
}
from, ok := txObj["from"].(string)
if !ok || !isZeroishHex(from) {
return false
}
gas, ok := txObj["gas"].(string)
if !ok || !isZeroishHex(gas) {
return false
}
gasPrice, ok := txObj["gasPrice"].(string)
return ok && isZeroishHex(gasPrice)
if !ok || !isZeroishHex(gasPrice) {
return false
}
if to, exists := txObj["to"]; exists && to != nil {
toStr, ok := to.(string)
if !ok || !isZeroishHex(toStr) {
return false
}
}
if value, exists := txObj["value"]; exists && value != nil {
valueStr, ok := value.(string)
if !ok || !isZeroishHex(valueStr) {
return false
}
}
if input, exists := txObj["input"]; exists && input != nil {
inputStr, ok := input.(string)
if !ok || !isZeroishHex(inputStr) {
return false
}
}
return true
}

// blockValidationTxLite is a minimal transaction model for block validation
Expand Down
62 changes: 51 additions & 11 deletions architecture/evm/eth_getBlockByNumber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
"number": "0x100",
"hash": "0xabc123",
"transactions": [
{"hash": "0xsystem1", "gasPrice": "0x0"},
{"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"},
{"hash": "0xreal-zero-price", "from": "0x1111111111111111111111111111111111111111", "to": "0x2222222222222222222222222222222222222222", "gas": "0x5208", "gasPrice": "0x0", "input": "0x", "value": "0x0"},
{"hash": "0xreal", "gasPrice": "0x1"},
{"hash": "0xsystem2", "gasPrice": "0x00"}
{"hash": "0xsystem2", "from": "0x0000000000000000000000000000000000000000", "to": "0x0000000000000000000000000000000000000000", "gas": "0x00", "gasPrice": "0x00", "input": "0x00", "value": "0x00"}
]
}`

Expand All @@ -100,7 +101,7 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
return req, resp
}

t.Run("FiltersGasPriceZeroTransactionsOnHyperEVM", func(t *testing.T) {
t.Run("FiltersOnlySystemTransactionsOnHyperEVM", func(t *testing.T) {
req, resp := newResponse(t)
network := &testNetwork{cfg: &common.NetworkConfig{
Architecture: common.ArchitectureEvm,
Expand All @@ -118,9 +119,11 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
require.Len(t, block.Transactions, 1)
assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
assert.Equal(t, "0x1", block.Transactions[0]["gasPrice"])
require.Len(t, block.Transactions, 2)
assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
assert.Equal(t, "0x0", block.Transactions[0]["gasPrice"])
assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
assert.Equal(t, "0x1", block.Transactions[1]["gasPrice"])
})

t.Run("NetworkPostForwardFiltersCachedBlocks", func(t *testing.T) {
Expand All @@ -142,8 +145,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
require.Len(t, block.Transactions, 1)
assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
require.Len(t, block.Transactions, 2)
assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
})

t.Run("NetworkPostForwardFiltersBlockByHash", func(t *testing.T) {
Expand All @@ -169,8 +173,9 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
require.Len(t, block.Transactions, 1)
assert.Equal(t, "0xreal", block.Transactions[0]["hash"])
require.Len(t, block.Transactions, 2)
assert.Equal(t, "0xreal-zero-price", block.Transactions[0]["hash"])
assert.Equal(t, "0xreal", block.Transactions[1]["hash"])
})

t.Run("PreservesHashOnlyTransactionsOnHyperEVM", func(t *testing.T) {
Expand Down Expand Up @@ -225,7 +230,42 @@ func TestUpstreamPostForward_HyperEVMFiltersSystemTransactions(t *testing.T) {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
require.Len(t, block.Transactions, 3)
require.Len(t, block.Transactions, 4)
})

t.Run("ValidatesOriginalBlockBeforeFilteringAllSystemTransactions", func(t *testing.T) {
systemOnlyBlockJSON := `{
"number": "0x100",
"hash": "0xabc123",
"transactionsRoot": "0x1111111111111111111111111111111111111111111111111111111111111111",
"transactions": [
{"hash": "0xsystem1", "from": "0x0", "to": "0x0", "gas": "0x0", "gasPrice": "0x0", "input": "0x", "value": "0x0"}
]
}`
jrpcResp, err := common.NewJsonRpcResponseFromBytes([]byte(`1`), []byte(systemOnlyBlockJSON), nil)
require.NoError(t, err)
req := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":1,"method":"eth_getBlockByNumber","params":["0x100",true]}`))
req.SetDirectives(&common.RequestDirectives{ValidateTransactionsRoot: true})
resp := common.NewNormalizedResponse().
WithRequest(req).
WithJsonRpcResponse(jrpcResp)
network := &testNetwork{cfg: &common.NetworkConfig{
Architecture: common.ArchitectureEvm,
Evm: &common.EvmNetworkConfig{
ChainId: 999,
},
}}

filteredResp, err := upstreamPostForward_eth_getBlockByNumber(ctx, network, nil, req, resp, nil)
require.NoError(t, err)

jrr, err := filteredResp.JsonRpcResponse(ctx)
require.NoError(t, err)
var block struct {
Transactions []map[string]interface{} `json:"transactions"`
}
require.NoError(t, common.SonicCfg.Unmarshal(jrr.GetResultBytes(), &block))
require.Empty(t, block.Transactions)
})
}

Expand Down
9 changes: 3 additions & 6 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,12 +1746,9 @@ type ConsensusPolicyConfig struct {
// the participant set so the first `maxParticipants` drawn satisfy
// every entry — without changing `maxParticipants` itself.
//
// Best-effort and governed by the EXISTING consensus behaviors: if a
// required group has fewer healthy upstreams than requested (or the
// quotas can't all fit within `maxParticipants`), consensus simply
// runs with what it can promote and the resulting participation is
// handled by `lowParticipantsBehavior` / `agreementThreshold` exactly
// like any other low-participation tick. Empty (default) = disabled.
// Reordering is best-effort, but final analysis fails closed when valid
// consensus responses do not satisfy every required quota. Empty
// (default) = disabled.
RequiredParticipants []*ConsensusRequiredParticipant `yaml:"requiredParticipants,omitempty" json:"requiredParticipants,omitempty"`
}

Expand Down
58 changes: 58 additions & 0 deletions consensus/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type consensusAnalysis struct {
originalRequest *common.NormalizedRequest
leaderUpstream common.Upstream
method string // The RPC method being called (e.g., "eth_getTransactionCount")
waitCapped bool
missingRequired []string

// Cached computed values
cachedBestNonEmpty *responseGroup
Expand All @@ -73,11 +75,16 @@ func contextFrom(ctxOrExec any) context.Context {
}

func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult) *consensusAnalysis {
return newConsensusAnalysisWithOptions(lg, ctxOrExec, config, responses, false)
}

func newConsensusAnalysisWithOptions(lg *zerolog.Logger, ctxOrExec any, config *config, responses []*execResult, waitCapped bool) *consensusAnalysis {
ctx := contextFrom(ctxOrExec)
analysis := &consensusAnalysis{
config: config,
groups: make(map[string]*responseGroup),
totalParticipants: len(responses),
waitCapped: waitCapped,
}

// Try to extract original request and compute leader upstream once
Expand Down Expand Up @@ -162,10 +169,61 @@ func newConsensusAnalysis(lg *zerolog.Logger, ctxOrExec any, config *config, res
analysis.getBestError()
analysis.getBestByCount()
analysis.getBestBySize()
analysis.missingRequired = analysis.computeMissingRequiredParticipants()

return analysis
}

func (a *consensusAnalysis) computeMissingRequiredParticipants() []string {
if a == nil || a.config == nil || len(a.config.requiredParticipants) == 0 {
return nil
}

seenByRequirement := make([]map[string]struct{}, len(a.config.requiredParticipants))
for _, group := range a.groups {
for _, result := range group.Results {
if result == nil || result.Upstream == nil || result.CachedResponseType == ResponseTypeInfrastructureError {
continue
}
upstreamID := result.Upstream.Id()
if upstreamID == "" {
upstreamID = fmt.Sprintf("%p", result.Upstream)
}
for i, required := range a.config.requiredParticipants {
if required == nil || required.Tag == "" || required.MinParticipants <= 0 {
continue
}
if !upstreamMatchesTag(result.Upstream, required.Tag) {
continue
}
if seenByRequirement[i] == nil {
seenByRequirement[i] = make(map[string]struct{})
}
seenByRequirement[i][upstreamID] = struct{}{}
}
}
}

var missing []string
for i, required := range a.config.requiredParticipants {
if required == nil || required.Tag == "" || required.MinParticipants <= 0 {
continue
}
have := len(seenByRequirement[i])
if have < required.MinParticipants {
missing = append(missing, fmt.Sprintf("%s=%d/%d", required.Tag, have, required.MinParticipants))
}
}
return missing
}

func (a *consensusAnalysis) hasPendingRequiredParticipants() bool {
if a == nil || a.config == nil || len(a.missingRequired) == 0 || a.waitCapped {
return false
}
return a.hasRemaining()
}

func (a *consensusAnalysis) hasRemaining() bool {
return a.config.maxParticipants > a.totalParticipants
}
Expand Down
27 changes: 21 additions & 6 deletions consensus/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func (e *executor) Run(
// include the configured minimum from each required group. Runs at the
// very top of consensus, before any participant slot consumes an
// upstream (req.UpstreamIdx is still 0), so the reorder takes effect.
// Best-effort: shortfalls fall through to lowParticipantsBehavior /
// agreementThreshold like organic low participation.
// Reordering is best-effort, but final analysis fails closed if valid
// responses do not satisfy the required tag quotas.
if len(e.config.requiredParticipants) > 0 {
if reordered := reorderForParticipantQuota(originalReq.Upstreams(), e.config.requiredParticipants); len(reordered) > 0 {
originalReq.SetUpstreams(reordered)
Expand Down Expand Up @@ -552,10 +552,7 @@ func (e *executor) runAnalyzer(
if waitTimer != nil {
waitTimer.Stop()
}
if analysis == nil {
analysis = newConsensusAnalysis(e.logger, ctx, e.config, responses)
winner = e.determineWinner(lg, analysis)
}
analysis, winner = e.finalizeAnalyzerDecision(lg, ctx, responses, analysis, winner, waitCapped, shortCircuited)
if !shortCircuited {
// Short-circuit branch already marked winners; mark here only
// for the wait-all path.
Expand Down Expand Up @@ -626,6 +623,24 @@ func (e *executor) runAnalyzer(
e.releaseNonWinningResponses(analysis, winner)
}

func (e *executor) finalizeAnalyzerDecision(
lg *zerolog.Logger,
ctx context.Context,
responses []*execResult,
analysis *consensusAnalysis,
winner *slotResult,
waitCapped bool,
shortCircuited bool,
) (*consensusAnalysis, *slotResult) {
if analysis == nil || waitCapped {
analysis = newConsensusAnalysisWithOptions(e.logger, ctx, e.config, responses, waitCapped)
}
if shortCircuited {
return analysis, winner
}
return analysis, e.determineWinner(lg, analysis)
}

// releaseNonWinningResponses releases the Result pointers on every non-winning
// execResult in analysis.groups. Extracted verbatim from the previous inline
// loop in Apply() so behavior is preserved.
Expand Down
6 changes: 2 additions & 4 deletions consensus/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import "github.com/erpc/erpc/common"
// Semantics:
// - Best-effort: if a required group has fewer matching upstreams than
// requested (or several quotas can't all fit within maxParticipants),
// it promotes everything it can and leaves the shortfall to the
// existing lowParticipantsBehavior / agreementThreshold handling —
// consensus is not aware this happened, it just sees fewer/uneven
// participants like any organic low-participation tick.
// it promotes everything it can. Final analysis enforces the quota
// against valid responses and returns low-participants on shortfall.
// - Minimal disturbance: non-required upstreams keep their incoming
// (selection-policy) order in the remaining slots, so ranking/quality
// is preserved wherever the quota doesn't force a change. Order WITHIN
Expand Down
Loading
Loading