Skip to content

Commit 8fffa99

Browse files
Reasnoguxi.reasnoGGXXLL
authored
refactor(pool): introduce pool.Manager (#251)
* refactor(pool): introduce pool.Manager Change-Id: I72f76d24fe267ef9e2bfc8d3df376d22d56df205 * refactor(pool): introduce pool.Manager Change-Id: I72f76d24fe267ef9e2bfc8d3df376d22d56df205 * pool manager set maxDuration default value 10 Minutes (#252) * fix: comment about provide * fix: manager's maxDuration default 10 Minutes Co-authored-by: guxi.reasno <guxi.reasno@bytedance.com> Co-authored-by: rock G <35254251+GGXXLL@users.noreply.github.com>
1 parent e7c857f commit 8fffa99

13 files changed

Lines changed: 297 additions & 193 deletions

CHANGELOG.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616
* **cron:** add MockStartTimeFunc helper ([#243](https://github.com/DoNewsCode/core/issues/243)) (@[谷溪](guxi99@gmail.com))
1717

1818

19-
<a name="v0.13.1-beta1"></a>
20-
## [v0.13.1-beta1](https://github.com/DoNewsCode/core/compare/v0.13.0-beta2...v0.13.1-beta1) (2022-07-19)
21-
22-
2319
<a name="v0.13.0-beta2"></a>
2420
## [v0.13.0-beta2](https://github.com/DoNewsCode/core/compare/v0.13.0-beta1...v0.13.0-beta2) (2022-07-19)
2521

control/pool/dependency.go

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,7 @@ import (
44
"github.com/DoNewsCode/core/di"
55
)
66

7-
// Providers provide a *pool.Pool to the core.
8-
func Providers(options ...ProviderOptionFunc) di.Deps {
9-
return di.Deps{func() *Pool {
10-
return NewPool(options...)
11-
}}
12-
}
13-
14-
// ProviderOptionFunc is the functional option to Providers.
15-
type ProviderOptionFunc func(pool *Pool)
16-
17-
// WithConcurrency sets the maximum concurrency for the pool.
18-
func WithConcurrency(concurrency int) ProviderOptionFunc {
19-
return func(pool *Pool) {
20-
pool.concurrency = concurrency
21-
}
22-
}
23-
24-
// WithCounter sets the counter for the pool.
25-
func WithCounter(counter *Counter) ProviderOptionFunc {
26-
return func(pool *Pool) {
27-
pool.counter = counter
28-
}
7+
// Providers provide a *Manager to the core.
8+
func Providers() di.Deps {
9+
return di.Deps{NewManager}
2910
}

control/pool/example_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ func Example() {
1818
core.WithInline("http.addr", ":9777"),
1919
core.WithInline("log.level", "none"),
2020
)
21-
c.Provide(pool.Providers(pool.WithConcurrency(1)))
21+
c.Provide(pool.Providers())
2222

23-
c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) {
23+
c.Invoke(func(m *pool.Manager, dispatcher lifecycle.HTTPServerStart) {
24+
p := pool.NewPool(m, 10)
2425
dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error {
2526
go func() {
2627
if _, err := http.Get("http://localhost:9777/"); err != nil {

control/pool/manager.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/DoNewsCode/core/ctxmeta"
9+
)
10+
11+
// Manager manages a pool of workers.
12+
type Manager struct {
13+
workers chan *Worker
14+
maxDuration time.Duration
15+
startWorkerCh chan *Worker
16+
managerStoppedCh chan struct{}
17+
}
18+
19+
// NewManager returns a new manager.
20+
func NewManager() *Manager {
21+
return &Manager{
22+
workers: make(chan *Worker, 1000),
23+
startWorkerCh: make(chan *Worker),
24+
managerStoppedCh: make(chan struct{}),
25+
maxDuration: 10 * time.Minute,
26+
}
27+
}
28+
29+
// Get returns a worker from the free list. If the free list is empty, create a new one.
30+
func (m *Manager) Get() *Worker {
31+
var w *Worker
32+
select {
33+
case w = <-m.workers:
34+
default:
35+
w = NewWorker()
36+
select {
37+
case m.startWorkerCh <- w:
38+
case <-m.managerStoppedCh:
39+
w.Stop()
40+
}
41+
}
42+
return w
43+
}
44+
45+
// Release put the worker back into the free list. If the free list is full,
46+
// discard the worker. If the worker has surpassed the max duration, discard and
47+
// managerStoppedCh the worker.
48+
func (m *Manager) Release(w *Worker) {
49+
if time.Since(w.startTime) > m.maxDuration {
50+
w.Stop()
51+
return
52+
}
53+
54+
select {
55+
case m.workers <- w:
56+
default:
57+
}
58+
}
59+
60+
// Go runs function with no concurrency limit.
61+
func (m *Manager) Go(ctx context.Context, f func(context.Context)) {
62+
w := m.Get()
63+
ctx = ctxmeta.WithoutCancel(ctx)
64+
fn := func() {
65+
f(ctx)
66+
m.Release(w)
67+
}
68+
select {
69+
case w.jobCh <- fn:
70+
case <-w.stopCh: // only executed if manager.Run is cancelled
71+
fn()
72+
}
73+
}
74+
75+
// Run starts the manager. It should be called during the initialization of the program.
76+
func (m *Manager) Run(ctx context.Context) error {
77+
var wg sync.WaitGroup
78+
for {
79+
select {
80+
case w := <-m.startWorkerCh:
81+
wg.Add(1)
82+
go func(w *Worker) {
83+
w.Run(ctx)
84+
wg.Done()
85+
}(w)
86+
case <-ctx.Done():
87+
close(m.managerStoppedCh)
88+
wg.Wait()
89+
return nil
90+
}
91+
}
92+
}
93+
94+
// Module implements the di.Modular interface.
95+
func (m *Manager) Module() interface{} {
96+
return m
97+
}

control/pool/manager_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestManager_Go(t *testing.T) {
10+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
11+
defer cancel()
12+
13+
m := NewManager()
14+
go m.Run(ctx)
15+
16+
var executed = make(chan struct{})
17+
m.Go(ctx, func(ctx context.Context) {
18+
close(executed)
19+
})
20+
<-executed
21+
}

control/pool/metrics.go

Lines changed: 0 additions & 48 deletions
This file was deleted.

control/pool/pool.go

Lines changed: 17 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -44,52 +44,23 @@ package pool
4444

4545
import (
4646
"context"
47-
"sync"
48-
49-
"github.com/DoNewsCode/core/ctxmeta"
50-
51-
"github.com/oklog/run"
5247
)
5348

54-
// NewPool returned func(contract.Dispatcher) *Pool
55-
func NewPool(options ...ProviderOptionFunc) *Pool {
49+
// NewPool returns *Pool
50+
func NewPool(manager *Manager, cap int) *Pool {
5651
pool := Pool{
57-
ch: make(chan job),
58-
concurrency: 10,
59-
}
60-
for _, f := range options {
61-
f(&pool)
52+
manager: manager,
53+
concurrency: make(chan struct{}, cap),
6254
}
6355
return &pool
6456
}
6557

66-
type job struct {
67-
fn func()
68-
}
69-
7058
// Pool is an async worker pool. It can be used to dispatch the async jobs from
7159
// web servers. See the package documentation about its advantage over creating a
7260
// goroutine directly.
7361
type Pool struct {
74-
ch chan job
75-
concurrency int
76-
counter *Counter
77-
}
78-
79-
// ProvideRunGroup implements core.RunProvider
80-
func (p *Pool) ProvideRunGroup(group *run.Group) {
81-
ctx, cancel := context.WithCancel(context.Background())
82-
83-
group.Add(func() error {
84-
return p.Run(ctx)
85-
}, func(err error) {
86-
cancel()
87-
})
88-
}
89-
90-
// Module implements di.Modular
91-
func (p *Pool) Module() interface{} {
92-
return p
62+
manager *Manager
63+
concurrency chan struct{}
9364
}
9465

9566
// Go dispatchers a job to the async worker pool. requestContext is the context
@@ -98,37 +69,18 @@ func (p *Pool) Module() interface{} {
9869
// nothing to do with the request. If the pool has reached max concurrency, the job will
9970
// be executed in the current goroutine. In other word, the job will be executed synchronously.
10071
func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context)) {
101-
j := job{
102-
fn: func() {
103-
function(ctxmeta.WithoutCancel(requestContext))
104-
},
105-
}
106-
select {
107-
case p.ch <- j:
108-
default:
109-
p.counter.IncSyncJob()
110-
j.fn()
111-
}
72+
p.concurrency <- struct{}{}
73+
p.manager.Go(requestContext, func(ctx context.Context) {
74+
defer func() {
75+
<-p.concurrency
76+
}()
77+
function(ctx)
78+
})
11279
}
11380

114-
// Run starts the async worker pool and block until it finishes.
115-
func (p *Pool) Run(ctx context.Context) error {
116-
var wg sync.WaitGroup
117-
for i := 0; i < p.concurrency; i++ {
118-
wg.Add(1)
119-
go func() {
120-
defer wg.Done()
121-
for {
122-
select {
123-
case j := <-p.ch:
124-
p.counter.IncAsyncJob()
125-
j.fn()
126-
case <-ctx.Done():
127-
return
128-
}
129-
}
130-
}()
81+
// Wait waits for all the async jobs to finish.
82+
func (p *Pool) Wait() {
83+
for i := 0; i < cap(p.concurrency); i++ {
84+
p.concurrency <- struct{}{}
13185
}
132-
wg.Wait()
133-
return nil
13486
}

0 commit comments

Comments
 (0)