Skip to content

Commit cbcb4b0

Browse files
committed
#### Version 0.9.9
* feature: TaskContext增加TimeoutContext与TimeoutCancel属性,用于超时控制,应用可根据需要从TaskContext获取 * feature: TaskInfo增加SetTimeout,用于设置超时时间,单位为秒 * refactor: 移除TaskInfo.Context() * 2020-04-25 16:00 at ShangHai
1 parent 29b4882 commit cbcb4b0

10 files changed

Lines changed: 263 additions & 157 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* 支持Exception、OnBegin、OnEnd注入点
1414
* 支持单独执行TaskHandler
1515
* 支持代码级重设Task的相关设置
16+
* 支持超时控制
1617
* 内建Task运行计数信息,包含执行与异常计数
1718
* 内建针对Task与Counter的OutputHttpHandler,可方便与WebServer自动集成
1819

context.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package task
2+
3+
import "context"
4+
5+
//Task上下文信息
6+
type TaskContext struct {
7+
TaskID string
8+
TaskData interface{} //用于当前Task全局设置的数据项
9+
Message interface{} //用于每次Task执行上下文消息传输
10+
IsEnd bool //如果设置该属性为true,则停止当次任务的后续执行,一般用在OnBegin中
11+
Error error
12+
Header map[string]interface{}
13+
TimeoutContext context.Context
14+
TimeoutCancel context.CancelFunc
15+
doneChan chan struct{}
16+
}
17+
18+
func (c TaskContext) reset() {
19+
c.TaskID = ""
20+
c.TaskData = nil
21+
c.Message = nil
22+
c.IsEnd = false
23+
c.Error = nil
24+
c.Header = nil
25+
c.TimeoutContext = nil
26+
c.TimeoutCancel = nil
27+
c.doneChan = nil
28+
}

crontask.go

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (task *CronTask) GetConfig() *TaskConfig {
3131
DueTime: task.DueTime,
3232
Interval: 0,
3333
Express: task.RawExpress,
34-
TaskData: task.Context().TaskData,
34+
TaskData: task.TaskData,
3535
}
3636
}
3737

@@ -58,7 +58,7 @@ func (task *CronTask) Reset(conf *TaskConfig) error {
5858
task.Stop()
5959
task.IsRun = conf.IsRun
6060
if conf.TaskData != nil {
61-
task.Context().TaskData = conf.TaskData
61+
task.TaskData = conf.TaskData
6262
}
6363
if conf.Handler != nil {
6464
task.handler = conf.Handler
@@ -104,23 +104,20 @@ func (task *CronTask) Start() {
104104
// no recover panic
105105
// support for #6 新增RunOnce方法建议
106106
func (task *CronTask) RunOnce() error {
107-
err := task.handler(task.context)
107+
err := task.handler(task.getTaskContext())
108108
return err
109109
}
110110

111111
// NewCronTask create new cron task
112112
func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle, taskData interface{}) (Task, error) {
113-
context := new(TaskContext)
114-
context.TaskID = taskID
115-
context.TaskData = taskData
116-
117113
task := new(CronTask)
118114
task.initCounters()
119-
task.taskID = context.TaskID
115+
task.taskID = taskID
120116
task.TaskType = TaskType_Cron
121117
task.IsRun = isRun
122118
task.handler = handler
123119
task.RawExpress = express
120+
task.TaskData = taskData
124121
expressList := strings.Split(express, " ")
125122
if len(expressList) != 6 {
126123
return nil, errors.New("express is wrong format => not 6 parts")
@@ -133,7 +130,6 @@ func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle,
133130
task.time_Second = parseExpress(expressList[0], ExpressType_Second)
134131

135132
task.State = TaskState_Init
136-
task.context = context
137133
return task, nil
138134
}
139135

@@ -155,48 +151,71 @@ func startCronTask(task *CronTask) {
155151
}
156152

157153
func doCronTask(task *CronTask) {
154+
taskCtx := task.getTaskContext()
158155
defer func() {
159-
task.Context().Header = nil
160-
task.Context().Error = nil
156+
if taskCtx.TimeoutCancel != nil {
157+
taskCtx.TimeoutCancel()
158+
}
159+
task.putTaskContext(taskCtx)
161160
if err := recover(); err != nil {
162161
task.CounterInfo().ErrorCounter.Inc(1)
163-
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID, " cron handler recover error => ", err))
162+
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " cron handler recover error => ", err))
164163
if task.taskService.ExceptionHandler != nil {
165-
task.taskService.ExceptionHandler(task.Context(), fmt.Errorf("%v", err))
164+
task.taskService.ExceptionHandler(taskCtx, fmt.Errorf("%v", err))
166165
}
167166
//goroutine panic, restart cron task
168167
startCronTask(task)
169-
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID, " goroutine panic, restart CronTask"))
168+
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " goroutine panic, restart CronTask"))
170169
}
171170
}()
172-
now := time.Now()
173-
task.context.Header = make(map[string]interface{})
174-
if task.time_WeekDay.IsMatch(now) &&
175-
task.time_Month.IsMatch(now) &&
176-
task.time_Day.IsMatch(now) &&
177-
task.time_Hour.IsMatch(now) &&
178-
task.time_Minute.IsMatch(now) &&
179-
task.time_Second.IsMatch(now) {
180-
181-
//inc run counter
182-
task.CounterInfo().RunCounter.Inc(1)
183-
//do log
184-
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
185-
task.taskService.OnBeforeHandler(task.Context())
186-
}
187-
var err error
188-
if !task.Context().IsEnd {
189-
err = task.handler(task.Context())
190-
}
191-
if err != nil {
192-
task.Context().Error = err
193-
task.CounterInfo().ErrorCounter.Inc(1)
194-
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
195-
task.taskService.ExceptionHandler(task.Context(), err)
171+
172+
handler := func() {
173+
defer func() {
174+
if task.Timeout > 0 {
175+
taskCtx.doneChan <- struct{}{}
176+
}
177+
}()
178+
now := time.Now()
179+
if task.time_WeekDay.IsMatch(now) &&
180+
task.time_Month.IsMatch(now) &&
181+
task.time_Day.IsMatch(now) &&
182+
task.time_Hour.IsMatch(now) &&
183+
task.time_Minute.IsMatch(now) &&
184+
task.time_Second.IsMatch(now) {
185+
186+
//inc run counter
187+
task.CounterInfo().RunCounter.Inc(1)
188+
//do log
189+
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
190+
task.taskService.OnBeforeHandler(taskCtx)
191+
}
192+
var err error
193+
if !taskCtx.IsEnd {
194+
err = task.handler(taskCtx)
195+
}
196+
if err != nil {
197+
taskCtx.Error = err
198+
task.CounterInfo().ErrorCounter.Inc(1)
199+
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
200+
task.taskService.ExceptionHandler(taskCtx, err)
201+
}
202+
}
203+
if task.taskService != nil && task.taskService.OnEndHandler != nil {
204+
task.taskService.OnEndHandler(taskCtx)
196205
}
197206
}
198-
if task.taskService != nil && task.taskService.OnEndHandler != nil {
199-
task.taskService.OnEndHandler(task.Context())
207+
}
208+
209+
if task.Timeout > 0 {
210+
go handler()
211+
select {
212+
case <-taskCtx.TimeoutContext.Done():
213+
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), "do handler timeout."))
214+
case <-taskCtx.doneChan:
215+
return
200216
}
217+
} else {
218+
handler()
201219
}
220+
202221
}

example/normal/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func main() {
5555
if exists {
5656
err = t.RunOnce()
5757
if err != nil {
58-
fmt.Println(t.Context(), "RunOnce error =>", err)
58+
fmt.Println(t.GetConfig(), "RunOnce error =>", err)
5959
}
6060
}
6161

example/timeout/main.go

Lines changed: 51 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,64 @@
11
package main
22

33
import (
4-
"context"
54
"fmt"
5+
. "github.com/devfeel/dottask"
66
"time"
77
)
88

9-
func main() {
10-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
11-
defer cancel()
12-
query(ctx)
9+
var service *TaskService
1310

14-
time.Sleep(time.Hour)
15-
}
11+
var firstLoopTimeout = 0
12+
var firstCronTimeout = 0
13+
14+
var patchLoop = 0
15+
var patchCron = 0
1616

17-
func query(ctx context.Context) {
18-
notifyChan := make(chan string)
19-
doing := func() {
17+
func Job_Timeout_Test(ctx *TaskContext) error {
18+
patchCron += 1
19+
patch := patchLoop
20+
21+
if firstLoopTimeout <= 0 {
22+
firstLoopTimeout = 1
23+
time.Sleep(time.Second * 15)
24+
} else {
2025
time.Sleep(time.Second)
21-
fmt.Println("1:", time.Now())
22-
time.Sleep(time.Second * 3)
23-
fmt.Println("2:", time.Now())
24-
time.Sleep(time.Second * 5)
25-
fmt.Println("3:", time.Now())
26-
notifyChan <- "1"
2726
}
28-
go doing()
29-
select {
30-
case <-notifyChan:
31-
fmt.Println("done")
32-
case <-ctx.Done():
33-
fmt.Println("timeout")
27+
28+
fmt.Println(time.Now().String(), " => Job_Timeout_Test", patch)
29+
return nil
30+
}
31+
32+
func Loop_Timeout_Test(ctx *TaskContext) error {
33+
patchLoop += 1
34+
patch := patchLoop
35+
if firstCronTimeout <= 0 {
36+
firstCronTimeout = 1
37+
time.Sleep(time.Second * 20)
38+
}
39+
40+
fmt.Println(time.Now().String(), " => Loop_Timeout_Test", patch)
41+
return nil
42+
}
43+
44+
func main() {
45+
service = StartNewService()
46+
47+
t1, err := service.CreateCronTask("test-timeout-cron", true, "*/10 * * * * *", Job_Timeout_Test, nil)
48+
if err != nil {
49+
fmt.Println("service.CreateCronTask error! => ", err.Error())
50+
}
51+
t1.SetTimeout(5)
52+
t2, err := service.CreateLoopTask("test-timeout-loop", true, 0, 10000, Loop_Timeout_Test, nil)
53+
if err != nil {
54+
fmt.Println("service.CreateLoopTask error! => ", err.Error())
3455
}
56+
t2.SetTimeout(5)
57+
58+
service.StartAllTask()
59+
60+
for {
61+
time.Sleep(time.Hour)
62+
}
63+
3564
}

0 commit comments

Comments
 (0)