Skip to content

Commit d05470f

Browse files
fix: defer and classify stream errors in identifySyncedStreams to prevent aggressive removal (#5009)
1 parent 00e2c2f commit d05470f

2 files changed

Lines changed: 111 additions & 26 deletions

File tree

api/service/synchronize/stagedstreamsync/stage_bodies.go

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/harmony-one/harmony/core"
1212
"github.com/harmony-one/harmony/core/types"
1313
"github.com/harmony-one/harmony/internal/utils"
14+
"github.com/harmony-one/harmony/p2p/stream/common/requestmanager"
1415
syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
1516
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
1617
"github.com/ledgerwatch/erigon-lib/kv"
@@ -183,20 +184,24 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
183184
return nil
184185
}
185186

186-
// IdentifySyncedStreams roughly find the synced streams.
187+
// identifySyncedStreams queries all available streams for their current block number
188+
// and returns those at or above targetHeight.
189+
// Results map: streamID → error (nil = synced, non-nil = failure reason).
190+
// Streams below target or with context errors are not recorded.
191+
// Failed streams are only punished when synced streams exist; otherwise the
192+
// stream pool is preserved to avoid cascading removal during systemic issues.
187193
func (b *StageBodies) identifySyncedStreams(ctx context.Context, s *StageState, targetHeight uint64, excludeIDs []sttypes.StreamID) (streams []sttypes.StreamID, err error) {
194+
results := sttypes.NewSafeMap[sttypes.StreamID, error]()
188195
var (
189-
synced = make(map[sttypes.StreamID]uint64)
190-
lock sync.Mutex
191-
wg sync.WaitGroup
196+
wg sync.WaitGroup
197+
syncedCount int32
198+
failedCount int32
192199
)
193200

194201
numStreams := b.configs.protocol.NumStreams()
195202
streamIDs := b.configs.protocol.GetStreamIDs()
196203

197-
// ask all streams for height
198204
for i := 0; i < numStreams; i++ {
199-
// skip excluded streams
200205
excluded := false
201206
if len(excludeIDs) > 0 {
202207
for _, excludedStreamID := range excludeIDs {
@@ -222,44 +227,70 @@ func (b *StageBodies) identifySyncedStreams(ctx context.Context, s *StageState,
222227
bn, _, err = b.configs.protocol.GetCurrentBlockNumber(ctx, syncProto.WithWhitelist([]sttypes.StreamID{stid}))
223228
}
224229
if err != nil {
225-
b.configs.logger.Err(err).Str("streamID", string(stid)).
226-
Msg(WrapStagedSyncMsg("[identifySyncedStreams] getCurrentNumber request failed"))
227-
228230
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
229-
// Do not remove stream when failure is due to context cancelation or deadline; only mark as failed.
230-
b.configs.protocol.StreamFailed(stid, "getCurrentNumber request failed")
231-
} else {
232-
b.configs.protocol.RemoveStream(stid, "getCurrentNumber request failed")
231+
return
233232
}
233+
results.Set(stid, err)
234+
atomic.AddInt32(&failedCount, 1)
234235
return
235236
}
236237

237-
if bn < targetHeight {
238-
return
238+
if bn >= targetHeight {
239+
results.Set(stid, nil)
240+
atomic.AddInt32(&syncedCount, 1)
239241
}
240-
241-
lock.Lock()
242-
synced[stid] = bn
243-
lock.Unlock()
244242
}(stID, targetHeight)
245243
}
246244

247-
// Wait for all goroutines to finish
248245
wg.Wait()
249246

250-
// If no valid block number results were received, return an error
251-
if len(synced) == 0 {
247+
if syncedCount == 0 {
248+
if failedCount > 0 {
249+
b.configs.logger.Warn().
250+
Int32("failedStreams", failedCount).
251+
Msg(WrapStagedSyncMsg("[identifySyncedStreams] no synced streams found; skipping punishment to preserve stream pool"))
252+
}
252253
return nil, ErrZeroBlockResponse
253254
}
254255

255-
// Compute synced streams array
256-
for st := range synced {
257-
streams = append(streams, st)
258-
}
256+
streams = make([]sttypes.StreamID, 0, syncedCount)
257+
results.Iterate(func(stid sttypes.StreamID, err error) {
258+
if err == nil {
259+
streams = append(streams, stid)
260+
} else {
261+
b.handleIdentifyStreamFailure(s, stid, err)
262+
}
263+
})
259264

260265
return streams, nil
261266
}
262267

268+
func (b *StageBodies) handleIdentifyStreamFailure(s *StageState, stid sttypes.StreamID, err error) {
269+
severity := requestmanager.ClassifyRequestError(err)
270+
271+
switch severity {
272+
case requestmanager.RequestErrorSkip:
273+
b.configs.logger.Debug().Err(err).Str("streamID", string(stid)).
274+
Msg(WrapStagedSyncMsg("[identifySyncedStreams] skipping non-stream error"))
275+
276+
case requestmanager.RequestErrorCritical:
277+
b.configs.logger.Warn().Err(err).Str("streamID", string(stid)).
278+
Msg(WrapStagedSyncMsg("[identifySyncedStreams] removing stream due to critical error"))
279+
if s.state.bnCache != nil {
280+
s.state.bnCache.RemoveStream(stid)
281+
}
282+
b.configs.protocol.RemoveStream(stid, "identifySyncedStreams: critical protocol error")
283+
284+
default:
285+
b.configs.logger.Info().Err(err).Str("streamID", string(stid)).
286+
Msg(WrapStagedSyncMsg("[identifySyncedStreams] marking stream as failed"))
287+
if s.state.bnCache != nil {
288+
s.state.bnCache.InvalidateStream(stid)
289+
}
290+
b.configs.protocol.StreamFailed(stid, "identifySyncedStreams: request failed")
291+
}
292+
}
293+
263294
func (b *StageBodies) runDownloadLoop(ctx context.Context, tx kv.RwTx, gbm *downloadManager, s *StageState, wl []sttypes.StreamID, startBlockNumber uint64, startTime time.Time) {
264295
currentBlock := startBlockNumber
265296
concurrency := s.state.config.Concurrency

p2p/stream/common/requestmanager/types.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package requestmanager
22

33
import (
44
"container/list"
5+
"strings"
56
"sync"
67
"sync/atomic"
78
"time"
@@ -22,6 +23,59 @@ var (
2223
ErrNoAvailableStream = errors.New("no available stream")
2324
)
2425

26+
// RequestErrorSeverity represents how a request error should affect the stream.
27+
type RequestErrorSeverity int
28+
29+
const (
30+
// RequestErrorSkip means the error is not the stream's fault
31+
// (system-level: stream busy, queue full, no available streams).
32+
RequestErrorSkip RequestErrorSeverity = iota
33+
34+
// RequestErrorLow means the stream had a transient problem
35+
// (timeout, write failure, peer error response).
36+
RequestErrorLow
37+
38+
// RequestErrorCritical means the stream has a fundamental problem
39+
// (protocol mismatch, malformed response).
40+
RequestErrorCritical
41+
)
42+
43+
// ClassifyRequestError classifies request-level errors by severity.
44+
// Unlike ClassifyStreamError (transport-level), this handles errors from
45+
// DoRequest and protocol response parsing.
46+
func ClassifyRequestError(err error) RequestErrorSeverity {
47+
if err == nil {
48+
return RequestErrorSkip
49+
}
50+
51+
if errors.Is(err, ErrQueueFull) || errors.Is(err, ErrClosed) || errors.Is(err, ErrNoAvailableStream) {
52+
return RequestErrorSkip
53+
}
54+
55+
lower := strings.ToLower(err.Error())
56+
57+
// Dispatch-level: stream busy or unavailable
58+
if strings.Contains(lower, "no more available") ||
59+
strings.Contains(lower, "too many requests") {
60+
return RequestErrorSkip
61+
}
62+
63+
// Transient I/O or timeout
64+
if strings.Contains(lower, "request timeout") ||
65+
strings.Contains(lower, "write bytes") ||
66+
strings.Contains(lower, "stream removed") {
67+
return RequestErrorLow
68+
}
69+
70+
// Protocol mismatch: stream responded with unexpected type
71+
if strings.Contains(lower, "not sync response") ||
72+
strings.Contains(lower, "response not get") {
73+
return RequestErrorCritical
74+
}
75+
76+
return RequestErrorLow
77+
}
78+
2579
// stream is the wrapped version of sttypes.Stream.
2680
// TODO: enable stream handle multiple pending requests at the same time
2781
type stream struct {

0 commit comments

Comments
 (0)