Skip to content

Commit 82842fc

Browse files
committed
feat: 完善 FSC Gateway Daemon,添加 CLI 接口(submit/query/status)和任务存储
1 parent f2f7d5e commit 82842fc

1 file changed

Lines changed: 105 additions & 6 deletions

File tree

fsc/fsc-gateway-daemon.ts

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@
77
* - 分发任务到 Redis 队列
88
* - 收集结果
99
* - Session 管理
10+
* - CLI 接口:submit/query/status
1011
*/
1112

1213
import { createClient } from 'redis';
1314
import winston from 'winston';
15+
import { randomUUID } from 'crypto';
1416

1517
// ============ 配置 ============
1618
const REDIS_HOST = process.env.REDIS_HOST || '127.0.0.1';
1719
const REDIS_PORT = parseInt(process.env.REDIS_PORT || '6379');
1820
const TASK_QUEUE = 'fsc:task_queue';
1921
const RESULT_QUEUE = 'fsc:result_queue';
2022
const FAILED_QUEUE = 'fsc:failed_tasks';
23+
const TASK_STORE_PREFIX = 'fsc:task:';
24+
const RESULT_STORE_PREFIX = 'fsc:result:';
2125

2226
// ============ Logger ============
2327
const logger = winston.createLogger({
@@ -72,9 +76,42 @@ interface TaskResult {
7276
timestamp: number;
7377
}
7478

79+
// ============ 存储任务和结果 ============
80+
async function storeTask(task: Task) {
81+
await redis.set(`${TASK_STORE_PREFIX}${task.id}`, JSON.stringify({
82+
...task,
83+
createdAt: Date.now(),
84+
status: 'queued'
85+
}));
86+
}
87+
88+
async function getTask(taskId: string): Promise<Task & { createdAt: number; status: string } | null> {
89+
const data = await redis.get(`${TASK_STORE_PREFIX}${taskId}`);
90+
return data ? JSON.parse(data) : null;
91+
}
92+
93+
async function storeResult(result: TaskResult) {
94+
await redis.set(`${RESULT_STORE_PREFIX}${result.taskId}`, JSON.stringify(result));
95+
// 更新任务状态
96+
const task = await getTask(result.taskId);
97+
if (task) {
98+
await redis.set(`${TASK_STORE_PREFIX}${result.taskId}`, JSON.stringify({
99+
...task,
100+
status: result.status,
101+
completedAt: result.timestamp
102+
}));
103+
}
104+
}
105+
106+
async function getResult(taskId: string): Promise<TaskResult | null> {
107+
const data = await redis.get(`${RESULT_STORE_PREFIX}${taskId}`);
108+
return data ? JSON.parse(data) : null;
109+
}
110+
75111
// ============ 提交任务 ============
76112
async function submitTask(task: Task): Promise<string> {
77113
logger.info(`[Gateway] Submitting task ${task.id}`);
114+
await storeTask(task);
78115
await redis.rPush(TASK_QUEUE, JSON.stringify(task));
79116
return task.id;
80117
}
@@ -96,8 +133,8 @@ async function resultCollectorLoop() {
96133
const taskResult = JSON.parse(result.element) as TaskResult;
97134
logger.info(`[Gateway] Received result for task ${taskResult.taskId}: ${taskResult.status}`);
98135

99-
// TODO: 持久化结果、通知订阅者、更新 Session
100-
// 暂时先打日志
136+
// 存储结果
137+
await storeResult(taskResult);
101138
} catch (error) {
102139
logger.error('Result collector error:', error);
103140
await new Promise(resolve => setTimeout(resolve, 1000));
@@ -107,6 +144,70 @@ async function resultCollectorLoop() {
107144
logger.info('Result collector exited');
108145
}
109146

147+
// ============ CLI 接口 ============
148+
async function handleCli() {
149+
const args = process.argv.slice(2);
150+
151+
if (args.length === 0) {
152+
// 启动 daemon 模式
153+
await main();
154+
return;
155+
}
156+
157+
// CLI 命令模式
158+
await redis.connect();
159+
160+
try {
161+
if (args[0] === 'submit') {
162+
// 提交任务:submit <image> <command...>
163+
const image = args[1];
164+
const commands = args.slice(2);
165+
166+
if (!image || commands.length === 0) {
167+
console.error('Usage: submit <image> <command...>');
168+
process.exit(1);
169+
}
170+
171+
const taskId = randomUUID();
172+
const task: Task = { id: taskId, image, commands, timeoutSeconds: 300 };
173+
await submitTask(task);
174+
console.log(`Task submitted: ${taskId}`);
175+
process.exit(0);
176+
177+
} else if (args[0] === 'query') {
178+
// 查询任务:query <taskId>
179+
const taskId = args[1];
180+
if (!taskId) {
181+
console.error('Usage: query <taskId>');
182+
process.exit(1);
183+
}
184+
185+
const task = await getTask(taskId);
186+
const result = await getResult(taskId);
187+
188+
console.log('Task:', task);
189+
console.log('Result:', result);
190+
process.exit(0);
191+
192+
} else if (args[0] === 'status') {
193+
// 查看状态:status
194+
const queueLen = await redis.lLen(TASK_QUEUE);
195+
const resultLen = await redis.lLen(RESULT_QUEUE);
196+
const failedLen = await redis.lLen(FAILED_QUEUE);
197+
198+
console.log(`Queue: ${queueLen} pending, ${resultLen} results, ${failedLen} failed`);
199+
process.exit(0);
200+
201+
} else {
202+
console.error('Unknown command:', args[0]);
203+
console.error('Commands: submit | query | status');
204+
process.exit(1);
205+
}
206+
} finally {
207+
await redis.quit();
208+
}
209+
}
210+
110211
// ============ 健康检查 ============
111212
setInterval(async () => {
112213
try {
@@ -149,7 +250,5 @@ async function main() {
149250
});
150251
}
151252

152-
main().catch((error) => {
153-
logger.error('Fatal error:', error);
154-
process.exit(1);
155-
});
253+
// ============ 入口 ============
254+
handleCli();

0 commit comments

Comments
 (0)