Skip to content

Commit e83f033

Browse files
committed
feat(executor): add tracer/profiler options and validator
- add options.go with functional options pattern (WithProfiler, WithTracer) - add tracer.go: Chrome Trace Event format recorder for task execution - add validator.go: internal execution result validator against TaskFlow DAG - add graph.go walk() method for recursive node traversal - add node.go hasCondPredecessor() for condition branch detection - extend profiler.go span with dependents field for dependency tracking - refactor executor.go: profiler/tracer opt-in via options, remove WithPanicHandler - remove WithConcurrency option: concurrency must be passed as positional argument - add executor_tracer_test.go, tracer_test.go, validator_test.go - update README.md and llms.txt with new executor options documentation
1 parent ac58e4f commit e83f033

15 files changed

Lines changed: 1399 additions & 533 deletions

README.md

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func main() {
118118
}
119119
done.Succeed(sortTasks...)
120120

121-
executor := gtf.NewExecutor(1000)
121+
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
122122

123123
executor.Run(tf).Wait()
124124

@@ -156,6 +156,30 @@ ok github.com/noneback/go-taskflow/benchmark 5.606s
156156

157157
Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html).
158158

159+
## Executor Options
160+
161+
`NewExecutor` accepts functional options to configure behavior:
162+
163+
```go
164+
import "runtime"
165+
166+
executor := gtf.NewExecutor(0,
167+
gtf.WithConcurrency(uint(runtime.NumCPU()*4)), // override concurrency
168+
gtf.WithProfiler(), // enable flamegraph profiling
169+
gtf.WithTracer(), // enable Chrome Trace recording
170+
gtf.WithPanicHandler(func(task string, r interface{}) {
171+
log.Printf("task %s panicked: %v", task, r)
172+
}),
173+
)
174+
```
175+
176+
| Option | Description |
177+
|:---|:---|
178+
| `WithConcurrency(n uint)` | Set max goroutine concurrency (overrides positional arg). Enables `NewExecutor(0, WithConcurrency(n))` style. |
179+
| `WithProfiler()` | Enable flamegraph profiling. Required before calling `executor.Profile()`. |
180+
| `WithTracer()` | Enable Chrome Trace recording. Required before calling `executor.Trace()`. |
181+
| `WithPanicHandler(fn)` | Custom panic handler invoked on task panic. Replaces default log output. Graph is still canceled. |
182+
159183
## Error Handling in go-taskflow
160184

161185
In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.
@@ -173,6 +197,14 @@ tf.NewTask("not interrupt", func() {
173197
})
174198
```
175199

200+
Alternatively, use `WithPanicHandler` to centralize panic handling across all tasks:
201+
202+
```go
203+
executor := gtf.NewExecutor(1000, gtf.WithPanicHandler(func(task string, r interface{}) {
204+
log.Printf("task %s panicked: %v", task, r)
205+
}))
206+
```
207+
176208
## Visualizing Taskflows
177209

178210
To generate a visual representation of a taskflow, use the `Dump` method:
@@ -189,9 +221,12 @@ The `Dump` method generates raw strings in DOT format. Use the `dot` tool to cre
189221

190222
## Profiling Taskflows
191223

192-
To profile a taskflow, use the `Profile` method:
224+
To profile a taskflow, first enable the profiler with `WithProfiler()`, then call `Profile`:
193225

194226
```go
227+
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
228+
executor.Run(tf).Wait()
229+
195230
if err := executor.Profile(os.Stdout); err != nil {
196231
log.Fatal(err)
197232
}
@@ -201,6 +236,21 @@ The `Profile` method generates raw strings in flamegraph format. Use the `flameg
201236

202237
![flg](image/fl.svg)
203238

239+
## Tracing Taskflows
240+
241+
To capture Chrome Trace events, enable the tracer with `WithTracer()`, then call `Trace`:
242+
243+
```go
244+
executor := gtf.NewExecutor(1000, gtf.WithTracer())
245+
executor.Run(tf).Wait()
246+
247+
if err := executor.Trace(os.Stdout); err != nil {
248+
log.Fatal(err)
249+
}
250+
```
251+
252+
The output is in [Chrome Trace Event format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU). Open it in `chrome://tracing` or [Perfetto UI](https://ui.perfetto.dev/) for visualization.
253+
204254
## Stargazer
205255

206256
[![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date)

executor.go

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
type Executor interface {
1818
Wait() // Wait block until all tasks finished
1919
Profile(w io.Writer) error // Profile write flame graph raw text into w
20+
Trace(w io.Writer) error // Trace write Chrome Trace Event data into w
2021
Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
2122
}
2223

@@ -26,23 +27,27 @@ type innerExecutorImpl struct {
2627
wq *utils.Queue[*innerNode]
2728
wg *sync.WaitGroup
2829
profiler *profiler
30+
tracer *tracer
2931
mu *sync.Mutex
3032
}
3133

32-
// NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )
33-
func NewExecutor(concurrency uint) Executor {
34+
// NewExecutor returns an Executor with the specified concurrency and options.
35+
// concurrency must be > 0. Recommend concurrency > runtime.NumCPU and MUST > num(subflows).
36+
func NewExecutor(concurrency uint, opts ...Option) Executor {
3437
if concurrency == 0 {
3538
panic("executor concurrency cannot be zero")
3639
}
37-
t := newProfiler()
38-
return &innerExecutorImpl{
40+
e := &innerExecutorImpl{
3941
concurrency: concurrency,
40-
pool: utils.NewCopool(concurrency),
4142
wq: utils.NewQueue[*innerNode](false),
4243
wg: &sync.WaitGroup{},
43-
profiler: t,
4444
mu: &sync.Mutex{},
4545
}
46+
for _, opt := range opts {
47+
opt(e)
48+
}
49+
e.pool = utils.NewCopool(e.concurrency)
50+
return e
4651
}
4752

4853
// Run start to schedule and execute taskflow
@@ -97,22 +102,44 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) {
97102
e.schedule(candidate...)
98103
}
99104

105+
// record submits the span to active observers.
106+
// ok=true means the node completed without panic; profiler only records successful spans.
107+
func (e *innerExecutorImpl) record(s *span, ok bool) {
108+
if ok && e.profiler != nil {
109+
e.profiler.AddSpan(s)
110+
}
111+
if e.tracer != nil {
112+
e.tracer.AddEvent(s)
113+
}
114+
}
115+
116+
// getDependentNames extracts predecessor task names from a node.
117+
func getDependentNames(node *innerNode) []string {
118+
if len(node.dependents) == 0 {
119+
return nil
120+
}
121+
names := make([]string, len(node.dependents))
122+
for i, dep := range node.dependents {
123+
names[i] = dep.name
124+
}
125+
return names
126+
}
127+
100128
func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() {
101129
return func() {
102130
span := span{extra: attr{
103131
typ: nodeStatic,
104132
name: node.name,
105-
}, begin: time.Now(), parent: parentSpan}
133+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
106134

107135
defer func() {
108136
span.cost = time.Since(span.begin)
109-
if r := recover(); r != nil {
137+
r := recover()
138+
if r != nil {
110139
node.g.canceled.Store(true)
111-
log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name)
112-
log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
113-
} else {
114-
e.profiler.AddSpan(&span) // remove canceled node span
140+
log.Printf("[go-taskflow] graph %q canceled: static task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
115141
}
142+
e.record(&span, r == nil)
116143

117144
node.drop()
118145
e.sche_successors(node)
@@ -132,18 +159,17 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
132159
span := span{extra: attr{
133160
typ: nodeSubflow,
134161
name: node.name,
135-
}, begin: time.Now(), parent: parentSpan}
162+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
136163

137164
defer func() {
138165
span.cost = time.Since(span.begin)
139-
if r := recover(); r != nil {
140-
log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name)
141-
log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack())
166+
r := recover()
167+
if r != nil {
168+
log.Printf("[go-taskflow] graph %q canceled: subflow %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
142169
node.g.canceled.Store(true)
143170
p.g.canceled.Store(true)
144-
} else {
145-
e.profiler.AddSpan(&span) // remove canceled node span
146171
}
172+
e.record(&span, r == nil)
147173

148174
e.scheduleGraph(node.g, p.g, &span)
149175
node.drop()
@@ -168,17 +194,16 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
168194
span := span{extra: attr{
169195
typ: nodeCondition,
170196
name: node.name,
171-
}, begin: time.Now(), parent: parentSpan}
197+
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}
172198

173199
defer func() {
174200
span.cost = time.Since(span.begin)
175-
if r := recover(); r != nil {
201+
r := recover()
202+
if r != nil {
176203
node.g.canceled.Store(true)
177-
log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name)
178-
log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
179-
} else {
180-
e.profiler.AddSpan(&span) // remove canceled node span
204+
log.Printf("[go-taskflow] graph %q canceled: condition task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
181205
}
206+
e.record(&span, r == nil)
182207
node.drop()
183208
// e.sche_successors(node)
184209
node.g.deref()
@@ -222,11 +247,10 @@ func (e *innerExecutorImpl) pushIntoQueue(node *innerNode) {
222247
func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
223248
for _, node := range nodes {
224249
if node.g.canceled.Load() {
225-
// no need
250+
// graph already canceled, skip scheduling
226251
node.g.scheCond.L.Lock()
227252
node.g.scheCond.Signal()
228253
node.g.scheCond.L.Unlock()
229-
log.Printf("node %v is not scheduled, since graph %v is canceled\n", node.name, node.g.name)
230254
return
231255
}
232256
e.wg.Add(1)
@@ -248,7 +272,6 @@ func (e *innerExecutorImpl) scheduleGraph(parentg, g *eGraph, parentSpan *span)
248272
e.schedule(g.entries...)
249273
if !e.invokeGraph(g, parentSpan) && parentg != nil {
250274
parentg.canceled.Store(true)
251-
log.Printf("graph %s canceled, since subgraph %s is canceled\n", parentg.name, g.name)
252275
}
253276

254277
g.scheCond.Signal()
@@ -261,5 +284,16 @@ func (e *innerExecutorImpl) Wait() {
261284

262285
// Profile write flame graph raw text into w
263286
func (e *innerExecutorImpl) Profile(w io.Writer) error {
287+
if e.profiler == nil {
288+
return nil
289+
}
264290
return e.profiler.draw(w)
265291
}
292+
293+
// Trace write Chrome Trace Event data into w
294+
func (e *innerExecutorImpl) Trace(w io.Writer) error {
295+
if e.tracer == nil {
296+
return nil
297+
}
298+
return e.tracer.draw(w)
299+
}

executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
func TestExecutor(t *testing.T) {
14-
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
14+
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()), gotaskflow.WithProfiler())
1515
tf := gotaskflow.NewTaskFlow("G")
1616
A, B, C :=
1717
tf.NewTask("A", func() {

0 commit comments

Comments
 (0)