Skip to content
Merged
3 changes: 2 additions & 1 deletion src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ export abstract class C2DEngine {
owner: string,
maxJobDuration: number,
resources: ComputeResourceRequest[],
payment: DBComputeJobPayment
payment: DBComputeJobPayment,
jobId: string
): Promise<ComputeJob[]>

public abstract stopComputeJob(
Expand Down
6 changes: 3 additions & 3 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import {
} from 'fs'
import { pipeline } from 'node:stream/promises'
import { CORE_LOGGER } from '../../utils/logging/common.js'
import { generateUniqueID } from '../database/sqliteCompute.js'
import { AssetUtils } from '../../utils/asset.js'
import { FindDdoHandler } from '../core/handler/ddoHandler.js'
import { OceanNode } from '../../OceanNode.js'
Expand Down Expand Up @@ -310,11 +309,12 @@ export class C2DEngineDocker extends C2DEngine {
owner: string,
maxJobDuration: number,
resources: ComputeResourceRequest[],
payment: DBComputeJobPayment
payment: DBComputeJobPayment,
jobId: string
): Promise<ComputeJob[]> {
if (!this.docker) return []
const isFree: boolean = !(payment && payment.lockTx)
const jobId = generateUniqueID()

// C2D - Check image, check arhitecture, etc
const image = getAlgorithmImage(algorithm)
// ex: node@sha256:1155995dda741e93afe4b1c6ced2d01734a6ec69865cc0997daf1f4db7259a36
Expand Down
25 changes: 16 additions & 9 deletions src/components/core/compute/startCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { FindDdoHandler } from '../handler/ddoHandler.js'
import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js'
import { DDOManager } from '@oceanprotocol/ddo-js'
import { getNonceAsNumber, checkNonce, NonceResponse } from '../utils/nonceHandler.js'
import { createHash } from 'crypto'
import { generateUniqueID } from '../../database/sqliteCompute.js'

export class PaidComputeStartHandler extends CommandHandler {
validate(command: PaidComputeStartCommand): ValidateParams {
Expand Down Expand Up @@ -353,12 +353,7 @@ export class PaidComputeStartHandler extends CommandHandler {
resources
}
// job ID unicity
const timestamp =
BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n)
const random = Math.random()
const jobId = createHash('sha256')
.update(JSON.stringify(s) + timestamp.toString() + random.toString())
.digest('hex')
const jobId = generateUniqueID(s)
// let's calculate payment needed based on resources request and maxJobDuration
const cost = engine.calculateResourcesCost(
task.payment.resources,
Expand Down Expand Up @@ -400,7 +395,8 @@ export class PaidComputeStartHandler extends CommandHandler {
token: task.payment.token,
lockTx: agreementId,
claimTx: null
}
},
jobId
)
CORE_LOGGER.logMessage(
'ComputeStartCommand Response: ' + JSON.stringify(response, null, 2),
Expand Down Expand Up @@ -561,6 +557,16 @@ export class FreeComputeStartHandler extends CommandHandler {
error: null
}
} */
const s = {
assets: task.datasets,
algorithm: task.algorithm,
output: task.output,
environment: task.environment,
owner: task.consumerAddress,
maxJobDuration: task.maxJobDuration,
resources: task.resources
}
const jobId = generateUniqueID(s)
const response = await engine.startComputeJob(
task.datasets,
task.algorithm,
Expand All @@ -569,7 +575,8 @@ export class FreeComputeStartHandler extends CommandHandler {
task.consumerAddress,
task.maxJobDuration,
task.resources,
null
null,
jobId
)

CORE_LOGGER.logMessage(
Expand Down
3 changes: 3 additions & 0 deletions src/components/core/utils/validateOrders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ export async function validateOrderTransaction(
const currentTimestamp = Math.floor(Date.now() / 1000)

const timeElapsed = currentTimestamp - eventTimestamp
CORE_LOGGER.logMessage(
`serviceTimeout ${serviceTimeout} vs. timeElapsed ${timeElapsed} when validating order.`
)

if (serviceTimeout !== 0 && timeElapsed > serviceTimeout) {
return {
Expand Down
33 changes: 28 additions & 5 deletions src/components/database/sqliteCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from '../../@types/C2D/C2D.js'
import sqlite3, { RunResult } from 'sqlite3'
import { DATABASE_LOGGER } from '../../utils/logging/common.js'
import { v4 as uuidv4 } from 'uuid'
import { createHash } from 'crypto'

interface ComputeDatabaseProvider {
newJob(job: DBComputeJob): Promise<string>
Expand All @@ -18,8 +18,14 @@ interface ComputeDatabaseProvider {
getFinishedJobs(): Promise<DBComputeJob[]>
}

export function generateUniqueID(): string {
return uuidv4()
export function generateUniqueID(jobStructure: any): string {
const timestamp =
BigInt(Date.now()) * 1_000_000n + (process.hrtime.bigint() % 1_000_000n)
const random = Math.random()
const jobId = createHash('sha256')
.update(JSON.stringify(jobStructure) + timestamp.toString() + random.toString())
.digest('hex')
return jobId
}

function getInternalStructure(job: DBComputeJob): any {
Expand Down Expand Up @@ -138,8 +144,25 @@ export class SQLiteCompute implements ComputeDatabaseProvider {
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
`
const jobId = job.jobId || generateUniqueID()
job.jobId = jobId
let jobId: string
if (!job.jobId) {
const jobStructure = {
assets: job.assets,
algorithm: job.algorithm,
output: {},
environment: job.environment,
owner: job.owner,
maxJobDuration: job.maxJobDuration,
chainId: job.payment?.chainId || null,
agreementId: job.agreementId,
resources: job.resources
}
jobId = generateUniqueID(jobStructure)
job.jobId = jobId
} else {
jobId = job.jobId
}

return new Promise<string>((resolve, reject) => {
this.db.run(
insertSQL,
Expand Down
19 changes: 18 additions & 1 deletion src/test/unit/compute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
convertStringToArray,
STRING_SEPARATOR
} from '../../components/database/sqliteCompute.js'
import os from 'os'
import {
buildEnvOverrideConfig,
OverrideEnvConfig,
Expand All @@ -29,7 +30,6 @@ import { OceanNodeConfig } from '../../@types/OceanNode.js'
import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js'
import { completeDBComputeJob, dockerImageManifest } from '../data/assets.js'
import { omitDBComputeFieldsFromComputeJob } from '../../components/c2d/index.js'
import os from 'os'
import { checkManifestPlatform } from '../../components/c2d/compute_engine_docker.js'

describe('Compute Jobs Database', () => {
Expand All @@ -46,6 +46,7 @@ describe('Compute Jobs Database', () => {
documentId: 'did:op:12345',
serviceId: '0x12345abc'
}

before(async () => {
envOverrides = buildEnvOverrideConfig(
[ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS],
Expand Down Expand Up @@ -83,6 +84,14 @@ describe('Compute Jobs Database', () => {
isStarted: false,
containerImage: 'some container image',
resources: [],
environment: 'some environment',
agreementId: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
payment: {
token: '0x123',
lockTx: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
claimTx: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
chainId: 8996
},
isFree: false,
algoStartTimestamp: '0',
algoStopTimestamp: '0'
Expand Down Expand Up @@ -139,10 +148,18 @@ describe('Compute Jobs Database', () => {
stopRequested: false,
algorithm,
assets: [dataset],
environment: 'some environment',
isRunning: false,
isStarted: false,
containerImage: 'another container image',
resources: [],
agreementId: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
payment: {
token: '0x123',
lockTx: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
claimTx: '0xe2DD09d719Da89e5a3D0F2549c7E24566e947260fdc',
chainId: 8996
},
isFree: false,
algoStartTimestamp: '0',
algoStopTimestamp: '0'
Expand Down
Loading