Skip to content

Commit 2c6d081

Browse files
authored
Sevey/shutdown management (#1434)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> Context #1412 Closes #1412 Tested by running the test suite in a loop until the runner timeout with no panics for writing to closed files: https://github.com/rollkit/rollkit/actions/runs/7426536284/job/20210452949?pr=1434 ## Checklist <!-- Please complete the checklist to ensure that the PR is ready to be reviewed. IMPORTANT: PRs should be left in Draft until the below checklist is completed. --> - [x] New and updated code has appropriate documentation - [ ] New and updated code has new and/or updated testing - [ ] Required CI checks are passing - [ ] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Implemented a new thread management system to enhance app performance and stability. - **Refactor** - Improved internal thread handling mechanisms for better resource management. - **Tests** - Added tests for the new thread management features to ensure reliability. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent a2cbfc2 commit 2c6d081

3 files changed

Lines changed: 72 additions & 9 deletions

File tree

node/full.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/rollkit/rollkit/state/txindex"
3535
"github.com/rollkit/rollkit/state/txindex/kv"
3636
"github.com/rollkit/rollkit/store"
37+
"github.com/rollkit/rollkit/types"
3738
)
3839

3940
// prefixes used in KV store to separate main node data from DALC data
@@ -82,8 +83,9 @@ type FullNode struct {
8283

8384
// keep context here only because of API compatibility
8485
// - it's used in `OnStart` (defined in service.Service interface)
85-
ctx context.Context
86-
cancel context.CancelFunc
86+
ctx context.Context
87+
cancel context.CancelFunc
88+
threadManager *types.ThreadManager
8789
}
8890

8991
// newFullNode creates a new Rollkit full node.
@@ -175,6 +177,7 @@ func newFullNode(
175177
bSyncService: blockSyncService,
176178
ctx: ctx,
177179
cancel: cancel,
180+
threadManager: types.NewThreadManager(),
178181
}
179182

180183
node.BaseService = *service.NewBaseService(logger, "Node", node)
@@ -339,15 +342,15 @@ func (n *FullNode) OnStart() error {
339342

340343
if n.nodeConfig.Aggregator {
341344
n.Logger.Info("working in aggregator mode", "block time", n.nodeConfig.BlockTime)
342-
go n.blockManager.AggregationLoop(n.ctx, n.nodeConfig.LazyAggregator)
343-
go n.blockManager.BlockSubmissionLoop(n.ctx)
344-
go n.headerPublishLoop(n.ctx)
345-
go n.blockPublishLoop(n.ctx)
345+
n.threadManager.Go(func() { n.blockManager.AggregationLoop(n.ctx, n.nodeConfig.LazyAggregator) })
346+
n.threadManager.Go(func() { n.blockManager.BlockSubmissionLoop(n.ctx) })
347+
n.threadManager.Go(func() { n.headerPublishLoop(n.ctx) })
348+
n.threadManager.Go(func() { n.blockPublishLoop(n.ctx) })
346349
return nil
347350
}
348-
go n.blockManager.RetrieveLoop(n.ctx)
349-
go n.blockManager.BlockStoreRetrieveLoop(n.ctx)
350-
go n.blockManager.SyncLoop(n.ctx, n.cancel)
351+
n.threadManager.Go(func() { n.blockManager.RetrieveLoop(n.ctx) })
352+
n.threadManager.Go(func() { n.blockManager.BlockStoreRetrieveLoop(n.ctx) })
353+
n.threadManager.Go(func() { n.blockManager.SyncLoop(n.ctx, n.cancel) })
351354
return nil
352355
}
353356

@@ -369,6 +372,8 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) {
369372
func (n *FullNode) OnStop() {
370373
n.Logger.Info("halting full node...")
371374
n.cancel()
375+
n.threadManager.Wait()
376+
n.Logger.Info("shutting down full node sub services...")
372377
err := n.p2pClient.Close()
373378
err = multierr.Append(err, n.hSyncService.Stop())
374379
err = multierr.Append(err, n.bSyncService.Stop())

types/threadmanager.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package types
2+
3+
import "sync"
4+
5+
// ThreadManager is a simple wrapper around sync.WaitGroup to make it easier to
6+
// manage threads
7+
type ThreadManager struct {
8+
wg sync.WaitGroup
9+
}
10+
11+
// NewThreadManager creates a new ThreadManager
12+
func NewThreadManager() *ThreadManager {
13+
return &ThreadManager{}
14+
}
15+
16+
// Wait blocks until all goroutines have called Done on the WaitGroup
17+
func (tm *ThreadManager) Wait() {
18+
tm.wg.Wait()
19+
}
20+
21+
// Go launches a goroutine and adds it to the WaitGroup
22+
func (tm *ThreadManager) Go(f func()) {
23+
tm.wg.Add(1)
24+
go func() {
25+
defer tm.wg.Done()
26+
f()
27+
}()
28+
}

types/threadmanager_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package types
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestThreadManager(t *testing.T) {
9+
tm := NewThreadManager()
10+
start := time.Now()
11+
12+
// Launch a goroutine that waits for 10ms
13+
tm.Go(func() {
14+
time.Sleep(10 * time.Millisecond)
15+
})
16+
// Launch a goroutine that waits for 100ms
17+
tm.Go(func() {
18+
time.Sleep(100 * time.Millisecond)
19+
})
20+
21+
// This will block until the goroutines have called Done on the
22+
// WaitGroup
23+
//
24+
// This also implicitly verifies that we are properly calling Done(),
25+
// otherwise this test would time out the test suite
26+
tm.Wait()
27+
if time.Since(start) < 100*time.Millisecond {
28+
t.Errorf("WaitGroup returned before all goroutines are done")
29+
}
30+
}

0 commit comments

Comments
 (0)