Skip to content

Commit 555e12d

Browse files
authored
Merge pull request #531 from chaitin/feat/task-last-active-time
feat(task): 增加任务最近活跃时间
2 parents 3cf3f2c + 0d6c7e3 commit 555e12d

File tree

21 files changed

+605
-38
lines changed

21 files changed

+605
-38
lines changed

backend/biz/task/handler/v1/task.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type TaskHandler struct {
4646
taskConns *ws.TaskConn
4747
controlConns *ws.ControlConn
4848
taskSummary *service.TaskSummaryService
49+
taskActivity service.TaskActivityRefresher
4950
idleRefresher vmidle.VMIdleRefresher
5051
activeRepo domain.UserActiveRepo
5152
}
@@ -64,6 +65,7 @@ func NewTaskHandler(i *do.Injector) (*TaskHandler, error) {
6465
tc := do.MustInvoke[*ws.TaskConn](i)
6566
cc := do.MustInvoke[*ws.ControlConn](i)
6667
ts := do.MustInvoke[*service.TaskSummaryService](i)
68+
ta := do.MustInvoke[service.TaskActivityRefresher](i)
6769
ir := do.MustInvoke[vmidle.VMIdleRefresher](i)
6870

6971
// Optional deps
@@ -91,6 +93,7 @@ func NewTaskHandler(i *do.Injector) (*TaskHandler, error) {
9193
taskConns: tc,
9294
controlConns: cc,
9395
taskSummary: ts,
96+
taskActivity: ta,
9497
idleRefresher: ir,
9598
activeRepo: activeRepo,
9699
}

backend/biz/task/handler/v1/task_control.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"golang.org/x/sync/errgroup"
1111

1212
"github.com/GoYoko/web"
13+
"github.com/google/uuid"
1314

15+
"github.com/chaitin/MonkeyCode/backend/biz/task/service"
1416
"github.com/chaitin/MonkeyCode/backend/consts"
1517
"github.com/chaitin/MonkeyCode/backend/domain"
1618
"github.com/chaitin/MonkeyCode/backend/middleware"
@@ -130,6 +132,9 @@ func (h *TaskHandler) Control(c *web.Context, req domain.TaskControlReq) error {
130132

131133
logger := h.logger.With("task_id", task.ID, "fn", "task.control")
132134
taskID := task.ID.String()
135+
if err := h.taskActivity.Refresh(c.Request().Context(), task.ID); err != nil {
136+
logger.WarnContext(c.Request().Context(), "failed to refresh task last active on control connect", "error", err)
137+
}
133138

134139
// 连接建立:刷新空闲计时器
135140
if vm := task.VirtualMachine; vm != nil {
@@ -182,7 +187,7 @@ func (h *TaskHandler) Control(c *web.Context, req domain.TaskControlReq) error {
182187
// 定期刷新空闲计时器,保持 VM 活跃
183188
if vm := task.VirtualMachine; vm != nil {
184189
g.Go(func() error {
185-
return h.controlKeepAlive(ctx, vm.ID)
190+
return h.controlKeepAlive(ctx, task.ID, vm.ID)
186191
})
187192
}
188193

@@ -211,17 +216,30 @@ func (h *TaskHandler) controlPing(ctx context.Context, wsConn *ws.WebsocketManag
211216
}
212217

213218
// controlKeepAlive 定期刷新空闲计时器,防止 VM 被误判空闲
214-
func (h *TaskHandler) controlKeepAlive(ctx context.Context, vmID string) error {
215-
ticker := time.NewTicker(1 * time.Minute)
216-
defer ticker.Stop()
219+
func (h *TaskHandler) controlKeepAlive(ctx context.Context, taskID uuid.UUID, vmID string) error {
220+
if err := h.idleRefresher.Refresh(ctx, vmID); err != nil {
221+
h.logger.WarnContext(ctx, "keepalive refresh failed", "vmID", vmID, "error", err)
222+
}
223+
if err := h.taskActivity.Refresh(ctx, taskID); err != nil {
224+
h.logger.WarnContext(ctx, "task activity refresh failed", "taskID", taskID, "error", err)
225+
}
226+
227+
idleTicker := time.NewTicker(1 * time.Minute)
228+
activityTicker := time.NewTicker(service.TaskActivityRefreshInterval)
229+
defer idleTicker.Stop()
230+
defer activityTicker.Stop()
217231
for {
218232
select {
219233
case <-ctx.Done():
220234
return ctx.Err()
221-
case <-ticker.C:
235+
case <-idleTicker.C:
222236
if err := h.idleRefresher.Refresh(ctx, vmID); err != nil {
223237
h.logger.WarnContext(ctx, "keepalive refresh failed", "vmID", vmID, "error", err)
224238
}
239+
case <-activityTicker.C:
240+
if err := h.taskActivity.Refresh(ctx, taskID); err != nil {
241+
h.logger.WarnContext(ctx, "task activity refresh failed", "taskID", taskID, "error", err)
242+
}
225243
}
226244
}
227245
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package v1
2+
3+
import (
4+
"context"
5+
"io"
6+
"log/slog"
7+
"testing"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
12+
"github.com/chaitin/MonkeyCode/backend/biz/task/service"
13+
)
14+
15+
func TestControlKeepAliveRefreshesImmediately(t *testing.T) {
16+
taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
17+
vmID := "vm-1"
18+
19+
idleRefresher := &testVMIdleRefresher{ch: make(chan string, 1)}
20+
taskActivity := &testTaskActivityRefresher{ch: make(chan uuid.UUID, 1)}
21+
handler := &TaskHandler{
22+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
23+
idleRefresher: idleRefresher,
24+
taskActivity: taskActivity,
25+
}
26+
27+
ctx, cancel := context.WithCancel(context.Background())
28+
defer cancel()
29+
30+
done := make(chan error, 1)
31+
go func() {
32+
done <- handler.controlKeepAlive(ctx, taskID, vmID)
33+
}()
34+
35+
select {
36+
case got := <-idleRefresher.ch:
37+
if got != vmID {
38+
t.Fatalf("idle refresher vm id = %q, want %q", got, vmID)
39+
}
40+
case <-time.After(100 * time.Millisecond):
41+
t.Fatal("expected idle refresher to run immediately")
42+
}
43+
44+
select {
45+
case got := <-taskActivity.ch:
46+
if got != taskID {
47+
t.Fatalf("task activity task id = %s, want %s", got, taskID)
48+
}
49+
case <-time.After(100 * time.Millisecond):
50+
t.Fatal("expected task activity refresher to run immediately")
51+
}
52+
53+
cancel()
54+
55+
select {
56+
case err := <-done:
57+
if err == nil {
58+
t.Fatal("controlKeepAlive() error = nil, want context canceled")
59+
}
60+
case <-time.After(time.Second):
61+
t.Fatal("controlKeepAlive() did not exit after cancel")
62+
}
63+
}
64+
65+
type testVMIdleRefresher struct {
66+
ch chan string
67+
}
68+
69+
func (r *testVMIdleRefresher) Refresh(_ context.Context, vmID string) error {
70+
select {
71+
case r.ch <- vmID:
72+
default:
73+
}
74+
return nil
75+
}
76+
77+
type testTaskActivityRefresher struct {
78+
ch chan uuid.UUID
79+
}
80+
81+
func (r *testTaskActivityRefresher) Refresh(_ context.Context, taskID uuid.UUID) error {
82+
select {
83+
case r.ch <- taskID:
84+
default:
85+
}
86+
return nil
87+
}
88+
89+
func (r *testTaskActivityRefresher) ForceRefresh(context.Context, uuid.UUID) error {
90+
return nil
91+
}
92+
93+
var _ service.TaskActivityRefresher = (*testTaskActivityRefresher)(nil)

backend/biz/task/register.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
func ProvideTask(i *do.Injector) {
1414
do.Provide(i, usecase.NewTaskUsecase)
1515
do.Provide(i, usecase.NewGitTaskUsecase)
16+
do.Provide(i, service.NewTaskActivityRefresher)
1617
do.Provide(i, service.NewTaskSummaryService)
1718
do.Provide(i, v1.NewTaskHandler)
1819
do.Provide(i, repo.NewTaskRepo)

backend/biz/task/repo/task.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,15 @@ func (t *TaskRepo) Update(ctx context.Context, _ *domain.User, id uuid.UUID, fn
261261
})
262262
}
263263

264+
func (t *TaskRepo) RefreshLastActiveAt(ctx context.Context, id uuid.UUID, at time.Time, minInterval time.Duration) error {
265+
up := t.db.Task.Update().Where(task.ID(id))
266+
if minInterval > 0 {
267+
up = up.Where(task.LastActiveAtLT(at.Add(-minInterval)))
268+
}
269+
_, err := up.SetLastActiveAt(at).Save(ctx)
270+
return err
271+
}
272+
264273
// Delete implements domain.TaskRepo.
265274
func (t *TaskRepo) Delete(ctx context.Context, user *domain.User, id uuid.UUID) error {
266275
_, err := t.db.Task.Delete().
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/google/uuid"
8+
"github.com/samber/do"
9+
10+
"github.com/chaitin/MonkeyCode/backend/domain"
11+
)
12+
13+
const TaskActivityRefreshInterval = 5 * time.Minute
14+
15+
type TaskActivityRefresher interface {
16+
Refresh(ctx context.Context, taskID uuid.UUID) error
17+
ForceRefresh(ctx context.Context, taskID uuid.UUID) error
18+
}
19+
20+
type taskActivityRefresher struct {
21+
repo taskActivityRepo
22+
clock func() time.Time
23+
}
24+
25+
type taskActivityRepo interface {
26+
RefreshLastActiveAt(ctx context.Context, id uuid.UUID, at time.Time, minInterval time.Duration) error
27+
}
28+
29+
func NewTaskActivityRefresher(i *do.Injector) (TaskActivityRefresher, error) {
30+
return &taskActivityRefresher{
31+
repo: do.MustInvoke[domain.TaskRepo](i),
32+
clock: time.Now,
33+
}, nil
34+
}
35+
36+
func (r *taskActivityRefresher) Refresh(ctx context.Context, taskID uuid.UUID) error {
37+
return r.repo.RefreshLastActiveAt(ctx, taskID, r.clock(), TaskActivityRefreshInterval)
38+
}
39+
40+
func (r *taskActivityRefresher) ForceRefresh(ctx context.Context, taskID uuid.UUID) error {
41+
return r.repo.RefreshLastActiveAt(ctx, taskID, r.clock(), 0)
42+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
)
10+
11+
func TestTaskActivityRefresherRefreshUsesThrottleInterval(t *testing.T) {
12+
taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
13+
now := time.Unix(1_700_000_000, 0).UTC()
14+
repo := &taskActivityRepoStub{}
15+
refresher := &taskActivityRefresher{
16+
repo: repo,
17+
clock: func() time.Time { return now },
18+
}
19+
20+
if err := refresher.Refresh(context.Background(), taskID); err != nil {
21+
t.Fatalf("Refresh() error = %v", err)
22+
}
23+
24+
if repo.taskID != taskID {
25+
t.Fatalf("task id = %s, want %s", repo.taskID, taskID)
26+
}
27+
if !repo.at.Equal(now) {
28+
t.Fatalf("refresh time = %s, want %s", repo.at, now)
29+
}
30+
if repo.minInterval != TaskActivityRefreshInterval {
31+
t.Fatalf("min interval = %s, want %s", repo.minInterval, TaskActivityRefreshInterval)
32+
}
33+
}
34+
35+
func TestTaskActivityRefresherForceRefreshBypassesThrottle(t *testing.T) {
36+
taskID := uuid.MustParse("22222222-2222-2222-2222-222222222222")
37+
now := time.Unix(1_700_000_100, 0).UTC()
38+
repo := &taskActivityRepoStub{}
39+
refresher := &taskActivityRefresher{
40+
repo: repo,
41+
clock: func() time.Time { return now },
42+
}
43+
44+
if err := refresher.ForceRefresh(context.Background(), taskID); err != nil {
45+
t.Fatalf("ForceRefresh() error = %v", err)
46+
}
47+
48+
if repo.taskID != taskID {
49+
t.Fatalf("task id = %s, want %s", repo.taskID, taskID)
50+
}
51+
if !repo.at.Equal(now) {
52+
t.Fatalf("refresh time = %s, want %s", repo.at, now)
53+
}
54+
if repo.minInterval != 0 {
55+
t.Fatalf("min interval = %s, want 0", repo.minInterval)
56+
}
57+
}
58+
59+
type taskActivityRepoStub struct {
60+
taskID uuid.UUID
61+
at time.Time
62+
minInterval time.Duration
63+
}
64+
65+
func (s *taskActivityRepoStub) RefreshLastActiveAt(_ context.Context, taskID uuid.UUID, at time.Time, minInterval time.Duration) error {
66+
s.taskID = taskID
67+
s.at = at
68+
s.minInterval = minInterval
69+
return nil
70+
}

0 commit comments

Comments
 (0)