Skip to content

Commit 10cc43e

Browse files
committed
feat(executor): add Options pattern, fix data races, update docs
- add WithConcurrency, WithProfiler, WithTracer, WithPanicHandler options - internalize validator (unexported types, package-level func) - fix data race in TestTaskflowNotInFlow (atomic.Int32) - fix data race in TestPoolSequentialExec (sync.WaitGroup) - update README and llms.txt with Options and Tracing sections
1 parent ac58e4f commit 10cc43e

14 files changed

Lines changed: 1474 additions & 538 deletions

README.md

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func main() {
118118
}
119119
done.Succeed(sortTasks...)
120120

121-
executor := gtf.NewExecutor(1000)
121+
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
122122

123123
executor.Run(tf).Wait()
124124

@@ -156,6 +156,30 @@ ok github.com/noneback/go-taskflow/benchmark 5.606s
156156

157157
Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html).
158158

159+
## Executor Options
160+
161+
`NewExecutor` accepts functional options to configure behavior:
162+
163+
```go
164+
import "runtime"
165+
166+
executor := gtf.NewExecutor(0,
167+
gtf.WithConcurrency(uint(runtime.NumCPU()*4)), // override concurrency
168+
gtf.WithProfiler(), // enable flamegraph profiling
169+
gtf.WithTracer(), // enable Chrome Trace recording
170+
gtf.WithPanicHandler(func(task string, r interface{}) {
171+
log.Printf("task %s panicked: %v", task, r)
172+
}),
173+
)
174+
```
175+
176+
| Option | Description |
177+
|:---|:---|
178+
| `WithConcurrency(n uint)` | Set max goroutine concurrency (overrides positional arg). Enables `NewExecutor(0, WithConcurrency(n))` style. |
179+
| `WithProfiler()` | Enable flamegraph profiling. Required before calling `executor.Profile()`. |
180+
| `WithTracer()` | Enable Chrome Trace recording. Required before calling `executor.Trace()`. |
181+
| `WithPanicHandler(fn)` | Custom panic handler invoked on task panic. Replaces default log output. Graph is still canceled. |
182+
159183
## Error Handling in go-taskflow
160184

161185
In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.
@@ -173,6 +197,14 @@ tf.NewTask("not interrupt", func() {
173197
})
174198
```
175199

200+
Alternatively, use `WithPanicHandler` to centralize panic handling across all tasks:
201+
202+
```go
203+
executor := gtf.NewExecutor(1000, gtf.WithPanicHandler(func(task string, r interface{}) {
204+
log.Printf("task %s panicked: %v", task, r)
205+
}))
206+
```
207+
176208
## Visualizing Taskflows
177209

178210
To generate a visual representation of a taskflow, use the `Dump` method:
@@ -189,9 +221,12 @@ The `Dump` method generates raw strings in DOT format. Use the `dot` tool to cre
189221

190222
## Profiling Taskflows
191223

192-
To profile a taskflow, use the `Profile` method:
224+
To profile a taskflow, first enable the profiler with `WithProfiler()`, then call `Profile`:
193225

194226
```go
227+
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
228+
executor.Run(tf).Wait()
229+
195230
if err := executor.Profile(os.Stdout); err != nil {
196231
log.Fatal(err)
197232
}
@@ -201,6 +236,21 @@ The `Profile` method generates raw strings in flamegraph format. Use the `flameg
201236

202237
![flg](image/fl.svg)
203238

239+
## Tracing Taskflows
240+
241+
To capture Chrome Trace events, enable the tracer with `WithTracer()`, then call `Trace`:
242+
243+
```go
244+
executor := gtf.NewExecutor(1000, gtf.WithTracer())
245+
executor.Run(tf).Wait()
246+
247+
if err := executor.Trace(os.Stdout); err != nil {
248+
log.Fatal(err)
249+
}
250+
```
251+
252+
The output is in [Chrome Trace Event format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU). Open it in `chrome://tracing` or [Perfetto UI](https://ui.perfetto.dev/) for visualization.
253+
204254
## Stargazer
205255

206256
[![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date)

executor.go

Lines changed: 122 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,76 @@ import (
1717
type Executor interface {
1818
Wait() // Wait block until all tasks finished
1919
Profile(w io.Writer) error // Profile write flame graph raw text into w
20+
Trace(w io.Writer) error // Trace write Chrome Trace Event data into w
2021
Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
2122
}
2223

2324
type innerExecutorImpl struct {
24-
concurrency uint
25-
pool *utils.Copool
26-
wq *utils.Queue[*innerNode]
27-
wg *sync.WaitGroup
28-
profiler *profiler
29-
mu *sync.Mutex
25+
concurrency uint
26+
pool *utils.Copool
27+
wq *utils.Queue[*innerNode]
28+
wg *sync.WaitGroup
29+
profiler *profiler
30+
tracer *tracer
31+
panicHandler func(task string, r interface{})
32+
mu *sync.Mutex
3033
}
3134

32-
// NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )
33-
func NewExecutor(concurrency uint) Executor {
34-
if concurrency == 0 {
35-
panic("executor concurrency cannot be zero")
35+
// Option configures executor behavior.
36+
type Option func(*innerExecutorImpl)
37+
38+
// WithProfiler enables flame graph profiling for task execution analysis.
39+
func WithProfiler() Option {
40+
return func(e *innerExecutorImpl) {
41+
e.profiler = newProfiler()
42+
}
43+
}
44+
45+
// WithTracer enables Chrome Trace Event recording for task execution analysis.
46+
// The trace output can be visualized in chrome://tracing or Perfetto UI.
47+
func WithTracer() Option {
48+
return func(e *innerExecutorImpl) {
49+
e.tracer = newTracer()
50+
}
51+
}
52+
53+
// WithConcurrency sets the max goroutine concurrency, overriding the positional argument.
54+
// This enables NewExecutor(0, WithConcurrency(runtime.NumCPU())) usage.
55+
// Recommend a value bigger than runtime.NumCPU and MUST bigger than num(subflows).
56+
func WithConcurrency(n uint) Option {
57+
return func(e *innerExecutorImpl) {
58+
e.concurrency = n
3659
}
37-
t := newProfiler()
38-
return &innerExecutorImpl{
60+
}
61+
62+
// WithPanicHandler sets a custom handler invoked when a task panics.
63+
// The handler receives the task name and the recovered panic value.
64+
// The graph is still canceled after a panic regardless of the handler.
65+
// Without this option, panics are logged via log.Printf (default behavior).
66+
func WithPanicHandler(fn func(task string, r interface{})) Option {
67+
return func(e *innerExecutorImpl) {
68+
e.panicHandler = fn
69+
}
70+
}
71+
72+
// NewExecutor returns an Executor with the specified concurrency and options.
73+
// concurrency can be 0 when WithConcurrency option is provided.
74+
// Recommend concurrency > runtime.NumCPU and MUST > num(subflows).
75+
func NewExecutor(concurrency uint, opts ...Option) Executor {
76+
e := &innerExecutorImpl{
3977
concurrency: concurrency,
40-
pool: utils.NewCopool(concurrency),
4178
wq: utils.NewQueue[*innerNode](false),
4279
wg: &sync.WaitGroup{},
43-
profiler: t,
4480
mu: &sync.Mutex{},
4581
}
82+
for _, opt := range opts {
83+
opt(e)
84+
}
85+
if e.concurrency == 0 {
86+
panic("executor concurrency cannot be zero")
87+
}
88+
e.pool = utils.NewCopool(e.concurrency)
89+
return e
4690
}
4791

4892
// Run start to schedule and execute taskflow
@@ -97,22 +141,49 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) {
97141
e.schedule(candidate...)
98142
}
99143

144+
// record submits the span to active observers.
145+
// ok=true means the node completed without panic; profiler only records successful spans.
146+
func (e *innerExecutorImpl) record(s *span, ok bool) {
147+
if ok && e.profiler != nil {
148+
e.profiler.AddSpan(s)
149+
}
150+
if e.tracer != nil {
151+
e.tracer.AddEvent(s)
152+
}
153+
}
154+
155+
// getDependentNames extracts predecessor task names from a node.
156+
func getDependentNames(node *innerNode) []string {
157+
if len(node.dependents) == 0 {
158+
return nil
159+
}
160+
names := make([]string, len(node.dependents))
161+
for i, dep := range node.dependents {
162+
names[i] = dep.name
163+
}
164+
return names
165+
}
166+
100167
func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() {
101168
return func() {
102169
span := span{extra: attr{
103170
typ: nodeStatic,
104171
name: node.name,
105-
}, begin: time.Now(), parent: parentSpan}
172+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
106173

107174
defer func() {
108175
span.cost = time.Since(span.begin)
109-
if r := recover(); r != nil {
176+
r := recover()
177+
if r != nil {
110178
node.g.canceled.Store(true)
111-
log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name)
112-
log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
113-
} else {
114-
e.profiler.AddSpan(&span) // remove canceled node span
179+
if e.panicHandler != nil {
180+
e.panicHandler(node.name, r)
181+
} else {
182+
log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name)
183+
log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
184+
}
115185
}
186+
e.record(&span, r == nil)
116187

117188
node.drop()
118189
e.sche_successors(node)
@@ -132,18 +203,22 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
132203
span := span{extra: attr{
133204
typ: nodeSubflow,
134205
name: node.name,
135-
}, begin: time.Now(), parent: parentSpan}
206+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
136207

137208
defer func() {
138209
span.cost = time.Since(span.begin)
139-
if r := recover(); r != nil {
140-
log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name)
141-
log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack())
210+
r := recover()
211+
if r != nil {
212+
if e.panicHandler != nil {
213+
e.panicHandler(node.name, r)
214+
} else {
215+
log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name)
216+
log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack())
217+
}
142218
node.g.canceled.Store(true)
143219
p.g.canceled.Store(true)
144-
} else {
145-
e.profiler.AddSpan(&span) // remove canceled node span
146220
}
221+
e.record(&span, r == nil)
147222

148223
e.scheduleGraph(node.g, p.g, &span)
149224
node.drop()
@@ -168,17 +243,21 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
168243
span := span{extra: attr{
169244
typ: nodeCondition,
170245
name: node.name,
171-
}, begin: time.Now(), parent: parentSpan}
246+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
172247

173248
defer func() {
174249
span.cost = time.Since(span.begin)
175-
if r := recover(); r != nil {
250+
r := recover()
251+
if r != nil {
176252
node.g.canceled.Store(true)
177-
log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name)
178-
log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
179-
} else {
180-
e.profiler.AddSpan(&span) // remove canceled node span
253+
if e.panicHandler != nil {
254+
e.panicHandler(node.name, r)
255+
} else {
256+
log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name)
257+
log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
258+
}
181259
}
260+
e.record(&span, r == nil)
182261
node.drop()
183262
// e.sche_successors(node)
184263
node.g.deref()
@@ -261,5 +340,16 @@ func (e *innerExecutorImpl) Wait() {
261340

262341
// Profile write flame graph raw text into w
263342
func (e *innerExecutorImpl) Profile(w io.Writer) error {
343+
if e.profiler == nil {
344+
return nil
345+
}
264346
return e.profiler.draw(w)
265347
}
348+
349+
// Trace write Chrome Trace Event data into w
350+
func (e *innerExecutorImpl) Trace(w io.Writer) error {
351+
if e.tracer == nil {
352+
return nil
353+
}
354+
return e.tracer.draw(w)
355+
}

executor_test.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@ import (
44
"fmt"
55
"os"
66
"runtime"
7+
"sync/atomic"
78
"testing"
89
"time"
910

1011
gotaskflow "github.com/noneback/go-taskflow"
12+
"github.com/noneback/go-taskflow/utils"
1113
)
1214

1315
func TestExecutor(t *testing.T) {
14-
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
16+
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()), gotaskflow.WithProfiler())
1517
tf := gotaskflow.NewTaskFlow("G")
1618
A, B, C :=
1719
tf.NewTask("A", func() {
@@ -91,3 +93,51 @@ func TestPanicInSubflow(t *testing.T) {
9193
make_install.Precede(relink)
9294
executor.Run(tf).Wait()
9395
}
96+
97+
// TestWithConcurrencyOption verifies that NewExecutor(0, WithConcurrency(n)) works correctly.
98+
func TestWithConcurrencyOption(t *testing.T) {
99+
executor := gotaskflow.NewExecutor(0, gotaskflow.WithConcurrency(uint(runtime.NumCPU())))
100+
tf := gotaskflow.NewTaskFlow("G")
101+
102+
var count atomic.Int32
103+
A := tf.NewTask("A", func() { count.Add(1) })
104+
B := tf.NewTask("B", func() { count.Add(1) })
105+
C := tf.NewTask("C", func() { count.Add(1) })
106+
A.Precede(B)
107+
C.Precede(B)
108+
109+
executor.Run(tf).Wait()
110+
111+
if count.Load() != 3 {
112+
t.Errorf("expected count=3, got %d", count.Load())
113+
}
114+
}
115+
116+
// TestWithPanicHandlerOption verifies that a custom panic handler is invoked on task panic.
117+
func TestWithPanicHandlerOption(t *testing.T) {
118+
var panickedTask string
119+
var panicVal interface{}
120+
121+
executor := gotaskflow.NewExecutor(10, gotaskflow.WithPanicHandler(func(task string, r interface{}) {
122+
panickedTask = task
123+
panicVal = r
124+
}))
125+
tf := gotaskflow.NewTaskFlow("G")
126+
tf.NewTask("boom", func() { panic("test panic") })
127+
128+
executor.Run(tf).Wait()
129+
130+
if panickedTask != "boom" {
131+
t.Errorf("expected panickedTask=%q, got %q", "boom", panickedTask)
132+
}
133+
if panicVal != "test panic" {
134+
t.Errorf("expected panicVal=%q, got %v", "test panic", panicVal)
135+
}
136+
}
137+
138+
// TestWithConcurrencyPanic verifies that NewExecutor(0) without WithConcurrency still panics.
139+
func TestWithConcurrencyPanic(t *testing.T) {
140+
utils.AssertPanics(t, "concurrency zero", func() {
141+
gotaskflow.NewExecutor(0)
142+
})
143+
}

0 commit comments

Comments
 (0)