Skip to content

Commit c2c52c8

Browse files
committed
Prune async upload jobs and test polling flow
1 parent 2b22aa0 commit c2c52c8

2 files changed

Lines changed: 179 additions & 0 deletions

File tree

packages/cli/src/index.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2589,6 +2589,8 @@ function startUiSiteServer(args: {
25892589
lastErrorAt: null as string | null,
25902590
lastSuccessAt: null as string | null
25912591
};
2592+
const UPLOAD_JOB_TTL_MS = 10 * 60 * 1000;
2593+
const UPLOAD_JOB_MAX_COUNT = 128;
25922594
const uploadJobs = new Map<
25932595
string,
25942596
{
@@ -2616,6 +2618,30 @@ function startUiSiteServer(args: {
26162618
const faucetTargetEth = faucet?.targetWei ? Number(faucet.targetWei / 10n ** 18n) : 10;
26172619
const manifestPaths = new Set(['/manifest.json', '/.well-known/tokenhost/manifest.json']);
26182620

2621+
function cleanupUploadJobs() {
2622+
const now = Date.now();
2623+
for (const [jobId, job] of uploadJobs.entries()) {
2624+
const updatedAt = Date.parse(String(job.updatedAt ?? job.createdAt ?? ''));
2625+
if (Number.isFinite(updatedAt) && now - updatedAt > UPLOAD_JOB_TTL_MS) {
2626+
uploadJobs.delete(jobId);
2627+
}
2628+
}
2629+
2630+
if (uploadJobs.size <= UPLOAD_JOB_MAX_COUNT) return;
2631+
2632+
const ordered = [...uploadJobs.entries()].sort((a, b) => {
2633+
const aTime = Date.parse(String(a[1].updatedAt ?? a[1].createdAt ?? '')) || 0;
2634+
const bTime = Date.parse(String(b[1].updatedAt ?? b[1].createdAt ?? '')) || 0;
2635+
return aTime - bTime;
2636+
});
2637+
2638+
while (ordered.length > UPLOAD_JOB_MAX_COUNT) {
2639+
const oldest = ordered.shift();
2640+
if (!oldest) break;
2641+
uploadJobs.delete(oldest[0]);
2642+
}
2643+
}
2644+
26192645
function contentTypeForPath(filePath: string): string {
26202646
const ext = path.extname(filePath).toLowerCase();
26212647
switch (ext) {
@@ -2944,6 +2970,7 @@ function startUiSiteServer(args: {
29442970

29452971
if (pathname === uploadPath || pathname === uploadStatusPath) {
29462972
(async () => {
2973+
cleanupUploadJobs();
29472974
const enabled = Boolean(upload?.enabled);
29482975
const requestedUploadJobId = String(requestUrl.searchParams.get('jobId') ?? '').trim();
29492976
if (req.method === 'GET' || req.method === 'HEAD') {

test/integration/testCliUploadPreviewIntegration.js

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,156 @@ describe('CLI local preview upload integration', function () {
216216
preview.kill('SIGINT');
217217
}
218218
});
219+
220+
it('supports async foc-process uploads with job polling', async function () {
221+
this.timeout(180000);
222+
223+
const dir = fs.mkdtempSync(path.join(os.tmpdir(), 'th-upload-preview-foc-async-'));
224+
const schemaPath = path.join(dir, 'schema.json');
225+
const outDir = path.join(dir, 'out');
226+
const fakeCliPath = path.join(dir, 'fake-foc-cli.mjs');
227+
writeJson(schemaPath, uploadSchema());
228+
229+
fs.writeFileSync(
230+
fakeCliPath,
231+
`#!/usr/bin/env node
232+
import fs from 'fs';
233+
const args = process.argv.slice(2);
234+
const cmd = args[0];
235+
if (cmd === 'wallet' && args[1] === 'init') {
236+
const keyIndex = args.indexOf('--privateKey');
237+
const privateKey = keyIndex >= 0 ? String(args[keyIndex + 1] || '') : '';
238+
const configDir = process.env.XDG_CONFIG_HOME
239+
? process.env.XDG_CONFIG_HOME + '/foc-cli-nodejs'
240+
: process.env.HOME + '/.config/foc-cli-nodejs';
241+
fs.mkdirSync(configDir, { recursive: true });
242+
fs.writeFileSync(configDir + '/config.json', JSON.stringify({ privateKey }, null, 2));
243+
process.stdout.write(JSON.stringify({ ok: true, data: { privateKey } }));
244+
process.exit(0);
245+
}
246+
if (cmd === 'upload') {
247+
const fileArg = args.find((value) => value && !value.startsWith('-') && value !== 'upload') || '';
248+
const stat = fs.statSync(fileArg);
249+
await new Promise((resolve) => setTimeout(resolve, 250));
250+
process.stdout.write(JSON.stringify({
251+
ok: true,
252+
data: {
253+
status: 'uploaded',
254+
result: {
255+
pieceCid: 'bafkfakeasyncuploadcid',
256+
pieceScannerUrl: 'https://scanner.example/piece/bafkfakeasyncuploadcid',
257+
size: stat.size,
258+
copyResults: [
259+
{
260+
url: 'https://uploads.example.test/piece/bafkfakeasyncuploadcid',
261+
providerRole: 'primary'
262+
}
263+
],
264+
copyFailures: []
265+
},
266+
processLog: [
267+
{ step: 'Reading file', status: 'done' },
268+
{ step: 'Uploading file', status: 'done' }
269+
]
270+
}
271+
}));
272+
process.exit(0);
273+
}
274+
process.stderr.write(JSON.stringify({ ok: false, error: { message: 'unsupported fake foc command', args } }));
275+
process.exit(1);
276+
`
277+
);
278+
fs.chmodSync(fakeCliPath, 0o755);
279+
280+
const buildRes = runTh(['build', schemaPath, '--out', outDir], process.cwd(), {
281+
TH_UPLOAD_RUNNER: 'foc-process',
282+
TH_UPLOAD_PROVIDER: 'foc'
283+
});
284+
expect(buildRes.status, buildRes.stderr || buildRes.stdout).to.equal(0);
285+
286+
const port = 45200 + Math.floor(Math.random() * 1000);
287+
const host = '127.0.0.1';
288+
const baseUrl = `http://${host}:${port}`;
289+
290+
const preview = spawn(
291+
'node',
292+
[
293+
path.resolve('packages/cli/dist/index.js'),
294+
'preview',
295+
outDir,
296+
'--host',
297+
host,
298+
'--port',
299+
String(port),
300+
'--no-deploy',
301+
'--no-start-anvil',
302+
'--no-faucet'
303+
],
304+
{
305+
cwd: process.cwd(),
306+
stdio: ['ignore', 'pipe', 'pipe'],
307+
env: {
308+
...process.env,
309+
TH_UPLOAD_RUNNER: 'foc-process',
310+
TH_UPLOAD_PROVIDER: 'foc',
311+
TH_UPLOAD_FOC_COMMAND: `node ${fakeCliPath}`,
312+
TH_UPLOAD_FOC_DEBUG: '1',
313+
TH_UPLOAD_FOC_COPIES: '1',
314+
TH_UPLOAD_FOC_CHAIN: '314159',
315+
PRIVATE_KEY: 'fff91c6963a11a8ff48f13297185f110678b47086992b0f1612b7a1467d11f0c',
316+
XDG_CONFIG_HOME: path.join(dir, 'xdg-config')
317+
}
318+
}
319+
);
320+
321+
try {
322+
await waitForOutput(preview, new RegExp(`http://${host}:${port}/`), 60000);
323+
324+
const uploadStatus = await request(`${baseUrl}/__tokenhost/upload`);
325+
expect(uploadStatus.status).to.equal(200);
326+
expect(uploadStatus.json?.ok).to.equal(true);
327+
expect(uploadStatus.json?.runnerMode).to.equal('foc-process');
328+
expect(uploadStatus.json?.provider).to.equal('filecoin_onchain_cloud');
329+
330+
const payload = Buffer.from('fake-foc-async-upload', 'utf-8');
331+
const accepted = await request(`${baseUrl}/__tokenhost/upload`, {
332+
method: 'POST',
333+
headers: {
334+
'content-type': 'image/png',
335+
'x-tokenhost-upload-filename': 'async.png',
336+
'x-tokenhost-upload-size': String(payload.length),
337+
'x-tokenhost-upload-mode': 'async'
338+
},
339+
body: payload
340+
});
341+
342+
expect(accepted.status).to.equal(202);
343+
expect(accepted.json?.ok).to.equal(true);
344+
expect(accepted.json?.pending).to.equal(true);
345+
expect(String(accepted.json?.jobId || '')).to.not.equal('');
346+
expect(String(accepted.json?.statusUrl || '')).to.match(/^\/__tokenhost\/upload\?jobId=/);
347+
348+
let completed = null;
349+
for (let attempt = 0; attempt < 20; attempt += 1) {
350+
await new Promise((resolve) => setTimeout(resolve, 250));
351+
const polled = await request(`${baseUrl}${accepted.json.statusUrl}`);
352+
expect(polled.status).to.equal(200);
353+
if (polled.json?.pending) continue;
354+
completed = polled.json;
355+
break;
356+
}
357+
358+
expect(completed?.ok).to.equal(true);
359+
expect(completed?.done).to.equal(true);
360+
expect(completed?.upload?.url).to.equal('https://uploads.example.test/piece/bafkfakeasyncuploadcid');
361+
expect(completed?.upload?.cid).to.equal('bafkfakeasyncuploadcid');
362+
363+
const finalStatus = await request(`${baseUrl}/__tokenhost/upload`);
364+
expect(finalStatus.status).to.equal(200);
365+
expect(finalStatus.json?.lastError).to.equal(null);
366+
expect(String(finalStatus.json?.lastSuccessAt || '')).to.not.equal('');
367+
} finally {
368+
preview.kill('SIGINT');
369+
}
370+
});
219371
});

0 commit comments

Comments
 (0)