From b21cb0945ec740f9891eb32f23804e4d75d02ba7 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 00:08:46 +0800 Subject: [PATCH 01/11] =?UTF-8?q?=E5=AE=8C=E5=96=84=20recover=20=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 8 +++++--- config_test.go | 18 +++++++++++++++++- worker.go | 12 +++++++----- worker_test.go | 11 +++++++++++ 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/config.go b/config.go index 36fa1cd..00fd523 100644 --- a/config.go +++ b/config.go @@ -28,10 +28,12 @@ func newDefaultConfig(workerNum int) *config { } } +func (c *config) recoverable() bool { + return c.recoverFunc != nil +} + func (c *config) recover(r any) { - if c.recoverFunc != nil { - c.recoverFunc(r) - } + c.recoverFunc(r) } func (c *config) newLocker() sync.Locker { diff --git a/config_test.go b/config_test.go index ef0e8fc..917e862 100644 --- a/config_test.go +++ b/config_test.go @@ -26,19 +26,35 @@ func TestNewDefaultConfig(t *testing.T) { func TestConfigRecover(t *testing.T) { workerNum := 16 conf := newDefaultConfig(workerNum) - conf.recover(0) + + if conf.recoverable() { + t.Fatalf("conf.recoverable() is wrong") + } got := 0 conf.recoverFunc = func(r any) { got = r.(int) } + if !conf.recoverable() { + t.Fatalf("conf.recoverable() is wrong") + } + want := 1 conf.recover(want) if got != want { t.Fatalf("got %d != want %d", got, want) } + + defer func() { + if r := recover(); r == nil { + t.Fatal("conf.recover should panic") + } + }() + + conf.recoverFunc = nil + conf.recover(0) } // go test -v -cover -run=^TestConfigNewLocker$ diff --git a/worker.go b/worker.go index 65a84d1..77f05d2 100644 --- a/worker.go +++ b/worker.go @@ -33,11 +33,13 @@ func (w *worker) WaitingTasks() int { } func (w *worker) handle(task Task) { - defer func() { - if r := recover(); r != nil { - w.executor.conf.recover(r) - } - }() + if w.executor.conf.recoverable() { + defer func() { + if r := recover(); r != nil { + w.executor.conf.recover(r) + } + }() + } task() } diff --git a/worker_test.go b/worker_test.go index fffc97b..75546cc 100644 --- a/worker_test.go +++ b/worker_test.go @@ -36,6 +36,17 @@ func TestWorkerHandle(t *testing.T) { if got != want { t.Fatalf("got %d != want %d", got, want) } + + defer func() { + if r := recover(); r == nil { + t.Fatal("worker.handle should panic") + } + }() + + worker.executor.conf.recoverFunc = nil + worker.handle(func() { + panic(want) + }) } // go test -v -cover -run=^TestWorkerWaitingTasks$ From f75a02db78cfe749910d0db14ec4d0cf0e4ac227 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 00:33:14 +0800 Subject: [PATCH 02/11] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81=E7=B2=92?= =?UTF-8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- _examples/performance_test.go | 6 +++--- executor.go | 15 ++++++++------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/_examples/performance_test.go b/_examples/performance_test.go index fc27056..6ea4521 100644 --- a/_examples/performance_test.go +++ b/_examples/performance_test.go @@ -84,7 +84,7 @@ func BenchmarkExecutor(b *testing.B) { // go test -v -run=none -bench=^BenchmarkExecutorTime$ -benchmem -benchtime=1s func BenchmarkExecutorTime(b *testing.B) { - executor := goes.NewExecutor(size) + executor := goes.NewExecutor(workerNum) num := uint32(0) task := func() { @@ -104,7 +104,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // go test -v -run=none -bench=^BenchmarkAntsPool$ -benchmem -benchtime=1s // func BenchmarkAntsPool(b *testing.B) { -// pool, _ := ants.NewPool(size) +// pool, _ := ants.NewPool(workerNum) // // num := uint32(0) // task := func() { @@ -123,7 +123,7 @@ func BenchmarkExecutorTime(b *testing.B) { // // // go test -v -run=none -bench=^BenchmarkAntsPoolTime$ -benchmem -benchtime=1s // func BenchmarkAntsPoolTime(b *testing.B) { -// pool, _ := ants.NewPool(size) +// pool, _ := ants.NewPool(workerNum) // // num := uint32(0) // task := func() { diff --git a/executor.go b/executor.go index c800006..630b599 100644 --- a/executor.go +++ b/executor.go @@ -77,30 +77,31 @@ func (e *Executor) AvailableWorkers() int { // Submit submits a task to be handled by workers. func (e *Executor) Submit(task Task) error { e.lock.Lock() - defer e.lock.Unlock() if e.closed { + e.lock.Unlock() + return ErrExecutorIsClosed } worker := e.scheduler.Get() if worker == nil { + e.lock.Unlock() + return ErrWorkerIsNil } - // We don't need to create a new worker if we got a worker with no tasks. + // 1. We don't need to create a new worker if we got a worker with no tasks. + // 2. The number of workers has reached the limit, so we can only use the worker we got. if worker.WaitingTasks() <= 0 || len(e.workers) >= e.conf.workerNum { - worker.Accept(task) - return nil - } + e.lock.Unlock() - // The number of workers has reached the limit, so we can only use the worker we got. - if len(e.workers) >= e.conf.workerNum { worker.Accept(task) return nil } worker = e.spawnWorker() + e.lock.Unlock() worker.Accept(task) return nil } From 3d4ac1361c953dc46a76da61ebfffcc6eb9eb646 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 00:45:17 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E5=BC=95=E7=94=A8=E7=BD=AE=E7=A9=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- executor.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executor.go b/executor.go index 630b599..e4711d8 100644 --- a/executor.go +++ b/executor.go @@ -102,6 +102,7 @@ func (e *Executor) Submit(task Task) error { worker = e.spawnWorker() e.lock.Unlock() + worker.Accept(task) return nil } @@ -122,4 +123,6 @@ func (e *Executor) Close() { e.Wait() e.closed = true + e.workers = nil + e.scheduler.Set(nil) } From d6cc8af2e0506ab5030aac0c3ff338addaadc04f Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 01:20:42 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config_test.go | 5 +++-- executor.go | 2 +- executor_test.go | 25 +++++++++++++--------- option_test.go | 10 +++++---- random_test.go | 12 +++++++---- round_robin_test.go | 12 +++++++---- worker.go | 19 ++++++++++++++--- worker_test.go | 51 +++++++++++++++++++++++++++++++++++++-------- 8 files changed, 99 insertions(+), 37 deletions(-) diff --git a/config_test.go b/config_test.go index 917e862..044492a 100644 --- a/config_test.go +++ b/config_test.go @@ -106,7 +106,8 @@ func TestConfigNewScheduler(t *testing.T) { t.Fatalf("got %T is not *roundRobinScheduler", got) } - if len(scheduler.workers) != workerNum { - t.Fatalf("len(scheduler.workers) %d != workerNum %d", len(scheduler.workers), workerNum) + gotLen := len(scheduler.workers) + if gotLen != workerNum { + t.Fatalf("gotLen %d != workerNum %d", gotLen, workerNum) } } diff --git a/executor.go b/executor.go index e4711d8..6cf0057 100644 --- a/executor.go +++ b/executor.go @@ -66,7 +66,7 @@ func (e *Executor) spawnWorker() *worker { return worker } -// AvailableWorkers returns the number of workers available in the executor. +// AvailableWorkers returns the number of workers available. func (e *Executor) AvailableWorkers() int { e.lock.Lock() defer e.lock.Unlock() diff --git a/executor_test.go b/executor_test.go index 6166ac6..5517664 100644 --- a/executor_test.go +++ b/executor_test.go @@ -77,14 +77,16 @@ func TestExecutorAvailableWorkers(t *testing.T) { t.Fatalf("len(executor.workers) %d != 1", len(executor.workers)) } - if executor.AvailableWorkers() != 1 { - t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers()) + got := executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != 1", got) } executor.workers = make([]*worker, workerNum) - if executor.AvailableWorkers() != workerNum { - t.Fatalf("executor.AvailableWorkers() %d != workerNum %d", executor.AvailableWorkers(), workerNum) + got = executor.AvailableWorkers() + if got != workerNum { + t.Fatalf("got %d != workerNum %d", got, workerNum) } } @@ -93,8 +95,9 @@ func TestExecutorSpawnWorker(t *testing.T) { workerNum := 16 executor := NewExecutor(workerNum) - if executor.AvailableWorkers() != 1 { - t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers()) + got := executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != 1", got) } for range workerNum { @@ -102,8 +105,9 @@ func TestExecutorSpawnWorker(t *testing.T) { time.Sleep(time.Millisecond) } - if executor.AvailableWorkers() != 1 { - t.Fatalf("executor.AvailableWorkers() %d != 1", executor.AvailableWorkers()) + got = executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != 1", got) } for range workerNum * 2 { @@ -112,7 +116,8 @@ func TestExecutorSpawnWorker(t *testing.T) { }) } - if executor.AvailableWorkers() != workerNum { - t.Fatalf("executor.AvailableWorkers() %d != workerNum %d", executor.AvailableWorkers(), workerNum) + got = executor.AvailableWorkers() + if got != workerNum { + t.Fatalf("got %d != workerNum %d", got, workerNum) } } diff --git a/option_test.go b/option_test.go index 24f779a..e7f4a4c 100644 --- a/option_test.go +++ b/option_test.go @@ -86,8 +86,9 @@ func TestWithRoundRobinScheduler(t *testing.T) { t.Fatalf("got %T is not *roundRobinScheduler", got) } - if cap(scheduler.workers) != workerNum { - t.Fatalf("cap(scheduler.workers) %d != workerNum %d", cap(scheduler.workers), workerNum) + gotCap := cap(scheduler.workers) + if gotCap != workerNum { + t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) } } @@ -105,7 +106,8 @@ func TestWithRandomScheduler(t *testing.T) { t.Fatalf("got %T is not *randomScheduler", got) } - if cap(scheduler.workers) != workerNum { - t.Fatalf("cap(scheduler.workers) %d != workerNum %d", cap(scheduler.workers), workerNum) + gotCap := cap(scheduler.workers) + if gotCap != workerNum { + t.Fatalf("gotCap %d != workerNum %d", gotCap, workerNum) } } diff --git a/random_test.go b/random_test.go index f43f6d2..56f2fe0 100644 --- a/random_test.go +++ b/random_test.go @@ -22,8 +22,10 @@ func TestRandomScheduler(t *testing.T) { t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) } - if len(scheduler.workers) != len(workers) { - t.Fatalf("len(scheduler.workers) %d != len(workers) %d", len(scheduler.workers), len(workers)) + got := len(scheduler.workers) + want := len(workers) + if got != want { + t.Fatalf("got %d != want %d", got, want) } scheduler.Set(workers) @@ -31,8 +33,10 @@ func TestRandomScheduler(t *testing.T) { t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) } - if len(scheduler.workers) != len(workers) { - t.Fatalf("len(scheduler.workers) %d != len(workers) %d", len(scheduler.workers), len(workers)) + got = len(scheduler.workers) + want = len(workers) + if got != want { + t.Fatalf("got %d != want %d", got, want) } for i, worker := range workers { diff --git a/round_robin_test.go b/round_robin_test.go index 441b10b..5e2363b 100644 --- a/round_robin_test.go +++ b/round_robin_test.go @@ -22,8 +22,10 @@ func TestRoundRobinScheduler(t *testing.T) { t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) } - if len(scheduler.workers) != len(workers) { - t.Fatalf("len(scheduler.workers) %d != len(workers) %d", len(scheduler.workers), len(workers)) + got := len(scheduler.workers) + want := len(workers) + if got != want { + t.Fatalf("got %d != want %d", got, want) } scheduler.Set(workers) @@ -31,8 +33,10 @@ func TestRoundRobinScheduler(t *testing.T) { t.Fatalf("scheduler.workers %p != workers %p", scheduler.workers, workers) } - if len(scheduler.workers) != len(workers) { - t.Fatalf("len(scheduler.workers) %d != len(workers) %d", len(scheduler.workers), len(workers)) + got = len(scheduler.workers) + want = len(workers) + if got != want { + t.Fatalf("got %d != want %d", got, want) } for i, worker := range workers { diff --git a/worker.go b/worker.go index 77f05d2..9c42bdb 100644 --- a/worker.go +++ b/worker.go @@ -4,6 +4,8 @@ package goes +import "time" + type scheduler interface { // Set sets the workers to scheduler. Set(workers []*worker) @@ -13,8 +15,9 @@ type scheduler interface { } type worker struct { - executor *Executor - taskQueue chan Task + executor *Executor + taskQueue chan Task + acceptTime time.Time } func newWorker(executor *Executor) *worker { @@ -27,11 +30,21 @@ func newWorker(executor *Executor) *worker { return w } -// WaitingTasks returns the number of tasks waiting in the worker. +// WaitingTasks returns the number of tasks waiting. func (w *worker) WaitingTasks() int { return len(w.taskQueue) } +// AcceptTime returns the accept time of worker. +func (w *worker) AcceptTime() time.Time { + return w.acceptTime +} + +// SetAcceptTime sets the accept time of worker. +func (w *worker) SetAcceptTime(t time.Time) { + w.acceptTime = t +} + func (w *worker) handle(task Task) { if w.executor.conf.recoverable() { defer func() { diff --git a/worker_test.go b/worker_test.go index 75546cc..4a710bd 100644 --- a/worker_test.go +++ b/worker_test.go @@ -4,7 +4,10 @@ package goes -import "testing" +import ( + "testing" + "time" +) // go test -v -cover -run=^TestWorkerHandle$ func TestWorkerHandle(t *testing.T) { @@ -54,22 +57,52 @@ func TestWorkerWaitingTasks(t *testing.T) { taskQueue := make(chan Task, 4) worker := &worker{taskQueue: taskQueue} - if worker.WaitingTasks() != len(taskQueue) { - t.Fatalf("got %d != want %d", worker.WaitingTasks(), len(taskQueue)) + got := worker.WaitingTasks() + want := len(taskQueue) + if got != want { + t.Fatalf("got %d != want %d", got, want) } - if worker.WaitingTasks() != 0 { - t.Fatalf("got %d != 0", worker.WaitingTasks()) + got = worker.WaitingTasks() + if got != 0 { + t.Fatalf("got %d != 0", got) } taskQueue <- nil taskQueue <- nil - if worker.WaitingTasks() != len(taskQueue) { - t.Fatalf("got %d != want %d", worker.WaitingTasks(), len(taskQueue)) + got = worker.WaitingTasks() + want = len(taskQueue) + if got != want { + t.Fatalf("got %d != want %d", got, want) + } + + got = worker.WaitingTasks() + if got != 2 { + t.Fatalf("got %d != 2", got) + } +} + +// go test -v -cover -run=^TestWorkerAcceptTime$ +func TestWorkerAcceptTime(t *testing.T) { + acceptTime := time.Now() + worker := &worker{acceptTime: acceptTime} + + got := worker.AcceptTime() + if got != acceptTime { + t.Fatalf("got %v != acceptTime %v", got, acceptTime) + } + + acceptTime = time.Unix(123456789, 0) + worker.SetAcceptTime(acceptTime) + + got = worker.acceptTime + if got != acceptTime { + t.Fatalf("got %v != acceptTime %v", got, acceptTime) } - if worker.WaitingTasks() != 2 { - t.Fatalf("got %d != 2", worker.WaitingTasks()) + got = worker.AcceptTime() + if got != acceptTime { + t.Fatalf("got %v != acceptTime %v", got, acceptTime) } } From 5194795b6a89d92a1939b2cc0c0d9368420c1c0e Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 01:56:25 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 11 +++++ config_test.go | 24 ++++++++++ executor.go | 4 ++ pkg/fastclock/fast_clock.go | 76 ++++++++++++++++++++++++++++++++ pkg/fastclock/fast_clock_test.go | 73 ++++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+) create mode 100644 pkg/fastclock/fast_clock.go create mode 100644 pkg/fastclock/fast_clock_test.go diff --git a/config.go b/config.go index 00fd523..a18b25f 100644 --- a/config.go +++ b/config.go @@ -6,6 +6,7 @@ package goes import ( "sync" + "time" "github.com/FishGoddess/goes/pkg/spinlock" ) @@ -13,6 +14,7 @@ import ( type config struct { workerNum int workerQueueSize int + nowFunc func() time.Time recoverFunc func(r any) newLockerFunc func() sync.Locker newSchedulerFunc func(workers ...*worker) scheduler @@ -22,12 +24,21 @@ func newDefaultConfig(workerNum int) *config { return &config{ workerNum: workerNum, workerQueueSize: 64, + nowFunc: nil, recoverFunc: nil, newLockerFunc: nil, newSchedulerFunc: nil, } } +func (c *config) now() time.Time { + if c.nowFunc == nil { + return time.Now() + } + + return c.nowFunc() +} + func (c *config) recoverable() bool { return c.recoverFunc != nil } diff --git a/config_test.go b/config_test.go index 044492a..26ce362 100644 --- a/config_test.go +++ b/config_test.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/FishGoddess/goes/pkg/spinlock" ) @@ -22,6 +23,29 @@ func TestNewDefaultConfig(t *testing.T) { } } +// go test -v -cover -run=^TestConfigNow$ +func TestConfigNow(t *testing.T) { + workerNum := 16 + conf := newDefaultConfig(workerNum) + + got := conf.now().Unix() + want := time.Now().Unix() + if got != want { + t.Fatalf("got %v != want %v", got, want) + } + + wantTime := time.Unix(123456789, 0) + conf.nowFunc = func() time.Time { + return wantTime + } + + got = conf.now().Unix() + want = wantTime.Unix() + if got != want { + t.Fatalf("got %v != want %v", got, want) + } +} + // go test -v -cover -run=^TestConfigRecover$ func TestConfigRecover(t *testing.T) { workerNum := 16 diff --git a/executor.go b/executor.go index 6cf0057..d62e537 100644 --- a/executor.go +++ b/executor.go @@ -91,9 +91,12 @@ func (e *Executor) Submit(task Task) error { return ErrWorkerIsNil } + now := e.conf.now() + // 1. We don't need to create a new worker if we got a worker with no tasks. // 2. The number of workers has reached the limit, so we can only use the worker we got. if worker.WaitingTasks() <= 0 || len(e.workers) >= e.conf.workerNum { + worker.SetAcceptTime(now) e.lock.Unlock() worker.Accept(task) @@ -101,6 +104,7 @@ func (e *Executor) Submit(task Task) error { } worker = e.spawnWorker() + worker.SetAcceptTime(now) e.lock.Unlock() worker.Accept(task) diff --git a/pkg/fastclock/fast_clock.go b/pkg/fastclock/fast_clock.go new file mode 100644 index 0000000..5bd523b --- /dev/null +++ b/pkg/fastclock/fast_clock.go @@ -0,0 +1,76 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package fastclock + +import ( + "sync" + "sync/atomic" + "time" +) + +const ( + duration = time.Second + adjustment = 59 +) + +// fastClock is a clock for getting current time faster. +// It caches current time in nanos and updates it in fixed duration, so it's not a precise way to get current time. +// In fact, we don't recommend you to use it unless you do need a fast way to get current time even the time is "incorrect". +// According to our benchmarks, it does run faster than time.Now: +// +// In my linux server with 2 cores: +// BenchmarkTimeNow-2 18361782 63.08 ns/op 0 B/op 0 allocs/op +// BenchmarkFastClockNow-2 303623113 3.92 ns/op 0 B/op 0 allocs/op +// BenchmarkFastClockNowNanos-2 477787526 2.51 ns/op 0 B/op 0 allocs/op +// +// However, the performance of time.Now is faster enough for 99.9% situations, so we hope you never use it :) +type fastClock struct { + nanos int64 +} + +func newClock() *fastClock { + clock := &fastClock{ + nanos: time.Now().UnixNano(), + } + + go clock.start() + return clock +} + +func (fc *fastClock) start() { + for { + for range adjustment { + time.Sleep(duration) + atomic.AddInt64(&fc.nanos, int64(duration)) + } + + time.Sleep(duration) + atomic.StoreInt64(&fc.nanos, time.Now().UnixNano()) + } +} + +func (fc *fastClock) nowNanos() int64 { + return atomic.LoadInt64(&fc.nanos) +} + +var ( + clock *fastClock + clockOnce sync.Once +) + +// Now returns the current time from fast clock. +func Now() time.Time { + nanos := NowNanos() + return time.Unix(0, nanos) +} + +// NowNanos returns the current time in nanos from fast clock. +func NowNanos() int64 { + clockOnce.Do(func() { + clock = newClock() + }) + + return clock.nowNanos() +} diff --git a/pkg/fastclock/fast_clock_test.go b/pkg/fastclock/fast_clock_test.go new file mode 100644 index 0000000..d8f2aba --- /dev/null +++ b/pkg/fastclock/fast_clock_test.go @@ -0,0 +1,73 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package fastclock + +import ( + "math" + "math/rand" + "testing" + "time" +) + +// go test -v -cover -run=^TestNow$ +func TestNow(t *testing.T) { + for i := 0; i < 10; i++ { + got := Now() + gap := time.Since(got) + t.Logf("got: %v, gap: %v", got, gap) + + if math.Abs(float64(gap.Nanoseconds())) > float64(duration)*1.2 { + t.Errorf("now %v is wrong", got) + } + + time.Sleep(time.Duration(rand.Int63n(int64(duration)))) + } +} + +// go test -v -cover -run=^TestNowNanos$ +func TestNowNanos(t *testing.T) { + for i := 0; i < 10; i++ { + gotNanos := NowNanos() + got := time.Unix(0, gotNanos) + gap := time.Since(got) + t.Logf("got: %v, gap: %v", got, gap) + + if math.Abs(float64(gap.Nanoseconds())) > float64(duration)*1.2 { + t.Errorf("now %v is wrong", got) + } + + time.Sleep(time.Duration(rand.Int63n(int64(duration)))) + } +} + +// go test -v -run=none -bench=^BenchmarkTimeNow$ -benchmem -benchtime=1s +func BenchmarkTimeNow(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + time.Now() + } +} + +// go test -v -run=none -bench=^BenchmarkFastClockNow$ -benchmem -benchtime=1s +func BenchmarkFastClockNow(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + Now() + } +} + +// go test -v -run=none -bench=^BenchmarkFastClockNowNanos$ -benchmem -benchtime=1s +func BenchmarkFastClockNowNanos(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + NowNanos() + } +} From 92df7d8e498e3a1086020c1a087d482098110050 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 16:06:22 +0800 Subject: [PATCH 06/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=80=89=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- executor.go | 13 ++---- option.go | 8 ++++ option_test.go | 13 ++++++ pkg/fastclock/fast_clock.go | 76 -------------------------------- pkg/fastclock/fast_clock_test.go | 73 ------------------------------ 5 files changed, 24 insertions(+), 159 deletions(-) delete mode 100644 pkg/fastclock/fast_clock.go delete mode 100644 pkg/fastclock/fast_clock_test.go diff --git a/executor.go b/executor.go index d62e537..5ec2434 100644 --- a/executor.go +++ b/executor.go @@ -91,20 +91,13 @@ func (e *Executor) Submit(task Task) error { return ErrWorkerIsNil } - now := e.conf.now() - // 1. We don't need to create a new worker if we got a worker with no tasks. // 2. The number of workers has reached the limit, so we can only use the worker we got. - if worker.WaitingTasks() <= 0 || len(e.workers) >= e.conf.workerNum { - worker.SetAcceptTime(now) - e.lock.Unlock() - - worker.Accept(task) - return nil + if worker.WaitingTasks() > 0 && len(e.workers) < e.conf.workerNum { + worker = e.spawnWorker() } - worker = e.spawnWorker() - worker.SetAcceptTime(now) + worker.SetAcceptTime(e.conf.now()) e.lock.Unlock() worker.Accept(task) diff --git a/option.go b/option.go index a8b7ba6..69ba575 100644 --- a/option.go +++ b/option.go @@ -6,6 +6,7 @@ package goes import ( "sync" + "time" "github.com/FishGoddess/goes/pkg/spinlock" ) @@ -24,6 +25,13 @@ func WithWorkerQueueSize(size int) Option { } } +// WithNowFunc sets the now function. +func WithNowFunc(nowFunc func() time.Time) Option { + return func(conf *config) { + conf.nowFunc = nowFunc + } +} + // WithRecoverFunc sets the recover function. func WithRecoverFunc(recoverFunc func(r any)) Option { return func(conf *config) { diff --git a/option_test.go b/option_test.go index e7f4a4c..ab01e33 100644 --- a/option_test.go +++ b/option_test.go @@ -8,6 +8,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/FishGoddess/goes/pkg/spinlock" ) @@ -24,6 +25,18 @@ func TestWithWorkerQueueSize(t *testing.T) { } } +// go test -v -cover -run=^TestWithNowFunc$ +func TestWithNowFunc(t *testing.T) { + workerNum := 16 + nowFunc := func() time.Time { return time.Now() } + conf := newDefaultConfig(workerNum) + WithNowFunc(nowFunc)(conf) + + if fmt.Sprintf("%p", conf.nowFunc) != fmt.Sprintf("%p", nowFunc) { + t.Fatalf("conf.nowFunc %p != nowFunc %p", conf.nowFunc, nowFunc) + } +} + // go test -v -cover -run=^TestWithRecoverFunc$ func TestWithRecoverFunc(t *testing.T) { workerNum := 16 diff --git a/pkg/fastclock/fast_clock.go b/pkg/fastclock/fast_clock.go deleted file mode 100644 index 5bd523b..0000000 --- a/pkg/fastclock/fast_clock.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package fastclock - -import ( - "sync" - "sync/atomic" - "time" -) - -const ( - duration = time.Second - adjustment = 59 -) - -// fastClock is a clock for getting current time faster. -// It caches current time in nanos and updates it in fixed duration, so it's not a precise way to get current time. -// In fact, we don't recommend you to use it unless you do need a fast way to get current time even the time is "incorrect". -// According to our benchmarks, it does run faster than time.Now: -// -// In my linux server with 2 cores: -// BenchmarkTimeNow-2 18361782 63.08 ns/op 0 B/op 0 allocs/op -// BenchmarkFastClockNow-2 303623113 3.92 ns/op 0 B/op 0 allocs/op -// BenchmarkFastClockNowNanos-2 477787526 2.51 ns/op 0 B/op 0 allocs/op -// -// However, the performance of time.Now is faster enough for 99.9% situations, so we hope you never use it :) -type fastClock struct { - nanos int64 -} - -func newClock() *fastClock { - clock := &fastClock{ - nanos: time.Now().UnixNano(), - } - - go clock.start() - return clock -} - -func (fc *fastClock) start() { - for { - for range adjustment { - time.Sleep(duration) - atomic.AddInt64(&fc.nanos, int64(duration)) - } - - time.Sleep(duration) - atomic.StoreInt64(&fc.nanos, time.Now().UnixNano()) - } -} - -func (fc *fastClock) nowNanos() int64 { - return atomic.LoadInt64(&fc.nanos) -} - -var ( - clock *fastClock - clockOnce sync.Once -) - -// Now returns the current time from fast clock. -func Now() time.Time { - nanos := NowNanos() - return time.Unix(0, nanos) -} - -// NowNanos returns the current time in nanos from fast clock. -func NowNanos() int64 { - clockOnce.Do(func() { - clock = newClock() - }) - - return clock.nowNanos() -} diff --git a/pkg/fastclock/fast_clock_test.go b/pkg/fastclock/fast_clock_test.go deleted file mode 100644 index d8f2aba..0000000 --- a/pkg/fastclock/fast_clock_test.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2025 FishGoddess. All rights reserved. -// Use of this source code is governed by a MIT style -// license that can be found in the LICENSE file. - -package fastclock - -import ( - "math" - "math/rand" - "testing" - "time" -) - -// go test -v -cover -run=^TestNow$ -func TestNow(t *testing.T) { - for i := 0; i < 10; i++ { - got := Now() - gap := time.Since(got) - t.Logf("got: %v, gap: %v", got, gap) - - if math.Abs(float64(gap.Nanoseconds())) > float64(duration)*1.2 { - t.Errorf("now %v is wrong", got) - } - - time.Sleep(time.Duration(rand.Int63n(int64(duration)))) - } -} - -// go test -v -cover -run=^TestNowNanos$ -func TestNowNanos(t *testing.T) { - for i := 0; i < 10; i++ { - gotNanos := NowNanos() - got := time.Unix(0, gotNanos) - gap := time.Since(got) - t.Logf("got: %v, gap: %v", got, gap) - - if math.Abs(float64(gap.Nanoseconds())) > float64(duration)*1.2 { - t.Errorf("now %v is wrong", got) - } - - time.Sleep(time.Duration(rand.Int63n(int64(duration)))) - } -} - -// go test -v -run=none -bench=^BenchmarkTimeNow$ -benchmem -benchtime=1s -func BenchmarkTimeNow(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - time.Now() - } -} - -// go test -v -run=none -bench=^BenchmarkFastClockNow$ -benchmem -benchtime=1s -func BenchmarkFastClockNow(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - Now() - } -} - -// go test -v -run=none -bench=^BenchmarkFastClockNowNanos$ -benchmem -benchtime=1s -func BenchmarkFastClockNowNanos(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - NowNanos() - } -} From 298852758f6c04e757f766571939d43bebeca4e3 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 20:58:58 +0800 Subject: [PATCH 07/11] purge --- FUTURE.md | 2 ++ config.go | 4 ++++ executor.go | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/FUTURE.md b/FUTURE.md index e2a93d4..7aa7348 100755 --- a/FUTURE.md +++ b/FUTURE.md @@ -12,6 +12,8 @@ * [x] 支持设置 worker 任务队列大小 * [x] 支持设置 panic 处理函数 * [x] 支持自定义 sync.Locker 实现 +* [ ] 增加 worker 内存复用池 +* [ ] 增加 workers 切片内存复用池 * [x] 完善单元测试,将覆盖率提高到 95% 以上 ### v0.1.0 diff --git a/config.go b/config.go index a18b25f..1750ebe 100644 --- a/config.go +++ b/config.go @@ -14,6 +14,8 @@ import ( type config struct { workerNum int workerQueueSize int + workerLifetime time.Duration + purgeInterval time.Duration nowFunc func() time.Time recoverFunc func(r any) newLockerFunc func() sync.Locker @@ -24,6 +26,8 @@ func newDefaultConfig(workerNum int) *config { return &config{ workerNum: workerNum, workerQueueSize: 64, + workerLifetime: 0, + purgeInterval: 0, nowFunc: nil, recoverFunc: nil, newLockerFunc: nil, diff --git a/executor.go b/executor.go index 5ec2434..ffbb795 100644 --- a/executor.go +++ b/executor.go @@ -7,6 +7,7 @@ package goes import ( "errors" "sync" + "time" ) var ( @@ -24,6 +25,7 @@ type Executor struct { workers []*worker scheduler scheduler + closeCh chan struct{} closed bool wg sync.WaitGroup @@ -50,11 +52,13 @@ func NewExecutor(workerNum int, opts ...Option) *Executor { conf: conf, workers: workers, scheduler: conf.newScheduler(), + closeCh: make(chan struct{}, 1), closed: false, lock: conf.newLocker(), } executor.spawnWorker() + executor.runPurgeTask() return executor } @@ -66,6 +70,54 @@ func (e *Executor) spawnWorker() *worker { return worker } +func (e *Executor) purgeActive() bool { + return e.conf.purgeInterval > 0 && e.conf.workerLifetime > 0 +} + +func (e *Executor) purgeWorkers() { + e.lock.Lock() + defer e.lock.Unlock() + + now := e.conf.now() + oldWorkers := e.workers + newWorkers := make([]*worker, 0, len(oldWorkers)) + for _, worker := range oldWorkers { + if worker.WaitingTasks() <= 0 && now.Sub(worker.AcceptTime()) >= e.conf.workerLifetime { + worker.Done() + } else { + newWorkers = append(newWorkers, worker) + } + } + + e.workers = newWorkers + e.scheduler.Set(e.workers) + + // Avoid gc leaks. + for i := range oldWorkers { + oldWorkers[i] = nil + } +} + +func (e *Executor) runPurgeTask() { + if !e.purgeActive() { + return + } + + go func() { + ticker := time.NewTicker(e.conf.purgeInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + e.purgeWorkers() + case <-e.closeCh: + return + } + } + }() +} + // AvailableWorkers returns the number of workers available. func (e *Executor) AvailableWorkers() int { e.lock.Lock() @@ -97,9 +149,11 @@ func (e *Executor) Submit(task Task) error { worker = e.spawnWorker() } - worker.SetAcceptTime(e.conf.now()) - e.lock.Unlock() + if e.purgeActive() { + worker.SetAcceptTime(e.conf.now()) + } + e.lock.Unlock() worker.Accept(task) return nil } From e177ab7f22810b4bf8bcf60556bffea190a0a6c5 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Sun, 29 Jun 2025 23:47:52 +0800 Subject: [PATCH 08/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E6=89=A9=E7=BC=A9=E5=AE=B9=E7=9A=84=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- FUTURE.md | 4 +- README.en.md | 1 + README.md | 1 + _icons/coverage.svg | 4 +- executor.go | 46 ++++++++++++++---- executor_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++++ option.go | 8 ++++ option_test.go | 17 +++++++ 8 files changed, 181 insertions(+), 14 deletions(-) diff --git a/FUTURE.md b/FUTURE.md index 7aa7348..f61d3ac 100755 --- a/FUTURE.md +++ b/FUTURE.md @@ -7,13 +7,11 @@ * [x] 支持随机调度策略 * [ ] 支持任务数量优先调度策略 * [ ] 支持任务时间优先调度策略 -* [ ] 支持 worker 动态扩缩容 +* [x] 支持 worker 动态扩缩容 * [x] 支持查询可用的 worker 数量 * [x] 支持设置 worker 任务队列大小 * [x] 支持设置 panic 处理函数 * [x] 支持自定义 sync.Locker 实现 -* [ ] 增加 worker 内存复用池 -* [ ] 增加 workers 切片内存复用池 * [x] 完善单元测试,将覆盖率提高到 95% 以上 ### v0.1.0 diff --git a/README.en.md b/README.en.md index f6cc33d..23f26b0 100755 --- a/README.en.md +++ b/README.en.md @@ -16,6 +16,7 @@ * Supports multiple scheduling strategies, including round robin, random, etc. * Supports spin lock with backoff strategy. * Supports getting the number of workers available in the executor. +* Supports dynamic scaling of workers in the executor. _Check [HISTORY.md](./HISTORY.md) and [FUTURE.md](./FUTURE.md) to know about more information._ diff --git a/README.md b/README.md index 6705db8..e4dbc3f 100755 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ * 支持多种调度策略,包括轮询、随机等。 * 支持使用退避策略的自旋锁。 * 支持查询可用的 worker 数量。 +* 支持 worker 动态扩缩容。 _历史版本的特性请查看 [HISTORY.md](./HISTORY.md)。未来版本的新特性和计划请查看 [FUTURE.md](./FUTURE.md)。_ diff --git a/_icons/coverage.svg b/_icons/coverage.svg index 4c5544e..be8453f 100644 --- a/_icons/coverage.svg +++ b/_icons/coverage.svg @@ -10,7 +10,7 @@ coverage coverage - 96% - 96% + 98% + 98% \ No newline at end of file diff --git a/executor.go b/executor.go index ffbb795..db121de 100644 --- a/executor.go +++ b/executor.go @@ -47,6 +47,14 @@ func NewExecutor(workerNum int, opts ...Option) *Executor { panic("goes: worker's queue size <= 0") } + if conf.purgeInterval > 0 && conf.purgeInterval < time.Minute { + panic("goes: executor's purge interval < 1 minute") + } + + if conf.workerLifetime > 0 && conf.workerLifetime < time.Minute { + panic("goes: executor's worker lifetime < 1 minute") + } + workers := make([]*worker, 0, conf.workerNum) executor := &Executor{ conf: conf, @@ -79,10 +87,33 @@ func (e *Executor) purgeWorkers() { defer e.lock.Unlock() now := e.conf.now() - oldWorkers := e.workers - newWorkers := make([]*worker, 0, len(oldWorkers)) - for _, worker := range oldWorkers { - if worker.WaitingTasks() <= 0 && now.Sub(worker.AcceptTime()) >= e.conf.workerLifetime { + purgeable := false + + isPurgeable := func(worker *worker) bool { + return worker.WaitingTasks() <= 0 && now.Sub(worker.AcceptTime()) >= e.conf.workerLifetime + } + + // Check if we need to purge workers. + for _, worker := range e.workers { + if isPurgeable(worker) { + purgeable = true + break + } + } + + if !purgeable { + return + } + + // Purge workers and we will keep one worker at least. + newWorkers := make([]*worker, 0, len(e.workers)) + for _, worker := range e.workers { + if len(newWorkers) <= 0 { + newWorkers = append(newWorkers, worker) + continue + } + + if isPurgeable(worker) { worker.Done() } else { newWorkers = append(newWorkers, worker) @@ -91,11 +122,6 @@ func (e *Executor) purgeWorkers() { e.workers = newWorkers e.scheduler.Set(e.workers) - - // Avoid gc leaks. - for i := range oldWorkers { - oldWorkers[i] = nil - } } func (e *Executor) runPurgeTask() { @@ -173,6 +199,8 @@ func (e *Executor) Close() { } e.Wait() + + e.closeCh <- struct{}{} e.closed = true e.workers = nil e.scheduler.Set(nil) diff --git a/executor_test.go b/executor_test.go index 5517664..90a1af1 100644 --- a/executor_test.go +++ b/executor_test.go @@ -5,11 +5,59 @@ package goes import ( + "fmt" "sync" "testing" "time" ) +// go test -v -cover -run=^TestNewExecutor$ +func TestNewExecutor(t *testing.T) { + workerNum := 16 + + t.Run("ok", func(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("r should not panic") + } + }() + + executor := NewExecutor(workerNum, WithWorkerQueueSize(256), WithPurgeTask(time.Minute, time.Minute)) + defer executor.Close() + }) + + testCase := func(t *testing.T, workerNum int, opts ...Option) { + defer func() { + if r := recover(); r == nil { + t.Fatalf("r should panic") + } + }() + + executor := NewExecutor(workerNum, opts...) + defer executor.Close() + } + + t.Run("worker num", func(t *testing.T) { + testCase(t, 0) + }) + + t.Run("worker queue size", func(t *testing.T) { + testCase(t, workerNum, WithWorkerQueueSize(0)) + }) + + t.Run("purge task 1", func(t *testing.T) { + testCase(t, workerNum, WithPurgeTask(time.Millisecond, 0)) + }) + + t.Run("purge task 2", func(t *testing.T) { + testCase(t, workerNum, WithPurgeTask(0, time.Millisecond)) + }) + + t.Run("purge task 3", func(t *testing.T) { + testCase(t, workerNum, WithPurgeTask(time.Millisecond, time.Millisecond)) + }) +} + // go test -v -cover -run=^TestExecutor$ func TestExecutor(t *testing.T) { workerNum := 16 @@ -121,3 +169,69 @@ func TestExecutorSpawnWorker(t *testing.T) { t.Fatalf("got %d != workerNum %d", got, workerNum) } } + +// go test -v -cover -run=^TestExecutorPurgeWorkers$ +func TestExecutorPurgeWorkers(t *testing.T) { + testCase := func(t *testing.T, workerNum int) { + executor := NewExecutor(workerNum) + executor.conf.purgeInterval = time.Millisecond + executor.conf.workerLifetime = 2 * time.Millisecond + executor.runPurgeTask() + + got := executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != 1", got) + } + + for range workerNum * 2 { + executor.Submit(func() { + time.Sleep(time.Millisecond) + }) + } + + got = executor.AvailableWorkers() + if got != workerNum { + t.Fatalf("got %d != workerNum %d", got, workerNum) + } + + time.Sleep(500 * time.Microsecond) + + got = executor.AvailableWorkers() + if got != workerNum { + t.Fatalf("got %d != workerNum %d", got, workerNum) + } + + time.Sleep(5 * time.Millisecond) + + got = executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != workerNum %d", got, workerNum) + } + + for range workerNum * 2 { + executor.Submit(func() { + time.Sleep(time.Millisecond) + }) + } + + got = executor.AvailableWorkers() + if got != workerNum { + t.Fatalf("got %d != workerNum %d", got, workerNum) + } + + time.Sleep(5 * time.Millisecond) + + got = executor.AvailableWorkers() + if got != 1 { + t.Fatalf("got %d != workerNum %d", got, workerNum) + } + } + + workerNums := []int{1, 2, 4, 16, 64, 256, 1024} + for _, workerNum := range workerNums { + name := fmt.Sprintf("worker num %d", workerNum) + t.Run(name, func(t *testing.T) { + testCase(t, workerNum) + }) + } +} diff --git a/option.go b/option.go index 69ba575..bd61e9e 100644 --- a/option.go +++ b/option.go @@ -25,6 +25,14 @@ func WithWorkerQueueSize(size int) Option { } } +// WithPurgeTask sets the purge interval of executor and the lifetime of worker. +func WithPurgeTask(purgeInterval time.Duration, workerLifetime time.Duration) Option { + return func(conf *config) { + conf.purgeInterval = purgeInterval + conf.workerLifetime = workerLifetime + } +} + // WithNowFunc sets the now function. func WithNowFunc(nowFunc func() time.Time) Option { return func(conf *config) { diff --git a/option_test.go b/option_test.go index ab01e33..f1032ee 100644 --- a/option_test.go +++ b/option_test.go @@ -25,6 +25,23 @@ func TestWithWorkerQueueSize(t *testing.T) { } } +// go test -v -cover -run=^TestWithPurgeTask$ +func TestWithPurgeTask(t *testing.T) { + workerNum := 16 + purgeInterval := time.Minute + workerLifetime := 3 * time.Minute + conf := newDefaultConfig(workerNum) + WithPurgeTask(purgeInterval, workerLifetime)(conf) + + if conf.purgeInterval != purgeInterval { + t.Fatalf("conf.purgeInterval %d != purgeInterval %d", conf.purgeInterval, purgeInterval) + } + + if conf.workerLifetime != workerLifetime { + t.Fatalf("conf.workerLifetime %d != workerLifetime %d", conf.workerLifetime, workerLifetime) + } +} + // go test -v -cover -run=^TestWithNowFunc$ func TestWithNowFunc(t *testing.T) { workerNum := 16 From e6dd875e1988a6d495467f427c86a53a506797dc Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Mon, 30 Jun 2025 00:39:14 +0800 Subject: [PATCH 09/11] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E6=B8=85=E7=90=86?= =?UTF-8?q?=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- HISTORY.md | 6 +++++ _examples/basic.go | 6 ++--- _examples/purge.go | 58 ++++++++++++++++++++++++++++++++++++++++++ _examples/spin_lock.go | 2 +- executor_test.go | 12 ++++----- option.go | 4 +-- option_test.go | 6 ++--- 7 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 _examples/purge.go diff --git a/HISTORY.md b/HISTORY.md index 5f4adc8..09a0ce7 100755 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,11 @@ ## ✒ 历史版本的特性介绍 (Features in old versions) +### v0.2.5-alpha + +> 此版本发布于 2025-06-30 + +* 自动清理 worker 机制,避免浪费 worker 资源 + ### v0.2.4-alpha > 此版本发布于 2025-06-28 diff --git a/_examples/basic.go b/_examples/basic.go index 94594d5..4885d57 100644 --- a/_examples/basic.go +++ b/_examples/basic.go @@ -1,4 +1,4 @@ -// Copyright 2023 FishGoddess. All rights reserved. +// Copyright 2025s FishGoddess. All rights reserved. // Use of this source code is governed by a MIT style // license that can be found in the LICENSE file. @@ -17,7 +17,7 @@ func main() { for i := 0; i < 20; i++ { limiter.Go(func() { - fmt.Println("limiter --> ", time.Now()) + fmt.Printf("limiter --> %s\n", time.Now()) time.Sleep(time.Second) }) } @@ -30,7 +30,7 @@ func main() { for i := 0; i < 20; i++ { executor.Submit(func() { - fmt.Println("executor --> ", time.Now()) + fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) } diff --git a/_examples/purge.go b/_examples/purge.go new file mode 100644 index 0000000..76d3e87 --- /dev/null +++ b/_examples/purge.go @@ -0,0 +1,58 @@ +// Copyright 2025 FishGoddess. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "math/rand" + "time" + + "github.com/FishGoddess/goes" +) + +func watchExecutorWorkers(executor *goes.Executor) { + for { + fmt.Printf("workers --> %d\n", executor.AvailableWorkers()) + time.Sleep(time.Second) + } +} + +func main() { + // Creates a purge-active executor with 16 workers. + // The purge task will be executed every purge interval and purge workers not in lifetime. + purgeInterval := time.Minute + workerLifetime := time.Minute + + executor := goes.NewExecutor(16, goes.WithPurgeActive(purgeInterval, workerLifetime)) + defer executor.Close() + + go watchExecutorWorkers(executor) + + // Submit some tasks. + for i := 0; i < 200; i++ { + no := i + + executor.Submit(func() { + r := 3000 + rand.Intn(1000) + fmt.Printf("task %d --> %d\n", no, r) + time.Sleep(time.Duration(r) * time.Millisecond) + }) + } + + time.Sleep(time.Minute + 5*time.Second) + fmt.Println("Wait! You will see the workers are decreasing and increasing...") + + for i := 0; i < 10; i++ { + no := i + + executor.Submit(func() { + r := 10000 + rand.Intn(1000) + fmt.Printf("task %d --> %d\n", no, r) + time.Sleep(time.Duration(r) * time.Millisecond) + }) + } + + time.Sleep(2 * time.Minute) +} diff --git a/_examples/spin_lock.go b/_examples/spin_lock.go index 87cc047..bb4bbb3 100644 --- a/_examples/spin_lock.go +++ b/_examples/spin_lock.go @@ -29,5 +29,5 @@ func main() { } wg.Wait() - fmt.Println("total is %d", total) + fmt.Printf("total is %d\n", total) } diff --git a/executor_test.go b/executor_test.go index 90a1af1..7a9bc3c 100644 --- a/executor_test.go +++ b/executor_test.go @@ -22,7 +22,7 @@ func TestNewExecutor(t *testing.T) { } }() - executor := NewExecutor(workerNum, WithWorkerQueueSize(256), WithPurgeTask(time.Minute, time.Minute)) + executor := NewExecutor(workerNum, WithWorkerQueueSize(256), WithPurgeActive(time.Minute, time.Minute)) defer executor.Close() }) @@ -46,15 +46,15 @@ func TestNewExecutor(t *testing.T) { }) t.Run("purge task 1", func(t *testing.T) { - testCase(t, workerNum, WithPurgeTask(time.Millisecond, 0)) + testCase(t, workerNum, WithPurgeActive(time.Millisecond, 0)) }) t.Run("purge task 2", func(t *testing.T) { - testCase(t, workerNum, WithPurgeTask(0, time.Millisecond)) + testCase(t, workerNum, WithPurgeActive(0, time.Millisecond)) }) t.Run("purge task 3", func(t *testing.T) { - testCase(t, workerNum, WithPurgeTask(time.Millisecond, time.Millisecond)) + testCase(t, workerNum, WithPurgeActive(time.Millisecond, time.Millisecond)) }) } @@ -170,8 +170,8 @@ func TestExecutorSpawnWorker(t *testing.T) { } } -// go test -v -cover -run=^TestExecutorPurgeWorkers$ -func TestExecutorPurgeWorkers(t *testing.T) { +// go test -v -cover -run=^TestExecutorDynamicScaling$ +func TestExecutorDynamicScaling(t *testing.T) { testCase := func(t *testing.T, workerNum int) { executor := NewExecutor(workerNum) executor.conf.purgeInterval = time.Millisecond diff --git a/option.go b/option.go index bd61e9e..2924716 100644 --- a/option.go +++ b/option.go @@ -25,8 +25,8 @@ func WithWorkerQueueSize(size int) Option { } } -// WithPurgeTask sets the purge interval of executor and the lifetime of worker. -func WithPurgeTask(purgeInterval time.Duration, workerLifetime time.Duration) Option { +// WithPurgeActive sets the purge interval of executor and the lifetime of worker. +func WithPurgeActive(purgeInterval time.Duration, workerLifetime time.Duration) Option { return func(conf *config) { conf.purgeInterval = purgeInterval conf.workerLifetime = workerLifetime diff --git a/option_test.go b/option_test.go index f1032ee..de3c571 100644 --- a/option_test.go +++ b/option_test.go @@ -25,13 +25,13 @@ func TestWithWorkerQueueSize(t *testing.T) { } } -// go test -v -cover -run=^TestWithPurgeTask$ -func TestWithPurgeTask(t *testing.T) { +// go test -v -cover -run=^TestWithPurgeActive$ +func TestWithPurgeActive(t *testing.T) { workerNum := 16 purgeInterval := time.Minute workerLifetime := 3 * time.Minute conf := newDefaultConfig(workerNum) - WithPurgeTask(purgeInterval, workerLifetime)(conf) + WithPurgeActive(purgeInterval, workerLifetime)(conf) if conf.purgeInterval != purgeInterval { t.Fatalf("conf.purgeInterval %d != purgeInterval %d", conf.purgeInterval, purgeInterval) From f26023aa390fce0fda993a34c9e0a20c9bc44308 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Mon, 30 Jun 2025 00:51:41 +0800 Subject: [PATCH 10/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- executor.go | 5 ++--- executor_test.go | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor.go b/executor.go index db121de..bfbab7e 100644 --- a/executor.go +++ b/executor.go @@ -198,10 +198,9 @@ func (e *Executor) Close() { worker.Done() } - e.Wait() - - e.closeCh <- struct{}{} + close(e.closeCh) e.closed = true e.workers = nil e.scheduler.Set(nil) + e.Wait() } diff --git a/executor_test.go b/executor_test.go index 7a9bc3c..f3cbb50 100644 --- a/executor_test.go +++ b/executor_test.go @@ -177,6 +177,7 @@ func TestExecutorDynamicScaling(t *testing.T) { executor.conf.purgeInterval = time.Millisecond executor.conf.workerLifetime = 2 * time.Millisecond executor.runPurgeTask() + defer executor.Close() got := executor.AvailableWorkers() if got != 1 { From cbfe540ae1e5c2d42d3d0b3da8f26861572935b2 Mon Sep 17 00:00:00 2001 From: FishGoddess <1149062639@qq.com> Date: Mon, 30 Jun 2025 00:52:23 +0800 Subject: [PATCH 11/11] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.en.md | 4 ++-- README.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.en.md b/README.en.md index 23f26b0..2fc4abc 100755 --- a/README.en.md +++ b/README.en.md @@ -42,7 +42,7 @@ func main() { for i := 0; i < 20; i++ { limiter.Go(func() { - fmt.Println("limiter --> ", time.Now()) + fmt.Printf("limiter --> %s\n", time.Now()) time.Sleep(time.Second) }) } @@ -55,7 +55,7 @@ func main() { for i := 0; i < 20; i++ { executor.Submit(func() { - fmt.Println("executor --> ", time.Now()) + fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) } diff --git a/README.md b/README.md index e4dbc3f..3240461 100755 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ func main() { for i := 0; i < 20; i++ { limiter.Go(func() { - fmt.Println("limiter --> ", time.Now()) + fmt.Printf("limiter --> %s\n", time.Now()) time.Sleep(time.Second) }) } @@ -55,7 +55,7 @@ func main() { for i := 0; i < 20; i++ { executor.Submit(func() { - fmt.Println("executor --> ", time.Now()) + fmt.Printf("executor --> %s\n", time.Now()) time.Sleep(time.Second) }) }