Skip to content

Commit cb167b3

Browse files
committed
fix(fibre): split DA Submit batches at Fibre's 128 MiB upload cap
Under sustained txsim load (~50 MiB/s) the DA submitter batched 10 block_data items into one Upload(), producing a flat payload of 144 MiB. Fibre's per-upload cap is hard at ~128 MiB ("blob size exceeds maximum allowed size: data size 144366912 exceeds maximum 134217723") and rejected every batched upload. With MaxPendingHeadersAndData=10 that took down 170 consecutive submissions before the node halted itself with "Data exceeds DA blob size limit". Wrap the Upload call in a chunker that groups input blobs into ≤120 MiB chunks (8 MiB headroom under Fibre's cap for the per-blob length-prefix overhead added by flattenBlobs) and uploads each chunk separately. Aggregates submitted counts and BlobIDs across chunks; on first chunk failure, returns the error with the partially-submitted count so the submitter's retry/backoff logic sees a coherent state instead of all-or-nothing. Single oversized blobs (already validated against DefaultMaxBlobSize earlier in Submit) still land alone and fail server-side, but at least don't drag healthy peers into the same rejected batch.
1 parent ac2c4e5 commit cb167b3

1 file changed

Lines changed: 82 additions & 35 deletions

File tree

block/internal/da/fiber_client.go

Lines changed: 82 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -87,59 +87,106 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na
8787
}
8888
}
8989

90-
flat := flattenBlobs(data)
90+
// Fibre's per-upload cap is ~128 MiB (hard server-side reject:
91+
// "data size %d exceeds maximum 134217723"). flattenBlobs adds
92+
// 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk
93+
// to leave overhead room and avoid borderline rejects.
94+
chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget)
9195
nsID := namespace[len(namespace)-10:]
92-
uploadStart := time.Now()
93-
result, err := c.fiber.Upload(context.Background(), nsID, flat)
94-
uploadDuration := time.Since(uploadStart)
95-
if err != nil {
96-
c.logger.Warn().
97-
Dur("duration", uploadDuration).
98-
Int("flat_size", len(flat)).
99-
Int("blob_count", len(data)).
100-
Err(err).
101-
Msg("fiber upload duration (failed)")
102-
} else {
96+
97+
ids := make([][]byte, 0, len(chunks))
98+
var submitted int
99+
for chunkIdx, chunk := range chunks {
100+
flat := flattenBlobs(chunk)
101+
uploadStart := time.Now()
102+
result, err := c.fiber.Upload(context.Background(), nsID, flat)
103+
uploadDuration := time.Since(uploadStart)
104+
if err != nil {
105+
c.logger.Warn().
106+
Dur("duration", uploadDuration).
107+
Int("flat_size", len(flat)).
108+
Int("blob_count", len(chunk)).
109+
Int("chunk_idx", chunkIdx).
110+
Int("chunk_total", len(chunks)).
111+
Err(err).
112+
Msg("fiber upload duration (failed)")
113+
code := datypes.StatusError
114+
switch {
115+
case errors.Is(err, context.Canceled):
116+
code = datypes.StatusContextCanceled
117+
case errors.Is(err, context.DeadlineExceeded):
118+
code = datypes.StatusContextDeadline
119+
}
120+
c.logger.Error().Err(err).Msg("fiber upload failed")
121+
return datypes.ResultSubmit{
122+
BaseResult: datypes.BaseResult{
123+
Code: code,
124+
Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err),
125+
SubmittedCount: uint64(submitted),
126+
BlobSize: blobSize,
127+
Timestamp: time.Now(),
128+
},
129+
}
130+
}
103131
c.logger.Info().
104132
Dur("duration", uploadDuration).
105133
Int("flat_size", len(flat)).
106-
Int("blob_count", len(data)).
134+
Int("blob_count", len(chunk)).
135+
Int("chunk_idx", chunkIdx).
136+
Int("chunk_total", len(chunks)).
107137
Msg("fiber upload duration (ok)")
108-
}
109-
if err != nil {
110-
code := datypes.StatusError
111-
switch {
112-
case errors.Is(err, context.Canceled):
113-
code = datypes.StatusContextCanceled
114-
case errors.Is(err, context.DeadlineExceeded):
115-
code = datypes.StatusContextDeadline
116-
}
117-
c.logger.Error().Err(err).Msg("fiber upload failed")
118-
return datypes.ResultSubmit{
119-
BaseResult: datypes.BaseResult{
120-
Code: code,
121-
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
122-
SubmittedCount: uint64(len(data) - 1),
123-
BlobSize: blobSize,
124-
Timestamp: time.Now(),
125-
},
126-
}
138+
ids = append(ids, result.BlobID)
139+
submitted += len(chunk)
127140
}
128141

129-
c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
142+
c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
130143

131144
return datypes.ResultSubmit{
132145
BaseResult: datypes.BaseResult{
133146
Code: datypes.StatusSuccess,
134-
IDs: [][]byte{result.BlobID},
135-
SubmittedCount: uint64(len(data)),
147+
IDs: ids,
148+
SubmittedCount: uint64(submitted),
136149
Height: 0, /* TODO */
137150
BlobSize: blobSize,
138151
Timestamp: time.Now(),
139152
},
140153
}
141154
}
142155

156+
// fibreUploadChunkBudget is the target maximum flattened size of a single
157+
// Fibre Upload call. Fibre rejects payloads above ~128 MiB
158+
// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for
159+
// flattenBlobs's per-blob length prefixes and for any future overhead.
160+
const fibreUploadChunkBudget = 120 * 1024 * 1024
161+
162+
// chunkBlobsForFibre groups data into chunks whose flattened size stays
163+
// below budget. Per-blob length-prefix overhead matches flattenBlobs.
164+
// A single oversized blob (already validated against DefaultMaxBlobSize
165+
// above) lands in its own chunk; the upload still fails server-side but
166+
// at least we don't drag healthy peers down with it.
167+
func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte {
168+
if len(data) == 0 {
169+
return nil
170+
}
171+
chunks := make([][][]byte, 0, 1)
172+
cur := make([][]byte, 0, len(data))
173+
curSize := 4 // flattenBlobs's count prefix
174+
for _, b := range data {
175+
entry := 4 + len(b)
176+
if len(cur) > 0 && curSize+entry > budget {
177+
chunks = append(chunks, cur)
178+
cur = make([][]byte, 0, len(data))
179+
curSize = 4
180+
}
181+
cur = append(cur, b)
182+
curSize += entry
183+
}
184+
if len(cur) > 0 {
185+
chunks = append(chunks, cur)
186+
}
187+
return chunks
188+
}
189+
143190
func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
144191
return c.retrieve(ctx, height, namespace, true)
145192
}

0 commit comments

Comments
 (0)