Skip to content

Commit 7f35484

Browse files
committed
Add Stoppable and Run to cc
1 parent 6baab38 commit 7f35484

2 files changed

Lines changed: 86 additions & 2 deletions

File tree

cc.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package cc
22

3-
import "sync"
4-
import "github.com/fluxio/multierror"
3+
import (
4+
"sync"
5+
6+
"github.com/fluxio/multierror"
7+
)
58

69
// Pool manages a pool of concurrent workers. It works a bit like a Waitgroup, but with error reporting and concurrency limits
710
// You create one with New, and run functions with Run. Then you wait on it like a regular WaitGroup.
@@ -56,3 +59,34 @@ func (p *Pool) Run(fn func() error) {
5659
p.wg.Done()
5760
}()
5861
}
62+
63+
// Stoppable is a function that can be stopped with the method Stop. You can also listen on the Stopped channel to see when it has been stopped.
64+
// Stoppable is different from a context cancelation because it waits until the function has cleaned up before broadcasting on the Stopped channel
65+
type Stoppable struct {
66+
Stopped chan struct{}
67+
stop chan struct{}
68+
once sync.Once
69+
}
70+
71+
// Stop signals the provided function that it needs to stop
72+
func (s *Stoppable) Stop() {
73+
s.once.Do(func() {
74+
close(s.stop)
75+
})
76+
}
77+
78+
// Run creates a new stoppable function from the provided func. When you call the Stop method on the returned Stoppable the stop channel fed to the provided func is closed,
79+
// signaling the need to stop. When the provided func returns the Stopped channel on the
80+
// returned Stoppable is closed as well, broadcasting the message that it has finished
81+
func Run(fn func(stop chan struct{})) (s *Stoppable) {
82+
s = &Stoppable{
83+
Stopped: make(chan struct{}),
84+
stop: make(chan struct{}),
85+
}
86+
87+
go func() {
88+
fn(s.stop)
89+
close(s.Stopped)
90+
}()
91+
return s
92+
}

cc_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,43 @@ func Example() {
2929
//: fail1
3030
}
3131

32+
func ExampleStoppable() {
33+
stoppable := cc.Run(func(stop chan struct{}) {
34+
i := 0
35+
L:
36+
for {
37+
select {
38+
case <-stop:
39+
fmt.Println("receive stop signal")
40+
break L
41+
default:
42+
i++
43+
time.Sleep(250 * time.Millisecond)
44+
fmt.Println(i)
45+
}
46+
}
47+
fmt.Println("finished with", i)
48+
})
49+
50+
go func() {
51+
time.Sleep(1 * time.Second)
52+
fmt.Println("send stop signal")
53+
stoppable.Stop()
54+
stoppable.Stop() // It shouldn't explode even if you attempt to close it multiple times
55+
}()
56+
57+
<-stoppable.Stopped
58+
fmt.Println("stopped finally")
59+
// Output: 1
60+
// 2
61+
// 3
62+
// send stop signal
63+
// 4
64+
// receive stop signal
65+
// finished with 4
66+
// stopped finally
67+
}
68+
3269
func TestRace(t *testing.T) {
3370
p := cc.New(4)
3471

@@ -40,3 +77,16 @@ func TestRace(t *testing.T) {
4077

4178
p.Wait()
4279
}
80+
81+
func Benchmark(b *testing.B) {
82+
for i := 0; i < b.N; i++ {
83+
p := cc.New(4)
84+
for i := 0; i < 1000; i++ {
85+
p.Run(func() error {
86+
time.Sleep(1 * time.Millisecond)
87+
return nil
88+
})
89+
}
90+
p.Wait()
91+
}
92+
}

0 commit comments

Comments
 (0)