Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ export P2P_BOOTSTRAP_NODES=
export P2P_FILTER_ANNOUNCED_ADDRESSES=

## compute
# Example with cross-resource constraints (constraints are optional and backwards-compatible):
# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01}]}]},"resources":[{"id":"cpu","total":8,"max":8,"min":1,"constraints":[{"id":"ram","min":1,"max":3},{"id":"disk","min":10,"max":100}]},{"id":"ram","total":32,"max":32,"min":1},{"id":"disk","total":500,"max":500,"min":10},{"id":"gpu","total":4,"max":4,"min":0,"constraints":[{"id":"ram","min":8,"max":32},{"id":"cpu","min":2,"max":4}]}]}]'
# Each environment defines its own resources (CPU, RAM, disk, GPUs) with full configuration.
# CPU, RAM, and disk are per-env exclusive: inUse tracked only within the environment where the job runs.
# A global check ensures the aggregate usage across all environments does not exceed physical capacity.
# GPUs are shared-exclusive: if a job on envA uses gpu0, it shows as in-use on envB too.
# CPU cores are automatically partitioned across environments based on each env's cpu.total.
# CPU and RAM defaults are auto-detected from the system when not configured.
# export DOCKER_COMPUTE_ENVIRONMENTS='[{"socketPath":"/var/run/docker.sock","environments":[{"id":"envA","storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":16,"max":16,"min":1,"type":"ram"},{"id":"disk","total":500,"max":500,"min":10,"type":"disk"},{"id":"gpu0","total":1,"max":1,"min":0,"type":"gpu","init":{"deviceRequests":{"Driver":"nvidia","DeviceIDs":["0"],"Capabilities":[["gpu"]]}}}],"fees":{"1":[{"feeToken":"0x123","prices":[{"id":"cpu","price":1},{"id":"ram","price":0.1},{"id":"disk","price":0.01},{"id":"gpu0","price":5}]}]}}]}]'
export DOCKER_COMPUTE_ENVIRONMENTS=


2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ jobs:
DB_PASSWORD: 'changeme'
MAX_REQ_PER_MINUTE: 320
MAX_CONNECTIONS_PER_MINUTE: 320
DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","resources":[{"id":"disk","total":10}],"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration": 60,"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"minJobDuration": 10,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]'
DOCKER_COMPUTE_ENVIRONMENTS: '[{"socketPath":"/var/run/docker.sock","environments":[{"storageExpiry":604800,"maxJobDuration":3600,"minJobDuration":60,"resources":[{"id":"cpu","total":4,"max":4,"min":1,"type":"cpu"},{"id":"ram","total":10,"max":10,"min":1,"type":"ram"},{"id":"disk","total":10,"max":10,"min":0,"type":"disk"}],"fees":{"8996":[{"prices":[{"id":"cpu","price":1}]}]},"free":{"maxJobDuration":60,"maxJobs":3,"resources":[{"id":"cpu","max":1},{"id":"ram","max":1},{"id":"disk","max":1}]}}]}]'
DOCKER_REGISTRY_AUTHS: ${{ env.DOCKER_REGISTRY_AUTHS }}
- name: Check Ocean Node is running
run: |
Expand Down
84 changes: 42 additions & 42 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,57 +93,57 @@
"claimDurationTimeout": 3600,
"validateUnsignedDDO": true,
"jwtSecret": "ocean-node-secret",
"enableBenchmark": true,
"dockerComputeEnvironments": [
{
"socketPath": "/var/run/docker.sock",
"resources": [
"environments": [
{
"id": "disk",
"total": 1
}
],
"storageExpiry": 604800,
"maxJobDuration": 3600,
"minJobDuration": 60,
"access": {
"addresses": [],
"accessLists": []
},
"fees": {
"8996": [
{
"prices": [
"storageExpiry": 604800,
"maxJobDuration": 3600,
"minJobDuration": 60,
"resources": [
{
"id": "disk",
"total": 1
}
],
"access": {
"addresses": [],
"accessLists": []
},
"fees": {
"8996": [
{
"id": "cpu",
"price": 1
"prices": [
{
"id": "cpu",
"price": 1
}
]
}
]
}
]
},
"free": {
"maxJobDuration": 3600,
"minJobDuration": 60,
"maxJobs": 3,
"access": {
"addresses": [],
"accessLists": []
},
"resources": [
{
"id": "cpu",
"max": 1
},
{
"id": "ram",
"max": 1
},
{
"id": "disk",
"max": 1
"free": {
"maxJobDuration": 3600,
"maxJobs": 3,
"resources": [
{
"id": "cpu",
"max": 1
},
{
"id": "ram",
"max": 1
},
{
"id": "disk",
"max": 1
}
]
}
]
}
}
]
}
]
}
22 changes: 14 additions & 8 deletions src/@types/C2D/C2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,19 @@ export interface ComputeEnvironment extends ComputeEnvironmentBaseConfig {
runMaxWaitTimeFree: number
}

export interface C2DEnvironmentConfig {
id?: string
description?: string
storageExpiry?: number
minJobDuration?: number
maxJobDuration?: number
maxJobs?: number
fees?: ComputeEnvFeesStructure
access?: ComputeAccessList
free?: ComputeEnvironmentFreeOptions
resources?: ComputeResource[]
}

export interface C2DDockerConfig {
socketPath: string
protocol: string
Expand All @@ -148,19 +161,12 @@ export interface C2DDockerConfig {
caPath: string
certPath: string
keyPath: string
storageExpiry?: number
maxJobDuration?: number
minJobDuration?: number
maxJobs?: number
fees: ComputeEnvFeesStructure
resources?: ComputeResource[] // optional, owner can overwrite
free?: ComputeEnvironmentFreeOptions
access: ComputeAccessList
imageRetentionDays?: number // Default: 7 days
imageCleanupInterval?: number // Default: 86400 seconds (24 hours)
paymentClaimInterval?: number // Default: 3600 seconds (1 hours)
scanImages?: boolean
scanImageDBUpdateInterval?: number // Default: 12 hours
environments: C2DEnvironmentConfig[]
}

export type ComputeResultType =
Expand Down
1 change: 1 addition & 0 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export interface OceanNodeConfig {
jwtSecret?: string
httpCertPath?: string
httpKeyPath?: string
enableBenchmark?: boolean
}

export interface P2PStatusResponse {
Expand Down
75 changes: 63 additions & 12 deletions src/components/c2d/compute_engine_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ export abstract class C2DEngine {
} catch (e) {
CORE_LOGGER.error('Failed to get running jobs:' + e.message)
}

const envResourceMap = new Map((env.resources || []).map((r) => [r.id, r]))

let totalJobs = 0
let totalFreeJobs = 0
let queuedJobs = 0
Expand All @@ -324,9 +327,13 @@ export abstract class C2DEngine {
let maxWaitTimeFree = 0
let maxRunningTime = 0
let maxRunningTimeFree = 0

for (const job of jobs) {
if (job.environment === env.id) {
if (job.queueMaxWaitTime === 0) {
const isThisEnv = job.environment === env.id
const isRunning = job.queueMaxWaitTime === 0

if (isThisEnv) {
if (isRunning) {
const timeElapsed = job.buildStartTimestamp
? new Date().getTime() / 1000 - Number.parseFloat(job?.buildStartTimestamp)
: new Date().getTime() / 1000 - Number.parseFloat(job?.algoStartTimestamp)
Expand All @@ -336,23 +343,31 @@ export abstract class C2DEngine {
totalFreeJobs++
maxRunningTimeFree += job.maxJobDuration - timeElapsed
}
} else {
queuedJobs++
maxWaitTime += job.maxJobDuration
if (job.isFree) {
queuedFreeJobs++
maxWaitTimeFree += job.maxJobDuration
}
}
}

for (const resource of job.resources) {
if (isRunning) {
for (const resource of job.resources) {
const envRes = envResourceMap.get(resource.id)
if (envRes) {
// GPUs are shared-exclusive: inUse tracked globally across all envs
// Everything else (cpu, ram, disk) is per-env exclusive
const isSharedExclusive = envRes.type === 'gpu'
if (!isSharedExclusive && !isThisEnv) continue
if (!(resource.id in usedResources)) usedResources[resource.id] = 0
usedResources[resource.id] += resource.amount
if (job.isFree) {
if (!(resource.id in usedFreeResources)) usedFreeResources[resource.id] = 0
usedFreeResources[resource.id] += resource.amount
}
}
} else {
// queued job
queuedJobs++
maxWaitTime += job.maxJobDuration
if (job.isFree) {
queuedFreeJobs++
maxWaitTimeFree += job.maxJobDuration
}
}
}
}
Expand All @@ -370,12 +385,41 @@ export abstract class C2DEngine {
}
}

protected physicalLimits: Map<string, number> = new Map()

private checkGlobalResourceAvailability(
allEnvironments: ComputeEnvironment[],
resourceId: string,
amount: number
) {
let globalUsed = 0
let globalTotal = 0
for (const e of allEnvironments) {
const res = this.getResource(e.resources, resourceId)
if (res) {
globalTotal += res.total || 0
globalUsed += res.inUse || 0
}
}
const physicalLimit = this.physicalLimits.get(resourceId)
if (physicalLimit !== undefined && globalTotal > physicalLimit) {
globalTotal = physicalLimit
}
const globalRemainder = globalTotal - globalUsed
if (globalRemainder < amount) {
throw new Error(
`Not enough available ${resourceId} globally (remaining: ${globalRemainder}, requested: ${amount})`
)
}
}

// overridden by each engine if required
// eslint-disable-next-line require-await
public async checkIfResourcesAreAvailable(
resourcesRequest: ComputeResourceRequest[],
env: ComputeEnvironment,
isFree: boolean
isFree: boolean,
allEnvironments?: ComputeEnvironment[]
) {
// Filter out resources with amount 0 as they're not actually being requested
const activeResources = resourcesRequest.filter((r) => r.amount > 0)
Expand All @@ -385,6 +429,13 @@ export abstract class C2DEngine {
if (!envResource) throw new Error(`No such resource ${request.id}`)
if (envResource.total - envResource.inUse < request.amount)
throw new Error(`Not enough available ${request.id}`)

// Global check for non-GPU resources (cpu, ram, disk are per-env exclusive)
// GPUs are shared-exclusive so their inUse already reflects global usage
if (allEnvironments && envResource.type !== 'gpu') {
this.checkGlobalResourceAvailability(allEnvironments, request.id, request.amount)
}

if (isFree) {
if (!env.free) throw new Error(`No free resources`)
envResource = this.getResource(env.free?.resources, request.id)
Expand Down
Loading
Loading