Skip to content

Commit 46ed210

Browse files
committed
Support multiple parallel functions to StoppableFunc
1 parent 52a2be9 commit 46ed210

2 files changed

Lines changed: 85 additions & 4 deletions

File tree

cc.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (p *Pool) Run(fn func() error) {
4747
}()
4848
}
4949

50+
// StoppableFunc is a func that receives a stop channel that when closed broadcasts the
51+
// need to wrap up and stop the function
52+
type StoppableFunc func(stop chan struct{})
53+
5054
// Stoppable is a function that can be stopped with the method Stop. You can also listen on the Stopped channel to see when it has been stopped.
5155
// Stoppable is different from a context cancelation because it waits until the function has cleaned up before broadcasting on the Stopped channel
5256
type Stoppable struct {
@@ -62,18 +66,34 @@ func (s *Stoppable) Stop() {
6266
})
6367
}
6468

65-
// Run creates a new stoppable function from the provided func. When you call the Stop method on the returned Stoppable the stop channel fed to the provided func is closed,
66-
// signaling the need to stop. When the provided func returns the Stopped channel on the
69+
// Run creates a new stoppable function from the provided funcs. When you call the Stop method on the returned Stoppable the stop channel fed to the provided funcs is closed,
70+
// signaling the need to stop. When all the provided func return the Stopped channel on the
6771
// returned Stoppable is closed as well, broadcasting the message that it has finished
68-
func Run(fn func(stop chan struct{})) (s *Stoppable) {
72+
func Run(fns ...StoppableFunc) (s *Stoppable) {
6973
s = &Stoppable{
7074
Stopped: make(chan struct{}),
7175
stop: make(chan struct{}),
7276
}
7377

78+
stoppedList := []chan struct{}{}
79+
80+
for _, fn := range fns {
81+
stopped := make(chan struct{})
82+
stoppedList = append(stoppedList, stopped)
83+
84+
go func(fn StoppableFunc, stopped chan struct{}) {
85+
fn(s.stop)
86+
s.Stop()
87+
close(stopped)
88+
}(fn, stopped)
89+
}
90+
7491
go func() {
75-
fn(s.stop)
92+
for _, stopped := range stoppedList {
93+
<-stopped
94+
}
7695
close(s.Stopped)
7796
}()
97+
7898
return s
7999
}

cc_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,67 @@ func ExampleStoppable() {
6666
// stopped finally
6767
}
6868

69+
func ExampleStoppable_multiple() {
70+
stoppable := cc.Run(func(stop chan struct{}) {
71+
i := 0
72+
L:
73+
for {
74+
select {
75+
case <-stop:
76+
fmt.Println("first received stop signal")
77+
break L
78+
default:
79+
i++
80+
time.Sleep(250 * time.Millisecond)
81+
fmt.Println("first: ", i)
82+
}
83+
}
84+
fmt.Println("first finished with", i)
85+
}, func(stop chan struct{}) {
86+
time.Sleep(50 * time.Millisecond)
87+
i := 0
88+
L:
89+
90+
for {
91+
select {
92+
case <-stop:
93+
fmt.Println("second received stop signal")
94+
break L
95+
default:
96+
i++
97+
time.Sleep(250 * time.Millisecond)
98+
fmt.Println("second: ", i)
99+
}
100+
}
101+
fmt.Println("second finished with", i)
102+
})
103+
104+
go func() {
105+
time.Sleep(1 * time.Second)
106+
fmt.Println("send stop signal")
107+
stoppable.Stop()
108+
stoppable.Stop() // It shouldn't explode even if you attempt to close it multiple times
109+
}()
110+
111+
<-stoppable.Stopped
112+
fmt.Println("stopped finally")
113+
// Output: first: 1
114+
// second: 1
115+
// first: 2
116+
// second: 2
117+
// first: 3
118+
// second: 3
119+
// send stop signal
120+
// first: 4
121+
// first received stop signal
122+
// first finished with 4
123+
// second: 4
124+
// second received stop signal
125+
// second finished with 4
126+
// stopped finally
127+
128+
}
129+
69130
func TestRace(t *testing.T) {
70131
p := cc.New(4)
71132

0 commit comments

Comments
 (0)