Skip to content

Commit 20471f1

Browse files
kvzclaude
andcommitted
Merge origin/cli: use got.stream for downloads
Resolved conflict in assemblies-create.ts by adopting the cleaner got.stream + pipeline approach for downloads while preserving the concurrency limiting and fresh stream creation from the local branch. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
2 parents 13b3547 + d73f212 commit 20471f1

2 files changed

Lines changed: 57 additions & 42 deletions

File tree

src/cli/assemblies-create.ts

Lines changed: 23 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import EventEmitter from 'node:events'
22
import fs from 'node:fs'
33
import fsp from 'node:fs/promises'
4-
import http from 'node:http'
5-
import https from 'node:https'
64
import path from 'node:path'
75
import process from 'node:process'
86
import type { Readable, Writable } from 'node:stream'
7+
import { pipeline } from 'node:stream/promises'
98
import tty from 'node:tty'
109
import { promisify } from 'node:util'
10+
import got from 'got'
1111
import { tryCatch } from '../alphalib/tryCatch.ts'
1212
import type { StepsInput } from '../alphalib/types/template.ts'
1313
import type { CreateAssemblyParams } from '../apiTypes.ts'
@@ -785,25 +785,17 @@ export default async function run(
785785
}
786786

787787
outputctl.debug(`DOWNLOADING ${stepResult.name} to ${outPath}`)
788-
await new Promise<void>((dlResolve, dlReject) => {
789-
const get = resultUrl.startsWith('https') ? https.get : http.get
790-
const req = get(resultUrl, { signal: abortController.signal }, (res) => {
791-
if (res.statusCode !== 200) {
792-
const msg = `Server returned http status ${res.statusCode}`
793-
outputctl.error(msg)
794-
return dlReject(new Error(msg))
795-
}
796-
const outStream = fs.createWriteStream(outPath)
797-
res.pipe(outStream)
798-
outStream.on('finish', () => dlResolve())
799-
outStream.on('error', dlReject)
800-
})
801-
req.on('error', (err) => {
802-
if (err.name === 'AbortError') return dlResolve()
803-
outputctl.error(err.message)
804-
dlReject(err)
805-
})
806-
})
788+
const [dlErr] = await tryCatch(
789+
pipeline(
790+
got.stream(resultUrl, { signal: abortController.signal }),
791+
fs.createWriteStream(outPath),
792+
),
793+
)
794+
if (dlErr) {
795+
if (dlErr.name === 'AbortError') continue
796+
outputctl.error(dlErr.message)
797+
throw dlErr
798+
}
807799
}
808800
}
809801
}
@@ -814,6 +806,7 @@ export default async function run(
814806
await fsp.unlink(inPath)
815807
}
816808
}
809+
return assembly
817810
})()
818811

819812
jobsPromise.add(singleAssemblyPromise)
@@ -912,27 +905,15 @@ export default async function run(
912905

913906
if (outStream != null && resulturl && !superceded) {
914907
outputctl.debug('DOWNLOADING')
915-
await new Promise<void>((dlResolve, dlReject) => {
916-
const get = resulturl.startsWith('https') ? https.get : http.get
917-
const req = get(resulturl, { signal: abortController.signal }, (res) => {
918-
if (res.statusCode !== 200) {
919-
const msg = `Server returned http status ${res.statusCode}`
920-
outputctl.error(msg)
921-
return dlReject(new Error(msg))
922-
}
923-
924-
if (superceded) return dlResolve()
925-
926-
res.pipe(outStream)
927-
outStream.on('finish', () => res.unpipe())
928-
res.on('end', () => dlResolve())
929-
})
930-
req.on('error', (err) => {
931-
if (err.name === 'AbortError') return dlResolve()
932-
outputctl.error(err.message)
933-
dlReject(err)
934-
})
935-
})
908+
const [dlErr] = await tryCatch(
909+
pipeline(got.stream(resulturl, { signal: abortController.signal }), outStream),
910+
)
911+
if (dlErr) {
912+
if (dlErr.name !== 'AbortError') {
913+
outputctl.error(dlErr.message)
914+
throw dlErr
915+
}
916+
}
936917
}
937918

938919
outputctl.debug(`COMPLETED ${inPath ?? 'null'} ${outPath ?? 'null'}`)

test/e2e/cli/assemblies.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import crypto from 'node:crypto'
12
import fsp from 'node:fs/promises'
23
import process from 'node:process'
34
import { promisify } from 'node:util'
@@ -230,6 +231,39 @@ describe('assemblies', () => {
230231
}),
231232
)
232233

234+
it(
235+
'should download file with correct md5 hash',
236+
testCase(async (client) => {
237+
const infile = await imgPromise()
238+
const steps = await stepsPromise()
239+
240+
const output = new OutputCtl()
241+
const { results } = await assembliesCreate(output, client, {
242+
steps,
243+
inputs: [infile],
244+
output: 'out-md5.jpg',
245+
})
246+
247+
// Get the assembly result to find the expected md5hash
248+
// The results array contains assembly statuses
249+
const assemblyResult = results[0] as {
250+
results?: Record<string, Array<{ md5hash?: string }>>
251+
}
252+
expect(assemblyResult).to.have.property('results')
253+
const resultSteps = Object.values(assemblyResult.results ?? {})
254+
expect(resultSteps.length).to.be.greaterThan(0)
255+
const firstResult = resultSteps[0]?.[0]
256+
expect(firstResult).to.have.property('md5hash')
257+
const expectedMd5 = firstResult?.md5hash
258+
259+
// Calculate md5 of downloaded file
260+
const downloadedBuffer = await fsp.readFile('out-md5.jpg')
261+
const actualMd5 = crypto.createHash('md5').update(downloadedBuffer).digest('hex')
262+
263+
expect(actualMd5).to.equal(expectedMd5)
264+
}),
265+
)
266+
233267
it(
234268
'should handle multiple inputs',
235269
testCase(async (client) => {

0 commit comments

Comments
 (0)