Skip to content

Commit d5f981c

Browse files
authored
fix(fiber): split DA Submit at Fibre's 128 MiB upload cap + duration log (#3307)
* feat(fibre): log per-Submit upload duration The Fibre Submit path was opaque: failures showed up as DeadlineExceeded with no signal of how long the upload actually took, and successes only logged at debug level inside the upstream library. During load-test debugging this turned into a guessing game — was the cluster slow, the deadline too tight, or something stuck mid-RPC? Add a single info-level (warn-on-failure) log line in fiberDAClient.Submit covering the Upload call: duration, flat blob bytes, blob count. Cheap (one time.Since) and gives the operator concrete numbers — e.g. "17 blobs / 115 MiB / 1.5 s" — to reason about whether RPCTimeout, pending cap, or batch sizing is the right knob to turn next. * fix(fibre): split DA Submit batches at Fibre's 128 MiB upload cap Under sustained txsim load (~50 MiB/s) the DA submitter batched 10 block_data items into one Upload(), producing a flat payload of 144 MiB. Fibre's per-upload cap is hard at ~128 MiB ("blob size exceeds maximum allowed size: data size 144366912 exceeds maximum 134217723") and rejected every batched upload. With MaxPendingHeadersAndData=10 that took down 170 consecutive submissions before the node halted itself with "Data exceeds DA blob size limit". Wrap the Upload call in a chunker that groups input blobs into ≤120 MiB chunks (8 MiB headroom under Fibre's cap for the per-blob length-prefix overhead added by flattenBlobs) and uploads each chunk separately. Aggregates submitted counts and BlobIDs across chunks; on first chunk failure, returns the error with the partially-submitted count so the submitter's retry/backoff logic sees a coherent state instead of all-or-nothing. Single oversized blobs (already validated against DefaultMaxBlobSize earlier in Submit) still land alone and fail server-side, but at least don't drag healthy peers into the same rejected batch. * fix(evnode-fibre): cap per-block data at 100 MiB to fit a Fibre upload Companion to the submitter chunking fix. The submitter can split a multi-blob batch into ≤120 MiB Fibre uploads, but a *single* block_data item that exceeds 128 MiB still ends up alone in its own chunk and fails server-side ("blob size exceeds maximum allowed size"). Lower the per-block cap to 100 MiB so under high-throughput txsim a single block can't grow past Fibre's hard limit, and update the comment to explain the relationship between this cap and Fibre's ~128 MiB upload reject threshold.
1 parent 91089a5 commit d5f981c

2 files changed

Lines changed: 91 additions & 23 deletions

File tree

block/internal/da/fiber_client.go

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,43 +87,106 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na
8787
}
8888
}
8989

90-
flat := flattenBlobs(data)
90+
// Fibre's per-upload cap is ~128 MiB (hard server-side reject:
91+
// "data size %d exceeds maximum 134217723"). flattenBlobs adds
92+
// 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk
93+
// to leave overhead room and avoid borderline rejects.
94+
chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget)
9195
nsID := namespace[len(namespace)-10:]
92-
result, err := c.fiber.Upload(context.Background(), nsID, flat)
93-
if err != nil {
94-
code := datypes.StatusError
95-
switch {
96-
case errors.Is(err, context.Canceled):
97-
code = datypes.StatusContextCanceled
98-
case errors.Is(err, context.DeadlineExceeded):
99-
code = datypes.StatusContextDeadline
100-
}
101-
c.logger.Error().Err(err).Msg("fiber upload failed")
102-
return datypes.ResultSubmit{
103-
BaseResult: datypes.BaseResult{
104-
Code: code,
105-
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
106-
SubmittedCount: uint64(len(data) - 1),
107-
BlobSize: blobSize,
108-
Timestamp: time.Now(),
109-
},
96+
97+
ids := make([][]byte, 0, len(chunks))
98+
var submitted int
99+
for chunkIdx, chunk := range chunks {
100+
flat := flattenBlobs(chunk)
101+
uploadStart := time.Now()
102+
result, err := c.fiber.Upload(context.Background(), nsID, flat)
103+
uploadDuration := time.Since(uploadStart)
104+
if err != nil {
105+
c.logger.Warn().
106+
Dur("duration", uploadDuration).
107+
Int("flat_size", len(flat)).
108+
Int("blob_count", len(chunk)).
109+
Int("chunk_idx", chunkIdx).
110+
Int("chunk_total", len(chunks)).
111+
Err(err).
112+
Msg("fiber upload duration (failed)")
113+
code := datypes.StatusError
114+
switch {
115+
case errors.Is(err, context.Canceled):
116+
code = datypes.StatusContextCanceled
117+
case errors.Is(err, context.DeadlineExceeded):
118+
code = datypes.StatusContextDeadline
119+
}
120+
c.logger.Error().Err(err).Msg("fiber upload failed")
121+
return datypes.ResultSubmit{
122+
BaseResult: datypes.BaseResult{
123+
Code: code,
124+
Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err),
125+
SubmittedCount: uint64(submitted),
126+
BlobSize: blobSize,
127+
Timestamp: time.Now(),
128+
},
129+
}
110130
}
131+
c.logger.Info().
132+
Dur("duration", uploadDuration).
133+
Int("flat_size", len(flat)).
134+
Int("blob_count", len(chunk)).
135+
Int("chunk_idx", chunkIdx).
136+
Int("chunk_total", len(chunks)).
137+
Msg("fiber upload duration (ok)")
138+
ids = append(ids, result.BlobID)
139+
submitted += len(chunk)
111140
}
112141

113-
c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
142+
c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
114143

115144
return datypes.ResultSubmit{
116145
BaseResult: datypes.BaseResult{
117146
Code: datypes.StatusSuccess,
118-
IDs: [][]byte{result.BlobID},
119-
SubmittedCount: uint64(len(data)),
147+
IDs: ids,
148+
SubmittedCount: uint64(submitted),
120149
Height: 0, /* TODO */
121150
BlobSize: blobSize,
122151
Timestamp: time.Now(),
123152
},
124153
}
125154
}
126155

156+
// fibreUploadChunkBudget is the target maximum flattened size of a single
157+
// Fibre Upload call. Fibre rejects payloads above ~128 MiB
158+
// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for
159+
// flattenBlobs's per-blob length prefixes and for any future overhead.
160+
const fibreUploadChunkBudget = 120 * 1024 * 1024
161+
162+
// chunkBlobsForFibre groups data into chunks whose flattened size stays
163+
// below budget. Per-blob length-prefix overhead matches flattenBlobs.
164+
// A single oversized blob (already validated against DefaultMaxBlobSize
165+
// above) lands in its own chunk; the upload still fails server-side but
166+
// at least we don't drag healthy peers down with it.
167+
func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte {
168+
if len(data) == 0 {
169+
return nil
170+
}
171+
chunks := make([][][]byte, 0, 1)
172+
cur := make([][]byte, 0, len(data))
173+
curSize := 4 // flattenBlobs's count prefix
174+
for _, b := range data {
175+
entry := 4 + len(b)
176+
if len(cur) > 0 && curSize+entry > budget {
177+
chunks = append(chunks, cur)
178+
cur = make([][]byte, 0, len(data))
179+
curSize = 4
180+
}
181+
cur = append(cur, b)
182+
curSize += entry
183+
}
184+
if len(cur) > 0 {
185+
chunks = append(chunks, cur)
186+
}
187+
return chunks
188+
}
189+
127190
func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
128191
return c.retrieve(ctx, height, namespace, true)
129192
}

tools/celestia-node-fiber/cmd/evnode-fibre/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,12 @@ func run(cli cliFlags) error {
302302
// Fiber-tuned profile: BatchingStrategy=adaptive, BatchMaxDelay=1.5s,
303303
// DA.BlockTime=1s, MaxPendingHeadersAndData=0, plus 120 MiB blob cap.
304304
cfg.ApplyFiberDefaults()
305-
block.SetMaxBlobSize(120 * 1024 * 1024)
305+
// 100 MiB — bounded by Fibre's hard ~128 MiB per-upload cap (we
306+
// hit `data size exceeds maximum 134217723` at 128 MiB - 5 B).
307+
// Set the per-block data cap below that so each block_data item
308+
// fits in a single Fibre upload after the submitter splits a
309+
// multi-blob batch into ≤120 MiB chunks.
310+
block.SetMaxBlobSize(100 * 1024 * 1024)
306311
cfg.P2P.ListenAddress = cli.p2pListen
307312
cfg.P2P.DisableConnectionGater = true
308313
cfg.RPC.Address = cli.rpcListen

0 commit comments

Comments
 (0)