Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion FUTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* [x] 支持随机调度策略
* [ ] 支持任务数量优先调度策略
* [ ] 支持任务时间优先调度策略
* [ ] 支持 worker 动态扩缩容
* [x] 支持 worker 动态扩缩容
* [x] 支持查询可用的 worker 数量
* [x] 支持设置 worker 任务队列大小
* [x] 支持设置 panic 处理函数
Expand Down
6 changes: 6 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## ✒ 历史版本的特性介绍 (Features in old versions)

### v0.2.5-alpha

> 此版本发布于 2025-06-30

* 自动清理 worker 机制,避免浪费 worker 资源

### v0.2.4-alpha

> 此版本发布于 2025-06-28
Expand Down
5 changes: 3 additions & 2 deletions README.en.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Supports multiple scheduling strategies, including round robin, random, etc.
* Supports spin lock with backoff strategy.
* Supports getting the number of workers available in the executor.
* Supports dynamic scaling of workers in the executor.

_Check [HISTORY.md](./HISTORY.md) and [FUTURE.md](./FUTURE.md) to know about more information._

Expand All @@ -41,7 +42,7 @@ func main() {

for i := 0; i < 20; i++ {
limiter.Go(func() {
fmt.Println("limiter --> ", time.Now())
fmt.Printf("limiter --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand All @@ -54,7 +55,7 @@ func main() {

for i := 0; i < 20; i++ {
executor.Submit(func() {
fmt.Println("executor --> ", time.Now())
fmt.Printf("executor --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* 支持多种调度策略,包括轮询、随机等。
* 支持使用退避策略的自旋锁。
* 支持查询可用的 worker 数量。
* 支持 worker 动态扩缩容。

_历史版本的特性请查看 [HISTORY.md](./HISTORY.md)。未来版本的新特性和计划请查看 [FUTURE.md](./FUTURE.md)。_

Expand All @@ -41,7 +42,7 @@ func main() {

for i := 0; i < 20; i++ {
limiter.Go(func() {
fmt.Println("limiter --> ", time.Now())
fmt.Printf("limiter --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand All @@ -54,7 +55,7 @@ func main() {

for i := 0; i < 20; i++ {
executor.Submit(func() {
fmt.Println("executor --> ", time.Now())
fmt.Printf("executor --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand Down
6 changes: 3 additions & 3 deletions _examples/basic.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 FishGoddess. All rights reserved.
// Copyright 2025s FishGoddess. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.

Expand All @@ -17,7 +17,7 @@ func main() {

for i := 0; i < 20; i++ {
limiter.Go(func() {
fmt.Println("limiter --> ", time.Now())
fmt.Printf("limiter --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand All @@ -30,7 +30,7 @@ func main() {

for i := 0; i < 20; i++ {
executor.Submit(func() {
fmt.Println("executor --> ", time.Now())
fmt.Printf("executor --> %s\n", time.Now())
time.Sleep(time.Second)
})
}
Expand Down
6 changes: 3 additions & 3 deletions _examples/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func BenchmarkExecutor(b *testing.B) {

// go test -v -run=none -bench=^BenchmarkExecutorTime$ -benchmem -benchtime=1s
func BenchmarkExecutorTime(b *testing.B) {
executor := goes.NewExecutor(size)
executor := goes.NewExecutor(workerNum)

num := uint32(0)
task := func() {
Expand All @@ -104,7 +104,7 @@ func BenchmarkExecutorTime(b *testing.B) {

// // go test -v -run=none -bench=^BenchmarkAntsPool$ -benchmem -benchtime=1s
// func BenchmarkAntsPool(b *testing.B) {
// pool, _ := ants.NewPool(size)
// pool, _ := ants.NewPool(workerNum)
//
// num := uint32(0)
// task := func() {
Expand All @@ -123,7 +123,7 @@ func BenchmarkExecutorTime(b *testing.B) {
//
// // go test -v -run=none -bench=^BenchmarkAntsPoolTime$ -benchmem -benchtime=1s
// func BenchmarkAntsPoolTime(b *testing.B) {
// pool, _ := ants.NewPool(size)
// pool, _ := ants.NewPool(workerNum)
//
// num := uint32(0)
// task := func() {
Expand Down
58 changes: 58 additions & 0 deletions _examples/purge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2025 FishGoddess. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.

package main

import (
"fmt"
"math/rand"
"time"

"github.com/FishGoddess/goes"
)

func watchExecutorWorkers(executor *goes.Executor) {
for {
fmt.Printf("workers --> %d\n", executor.AvailableWorkers())
time.Sleep(time.Second)
}
}

func main() {
// Creates a purge-active executor with 16 workers.
// The purge task will be executed every purge interval and purge workers not in lifetime.
purgeInterval := time.Minute
workerLifetime := time.Minute

executor := goes.NewExecutor(16, goes.WithPurgeActive(purgeInterval, workerLifetime))
defer executor.Close()

go watchExecutorWorkers(executor)

// Submit some tasks.
for i := 0; i < 200; i++ {
no := i

executor.Submit(func() {
r := 3000 + rand.Intn(1000)
fmt.Printf("task %d --> %d\n", no, r)
time.Sleep(time.Duration(r) * time.Millisecond)
})
}

time.Sleep(time.Minute + 5*time.Second)
fmt.Println("Wait! You will see the workers are decreasing and increasing...")

for i := 0; i < 10; i++ {
no := i

executor.Submit(func() {
r := 10000 + rand.Intn(1000)
fmt.Printf("task %d --> %d\n", no, r)
time.Sleep(time.Duration(r) * time.Millisecond)
})
}

time.Sleep(2 * time.Minute)
}
2 changes: 1 addition & 1 deletion _examples/spin_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ func main() {
}

wg.Wait()
fmt.Println("total is %d", total)
fmt.Printf("total is %d\n", total)
}
4 changes: 2 additions & 2 deletions _icons/coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 20 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package goes

import (
"sync"
"time"

"github.com/FishGoddess/goes/pkg/spinlock"
)

type config struct {
workerNum int
workerQueueSize int
workerLifetime time.Duration
purgeInterval time.Duration
nowFunc func() time.Time
recoverFunc func(r any)
newLockerFunc func() sync.Locker
newSchedulerFunc func(workers ...*worker) scheduler
Expand All @@ -22,16 +26,29 @@ func newDefaultConfig(workerNum int) *config {
return &config{
workerNum: workerNum,
workerQueueSize: 64,
workerLifetime: 0,
purgeInterval: 0,
nowFunc: nil,
recoverFunc: nil,
newLockerFunc: nil,
newSchedulerFunc: nil,
}
}

func (c *config) recover(r any) {
if c.recoverFunc != nil {
c.recoverFunc(r)
func (c *config) now() time.Time {
if c.nowFunc == nil {
return time.Now()
}

return c.nowFunc()
}

func (c *config) recoverable() bool {
return c.recoverFunc != nil
}

func (c *config) recover(r any) {
c.recoverFunc(r)
}

func (c *config) newLocker() sync.Locker {
Expand Down
47 changes: 44 additions & 3 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/FishGoddess/goes/pkg/spinlock"
)
Expand All @@ -22,23 +23,62 @@ func TestNewDefaultConfig(t *testing.T) {
}
}

// go test -v -cover -run=^TestConfigNow$
func TestConfigNow(t *testing.T) {
workerNum := 16
conf := newDefaultConfig(workerNum)

got := conf.now().Unix()
want := time.Now().Unix()
if got != want {
t.Fatalf("got %v != want %v", got, want)
}

wantTime := time.Unix(123456789, 0)
conf.nowFunc = func() time.Time {
return wantTime
}

got = conf.now().Unix()
want = wantTime.Unix()
if got != want {
t.Fatalf("got %v != want %v", got, want)
}
}

// go test -v -cover -run=^TestConfigRecover$
func TestConfigRecover(t *testing.T) {
workerNum := 16
conf := newDefaultConfig(workerNum)
conf.recover(0)

if conf.recoverable() {
t.Fatalf("conf.recoverable() is wrong")
}

got := 0
conf.recoverFunc = func(r any) {
got = r.(int)
}

if !conf.recoverable() {
t.Fatalf("conf.recoverable() is wrong")
}

want := 1
conf.recover(want)

if got != want {
t.Fatalf("got %d != want %d", got, want)
}

defer func() {
if r := recover(); r == nil {
t.Fatal("conf.recover should panic")
}
}()

conf.recoverFunc = nil
conf.recover(0)
}

// go test -v -cover -run=^TestConfigNewLocker$
Expand Down Expand Up @@ -90,7 +130,8 @@ func TestConfigNewScheduler(t *testing.T) {
t.Fatalf("got %T is not *roundRobinScheduler", got)
}

if len(scheduler.workers) != workerNum {
t.Fatalf("len(scheduler.workers) %d != workerNum %d", len(scheduler.workers), workerNum)
gotLen := len(scheduler.workers)
if gotLen != workerNum {
t.Fatalf("gotLen %d != workerNum %d", gotLen, workerNum)
}
}
Loading
Loading