Skip to content

Commit fb25598

Browse files
committed
fix
1 parent b4bd5ea commit fb25598

9 files changed

Lines changed: 412 additions & 51 deletions

File tree

block/internal/executing/executor.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,6 @@ func (e *Executor) executionLoop() {
381381
} else {
382382
delay = time.Until(currentState.LastBlockTime.Add(e.config.Node.BlockTime.Duration))
383383
}
384-
385384
if delay > 0 {
386385
e.logger.Info().Dur("delay", delay).Msg("waiting to start block production")
387386
select {

block/internal/syncing/syncer.go

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ func (s *Syncer) SetBlockSyncer(bs BlockSyncer) {
164164

165165
// Start begins the syncing component
166166
func (s *Syncer) Start(ctx context.Context) error {
167-
s.ctx, s.cancel = context.WithCancel(ctx)
167+
ctx, cancel := context.WithCancel(ctx)
168+
s.ctx, s.cancel = ctx, cancel
168169

169170
if err := s.initializeState(); err != nil {
170171
return fmt.Errorf("failed to initialize syncer state: %w", err)
@@ -195,7 +196,7 @@ func (s *Syncer) Start(ctx context.Context) error {
195196
}
196197

197198
// Start main processing loop
198-
s.wg.Go(s.processLoop)
199+
s.wg.Go(func() { s.processLoop(ctx) })
199200

200201
// Start dedicated workers for DA, and pending processing
201202
s.startSyncWorkers(ctx)
@@ -342,38 +343,37 @@ func (s *Syncer) initializeState() error {
342343
}
343344

344345
// processLoop is the main coordination loop for processing events
345-
func (s *Syncer) processLoop() {
346+
func (s *Syncer) processLoop(ctx context.Context) {
346347
s.logger.Info().Msg("starting process loop")
347348
defer s.logger.Info().Msg("process loop stopped")
348349

349350
for {
350351
select {
351-
case <-s.ctx.Done():
352+
case <-ctx.Done():
352353
return
353354
case heightEvent, ok := <-s.heightInCh:
354355
if ok {
355-
s.processHeightEvent(s.ctx, &heightEvent)
356+
s.processHeightEvent(ctx, &heightEvent)
356357
}
357358
}
358359
}
359360
}
360361

361362
func (s *Syncer) startSyncWorkers(ctx context.Context) {
362-
_ = ctx
363363
s.wg.Add(3)
364-
go s.daWorkerLoop()
365-
go s.pendingWorkerLoop()
364+
go s.daWorkerLoop(ctx)
365+
go s.pendingWorkerLoop(ctx)
366366
go s.p2pWorkerLoop(ctx)
367367
}
368368

369-
func (s *Syncer) daWorkerLoop() {
369+
func (s *Syncer) daWorkerLoop(ctx context.Context) {
370370
defer s.wg.Done()
371371

372372
s.logger.Info().Msg("starting DA worker")
373373
defer s.logger.Info().Msg("DA worker stopped")
374374

375375
for {
376-
err := s.fetchDAUntilCaughtUp()
376+
err := s.fetchDAUntilCaughtUp(ctx)
377377

378378
var backoff time.Duration
379379
if err == nil {
@@ -389,7 +389,7 @@ func (s *Syncer) daWorkerLoop() {
389389
}
390390

391391
select {
392-
case <-s.ctx.Done():
392+
case <-ctx.Done():
393393
return
394394
case <-time.After(backoff):
395395
}
@@ -402,11 +402,11 @@ func (s *Syncer) HasReachedDAHead() bool {
402402
return s.daHeadReached.Load()
403403
}
404404

405-
func (s *Syncer) fetchDAUntilCaughtUp() error {
405+
func (s *Syncer) fetchDAUntilCaughtUp(ctx context.Context) error {
406406
for {
407407
select {
408-
case <-s.ctx.Done():
409-
return s.ctx.Err()
408+
case <-ctx.Done():
409+
return ctx.Err()
410410
default:
411411
}
412412

@@ -424,7 +424,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
424424
daHeight = max(s.daRetrieverHeight.Load(), s.cache.DaHeight())
425425
}
426426

427-
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
427+
events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight)
428428
if err != nil {
429429
switch {
430430
case errors.Is(err, datypes.ErrBlobNotFound):
@@ -446,7 +446,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
446446

447447
// Process DA events
448448
for _, event := range events {
449-
if err := s.pipeEvent(s.ctx, event); err != nil {
449+
if err := s.pipeEvent(ctx, event); err != nil {
450450
return err
451451
}
452452
}
@@ -467,7 +467,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
467467
}
468468
}
469469

470-
func (s *Syncer) pendingWorkerLoop() {
470+
func (s *Syncer) pendingWorkerLoop(ctx context.Context) {
471471
defer s.wg.Done()
472472

473473
s.logger.Info().Msg("starting pending worker")
@@ -478,10 +478,10 @@ func (s *Syncer) pendingWorkerLoop() {
478478

479479
for {
480480
select {
481-
case <-s.ctx.Done():
481+
case <-ctx.Done():
482482
return
483483
case <-ticker.C:
484-
s.processPendingEvents()
484+
s.processPendingEvents(ctx)
485485
}
486486
}
487487
}
@@ -503,7 +503,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) {
503503
currentHeight, err := s.store.Height(ctx)
504504
if err != nil {
505505
logger.Error().Err(err).Msg("failed to get current height for P2P worker")
506-
if !s.sleepOrDone(50 * time.Millisecond) {
506+
if !s.sleepOrDone(ctx, 50*time.Millisecond) {
507507
return
508508
}
509509
continue
@@ -525,13 +525,13 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) {
525525
logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height")
526526
}
527527

528-
if !s.sleepOrDone(50 * time.Millisecond) {
528+
if !s.sleepOrDone(ctx, 50*time.Millisecond) {
529529
return
530530
}
531531
continue
532532
}
533533

534-
if err := s.waitForStoreHeight(targetHeight); err != nil {
534+
if err := s.waitForStoreHeight(ctx, targetHeight); err != nil {
535535
if errors.Is(err, context.Canceled) {
536536
return
537537
}
@@ -1078,8 +1078,8 @@ func (s *Syncer) sendCriticalError(err error) {
10781078

10791079
// processPendingEvents fetches and processes pending events from cache
10801080
// optimistically fetches the next events from cache until no matching heights are found
1081-
func (s *Syncer) processPendingEvents() {
1082-
currentHeight, err := s.store.Height(s.ctx)
1081+
func (s *Syncer) processPendingEvents(ctx context.Context) {
1082+
currentHeight, err := s.store.Height(ctx)
10831083
if err != nil {
10841084
s.logger.Error().Err(err).Msg("failed to get current height for pending events")
10851085
return
@@ -1104,7 +1104,7 @@ func (s *Syncer) processPendingEvents() {
11041104
case s.heightInCh <- heightEvent:
11051105
// Event was successfully sent and already removed by GetNextPendingEvent
11061106
s.logger.Debug().Uint64("height", nextHeight).Msg("sent pending event to processing")
1107-
case <-s.ctx.Done():
1107+
case <-ctx.Done():
11081108
s.cache.SetPendingEvent(nextHeight, event)
11091109
return
11101110
default:
@@ -1116,9 +1116,9 @@ func (s *Syncer) processPendingEvents() {
11161116
}
11171117
}
11181118

1119-
func (s *Syncer) waitForStoreHeight(target uint64) error {
1119+
func (s *Syncer) waitForStoreHeight(ctx context.Context, target uint64) error {
11201120
for {
1121-
currentHeight, err := s.store.Height(s.ctx)
1121+
currentHeight, err := s.store.Height(ctx)
11221122
if err != nil {
11231123
return err
11241124
}
@@ -1127,20 +1127,20 @@ func (s *Syncer) waitForStoreHeight(target uint64) error {
11271127
return nil
11281128
}
11291129

1130-
if !s.sleepOrDone(10 * time.Millisecond) {
1131-
if s.ctx.Err() != nil {
1132-
return s.ctx.Err()
1130+
if !s.sleepOrDone(ctx, 10*time.Millisecond) {
1131+
if ctx.Err() != nil {
1132+
return ctx.Err()
11331133
}
11341134
}
11351135
}
11361136
}
11371137

1138-
func (s *Syncer) sleepOrDone(duration time.Duration) bool {
1138+
func (s *Syncer) sleepOrDone(ctx context.Context, duration time.Duration) bool {
11391139
timer := time.NewTimer(duration)
11401140
defer timer.Stop()
11411141

11421142
select {
1143-
case <-s.ctx.Done():
1143+
case <-ctx.Done():
11441144
return false
11451145
case <-timer.C:
11461146
return true

block/internal/syncing/syncer_backoff_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
220220
Return(nil, datypes.ErrBlobNotFound).Once()
221221

222222
// Start process loop to handle events
223-
go syncer.processLoop()
223+
go syncer.processLoop(ctx)
224224

225225
// Run workers
226226
syncer.startSyncWorkers(ctx)
@@ -293,7 +293,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) {
293293
}).
294294
Return(nil, datypes.ErrBlobNotFound).Once()
295295

296-
go syncer.processLoop()
296+
go syncer.processLoop(ctx)
297297
syncer.startSyncWorkers(ctx)
298298
<-ctx.Done()
299299
syncer.wg.Wait()

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ func BenchmarkSyncerIO(b *testing.B) {
4343
fixt := newBenchFixture(b, spec.heights, spec.shuffledTx, spec.daDelay, spec.execDelay, true)
4444

4545
// run both loops
46-
go fixt.s.processLoop()
47-
fixt.s.startSyncWorkers(b.Context())
46+
go fixt.s.processLoop(fixt.s.ctx)
47+
fixt.s.startSyncWorkers(fixt.s.ctx)
4848

4949
require.Eventually(b, func() bool {
5050
processedHeight, _ := fixt.s.store.Height(b.Context())

block/internal/syncing/syncer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func TestSyncer_processPendingEvents(t *testing.T) {
309309
cm.SetPendingEvent(1, evt1)
310310
cm.SetPendingEvent(2, evt2)
311311

312-
s.processPendingEvents()
312+
s.processPendingEvents(s.ctx)
313313

314314
// should have forwarded height 2 and removed both
315315
select {
@@ -416,8 +416,8 @@ func TestSyncLoopPersistState(t *testing.T) {
416416
}).
417417
Return(nil, datypes.ErrHeightFromFuture)
418418

419-
go syncerInst1.processLoop()
420-
syncerInst1.startSyncWorkers(t.Context())
419+
go syncerInst1.processLoop(ctx)
420+
syncerInst1.startSyncWorkers(ctx)
421421
syncerInst1.wg.Wait()
422422
requireEmptyChan(t, errorCh)
423423

@@ -480,7 +480,7 @@ func TestSyncLoopPersistState(t *testing.T) {
480480

481481
// when it starts, it should fetch from the last height it stopped at
482482
t.Log("sync workers on instance2 started")
483-
syncerInst2.startSyncWorkers(t.Context())
483+
syncerInst2.startSyncWorkers(ctx)
484484
syncerInst2.wg.Wait()
485485

486486
t.Log("sync workers exited")

node/failover.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,9 @@ func setupFailoverState(
180180
}
181181

182182
func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
183-
stopService := func(stoppable func(context.Context) error, name string) {
183+
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
184184
// parent context is cancelled already, so we need to create a new one
185-
shutdownCtx, done := context.WithTimeout(context.WithoutCancel(pCtx), 3*time.Second)
185+
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation
186186
defer done()
187187

188188
if err := stoppable(shutdownCtx); err != nil && !errors.Is(err, context.Canceled) {
@@ -233,8 +233,8 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
233233
}
234234

235235
wg.Go(func() error {
236-
defer func() {
237-
shutdownCtx, done := context.WithTimeout(context.WithoutCancel(ctx), 3*time.Second)
236+
defer func() { //nolint:contextcheck // shutdown uses context.Background intentionally
237+
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
238238
defer done()
239239
_ = f.rpcServer.Shutdown(shutdownCtx)
240240
}()

node/full.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (n *FullNode) Run(parentCtx context.Context) error {
307307
n.Logger.Info().Msg("halting full node and its sub services...")
308308

309309
// Use a timeout context to ensure shutdown doesn't hang
310-
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 9*time.Second)
310+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 9*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation
311311
defer cancel()
312312

313313
var shutdownMultiErr error // Variable to accumulate multiple errors
@@ -323,7 +323,7 @@ func (n *FullNode) Run(parentCtx context.Context) error {
323323

324324
// Shutdown Prometheus Server
325325
if n.prometheusSrv != nil {
326-
err := n.prometheusSrv.Shutdown(shutdownCtx)
326+
err := n.prometheusSrv.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background
327327
// http.ErrServerClosed is expected on graceful shutdown
328328
if err != nil && !errors.Is(err, http.ErrServerClosed) {
329329
shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down Prometheus server: %w", err))
@@ -334,7 +334,7 @@ func (n *FullNode) Run(parentCtx context.Context) error {
334334

335335
// Shutdown Pprof Server
336336
if n.pprofSrv != nil {
337-
err := n.pprofSrv.Shutdown(shutdownCtx)
337+
err := n.pprofSrv.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background
338338
if err != nil && !errors.Is(err, http.ErrServerClosed) {
339339
shutdownMultiErr = errors.Join(shutdownMultiErr, fmt.Errorf("shutting down pprof server: %w", err))
340340
} else {

node/light.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,13 @@ func (ln *LightNode) Run(parentCtx context.Context) error {
134134

135135
ln.Logger.Info().Msg("halting light node and its sub services...")
136136

137-
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(parentCtx), 2*time.Second)
137+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) //nolint:contextcheck // intentional: need fresh context for graceful shutdown after cancellation
138138
defer cancel()
139139

140140
var multiErr error
141141

142142
// Stop Header Sync Service
143-
err = ln.hSyncService.Stop(shutdownCtx)
143+
err = ln.hSyncService.Stop(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background
144144
if err != nil {
145145
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
146146
multiErr = errors.Join(multiErr, fmt.Errorf("stopping header sync service: %w", err))
@@ -151,7 +151,7 @@ func (ln *LightNode) Run(parentCtx context.Context) error {
151151

152152
// Shutdown RPC Server
153153
if ln.rpcServer != nil {
154-
err = ln.rpcServer.Shutdown(shutdownCtx)
154+
err = ln.rpcServer.Shutdown(shutdownCtx) //nolint:contextcheck // shutdownCtx is intentionally from context.Background
155155
if err != nil && !errors.Is(err, http.ErrServerClosed) {
156156
multiErr = errors.Join(multiErr, fmt.Errorf("shutting down RPC server: %w", err))
157157
} else {

0 commit comments

Comments
 (0)