Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4abeec2
feat: implement batch size validation and automatic splitting for DA …
randygrok Jul 31, 2025
e383efb
feat: add blob size validation and end-to-end test for DA layer restart
randygrok Jul 31, 2025
e14a743
feat: implement recursive batch splitting for DA submissions
randygrok Jul 31, 2025
063c858
refactor to simplify code for retry
randygrok Jul 31, 2025
398e911
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 1, 2025
cd18407
add changes from pr
randygrok Aug 1, 2025
e0806f5
add exponential backoff fully into retryStrategy
randygrok Aug 1, 2025
2d18d0e
clean exponential backoff flow
randygrok Aug 1, 2025
3aaed4d
clean a little more
randygrok Aug 1, 2025
e64dba7
move methods for better readability
randygrok Aug 1, 2025
cce5308
refactor submission handling to improve context management and error …
randygrok Aug 1, 2025
21a2880
refactor recursive to really use recursive
randygrok Aug 1, 2025
adff059
cleanup the recursive func
randygrok Aug 1, 2025
3cc3334
remove logging into client
randygrok Aug 1, 2025
7703555
add maxAttempts as a da config
randygrok Aug 1, 2025
d12b0e5
check all txs
randygrok Aug 4, 2025
1ce35f2
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 4, 2025
ac17de1
fix integrations tests
randygrok Aug 4, 2025
dc29c63
increase timeout
randygrok Aug 4, 2025
672a8a7
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 4, 2025
e6299cd
Merge remote-tracking branch 'origin/main' into feat/fix-node-restart
randygrok Aug 5, 2025
b1f5457
use tastora v0.1.0
randygrok Aug 5, 2025
6806e00
use version 0.2.0 of tastora
randygrok Aug 5, 2025
b379974
make SubmissionOutcome, SubmissionBatch, BatchResult private
randygrok Aug 6, 2025
6012782
refactor: reorder parameters in handleSubmissionResult function
randygrok Aug 6, 2025
8f43c2e
fix: handle errors in batch submission and recursive splitting
randygrok Aug 6, 2025
d02ff04
add unit tests for the submit half batch
randygrok Aug 6, 2025
12cbedf
refactor: rename BatchAction to batchAction
randygrok Aug 6, 2025
b8a0bcd
Merge remote-tracking branch 'origin/main' into feat/fix-node-restart
randygrok Aug 6, 2025
20bc4af
docs: update block manager documentation with MaxSubmitAttempts and M…
randygrok Aug 6, 2025
10ac5a2
fix: update README.md formatting for deploy badge
randygrok Aug 6, 2025
c1b860f
fix mod tidy
randygrok Aug 7, 2025
b9fc88d
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 7, 2025
24fe0f0
Merge remote-tracking branch 'origin/main' into feat/fix-node-restart
randygrok Aug 7, 2025
fb369a7
Merge branch 'feat/fix-node-restart' of github.com-randy:evstack/ev-n…
randygrok Aug 7, 2025
0f87ffc
fix md links
randygrok Aug 7, 2025
2457ed7
Update tastora.
randygrok Aug 7, 2025
ef24e94
tidy all
randygrok Aug 7, 2025
bee0f2b
fix: update test-e2e command to use relative paths for binaries
randygrok Aug 7, 2025
d155f4e
try to fix ports issues
randygrok Aug 7, 2025
b12cdc5
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 7, 2025
864385d
Merge remote-tracking branch 'origin/main' into feat/fix-node-restart
jgimeno Aug 8, 2025
992a9d6
fix: add namespace parameter to submission functions
randygrok Aug 8, 2025
9b950cc
fix: correct formatting of namespace fields in config and defaults
randygrok Aug 8, 2025
190df5b
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 8, 2025
ce2b81e
Merge branch 'main' into feat/fix-node-restart
randygrok Aug 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ build
.DS_Store
coverage.out
execution/evm/jwttoken
target
target
/.claude/settings.local.json
23 changes: 22 additions & 1 deletion block/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,28 @@ func submitToDA[T any](
m.logger.Info("DA layer submission canceled due to context cancellation", "attempt", attempt)
return nil
case coreda.StatusTooBig:
fallthrough
m.logger.Warn("DA layer submission failed due to blob size limit", "error", res.Message, "attempt", attempt, "batchSize", len(remaining))
// Record failed DA submission (will retry)
m.recordDAMetrics("submission", DAModeFail)

// Implement batch splitting when blob is too big
if len(remaining) > 1 {
// Split the batch in half to reduce size
splitPoint := len(remaining) / 2
m.logger.Info("splitting batch due to size limit", "originalSize", len(remaining), "newSize", splitPoint)

// Keep only the first half for this attempt
remaining = remaining[:splitPoint]
marshaled = marshaled[:splitPoint]
remLen = len(remaining)

// Reset backoff since we're trying with a smaller batch
backoff = 0
Comment thread
randygrok marked this conversation as resolved.
Outdated
} else {
// If we have only 1 item and it's still too big, we can't split further
m.logger.Error("single item exceeds DA blob size limit", "itemType", itemType, "attempt", attempt)
backoff = m.exponentialBackoff(backoff)
}
default:
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
// Record failed DA submission (will retry)
Expand Down
108 changes: 102 additions & 6 deletions block/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,21 +432,21 @@ func TestCreateSignedDataToSubmit(t *testing.T) {
func fillPendingHeaders(ctx context.Context, t *testing.T, pendingHeaders *PendingHeaders, chainID string, numBlocks uint64) {
t.Helper()

store := pendingHeaders.base.store
s := pendingHeaders.base.store
for i := uint64(0); i < numBlocks; i++ {
height := i + 1
header, data := types.GetRandomBlock(height, 0, chainID)
sig := &header.Signature
err := store.SaveBlockData(ctx, header, data, sig)
err := s.SaveBlockData(ctx, header, data, sig)
require.NoError(t, err, "failed to save block data for header at height %d", height)
err = store.SetHeight(ctx, height)
err = s.SetHeight(ctx, height)
require.NoError(t, err, "failed to set store height for header at height %d", height)
}
}

func fillPendingData(ctx context.Context, t *testing.T, pendingData *PendingData, chainID string, numBlocks uint64) {
t.Helper()
store := pendingData.base.store
s := pendingData.base.store
txNum := 1
for i := uint64(0); i < numBlocks; i++ {
height := i + 1
Expand All @@ -457,9 +457,9 @@ func fillPendingData(ctx context.Context, t *testing.T, pendingData *PendingData
txNum++
}
sig := &header.Signature
err := store.SaveBlockData(ctx, header, data, sig)
err := s.SaveBlockData(ctx, header, data, sig)
require.NoError(t, err, "failed to save block data for data at height %d", height)
err = store.SetHeight(ctx, height)
err = s.SetHeight(ctx, height)
require.NoError(t, err, "failed to set store height for data at height %d", height)
}
}
Expand Down Expand Up @@ -565,6 +565,102 @@ func TestSubmitDataToDA_WithMetricsRecorder(t *testing.T) {
mockSequencer.AssertExpectations(t)
}

// TestSubmitToDA_ItChunksBatchWhenSizeExceedsLimit verifies that when DA submission
// fails with StatusTooBig, the submitter automatically splits the batch in half and
// retries until successful. This prevents infinite retry loops when batches exceed
// DA layer size limits.
func TestSubmitToDA_ItChunksBatchWhenSizeExceedsLimit(t *testing.T) {

da := &mocks.MockDA{}
m := newTestManagerWithDA(t, da)
ctx := context.Background()

// Fill with items that would be too big as a single batch
largeItemCount := uint64(10)
fillPendingHeaders(ctx, t, m.pendingHeaders, "TestFix", largeItemCount)

headers, err := m.pendingHeaders.getPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, int(largeItemCount))

var submitAttempts int
var batchSizes []int

// Mock DA behavior:
// - First call: full batch (10 items) -> StatusTooBig
// - Second call: split batch (5 items) -> Success
da.On("SubmitWithOptions", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
submitAttempts++
blobs := args.Get(1).([]coreda.Blob)
batchSizes = append(batchSizes, len(blobs))
t.Logf("DA Submit attempt %d: batch size %d", submitAttempts, len(blobs))
}).
Return(nil, coreda.ErrBlobSizeOverLimit).Once() // First attempt fails

da.On("SubmitWithOptions", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
submitAttempts++
blobs := args.Get(1).([]coreda.Blob)
batchSizes = append(batchSizes, len(blobs))
t.Logf("DA Submit attempt %d: batch size %d", submitAttempts, len(blobs))
}).
Return([]coreda.ID{getDummyID(1, []byte("id1")), getDummyID(1, []byte("id2")), getDummyID(1, []byte("id3")), getDummyID(1, []byte("id4")), getDummyID(1, []byte("id5"))}, nil).Once() // Second attempt succeeds

err = m.submitHeadersToDA(ctx, headers)

assert.NoError(t, err, "Should succeed by splitting large batch")
assert.Equal(t, 2, submitAttempts, "Should make 2 attempts: 1 large batch + 1 split batch")
assert.Equal(t, []int{10, 5}, batchSizes, "Should try full batch, then split in half")

// The first 5 items should be successfully submitted
// Note: Our current implementation only handles one split cycle per call
// The remaining 5 items would be handled in the next submission loop iteration
}

// TestSubmitToDA_SingleItemTooLarge verifies behavior when even a single item
// exceeds DA size limits and cannot be split further. This should result in
// exponential backoff and eventual failure after maxSubmitAttempts.
func TestSubmitToDA_SingleItemTooLarge(t *testing.T) {
// Skip this test normally to avoid long wait times, but keep it for manual testing
t.Skip("Skipping long-running test. Uncomment to test single item too large scenario manually.")

Comment thread
randygrok marked this conversation as resolved.
Outdated
da := &mocks.MockDA{}
m := newTestManagerWithDA(t, da)
ctx := context.Background()

// Create a single header that will always be "too big"
fillPendingHeaders(ctx, t, m.pendingHeaders, "TestSingleLarge", 1)

headers, err := m.pendingHeaders.getPendingHeaders(ctx)
require.NoError(t, err)
require.Len(t, headers, 1)

var submitAttempts int

// Mock DA to always return "too big" for this single item
da.On("SubmitWithOptions", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
submitAttempts++
blobs := args.Get(1).([]coreda.Blob)
t.Logf("DA Submit attempt %d: batch size %d (single item too large)", submitAttempts, len(blobs))
}).
Return(nil, coreda.ErrBlobSizeOverLimit) // Always fails

// This should fail after maxSubmitAttempts (30) attempts
err = m.submitHeadersToDA(ctx, headers)

// Expected behavior: Should fail after exhausting all attempts
assert.Error(t, err, "Should fail when single item is too large")
assert.Contains(t, err.Error(), "failed to submit all header(s) to DA layer")
assert.Contains(t, err.Error(), "after 30 attempts") // maxSubmitAttempts

// Should have made exactly maxSubmitAttempts (30) attempts
assert.Equal(t, 30, submitAttempts, "Should make exactly maxSubmitAttempts before giving up")

da.AssertExpectations(t)
}

func getDummyID(height uint64, commitment []byte) coreda.ID {
id := make([]byte, len(commitment)+8)
binary.LittleEndian.PutUint64(id, height)
Expand Down
42 changes: 15 additions & 27 deletions da/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,46 +140,34 @@ func (api *API) Submit(ctx context.Context, blobs []da.Blob, gasPrice float64, _
}

// SubmitWithOptions submits the Blobs to Data Availability layer with additional options.
// It checks blobs against MaxBlobSize and submits only those that fit.
// It validates the entire batch against MaxBlobSize before submission.
// If any blob or the total batch size exceeds limits, it returns ErrBlobSizeOverLimit.
func (api *API) SubmitWithOptions(ctx context.Context, inputBlobs []da.Blob, gasPrice float64, _ []byte, options []byte) ([]da.ID, error) {
maxBlobSize := api.MaxBlobSize

var (
blobsToSubmit [][]byte = make([][]byte, 0, len(inputBlobs))
currentSize uint64
oversizeBlobs int
)
if len(inputBlobs) == 0 {
return []da.ID{}, nil
}

// Validate each blob individually and calculate total size
var totalSize uint64
for i, blob := range inputBlobs {
blobLen := uint64(len(blob))
if blobLen > maxBlobSize {
api.Logger.Warn("Individual blob exceeds MaxBlobSize, cannot submit", "index", i, "blobSize", blobLen, "maxBlobSize", maxBlobSize)
oversizeBlobs++
continue
}
if currentSize+blobLen > maxBlobSize {
api.Logger.Info("Blob size limit reached for batch", "maxBlobSize", maxBlobSize, "index", i, "currentSize", currentSize, "nextBlobSize", blobLen)
break
api.Logger.Warn("Individual blob exceeds MaxBlobSize", "index", i, "blobSize", blobLen, "maxBlobSize", maxBlobSize)
Comment thread
randygrok marked this conversation as resolved.
Outdated
return nil, da.ErrBlobSizeOverLimit
}
currentSize += blobLen
blobsToSubmit = append(blobsToSubmit, blob)
totalSize += blobLen
}

if oversizeBlobs > 0 {
api.Logger.Error("Blobs exceeded size limit", "oversize_count", oversizeBlobs, "total_blobs", len(inputBlobs))
// Validate total batch size
if totalSize > maxBlobSize {
Comment thread
randygrok marked this conversation as resolved.
api.Logger.Warn("Total batch size exceeds MaxBlobSize", "totalSize", totalSize, "maxBlobSize", maxBlobSize, "numBlobs", len(inputBlobs))
return nil, da.ErrBlobSizeOverLimit
}

if len(blobsToSubmit) == 0 {
api.Logger.Info("No blobs to submit after filtering by size")
if len(inputBlobs) > 0 {
return nil, da.ErrBlobSizeOverLimit
}
return []da.ID{}, nil
}

api.Logger.Debug("Making RPC call", "method", "SubmitWithOptions", "num_blobs_original", len(inputBlobs), "num_blobs_to_submit", len(blobsToSubmit), "gas_price", gasPrice, "namespace", string(api.Namespace))
res, err := api.Internal.SubmitWithOptions(ctx, blobsToSubmit, gasPrice, api.Namespace, options)
api.Logger.Debug("Making RPC call", "method", "SubmitWithOptions", "num_blobs", len(inputBlobs), "total_size", totalSize, "gas_price", gasPrice, "namespace", string(api.Namespace))
res, err := api.Internal.SubmitWithOptions(ctx, inputBlobs, gasPrice, api.Namespace, options)
if err != nil {
if strings.Contains(err.Error(), context.Canceled.Error()) {
api.Logger.Debug("RPC call canceled due to context cancellation", "method", "SubmitWithOptions")
Expand Down
126 changes: 126 additions & 0 deletions da/jsonrpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package jsonrpc

import (
"context"
"testing"

logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/assert"

"github.com/evstack/ev-node/core/da"
)

// TestSubmitWithOptions_SizeValidation tests the corrected behavior of SubmitWithOptions
// where it validates the entire batch before submission and returns ErrBlobSizeOverLimit
// if the batch is too large, instead of silently dropping blobs.
func TestSubmitWithOptions_SizeValidation(t *testing.T) {
logger := logging.Logger("test")

testCases := []struct {
name string
maxBlobSize uint64
inputBlobs []da.Blob
expectError bool
expectedError error
description string
}{
{
name: "Empty input",
maxBlobSize: 1000,
inputBlobs: []da.Blob{},
expectError: false,
description: "Empty input should return empty result without error",
},
{
name: "Single blob within limit",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 500)},
expectError: false,
description: "Single blob smaller than limit should succeed",
},
{
name: "Single blob exceeds limit",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 1500)},
expectError: true,
expectedError: da.ErrBlobSizeOverLimit,
description: "Single blob larger than limit should fail",
},
{
name: "Multiple blobs within limit",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 300), make([]byte, 400), make([]byte, 200)},
expectError: false,
description: "Multiple blobs totaling less than limit should succeed",
},
{
name: "Multiple blobs exceed total limit",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 400), make([]byte, 400), make([]byte, 400)},
expectError: true,
expectedError: da.ErrBlobSizeOverLimit,
description: "Multiple blobs totaling more than limit should fail completely",
},
{
name: "Mixed: some blobs fit, total exceeds limit",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 100), make([]byte, 200), make([]byte, 800)},
expectError: true,
expectedError: da.ErrBlobSizeOverLimit,
description: "Should fail completely, not partially submit blobs that fit",
},
{
name: "One blob exceeds limit individually",
maxBlobSize: 1000,
inputBlobs: []da.Blob{make([]byte, 300), make([]byte, 1500), make([]byte, 200)},
expectError: true,
expectedError: da.ErrBlobSizeOverLimit,
description: "Should fail if any individual blob exceeds limit",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create API with test configuration
api := &API{
Logger: logger,
MaxBlobSize: tc.maxBlobSize,
Namespace: []byte("test"),
}

// Mock the Internal.SubmitWithOptions to always succeed if called
// This tests that our validation logic works before reaching the actual RPC call
mockCalled := false
api.Internal.SubmitWithOptions = func(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte, options []byte) ([]da.ID, error) {
mockCalled = true
// Return mock IDs for successful submissions
ids := make([]da.ID, len(blobs))
for i := range blobs {
ids[i] = []byte{byte(i)}
}
return ids, nil
}

// Call SubmitWithOptions
ctx := context.Background()
result, err := api.SubmitWithOptions(ctx, tc.inputBlobs, 1.0, nil, nil)

// Verify expectations
if tc.expectError {
assert.Error(t, err, tc.description)
if tc.expectedError != nil {
assert.ErrorIs(t, err, tc.expectedError, tc.description)
}
assert.Nil(t, result, "Result should be nil on error")
assert.False(t, mockCalled, "Internal RPC should not be called when validation fails")
} else {
assert.NoError(t, err, tc.description)
assert.NotNil(t, result, "Result should not be nil on success")
if len(tc.inputBlobs) > 0 {
assert.True(t, mockCalled, "Internal RPC should be called for valid submissions")
assert.Len(t, result, len(tc.inputBlobs), "Should return IDs for all submitted blobs")
}
}
})
}
}
Loading
Loading