Skip to content

Commit 47b2313

Browse files
committed
♻️ AgentTaskRunRepo 改用 OPFS 存储
AgentTaskRun 原本存在 chrome.storage.local(键前缀 agent_task_run:), 存在几个问题:配额压力、listRuns 全表扫描、缓存常驻内存。 改为 OPFS 后: - 存储路径:agents/task_runs/{taskId}.json(内含 run[] 降序) - listRuns O(全表) → O(单文件 parse) - 环形缓冲:每任务保留最近 100 条(append 时自动裁剪) - Agent 体系存储后端统一到 OPFS 改动: - agent_task.ts: AgentTaskRunRepo extends Repo → extends OPFSRepo - task_scheduler.ts: updateRun 签名加 taskId 参数 - 测试:添加 OPFS mock + 2 个新用例(环形缓冲 / 更新不存在 id) Agent 功能尚未发布,无需考虑旧数据迁移。 Typecheck 0 错误,1736 测试全绿(+2 新用例)
1 parent 7e2450f commit 47b2313

4 files changed

Lines changed: 208 additions & 38 deletions

File tree

src/app/repo/agent_task.test.ts

Lines changed: 117 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,82 @@
1-
import { describe, expect, it, beforeEach } from "vitest";
1+
import { describe, expect, it, vi, beforeEach } from "vitest";
22
import { AgentTaskRepo, AgentTaskRunRepo } from "./agent_task";
33
import type { AgentTask, AgentTaskRun } from "@App/app/service/agent/core/types";
44

5+
// Mock OPFS 文件系统
6+
function createMockOPFS() {
7+
function createMockWritable() {
8+
let data: any = null;
9+
return {
10+
write: vi.fn(async (content: any) => {
11+
data = content;
12+
}),
13+
close: vi.fn(async () => {}),
14+
getData: () => data,
15+
};
16+
}
17+
18+
function createMockFileHandle(name: string, dir: Map<string, any>) {
19+
return {
20+
kind: "file" as const,
21+
getFile: vi.fn(async () => {
22+
const content = dir.get(name);
23+
if (typeof content === "string") return new Blob([content], { type: "application/json" });
24+
return new Blob([""], { type: "application/json" });
25+
}),
26+
createWritable: vi.fn(async () => {
27+
const writable = createMockWritable();
28+
const origClose = writable.close;
29+
writable.close = vi.fn(async () => {
30+
const written = writable.getData();
31+
dir.set(name, written);
32+
await origClose();
33+
});
34+
return writable;
35+
}),
36+
};
37+
}
38+
39+
function createMockDirHandle(store: Map<string, any>): any {
40+
return {
41+
kind: "directory" as const,
42+
getDirectoryHandle: vi.fn(async (name: string, opts?: { create?: boolean }) => {
43+
if (!store.has("__dir__" + name)) {
44+
if (opts?.create) {
45+
store.set("__dir__" + name, new Map());
46+
} else {
47+
throw new Error("Not found");
48+
}
49+
}
50+
return createMockDirHandle(store.get("__dir__" + name));
51+
}),
52+
getFileHandle: vi.fn(async (name: string, opts?: { create?: boolean }) => {
53+
if (!store.has(name) && !opts?.create) {
54+
throw new Error("Not found");
55+
}
56+
if (!store.has(name)) {
57+
store.set(name, "");
58+
}
59+
return createMockFileHandle(name, store);
60+
}),
61+
removeEntry: vi.fn(async (name: string) => {
62+
store.delete(name);
63+
store.delete("__dir__" + name);
64+
}),
65+
};
66+
}
67+
68+
const rootStore = new Map<string, any>();
69+
const mockRoot = createMockDirHandle(rootStore);
70+
71+
Object.defineProperty(navigator, "storage", {
72+
value: {
73+
getDirectory: vi.fn(async () => mockRoot),
74+
},
75+
configurable: true,
76+
writable: true,
77+
});
78+
}
79+
580
function makeTask(overrides: Partial<AgentTask> = {}): AgentTask {
681
return {
782
id: "task-1",
@@ -26,45 +101,46 @@ function makeRun(overrides: Partial<AgentTaskRun> = {}): AgentTaskRun {
26101
};
27102
}
28103

29-
describe.concurrent("AgentTaskRepo", () => {
104+
describe("AgentTaskRepo", () => {
30105
let repo: AgentTaskRepo;
31106

32107
beforeEach(() => {
108+
createMockOPFS();
33109
repo = new AgentTaskRepo();
34110
});
35111

36-
it.concurrent("saveTask / getTask CRUD", async () => {
37-
const task = makeTask();
112+
it("saveTask / getTask CRUD", async () => {
113+
const task = makeTask({ id: "crud-task" });
38114
await repo.saveTask(task);
39-
const result = await repo.getTask("task-1");
115+
const result = await repo.getTask("crud-task");
40116
expect(result).toBeDefined();
41117
expect(result!.name).toBe("测试任务");
42118
});
43119

44-
it.concurrent("listTasks 返回所有任务", async () => {
45-
await repo.saveTask(makeTask({ id: "t-a", name: "A" }));
46-
await repo.saveTask(makeTask({ id: "t-b", name: "B" }));
120+
it("listTasks 返回所有任务", async () => {
121+
await repo.saveTask(makeTask({ id: "list-a", name: "A" }));
122+
await repo.saveTask(makeTask({ id: "list-b", name: "B" }));
47123
const list = await repo.listTasks();
48124
expect(list.length).toBeGreaterThanOrEqual(2);
49125
const names = list.map((t) => t.name);
50126
expect(names).toContain("A");
51127
expect(names).toContain("B");
52128
});
53129

54-
it.concurrent("removeTask 删除任务", async () => {
130+
it("removeTask 删除任务", async () => {
55131
const task = makeTask({ id: "t-del" });
56132
await repo.saveTask(task);
57133
await repo.removeTask("t-del");
58134
const result = await repo.getTask("t-del");
59135
expect(result).toBeUndefined();
60136
});
61137

62-
it.concurrent("removeTask 同时清理关联的 runs", async () => {
138+
it("removeTask 同时清理关联的 runs", async () => {
63139
const taskId = "t-clean";
64140
await repo.saveTask(makeTask({ id: taskId }));
65141
const runRepo = new AgentTaskRunRepo();
66-
await runRepo.appendRun(makeRun({ id: "r1", taskId }));
67-
await runRepo.appendRun(makeRun({ id: "r2", taskId }));
142+
await runRepo.appendRun(makeRun({ id: "r1", taskId, starttime: 1000 }));
143+
await runRepo.appendRun(makeRun({ id: "r2", taskId, starttime: 2000 }));
68144

69145
await repo.removeTask(taskId);
70146

@@ -73,28 +149,29 @@ describe.concurrent("AgentTaskRepo", () => {
73149
});
74150
});
75151

76-
describe.concurrent("AgentTaskRunRepo", () => {
152+
describe("AgentTaskRunRepo", () => {
77153
let repo: AgentTaskRunRepo;
78154

79155
beforeEach(() => {
156+
createMockOPFS();
80157
repo = new AgentTaskRunRepo();
81158
});
82159

83-
it.concurrent("appendRun / listRuns", async () => {
160+
it("appendRun / listRuns 按 starttime 降序", async () => {
84161
const taskId = "task-run-test";
85162
await repo.appendRun(makeRun({ id: "r-a", taskId, starttime: 1000 }));
86163
await repo.appendRun(makeRun({ id: "r-b", taskId, starttime: 2000 }));
87164
await repo.appendRun(makeRun({ id: "r-c", taskId, starttime: 3000 }));
88165

89166
const runs = await repo.listRuns(taskId);
90167
expect(runs.length).toBe(3);
91-
// 按 starttime 降序
168+
// 按 starttime 降序(最新在前)
92169
expect(runs[0].id).toBe("r-c");
93170
expect(runs[1].id).toBe("r-b");
94171
expect(runs[2].id).toBe("r-a");
95172
});
96173

97-
it.concurrent("listRuns 限制返回条数", async () => {
174+
it("listRuns 限制返回条数", async () => {
98175
const taskId = "task-limit";
99176
for (let i = 0; i < 5; i++) {
100177
await repo.appendRun(makeRun({ id: `rl-${i}`, taskId, starttime: i * 1000 }));
@@ -103,7 +180,7 @@ describe.concurrent("AgentTaskRunRepo", () => {
103180
expect(runs.length).toBe(3);
104181
});
105182

106-
it.concurrent("clearRuns 清理指定任务的运行历史", async () => {
183+
it("clearRuns 清理指定任务的运行历史", async () => {
107184
const taskId = "task-clear";
108185
await repo.appendRun(makeRun({ id: "rc-1", taskId }));
109186
await repo.appendRun(makeRun({ id: "rc-2", taskId }));
@@ -119,11 +196,31 @@ describe.concurrent("AgentTaskRunRepo", () => {
119196
expect(otherRuns).toHaveLength(1);
120197
});
121198

122-
it.concurrent("updateRun 更新运行状态", async () => {
199+
it("updateRun 更新运行状态", async () => {
123200
await repo.appendRun(makeRun({ id: "r-upd", taskId: "t-upd" }));
124-
await repo.updateRun("r-upd", { status: "success", endtime: Date.now() });
201+
await repo.updateRun("t-upd", "r-upd", { status: "success", endtime: 99999 });
125202
const runs = await repo.listRuns("t-upd");
126203
expect(runs[0].status).toBe("success");
127-
expect(runs[0].endtime).toBeDefined();
204+
expect(runs[0].endtime).toBe(99999);
205+
});
206+
207+
it("updateRun 找不到 id 时静默忽略", async () => {
208+
await repo.appendRun(makeRun({ id: "r-exists", taskId: "t-miss" }));
209+
await repo.updateRun("t-miss", "r-nonexistent", { status: "error" });
210+
const runs = await repo.listRuns("t-miss");
211+
expect(runs).toHaveLength(1);
212+
expect(runs[0].status).toBe("running");
213+
});
214+
215+
it("appendRun 超过 MAX_RUNS_PER_TASK 时裁剪最老记录", async () => {
216+
const taskId = "task-ring";
217+
for (let i = 0; i < 105; i++) {
218+
await repo.appendRun(makeRun({ id: `rr-${i}`, taskId, starttime: i }));
219+
}
220+
const runs = await repo.listRuns(taskId, 200);
221+
expect(runs.length).toBe(100);
222+
// 最新的在前,最老 5 条被裁剪掉(rr-0 ~ rr-4)
223+
expect(runs[0].id).toBe("rr-104");
224+
expect(runs[99].id).toBe("rr-5");
128225
});
129226
});

src/app/repo/agent_task.ts

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { AgentTask, AgentTaskRun } from "@App/app/service/agent/core/types";
22
import { Repo } from "./repo";
3+
import { OPFSRepo } from "./opfs_repo";
34

45
export class AgentTaskRepo extends Repo<AgentTask> {
56
constructor() {
@@ -27,32 +28,41 @@ export class AgentTaskRepo extends Repo<AgentTask> {
2728
}
2829
}
2930

30-
export class AgentTaskRunRepo extends Repo<AgentTaskRun> {
31+
const MAX_RUNS_PER_TASK = 100;
32+
33+
export class AgentTaskRunRepo extends OPFSRepo {
3134
constructor() {
32-
super("agent_task_run:");
33-
this.enableCache();
35+
super("task_runs");
36+
}
37+
38+
private filename(taskId: string): string {
39+
return `${taskId}.json`;
3440
}
3541

3642
async appendRun(run: AgentTaskRun): Promise<void> {
37-
await this._save(run.id, run);
43+
const runs = await this.readJsonFile<AgentTaskRun[]>(this.filename(run.taskId), []);
44+
runs.unshift(run);
45+
// 环形缓冲:超过上限时裁剪最老的记录
46+
if (runs.length > MAX_RUNS_PER_TASK) {
47+
runs.length = MAX_RUNS_PER_TASK;
48+
}
49+
await this.writeJsonFile(this.filename(run.taskId), runs);
3850
}
3951

40-
async updateRun(id: string, data: Partial<AgentTaskRun>): Promise<void> {
41-
await this.update(id, data);
52+
async updateRun(taskId: string, id: string, data: Partial<AgentTaskRun>): Promise<void> {
53+
const runs = await this.readJsonFile<AgentTaskRun[]>(this.filename(taskId), []);
54+
const idx = runs.findIndex((r) => r.id === id);
55+
if (idx < 0) return;
56+
Object.assign(runs[idx], data);
57+
await this.writeJsonFile(this.filename(taskId), runs);
4258
}
4359

4460
async listRuns(taskId: string, limit = 50): Promise<AgentTaskRun[]> {
45-
const all = await this.find((_key, value) => value.taskId === taskId);
46-
// 按 starttime 降序排列
47-
all.sort((a, b) => b.starttime - a.starttime);
48-
return all.slice(0, limit);
61+
const runs = await this.readJsonFile<AgentTaskRun[]>(this.filename(taskId), []);
62+
return runs.slice(0, limit);
4963
}
5064

5165
async clearRuns(taskId: string): Promise<void> {
52-
const all = await this.find((_key, value) => value.taskId === taskId);
53-
if (all.length > 0) {
54-
const keys = all.map((r) => r.id);
55-
await this.deletes(keys);
56-
}
66+
await this.deleteFile(this.filename(taskId));
5767
}
5868
}

src/app/service/agent/core/task_scheduler.test.ts

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,67 @@ import { AgentTaskScheduler } from "./task_scheduler";
33
import { AgentTaskRepo, AgentTaskRunRepo } from "@App/app/repo/agent_task";
44
import type { AgentTask } from "@App/app/service/agent/core/types";
55

6+
// Mock OPFS 文件系统(AgentTaskRunRepo 使用 OPFS 存储)
7+
function createMockOPFS() {
8+
function createMockWritable() {
9+
let data: any = null;
10+
return {
11+
write: vi.fn(async (content: any) => {
12+
data = content;
13+
}),
14+
close: vi.fn(async () => {}),
15+
getData: () => data,
16+
};
17+
}
18+
function createMockFileHandle(name: string, dir: Map<string, any>) {
19+
return {
20+
kind: "file" as const,
21+
getFile: vi.fn(async () => {
22+
const content = dir.get(name);
23+
if (typeof content === "string") return new Blob([content], { type: "application/json" });
24+
return new Blob([""], { type: "application/json" });
25+
}),
26+
createWritable: vi.fn(async () => {
27+
const writable = createMockWritable();
28+
const origClose = writable.close;
29+
writable.close = vi.fn(async () => {
30+
dir.set(name, writable.getData());
31+
await origClose();
32+
});
33+
return writable;
34+
}),
35+
};
36+
}
37+
function createMockDirHandle(store: Map<string, any>): any {
38+
return {
39+
kind: "directory" as const,
40+
getDirectoryHandle: vi.fn(async (name: string, opts?: { create?: boolean }) => {
41+
if (!store.has("__dir__" + name)) {
42+
if (opts?.create) store.set("__dir__" + name, new Map());
43+
else throw new Error("Not found");
44+
}
45+
return createMockDirHandle(store.get("__dir__" + name));
46+
}),
47+
getFileHandle: vi.fn(async (name: string, opts?: { create?: boolean }) => {
48+
if (!store.has(name) && !opts?.create) throw new Error("Not found");
49+
if (!store.has(name)) store.set(name, "");
50+
return createMockFileHandle(name, store);
51+
}),
52+
removeEntry: vi.fn(async (name: string) => {
53+
store.delete(name);
54+
store.delete("__dir__" + name);
55+
}),
56+
};
57+
}
58+
const rootStore = new Map<string, any>();
59+
const mockRoot = createMockDirHandle(rootStore);
60+
Object.defineProperty(navigator, "storage", {
61+
value: { getDirectory: vi.fn(async () => mockRoot) },
62+
configurable: true,
63+
writable: true,
64+
});
65+
}
66+
667
function makeTask(overrides: Partial<AgentTask> = {}): AgentTask {
768
return {
869
id: "task-1",
@@ -29,6 +90,7 @@ describe("AgentTaskScheduler", () => {
2990
let scheduler: AgentTaskScheduler;
3091

3192
beforeEach(() => {
93+
createMockOPFS();
3294
repo = new AgentTaskRepo();
3395
runRepo = new AgentTaskRunRepo();
3496
internalExecutor = vi
@@ -88,10 +150,11 @@ describe("AgentTaskScheduler", () => {
88150

89151
await scheduler.tick();
90152

91-
// 等待第一次执行开始
153+
// 等待 executor 被调用(表明已经过 appendRun)
92154
await vi.waitFor(() => {
93-
expect(scheduler.isRunning("running-1")).toBe(true);
155+
expect(internalExecutor).toHaveBeenCalledTimes(1);
94156
});
157+
expect(scheduler.isRunning("running-1")).toBe(true);
95158

96159
// 再次 tick 不应重复执行
97160
await scheduler.tick();

src/app/service/agent/core/task_scheduler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export class AgentTaskScheduler {
8989
task.lastRunError = run.error;
9090
} finally {
9191
// 更新 run 记录
92-
await this.runRepo.updateRun(run.id, {
92+
await this.runRepo.updateRun(task.id, run.id, {
9393
status: run.status,
9494
endtime: run.endtime,
9595
error: run.error,

0 commit comments

Comments
 (0)