Skip to content

Commit 78e7994

Browse files
Ganesha UpadhyayaGanesha Upadhyaya
authored andcommitted
add prepare and process proposal abci calls to executor
1 parent a13eb84 commit 78e7994

7 files changed

Lines changed: 143 additions & 13 deletions

File tree

block/manager.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,10 @@ func (m *Manager) publishBlock(ctx context.Context) error {
602602
block = pendingBlock
603603
} else {
604604
m.logger.Info("Creating and publishing block", "height", newHeight)
605-
block = m.createBlock(newHeight, lastCommit, lastHeaderHash)
605+
block, err = m.createBlock(ctx, newHeight, lastCommit, lastHeaderHash)
606+
if err != nil {
607+
return nil
608+
}
606609
m.logger.Debug("block info", "num_tx", len(block.Data.Txs))
607610

608611
block.SignedHeader.DataHash, err = block.Data.Hash()
@@ -775,10 +778,10 @@ func (m *Manager) getLastBlockTime() time.Time {
775778
return m.lastState.LastBlockTime
776779
}
777780

778-
func (m *Manager) createBlock(height uint64, lastCommit *types.Commit, lastHeaderHash types.Hash) *types.Block {
781+
func (m *Manager) createBlock(ctx context.Context, height uint64, lastCommit *types.Commit, lastHeaderHash types.Hash) (*types.Block, error) {
779782
m.lastStateMtx.RLock()
780783
defer m.lastStateMtx.RUnlock()
781-
return m.executor.CreateBlock(height, lastCommit, lastHeaderHash, m.lastState)
784+
return m.executor.CreateBlock(ctx, height, lastCommit, lastHeaderHash, m.lastState)
782785
}
783786

784787
func (m *Manager) applyBlock(ctx context.Context, block *types.Block) (types.State, *abci.ResponseFinalizeBlock, error) {

node/full_client_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,8 @@ func TestTx(t *testing.T) {
476476

477477
mockApp := &mocks.Application{}
478478
mockApp.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
479+
mockApp.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
480+
mockApp.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
479481
key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
480482
genesisValidators, signingKey := getGenesisValidatorSetWithSigner(1)
481483
node, err := newFullNode(context.Background(), config.NodeConfig{
@@ -808,7 +810,8 @@ func createApp(require *require.Assertions, vKeyToRemove cmcrypto.PrivKey, wg *s
808810

809811
pbValKey, err := encoding.PubKeyToProto(vKeyToRemove.PubKey())
810812
require.NoError(err)
811-
813+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
814+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
812815
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse).Times(2)
813816
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(
814817
func(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
@@ -909,6 +912,8 @@ func TestMempool2Nodes(t *testing.T) {
909912

910913
app := &mocks.Application{}
911914
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
915+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse)
916+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
912917
app.On("CheckTx", mock.Anything, &abci.RequestCheckTx{Tx: []byte("bad")}).Return(&abci.ResponseCheckTx{Code: 1}, nil)
913918
app.On("CheckTx", mock.Anything, &abci.RequestCheckTx{Tx: []byte("good")}).Return(&abci.ResponseCheckTx{Code: 0}, nil)
914919
key1, _, _ := crypto.GenerateEd25519Key(crand.Reader)
@@ -1123,6 +1128,8 @@ func TestFutureGenesisTime(t *testing.T) {
11231128
wg.Add(1)
11241129
mockApp := &mocks.Application{}
11251130
mockApp.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
1131+
mockApp.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
1132+
mockApp.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
11261133
mockApp.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse).Run(func(_ mock.Arguments) {
11271134
beginBlockTime = time.Now()
11281135
wg.Done()

node/full_node_integration_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,21 @@ import (
3232
testutils "github.com/celestiaorg/utils/test"
3333
)
3434

35+
func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
36+
return &abci.ResponsePrepareProposal{
37+
Txs: req.Txs,
38+
}, nil
39+
}
40+
3541
func TestAggregatorMode(t *testing.T) {
3642
assert := assert.New(t)
3743
require := require.New(t)
3844

3945
app := &mocks.Application{}
4046
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
4147
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
48+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
49+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
4250
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse)
4351
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
4452

@@ -99,6 +107,8 @@ func TestTxGossipingAndAggregation(t *testing.T) {
99107
beginCnt := 0
100108
endCnt := 0
101109
commitCnt := 0
110+
// prepareProposal := 0
111+
// processProposal := 0
102112
for _, call := range app.Calls {
103113
switch call.Method {
104114
case "FinalizeBlock":
@@ -107,13 +117,19 @@ func TestTxGossipingAndAggregation(t *testing.T) {
107117
endCnt++
108118
case "Commit":
109119
commitCnt++
120+
// case "PrepareProposal":
121+
// prepareProposal++
122+
// case "ProcessProposal":
123+
// processProposal++
110124
}
111125
}
112126
aggregatorHeight := nodes[0].Store.Height()
113127
adjustedHeight := int(aggregatorHeight - 3) // 3 is completely arbitrary
114128
assert.GreaterOrEqual(beginCnt, adjustedHeight)
115129
assert.GreaterOrEqual(endCnt, adjustedHeight)
116130
assert.GreaterOrEqual(commitCnt, adjustedHeight)
131+
// assert.GreaterOrEqual(prepareProposal, adjustedHeight)
132+
// assert.GreaterOrEqual(processProposal, adjustedHeight)
117133

118134
// assert that all blocks known to node are same as produced by aggregator
119135
for h := uint64(1); h <= nodes[i].Store.Height(); h++ {
@@ -133,6 +149,8 @@ func TestLazyAggregator(t *testing.T) {
133149
app := &mocks.Application{}
134150
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
135151
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
152+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
153+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
136154
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse)
137155
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
138156

@@ -536,6 +554,8 @@ func createNode(ctx context.Context, n int, aggregator bool, isLight bool, keys
536554
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
537555
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
538556
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
557+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
558+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
539559
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(finalizeBlockResponse)
540560

541561
if ctx == nil {

rpc/json/test_helpers.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,20 @@ import (
2222
"github.com/rollkit/rollkit/test/mocks"
2323
)
2424

25+
func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
26+
return &abci.ResponsePrepareProposal{
27+
Txs: req.Txs,
28+
}, nil
29+
}
30+
2531
// copied from rpc
2632
func getRPC(t *testing.T) (*mocks.Application, rpcclient.Client) {
2733
t.Helper()
2834
require := require.New(t)
2935
app := &mocks.Application{}
3036
app.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
37+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse).Maybe()
38+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
3139
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(
3240
func(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
3341
txResults := make([]*abci.ExecTxResult, len(req.Txs))

state/executor.go

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,13 @@ func (e *BlockExecutor) InitChain(genesis *cmtypes.GenesisDoc) (*abci.ResponseIn
8686
}
8787

8888
// CreateBlock reaps transactions from mempool and builds a block.
89-
func (e *BlockExecutor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash types.Hash, state types.State) *types.Block {
89+
func (e *BlockExecutor) CreateBlock(ctx context.Context, height uint64, lastCommit *types.Commit, lastHeaderHash types.Hash, state types.State) (*types.Block, error) {
9090
maxBytes := state.ConsensusParams.Block.MaxBytes
91+
emptyMaxBytes := maxBytes == -1
92+
if emptyMaxBytes {
93+
maxBytes = int64(cmtypes.MaxBlockSizeBytes)
94+
}
95+
9196
maxGas := state.ConsensusParams.Block.MaxGas
9297

9398
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(maxBytes, maxGas)
@@ -121,16 +126,80 @@ func (e *BlockExecutor) CreateBlock(height uint64, lastCommit *types.Commit, las
121126
// Evidence: types.EvidenceData{Evidence: nil},
122127
},
123128
}
129+
130+
rpp, err := e.proxyApp.PrepareProposal(
131+
ctx,
132+
&abci.RequestPrepareProposal{
133+
MaxTxBytes: maxBytes,
134+
Txs: mempoolTxs.ToSliceOfBytes(),
135+
LocalLastCommit: abci.ExtendedCommitInfo{},
136+
Misbehavior: []abci.Misbehavior{},
137+
Height: int64(block.Height()),
138+
Time: block.Time(),
139+
NextValidatorsHash: block.SignedHeader.NextAggregatorsHash,
140+
ProposerAddress: e.proposerAddress,
141+
},
142+
)
143+
if err != nil {
144+
// The App MUST ensure that only valid (and hence 'processable') transactions
145+
// enter the mempool. Hence, at this point, we can't have any non-processable
146+
// transaction causing an error.
147+
//
148+
// Also, the App can simply skip any transaction that could cause any kind of trouble.
149+
// Either way, we cannot recover in a meaningful way, unless we skip proposing
150+
// this block, repair what caused the error and try again. Hence, we return an
151+
// error for now (the production code calling this function is expected to panic).
152+
return nil, err
153+
}
154+
155+
txl := cmtypes.ToTxs(rpp.Txs)
156+
if err := txl.Validate(maxBytes); err != nil {
157+
return nil, err
158+
}
159+
160+
block.Data.Txs = toRollkitTxs(txl)
124161
block.SignedHeader.LastCommitHash = lastCommit.GetCommitHash(&block.SignedHeader.Header, e.proposerAddress)
125162
block.SignedHeader.LastHeaderHash = lastHeaderHash
126163
block.SignedHeader.AggregatorsHash = state.Validators.Hash()
127164

128-
return block
165+
return block, nil
166+
}
167+
168+
func (e *BlockExecutor) ProcessProposal(
169+
block *types.Block,
170+
state types.State,
171+
) (bool, error) {
172+
resp, err := e.proxyApp.ProcessProposal(context.TODO(), &abci.RequestProcessProposal{
173+
Hash: block.Hash(),
174+
Height: int64(block.Height()),
175+
Time: block.Time(),
176+
Txs: block.Data.Txs.ToSliceOfBytes(),
177+
ProposedLastCommit: abci.CommitInfo{},
178+
Misbehavior: []abci.Misbehavior{},
179+
ProposerAddress: e.proposerAddress,
180+
NextValidatorsHash: block.SignedHeader.NextAggregatorsHash,
181+
})
182+
if err != nil {
183+
return false, err
184+
}
185+
if resp.IsStatusUnknown() {
186+
panic(fmt.Sprintf("ProcessProposal responded with status %s", resp.Status.String()))
187+
}
188+
189+
return resp.IsAccepted(), nil
129190
}
130191

131192
// ApplyBlock validates and executes the block.
132193
func (e *BlockExecutor) ApplyBlock(ctx context.Context, state types.State, block *types.Block) (types.State, *abci.ResponseFinalizeBlock, error) {
133-
err := e.Validate(state, block)
194+
isAppValid, err := e.ProcessProposal(block, state)
195+
if err != nil {
196+
return types.State{}, nil, err
197+
}
198+
if !isAppValid {
199+
return types.State{}, nil, fmt.Errorf("error while processing the proposal: %v", err)
200+
}
201+
202+
err = e.Validate(state, block)
134203
if err != nil {
135204
return types.State{}, nil, err
136205
}

state/executor_test.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ import (
2525
"github.com/rollkit/rollkit/types"
2626
)
2727

28+
func prepareProposalResponse(_ context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
29+
return &abci.ResponsePrepareProposal{
30+
Txs: req.Txs,
31+
}, nil
32+
}
33+
2834
func doTestCreateBlock(t *testing.T) {
2935
assert := assert.New(t)
3036
require := require.New(t)
@@ -33,7 +39,8 @@ func doTestCreateBlock(t *testing.T) {
3339

3440
app := &mocks.Application{}
3541
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
36-
42+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse)
43+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
3744
fmt.Println("App On CheckTx")
3845
client, err := proxy.NewLocalClientCreator(app).NewABCIClient()
3946
fmt.Println("Created New Local Client")
@@ -65,15 +72,17 @@ func doTestCreateBlock(t *testing.T) {
6572
state.NextValidators = cmtypes.NewValidatorSet(validators)
6673

6774
// empty block
68-
block := executor.CreateBlock(1, &types.Commit{}, []byte{}, state)
75+
block, err := executor.CreateBlock(context.Background(), 1, &types.Commit{}, []byte{}, state)
76+
require.NoError(err)
6977
require.NotNil(block)
7078
assert.Empty(block.Data.Txs)
7179
assert.Equal(uint64(1), block.Height())
7280

7381
// one small Tx
7482
err = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{})
7583
require.NoError(err)
76-
block = executor.CreateBlock(2, &types.Commit{}, []byte{}, state)
84+
block, err = executor.CreateBlock(context.Background(), 2, &types.Commit{}, []byte{}, state)
85+
require.NoError(err)
7786
require.NotNil(block)
7887
assert.Equal(uint64(2), block.Height())
7988
assert.Len(block.Data.Txs, 1)
@@ -83,7 +92,8 @@ func doTestCreateBlock(t *testing.T) {
8392
require.NoError(err)
8493
err = mpool.CheckTx(make([]byte, 100), func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{})
8594
require.NoError(err)
86-
block = executor.CreateBlock(3, &types.Commit{}, []byte{}, state)
95+
block, err = executor.CreateBlock(context.Background(), 3, &types.Commit{}, []byte{}, state)
96+
require.NoError(err)
8797
require.NotNil(block)
8898
assert.Len(block.Data.Txs, 2)
8999
}
@@ -105,6 +115,8 @@ func doTestApplyBlock(t *testing.T) {
105115
app := &mocks.Application{}
106116
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{}, nil)
107117
app.On("Commit", mock.Anything, mock.Anything).Return(&abci.ResponseCommit{}, nil)
118+
app.On("PrepareProposal", mock.Anything, mock.Anything).Return(prepareProposalResponse)
119+
app.On("ProcessProposal", mock.Anything, mock.Anything).Return(&abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT}, nil)
108120
app.On("FinalizeBlock", mock.Anything, mock.Anything).Return(
109121
func(_ context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
110122
txResults := make([]*abci.ExecTxResult, len(req.Txs))
@@ -167,7 +179,8 @@ func doTestApplyBlock(t *testing.T) {
167179

168180
err = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{})
169181
require.NoError(err)
170-
block := executor.CreateBlock(1, &types.Commit{Signatures: []types.Signature{types.Signature([]byte{1, 1, 1})}}, []byte{}, state)
182+
block, err := executor.CreateBlock(context.Background(), 1, &types.Commit{Signatures: []types.Signature{types.Signature([]byte{1, 1, 1})}}, []byte{}, state)
183+
require.NoError(err)
171184
require.NotNil(block)
172185
assert.Equal(uint64(1), block.Height())
173186
assert.Len(block.Data.Txs, 1)
@@ -196,7 +209,8 @@ func doTestApplyBlock(t *testing.T) {
196209
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{}))
197210
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{}))
198211
require.NoError(mpool.CheckTx(make([]byte, 90), func(r *abci.ResponseCheckTx) {}, mempool.TxInfo{}))
199-
block = executor.CreateBlock(2, &types.Commit{Signatures: []types.Signature{types.Signature([]byte{1, 1, 1})}}, []byte{}, newState)
212+
block, err = executor.CreateBlock(context.Background(), 2, &types.Commit{Signatures: []types.Signature{types.Signature([]byte{1, 1, 1})}}, []byte{}, newState)
213+
require.NoError(err)
200214
require.NotNil(block)
201215
assert.Equal(uint64(2), block.Height())
202216
assert.Len(block.Data.Txs, 3)

types/tx.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ func (tx Tx) Hash() []byte {
2424
return tmhash.Sum(tx)
2525
}
2626

27+
// ToSliceOfBytes converts a Txs to slice of byte slices.
28+
func (txs Txs) ToSliceOfBytes() [][]byte {
29+
txBzs := make([][]byte, len(txs))
30+
for i := 0; i < len(txs); i++ {
31+
txBzs[i] = txs[i]
32+
}
33+
return txBzs
34+
}
35+
2736
// Proof returns a simple merkle proof for this node.
2837
// Panics if i < 0 or i >= len(txs)
2938
// TODO: optimize this!

0 commit comments

Comments
 (0)