Skip to content

Commit f9ebfcb

Browse files
authored
Merge pull request #522 from chaitin/feat/backend-vm-recycle-check-token-20260416
feat(host): 增加回收态虚拟机回收补偿逻辑
2 parents 7902d19 + 35aa214 commit f9ebfcb

6 files changed

Lines changed: 507 additions & 33 deletions

File tree

backend/biz/host/handler/v1/internal.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/chaitin/MonkeyCode/backend/domain"
2323
etypes "github.com/chaitin/MonkeyCode/backend/ent/types"
2424
"github.com/chaitin/MonkeyCode/backend/pkg/cvt"
25+
"github.com/chaitin/MonkeyCode/backend/pkg/entx"
2526
"github.com/chaitin/MonkeyCode/backend/pkg/lifecycle"
2627
"github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
2728
"github.com/chaitin/MonkeyCode/backend/pkg/ws"
@@ -33,6 +34,10 @@ type InternalHostHandler struct {
3334
repo domain.HostRepo
3435
teamRepo domain.TeamHostRepo
3536
redis *redis.Client
37+
getAgentToken agentTokenGetter
38+
limiter *redis.Client
39+
vmDeleter taskflow.VirtualMachiner
40+
skipSoftDelete func(context.Context) context.Context
3641
cache *cache.Cache
3742
taskLifecycle *lifecycle.Manager[uuid.UUID, consts.TaskStatus, lifecycle.TaskMetadata]
3843
hostUsecase domain.HostUsecase
@@ -43,16 +48,22 @@ type InternalHostHandler struct {
4348

4449
func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
4550
w := do.MustInvoke[*web.Web](i)
51+
tf := do.MustInvoke[taskflow.Clienter](i)
52+
rdb := do.MustInvoke[*redis.Client](i)
4653

4754
h := &InternalHostHandler{
4855
logger: do.MustInvoke[*slog.Logger](i).With("module", "InternalHostHandler"),
4956
repo: do.MustInvoke[domain.HostRepo](i),
5057
teamRepo: do.MustInvoke[domain.TeamHostRepo](i),
51-
redis: do.MustInvoke[*redis.Client](i),
58+
redis: rdb,
59+
getAgentToken: defaultAgentTokenGetter(rdb),
60+
limiter: rdb,
61+
vmDeleter: tf.VirtualMachiner(),
62+
skipSoftDelete: entx.SkipSoftDelete,
5263
cache: cache.New(15*time.Minute, 10*time.Minute),
64+
taskLifecycle: do.MustInvoke[*lifecycle.Manager[uuid.UUID, consts.TaskStatus, lifecycle.TaskMetadata]](i),
5365
hostUsecase: do.MustInvoke[domain.HostUsecase](i),
5466
taskConns: do.MustInvoke[*ws.TaskConn](i),
55-
taskLifecycle: do.MustInvoke[*lifecycle.Manager[uuid.UUID, consts.TaskStatus, lifecycle.TaskMetadata]](i),
5667
projectUsecase: do.MustInvoke[domain.ProjectUsecase](i),
5768
tokenProvider: do.MustInvoke[*gituc.TokenProvider](i),
5869
}
@@ -178,55 +189,46 @@ func (h *InternalHostHandler) CheckToken(c *web.Context, req taskflow.CheckToken
178189
func (h *InternalHostHandler) agentAuth(ctx context.Context, token, mid string) (*taskflow.Token, error) {
179190
// 1) 优先从 Redis 读取一次性 agent token,并清除
180191
key := fmt.Sprintf("agent:token:%s", token)
181-
luaGetDel := `
182-
local v = redis.call('GET', KEYS[1])
183-
if v then
184-
redis.call('DEL', KEYS[1])
185-
return v
186-
end
187-
return nil
188-
`
189-
res, err := h.redis.Eval(ctx, luaGetDel, []string{key}).Result()
190-
h.logger.With("mid", mid, "key", key, "res", res, "error", err).DebugContext(ctx, "agent auth...")
192+
res, err := h.getAgentToken(ctx, key)
193+
h.logger.With("mid", mid, "key", key, "hit", err == nil, "error", err).DebugContext(ctx, "agent auth...")
191194
if err == nil {
192-
if b, ok := res.(string); ok && b != "" {
193-
var t taskflow.Token
194-
if uerr := json.Unmarshal([]byte(b), &t); uerr != nil {
195-
h.logger.With("error", uerr, "token", token).ErrorContext(ctx, "failed to unmarshal token from redis")
196-
return nil, uerr
197-
}
195+
var t taskflow.Token
196+
if uerr := json.Unmarshal([]byte(res), &t); uerr != nil {
197+
h.logger.With("error", uerr, "token", token).ErrorContext(ctx, "failed to unmarshal token from redis")
198+
return nil, uerr
199+
}
198200

199-
if mid != "" {
200-
if err := h.repo.UpdateVirtualMachine(ctx, token, func(up *db.VirtualMachineUpdateOne) error {
201-
up.SetMachineID(mid)
202-
return nil
203-
}); err != nil {
204-
h.logger.With("error", err, "token", token).ErrorContext(ctx, "failed to update virtual machine machine id")
205-
return nil, err
206-
}
201+
if mid != "" {
202+
if err := h.repo.UpdateVirtualMachine(ctx, token, func(up *db.VirtualMachineUpdateOne) error {
203+
up.SetMachineID(mid)
204+
return nil
205+
}); err != nil {
206+
h.logger.With("error", err, "token", token).ErrorContext(ctx, "failed to update virtual machine machine id")
207+
return nil, err
207208
}
208-
209-
return &t, nil
210209
}
210+
211+
return &t, nil
211212
} else if !errors.Is(err, redis.Nil) {
212213
h.logger.With("error", err, "token", token).ErrorContext(ctx, "failed to get redis token via lua, fallback to db")
213214
}
214215

215216
// 2) Redis 没值时根据数据库校验 token
216-
vm, err := h.repo.GetVirtualMachine(ctx, token)
217+
vm, err := h.repo.GetVirtualMachine(h.skipSoftDelete(ctx), token)
217218
if err != nil {
218219
return nil, err
219220
}
220221

222+
if vm.IsRecycled {
223+
h.tryRecycledVMDelete(ctx, vm, mid)
224+
return nil, errAgentVMRecycled
225+
}
226+
221227
// 机器码绑定校验
222228
if mid != "" && vm.MachineID != "" && vm.MachineID != mid {
223229
return nil, fmt.Errorf("mismatch machine id")
224230
}
225231

226-
if vm.IsRecycled {
227-
return nil, fmt.Errorf("vm is recycled")
228-
}
229-
230232
if vm.Edges.Host == nil {
231233
return nil, fmt.Errorf("no host found for vm")
232234
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package v1
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9"
10+
11+
"github.com/chaitin/MonkeyCode/backend/db"
12+
"github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
13+
)
14+
15+
const (
16+
recycledDeleteRetryTTL = 30 * time.Second
17+
recycledDeleteTimeout = 5 * time.Second
18+
)
19+
20+
var errAgentVMRecycled = errors.New("agent vm is recycled")
21+
22+
type agentTokenGetter func(ctx context.Context, key string) (string, error)
23+
24+
func defaultAgentTokenGetter(rdb *redis.Client) agentTokenGetter {
25+
const luaGetDel = `
26+
local v = redis.call('GET', KEYS[1])
27+
if v then
28+
redis.call('DEL', KEYS[1])
29+
return v
30+
end
31+
return nil
32+
`
33+
return func(ctx context.Context, key string) (string, error) {
34+
res, err := rdb.Eval(ctx, luaGetDel, []string{key}).Result()
35+
if err != nil {
36+
return "", err
37+
}
38+
39+
b, ok := res.(string)
40+
if !ok || b == "" {
41+
return "", redis.Nil
42+
}
43+
return b, nil
44+
}
45+
}
46+
47+
func (h *InternalHostHandler) tryRecycledVMDelete(ctx context.Context, vm *db.VirtualMachine, machineID string) {
48+
if h.limiter == nil || h.vmDeleter == nil {
49+
h.logger.WarnContext(ctx, "skip recycled vm delete retry", "vm_id", vm.ID, "machine_id", machineID, "error", "missing dependency")
50+
return
51+
}
52+
53+
key := fmt.Sprintf("vm:recycle:retry:%s", vm.ID)
54+
ok, err := h.limiter.SetNX(ctx, key, "1", recycledDeleteRetryTTL).Result()
55+
if err != nil || !ok {
56+
h.logger.WarnContext(ctx, "skip recycled vm delete retry", "vm_id", vm.ID, "machine_id", machineID, "rate_limited", !ok, "error", err)
57+
return
58+
}
59+
60+
go func() {
61+
deleteCtx, cancel := context.WithTimeout(context.Background(), recycledDeleteTimeout)
62+
defer cancel()
63+
64+
err := h.vmDeleter.Delete(deleteCtx, &taskflow.DeleteVirtualMachineReq{
65+
UserID: vm.UserID.String(),
66+
HostID: vm.HostID,
67+
ID: vm.EnvironmentID,
68+
})
69+
if err != nil {
70+
h.logger.ErrorContext(deleteCtx, "reissue recycled vm delete failed", "vm_id", vm.ID, "machine_id", machineID, "error", err)
71+
return
72+
}
73+
h.logger.InfoContext(deleteCtx, "reissue recycled vm delete success", "vm_id", vm.ID, "machine_id", machineID)
74+
}()
75+
}

0 commit comments

Comments
 (0)