Skip to content

Commit 1986e09

Browse files
authored
fix(copool): deadlock may occur when goroutines end too quick under certain situation (#100)
1 parent 51d5d70 commit 1986e09

File tree

6 files changed

+54
-14
lines changed

6 files changed

+54
-14
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@ jobs:
2323
ref: ${{ github.event.pull_request.head.sha }}
2424
persist-credentials: false # otherwise, the token used is the GITHUB_TOKEN, instead of your personal access token.
2525
fetch-depth: 0 # otherwise, there would be errors pushing refs to the destination repository.
26-
26+
2727
- name: Setup go
2828
uses: actions/setup-go@v4
2929
with:
3030
go-version-file: 'go.mod'
3131

3232
- name: Run Test
3333
run: |
34-
go test -count=100 -timeout=1800s -v ./... -covermode=count -coverprofile=coverage.out
35-
34+
go test -count=150 -timeout=3600s -v ./... -covermode=count -coverprofile=coverage.out
3635
- name: Upload coverage to Codecov
3736
uses: codecov/codecov-action@v4
3837
with:

benchmark/benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
gotaskflow "github.com/noneback/go-taskflow"
88
)
99

10-
var executor = gotaskflow.NewExecutor(6400)
10+
var executor = gotaskflow.NewExecutor(1)
1111

1212
func BenchmarkC32(b *testing.B) {
1313
tf := gotaskflow.NewTaskFlow("G")

taskflow_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,3 +721,45 @@ func TestSequencialTaskingPanic(t *testing.T) {
721721
t.Fail()
722722
}
723723
}
724+
func TestDeadlock(t *testing.T) {
725+
// BUG: https://github.com/noneback/go-taskflow/issues/99
726+
tf := gotaskflow.NewTaskFlow("G1")
727+
exe := gotaskflow.NewExecutor(1)
728+
N := 100
729+
prev := tf.NewTask("N0", func() {})
730+
for i := 1; i < 32; i++ {
731+
next := tf.NewTask(fmt.Sprintf("N%d", i), func() {})
732+
prev.Precede(next)
733+
prev = next
734+
}
735+
736+
for i := 0; i < N; i++ {
737+
exe.Run(tf).Wait()
738+
}
739+
740+
tf = gotaskflow.NewTaskFlow("G2")
741+
742+
layersCount := 8
743+
layerNodesCount := 8
744+
745+
var curLayer, upperLayer []*gotaskflow.Task
746+
747+
for i := 0; i < layersCount; i++ {
748+
for j := 0; j < layerNodesCount; j++ {
749+
task := tf.NewTask(fmt.Sprintf("N%d", i*layersCount+j), func() {})
750+
751+
for i := range upperLayer {
752+
upperLayer[i].Precede(task)
753+
}
754+
755+
curLayer = append(curLayer, task)
756+
}
757+
758+
upperLayer = curLayer
759+
curLayer = []*gotaskflow.Task{}
760+
}
761+
762+
for i := 0; i < N; i++ {
763+
exe.Run(tf).Wait()
764+
}
765+
}

utils/copool.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type Copool struct {
2424
cap uint
2525
taskQ *Queue[*cotask]
2626
corun atomic.Int32
27-
coworker atomic.Int32
27+
coworker uint
2828
mu *sync.Mutex
2929
taskObjPool *ObjectPool[*cotask]
3030
}
@@ -36,7 +36,7 @@ func NewCopool(cap uint) *Copool {
3636
taskQ: NewQueue[*cotask](false),
3737
cap: cap,
3838
corun: atomic.Int32{},
39-
coworker: atomic.Int32{},
39+
coworker: 0,
4040
mu: &sync.Mutex{},
4141
taskObjPool: NewObjectPool(func() *cotask {
4242
return &cotask{}
@@ -74,16 +74,15 @@ func (cp *Copool) CtxGo(ctx *context.Context, f func()) {
7474
cp.mu.Lock()
7575
cp.taskQ.Put(task)
7676

77-
if cp.coworker.Load() == 0 || cp.taskQ.Len() != 0 && uint(cp.coworker.Load()) < uint(cp.cap) {
77+
if cp.coworker == 0 || cp.taskQ.Len() != 0 && cp.coworker < cp.cap {
78+
cp.coworker++
7879
cp.mu.Unlock()
79-
cp.coworker.Add(1)
8080

8181
go func() {
82-
defer cp.coworker.Add(-1)
83-
8482
for {
8583
cp.mu.Lock()
8684
if cp.taskQ.Len() == 0 {
85+
cp.coworker--
8786
cp.mu.Unlock()
8887
return
8988
}

visualizer_dot.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type DotEdge struct {
3232
attributes map[string]string
3333
}
3434

35-
func NewDotGraph(name string) *DotGraph {
35+
func newDotGraph(name string) *DotGraph {
3636
return &DotGraph{
3737
name: name,
3838
isSubgraph: false,
@@ -230,7 +230,7 @@ func (v *dotVizer) visualizeG(g *eGraph, parentGraph *DotGraph) error {
230230

231231
// Visualize generates raw dag text in dot format and writes to writer
232232
func (v *dotVizer) Visualize(tf *TaskFlow, writer io.Writer) error {
233-
graph := NewDotGraph(tf.graph.name)
233+
graph := newDotGraph(tf.graph.name)
234234
err := v.visualizeG(tf.graph, graph)
235235
if err != nil {
236236
return fmt.Errorf("visualize %v -> %w", tf.graph.name, err)

visualizer_dot_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func TestDotGraph_String(t *testing.T) {
10-
graph := NewDotGraph("test_graph")
10+
graph := newDotGraph("test_graph")
1111
graph.attributes["rankdir"] = "LR"
1212

1313
nodeA := graph.CreateNode("A")
@@ -38,7 +38,7 @@ func TestDotGraph_String(t *testing.T) {
3838
}
3939

4040
func TestDotGraph_SubGraph(t *testing.T) {
41-
graph := NewDotGraph("main_graph")
41+
graph := newDotGraph("main_graph")
4242

4343
nodeA := graph.CreateNode("A")
4444
nodeB := graph.CreateNode("B")

0 commit comments

Comments
 (0)