Skip to content

Commit f5d0f87

Browse files
authored
Fix bad content parsing results (#902)
* Send headers after computing * Send headers after computing
1 parent bc5558a commit f5d0f87

4 files changed

Lines changed: 26 additions & 14 deletions

File tree

src/components/c2d/compute_engine_base.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export abstract class C2DEngine {
7373
consumerAddress: string,
7474
jobId: string,
7575
index: number
76-
): Promise<Readable>
76+
): Promise<{ stream: Readable; headers: any }>
7777

7878
public abstract cleanupExpiredStorage(job: DBComputeJob): Promise<boolean>
7979

src/components/c2d/compute_engine_docker.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ export class C2DEngineDocker extends C2DEngine {
421421
consumerAddress: string,
422422
jobId: string,
423423
index: number
424-
): Promise<Readable> {
424+
): Promise<{ stream: Readable; headers: any }> {
425425
const jobs = await this.db.getJob(jobId, null, consumerAddress)
426426
if (jobs.length === 0) {
427427
return null
@@ -430,14 +430,24 @@ export class C2DEngineDocker extends C2DEngine {
430430
for (const i of results) {
431431
if (i.index === index) {
432432
if (i.type === 'algorithmLog') {
433-
return createReadStream(
434-
this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/algorithm.log'
435-
)
433+
return {
434+
stream: createReadStream(
435+
this.getC2DConfig().tempFolder + '/' + jobId + '/data/logs/algorithm.log'
436+
),
437+
headers: {
438+
'Content-Type': 'text/plain'
439+
}
440+
}
436441
}
437442
if (i.type === 'output') {
438-
return createReadStream(
439-
this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar'
440-
)
443+
return {
444+
stream: createReadStream(
445+
this.getC2DConfig().tempFolder + '/' + jobId + '/data/outputs/outputs.tar'
446+
),
447+
headers: {
448+
'Content-Type': 'application/octet-stream'
449+
}
450+
}
441451
}
442452
}
443453
}

src/components/c2d/compute_engine_opf_k8.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ export class C2DEngineOPFK8 extends C2DEngine {
276276
consumerAddress: string,
277277
jobId: string,
278278
index: number
279-
): Promise<Readable> {
279+
): Promise<{ stream: Readable; headers: any }> {
280280
const nonce: number = new Date().getTime()
281281
const config = await getConfiguration()
282282
// signature check on operator service is only owner + jobId
@@ -307,7 +307,10 @@ export class C2DEngineOPFK8 extends C2DEngine {
307307
const message = `Exception on getComputeJobResult. Status: ${response.status}, ${response.statusText}`
308308
throw new Error(message)
309309
}
310-
return response.data
310+
return {
311+
stream: response.data,
312+
headers: response.headers
313+
}
311314
} catch (e) {
312315
console.error(e)
313316
}

src/components/core/compute/getResults.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,15 @@ export class ComputeGetResultHandler extends CommandHandler {
9090
jobId,
9191
task.index
9292
)
93-
const anyResp: any = respStream as any
9493
const response: P2PCommandResponse = {
95-
stream: respStream,
94+
stream: respStream.stream,
9695
status: {
9796
httpStatus: 200
9897
}
9998
}
10099
// need to pass the headers properly
101-
if (anyResp.headers) {
102-
response.status.headers = anyResp.headers
100+
if (respStream.headers) {
101+
response.status.headers = respStream.headers
103102
}
104103
return response
105104
} catch (error) {

0 commit comments

Comments
 (0)