Skip to content

Commit 0d6c7e3

Browse files
committed
fix(task): 立即刷新 control 保活状态
1 parent 2211849 commit 0d6c7e3

2 files changed

Lines changed: 100 additions & 0 deletions

File tree

backend/biz/task/handler/v1/task_control.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,13 @@ func (h *TaskHandler) controlPing(ctx context.Context, wsConn *ws.WebsocketManag
217217

218218
// controlKeepAlive 定期刷新空闲计时器,防止 VM 被误判空闲
219219
func (h *TaskHandler) controlKeepAlive(ctx context.Context, taskID uuid.UUID, vmID string) error {
220+
if err := h.idleRefresher.Refresh(ctx, vmID); err != nil {
221+
h.logger.WarnContext(ctx, "keepalive refresh failed", "vmID", vmID, "error", err)
222+
}
223+
if err := h.taskActivity.Refresh(ctx, taskID); err != nil {
224+
h.logger.WarnContext(ctx, "task activity refresh failed", "taskID", taskID, "error", err)
225+
}
226+
220227
idleTicker := time.NewTicker(1 * time.Minute)
221228
activityTicker := time.NewTicker(service.TaskActivityRefreshInterval)
222229
defer idleTicker.Stop()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package v1
2+
3+
import (
4+
"context"
5+
"io"
6+
"log/slog"
7+
"testing"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
12+
"github.com/chaitin/MonkeyCode/backend/biz/task/service"
13+
)
14+
15+
func TestControlKeepAliveRefreshesImmediately(t *testing.T) {
16+
taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
17+
vmID := "vm-1"
18+
19+
idleRefresher := &testVMIdleRefresher{ch: make(chan string, 1)}
20+
taskActivity := &testTaskActivityRefresher{ch: make(chan uuid.UUID, 1)}
21+
handler := &TaskHandler{
22+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
23+
idleRefresher: idleRefresher,
24+
taskActivity: taskActivity,
25+
}
26+
27+
ctx, cancel := context.WithCancel(context.Background())
28+
defer cancel()
29+
30+
done := make(chan error, 1)
31+
go func() {
32+
done <- handler.controlKeepAlive(ctx, taskID, vmID)
33+
}()
34+
35+
select {
36+
case got := <-idleRefresher.ch:
37+
if got != vmID {
38+
t.Fatalf("idle refresher vm id = %q, want %q", got, vmID)
39+
}
40+
case <-time.After(100 * time.Millisecond):
41+
t.Fatal("expected idle refresher to run immediately")
42+
}
43+
44+
select {
45+
case got := <-taskActivity.ch:
46+
if got != taskID {
47+
t.Fatalf("task activity task id = %s, want %s", got, taskID)
48+
}
49+
case <-time.After(100 * time.Millisecond):
50+
t.Fatal("expected task activity refresher to run immediately")
51+
}
52+
53+
cancel()
54+
55+
select {
56+
case err := <-done:
57+
if err == nil {
58+
t.Fatal("controlKeepAlive() error = nil, want context canceled")
59+
}
60+
case <-time.After(time.Second):
61+
t.Fatal("controlKeepAlive() did not exit after cancel")
62+
}
63+
}
64+
65+
type testVMIdleRefresher struct {
66+
ch chan string
67+
}
68+
69+
func (r *testVMIdleRefresher) Refresh(_ context.Context, vmID string) error {
70+
select {
71+
case r.ch <- vmID:
72+
default:
73+
}
74+
return nil
75+
}
76+
77+
type testTaskActivityRefresher struct {
78+
ch chan uuid.UUID
79+
}
80+
81+
func (r *testTaskActivityRefresher) Refresh(_ context.Context, taskID uuid.UUID) error {
82+
select {
83+
case r.ch <- taskID:
84+
default:
85+
}
86+
return nil
87+
}
88+
89+
func (r *testTaskActivityRefresher) ForceRefresh(context.Context, uuid.UUID) error {
90+
return nil
91+
}
92+
93+
var _ service.TaskActivityRefresher = (*testTaskActivityRefresher)(nil)

0 commit comments

Comments
 (0)