Skip to content

Commit 40ddae6

Browse files
committed
feat: 实现文件管理接口
1 parent d80931c commit 40ddae6

File tree

11 files changed

+438
-0
lines changed

11 files changed

+438
-0
lines changed
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
package v1
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"io"
8+
"log/slog"
9+
"strings"
10+
"time"
11+
12+
"github.com/GoYoko/web"
13+
"github.com/labstack/echo/v4"
14+
"github.com/samber/do"
15+
16+
"github.com/chaitin/MonkeyCode/backend/domain"
17+
"github.com/chaitin/MonkeyCode/backend/errcode"
18+
"github.com/chaitin/MonkeyCode/backend/middleware"
19+
"github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
20+
)
21+
22+
// FileHandler VM 内文件管理处理器
23+
type FileHandler struct {
24+
logger *slog.Logger
25+
taskflow taskflow.Clienter
26+
usecase domain.HostUsecase
27+
}
28+
29+
// NewFileHandler 创建文件管理处理器
30+
func NewFileHandler(i *do.Injector) (*FileHandler, error) {
31+
w := do.MustInvoke[*web.Web](i)
32+
auth := do.MustInvoke[*middleware.AuthMiddleware](i)
33+
34+
f := &FileHandler{
35+
logger: do.MustInvoke[*slog.Logger](i).With("module", "handler.file"),
36+
taskflow: do.MustInvoke[taskflow.Clienter](i),
37+
usecase: do.MustInvoke[domain.HostUsecase](i),
38+
}
39+
40+
g := w.Group("/api/v1/users")
41+
g.Use(auth.Auth())
42+
43+
g.GET("/folders", web.BindHandler(f.ListFolder))
44+
g.POST("/folders", web.BindHandler(f.Mkdir))
45+
g.PUT("/files/move", web.BindHandler(f.Move))
46+
g.POST("/files/copy", web.BindHandler(f.Copy))
47+
g.DELETE("/files", web.BindHandler(f.Delete))
48+
g.PUT("/files/save", web.BindHandler(f.Save))
49+
g.POST("/files/upload", web.BaseHandler(f.Upload))
50+
g.GET("/files/download", web.BindHandler(f.Download))
51+
52+
return f, nil
53+
}
54+
55+
func wraperr(err error, path string) error {
56+
if err == nil {
57+
return err
58+
}
59+
if strings.Contains(err.Error(), "permission denied") {
60+
return errcode.ErrFilePermisionDenied.Wrap(err).WithParam("file", path)
61+
}
62+
if strings.Contains(err.Error(), "stream not found") {
63+
return errcode.ErrStreamDisconnect.Wrap(err)
64+
}
65+
if strings.Contains(err.Error(), "virtual_machine not found") {
66+
return errcode.ErrVmRemoved.Wrap(err)
67+
}
68+
return errcode.ErrFileOp.Wrap(err)
69+
}
70+
71+
// ListFolder 目录列表
72+
func (f *FileHandler) ListFolder(c *web.Context, req domain.FilePathReq) error {
73+
user := middleware.GetUser(c)
74+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
75+
fs, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
76+
ID: req.ID,
77+
Operate: taskflow.FileOpList,
78+
Path: req.Path,
79+
})
80+
if err != nil {
81+
return err
82+
}
83+
return c.Success(fs)
84+
}), req.Path)
85+
}
86+
87+
// Mkdir 创建目录
88+
func (f *FileHandler) Mkdir(c *web.Context, req domain.FilePathReq) error {
89+
user := middleware.GetUser(c)
90+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
91+
_, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
92+
ID: req.ID,
93+
Operate: taskflow.FileOpMkdir,
94+
Path: req.Path,
95+
})
96+
if err != nil {
97+
f.logger.With("error", err).ErrorContext(c.Request().Context(), "failed to mkdir")
98+
return err
99+
}
100+
return c.Success(nil)
101+
}), req.Path)
102+
}
103+
104+
// Move 移动文件/目录
105+
func (f *FileHandler) Move(c *web.Context, req domain.FileChangeReq) error {
106+
user := middleware.GetUser(c)
107+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
108+
_, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
109+
ID: req.ID,
110+
Operate: taskflow.FileOpMove,
111+
Source: req.Source,
112+
Target: req.Target,
113+
})
114+
if err != nil {
115+
return err
116+
}
117+
return c.Success(nil)
118+
}), req.Source)
119+
}
120+
121+
// Copy 复制文件/目录
122+
func (f *FileHandler) Copy(c *web.Context, req domain.FileChangeReq) error {
123+
user := middleware.GetUser(c)
124+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
125+
_, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
126+
ID: req.ID,
127+
Operate: taskflow.FileOpCopy,
128+
Source: req.Source,
129+
Target: req.Target,
130+
})
131+
if err != nil {
132+
return err
133+
}
134+
return c.Success(nil)
135+
}), req.Source)
136+
}
137+
138+
// Delete 删除文件/目录
139+
func (f *FileHandler) Delete(c *web.Context, req domain.FilePathReq) error {
140+
user := middleware.GetUser(c)
141+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
142+
_, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
143+
ID: req.ID,
144+
Operate: taskflow.FileOpDelete,
145+
Path: req.Path,
146+
})
147+
if err != nil {
148+
return err
149+
}
150+
return c.Success(nil)
151+
}), req.Path)
152+
}
153+
154+
// Save 保存文件内容
155+
func (f *FileHandler) Save(c *web.Context, req domain.FileSaveReq) error {
156+
user := middleware.GetUser(c)
157+
return wraperr(f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
158+
_, err := f.taskflow.FileManager().Operate(c.Request().Context(), taskflow.FileReq{
159+
ID: req.ID,
160+
Operate: taskflow.FileOpSave,
161+
Path: req.Path,
162+
Content: req.Content,
163+
})
164+
if err != nil {
165+
return err
166+
}
167+
return c.Success(nil)
168+
}), req.Path)
169+
}
170+
171+
// Upload 上传文件到 VM
172+
func (f *FileHandler) Upload(c *web.Context) error {
173+
id := c.QueryParam("id")
174+
path := c.QueryParam("path")
175+
f.logger.With("id", id, "path", path).DebugContext(c.Request().Context(), "upload file")
176+
177+
user := middleware.GetUser(c)
178+
if err := f.usecase.WithVMPermission(c.Request().Context(), user.ID, id, func(v *domain.VirtualMachine) error {
179+
return nil
180+
}); err != nil {
181+
return wraperr(err, path)
182+
}
183+
184+
fh, err := c.FormFile("file")
185+
if err != nil {
186+
return err
187+
}
188+
ff, err := fh.Open()
189+
if err != nil {
190+
return fmt.Errorf("failed to open file %s", err)
191+
}
192+
defer ff.Close()
193+
194+
ctx := c.Request().Context()
195+
const (
196+
uploadChunkSize = 1 * 1024 * 1024
197+
uploadQueueDepth = 16
198+
)
199+
data := make(chan []byte, uploadQueueDepth)
200+
errChan := make(chan error, 1)
201+
done := make(chan struct{})
202+
203+
// 创建可取消的 context 给 reader,确保 Upload 返回后能终止 reader
204+
readerCtx, cancelReader := context.WithCancel(ctx)
205+
defer cancelReader()
206+
207+
go func() {
208+
buf := make([]byte, uploadChunkSize)
209+
defer close(data)
210+
defer close(done)
211+
for {
212+
n, err := ff.Read(buf)
213+
if n > 0 {
214+
chunk := make([]byte, n)
215+
copy(chunk, buf[:n])
216+
select {
217+
case data <- chunk:
218+
case <-readerCtx.Done():
219+
return
220+
}
221+
}
222+
if err == nil {
223+
continue
224+
}
225+
if err == io.EOF {
226+
break
227+
}
228+
f.logger.With("error", err).ErrorContext(ctx, "failed to read upload file")
229+
select {
230+
case errChan <- err:
231+
default:
232+
}
233+
return
234+
}
235+
}()
236+
237+
uploadErr := f.taskflow.FileManager().Upload(ctx, taskflow.FileReq{
238+
ID: id,
239+
Path: path,
240+
}, data)
241+
242+
// Upload 返回后立即取消 reader,确保 goroutine 能退出
243+
cancelReader()
244+
245+
if uploadErr != nil {
246+
// Best effort: unblock a stalled reader so we can return the upload error promptly.
247+
if closeErr := ff.Close(); closeErr != nil {
248+
f.logger.With("error", closeErr).DebugContext(ctx, "failed to close upload file after cancel")
249+
}
250+
}
251+
252+
waitTimeout := 5 * time.Second
253+
if uploadErr != nil {
254+
waitTimeout = 200 * time.Millisecond
255+
}
256+
257+
// 等待 goroutine 完成(带超时保护)
258+
timer := time.NewTimer(waitTimeout)
259+
defer timer.Stop()
260+
select {
261+
case <-done:
262+
case <-timer.C:
263+
f.logger.ErrorContext(ctx, "timeout waiting for reader goroutine to exit")
264+
if uploadErr != nil {
265+
return wraperr(uploadErr, path)
266+
}
267+
return fmt.Errorf("timeout waiting for file reader to complete")
268+
}
269+
270+
// 检查文件读取错误
271+
select {
272+
case err := <-errChan:
273+
if uploadErr == nil {
274+
return wraperr(err, path)
275+
}
276+
// 两个错误都存在,优先返回 upload 错误
277+
f.logger.With("read_error", err, "upload_error", uploadErr).ErrorContext(ctx, "multiple errors during upload")
278+
return wraperr(uploadErr, path)
279+
default:
280+
}
281+
282+
if uploadErr != nil {
283+
return wraperr(uploadErr, path)
284+
}
285+
286+
return c.Success(nil)
287+
}
288+
289+
// Download 下载文件
290+
func (f *FileHandler) Download(c *web.Context, req domain.FilePathReq) error {
291+
user := middleware.GetUser(c)
292+
if err := f.usecase.WithVMPermission(c.Request().Context(), user.ID, req.ID, func(v *domain.VirtualMachine) error {
293+
return nil
294+
}); err != nil {
295+
if strings.Contains(err.Error(), "virtual_machine not found") {
296+
c.Response().Header().Set("X-Internal-Error", base64.StdEncoding.EncodeToString([]byte("开发环境已被回收")))
297+
return errcode.ErrVmRemoved.Wrap(err)
298+
}
299+
c.Response().Header().Set("X-Internal-Error", base64.StdEncoding.EncodeToString([]byte(err.Error())))
300+
return errcode.ErrFilePermisionDenied.Wrap(err).WithParam("file", req.Path)
301+
}
302+
303+
c.Response().Header().Set("Content-Disposition", "attachment; filename=\"\"")
304+
c.Response().Header().Set(echo.HeaderContentType, "application/octet-stream")
305+
306+
err := f.taskflow.FileManager().Download(c.Request().Context(), taskflow.FileReq{
307+
ID: req.ID,
308+
Path: req.Path,
309+
}, func(size uint64, b []byte) error {
310+
f.logger.With("size", size, "len", len(b)).DebugContext(c.Request().Context(), "download file chunk")
311+
if size > 0 {
312+
c.Response().Header().Set(echo.HeaderContentLength, fmt.Sprintf("%d", size))
313+
}
314+
if len(b) > 0 {
315+
if _, err := c.Response().Writer.Write(b); err != nil {
316+
return err
317+
}
318+
}
319+
return nil
320+
})
321+
if err != nil {
322+
c.Response().Header().Set("X-Internal-Error", base64.StdEncoding.EncodeToString([]byte(err.Error())))
323+
f.logger.With("error", err, "req", req).ErrorContext(c.Request().Context(), "failed to download file")
324+
return err
325+
}
326+
327+
return nil
328+
}

backend/biz/file/register.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package file
2+
3+
import (
4+
"github.com/samber/do"
5+
6+
v1 "github.com/chaitin/MonkeyCode/backend/biz/file/handler/v1"
7+
)
8+
9+
// RegisterFile 注册文件管理模块
10+
func RegisterFile(i *do.Injector) {
11+
do.Provide(i, v1.NewFileHandler)
12+
do.MustInvoke[*v1.FileHandler](i)
13+
}

backend/biz/register.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package biz
33
import (
44
"github.com/samber/do"
55

6+
"github.com/chaitin/MonkeyCode/backend/biz/file"
67
"github.com/chaitin/MonkeyCode/backend/biz/git"
78
"github.com/chaitin/MonkeyCode/backend/biz/host"
89
"github.com/chaitin/MonkeyCode/backend/biz/notify"
@@ -38,5 +39,8 @@ func RegisterAll(i *do.Injector) error {
3839
// 注册 host 模块
3940
host.RegisterHost(i)
4041

42+
// 注册 file 模块(依赖 HostUsecase)
43+
file.RegisterFile(i)
44+
4145
return nil
4246
}

backend/biz/task/handler/v1/task.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,31 @@ func NewTaskHandler(i *do.Injector) (*TaskHandler, error) {
8383
v1.GET("/stream", web.BindHandler(h.Stream))
8484
v1.POST("", web.BindHandler(h.Create))
8585
v1.PUT("/stop", web.BindHandler(h.Stop))
86+
v1.DELETE("/:id", web.BindHandler(h.Delete))
8687

8788
return h, nil
8889
}
8990

91+
// Delete 删除任务
92+
//
93+
// @Summary 删除任务
94+
// @Description 删除任务。任务处于运行中(pending/processing)或虚拟机仍在线时不允许删除。
95+
// @Tags 【用户】任务管理
96+
// @Accept json
97+
// @Produce json
98+
// @Security MonkeyCodeAIAuth
99+
// @Param id path string true "任务 ID"
100+
// @Success 200 {object} web.Resp{} "成功"
101+
// @Failure 500 {object} web.Resp "服务器内部错误"
102+
// @Router /api/v1/users/tasks/{id} [delete]
103+
func (h *TaskHandler) Delete(c *web.Context, req domain.IDReq[uuid.UUID]) error {
104+
user := middleware.GetUser(c)
105+
if err := h.usecase.Delete(c.Request().Context(), user, req.ID); err != nil {
106+
return err
107+
}
108+
return c.Success(nil)
109+
}
110+
90111
// Stop 停止任务
91112
//
92113
// @Summary 停止任务

0 commit comments

Comments
 (0)