Skip to content

Commit 7cedcef

Browse files
authored
Merge pull request #1289 from oceanprotocol/cpu-affinity
2 parents 702a059 + bf1a460 commit 7cedcef

2 files changed

Lines changed: 121 additions & 3 deletions

File tree

src/components/c2d/compute_engine_docker.ts

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,19 @@ export class C2DEngineDocker extends C2DEngine {
6969
private retentionDays: number
7070
private cleanupInterval: number
7171
private paymentClaimInterval: number
72+
private cpuAllocations: Map<string, number[]> = new Map()
73+
private envCpuCores: number[] = []
74+
private cpuOffset: number
7275
public constructor(
7376
clusterConfig: C2DClusterInfo,
7477
db: C2DDatabase,
7578
escrow: Escrow,
7679
keyManager: KeyManager,
77-
dockerRegistryAuths: dockerRegistrysAuth
80+
dockerRegistryAuths: dockerRegistrysAuth,
81+
cpuOffset: number = 0
7882
) {
7983
super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths)
84+
this.cpuOffset = cpuOffset
8085

8186
this.docker = null
8287
if (clusterConfig.connection.socketPath) {
@@ -247,7 +252,15 @@ export class C2DEngineDocker extends C2DEngine {
247252
}
248253
this.envs[0].resources.push(cpuResources)
249254
this.envs[0].resources.push(ramResources)
250-
/* TODO - get namedresources & discreete one
255+
// Build the list of physical CPU core indices for this environment
256+
this.envCpuCores = Array.from(
257+
{ length: cpuResources.total },
258+
(_, i) => this.cpuOffset + i
259+
)
260+
CORE_LOGGER.info(
261+
`CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})`
262+
)
263+
/* TODO - get namedresources & discreete one
251264
if (sysinfo.GenericResources) {
252265
for (const [key, value] of Object.entries(sysinfo.GenericResources)) {
253266
for (const [type, val] of Object.entries(value)) {
@@ -301,6 +314,9 @@ export class C2DEngineDocker extends C2DEngine {
301314
this.envs[0].id =
302315
this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees))
303316

317+
// Rebuild CPU allocations from running containers (handles node restart)
318+
await this.rebuildCpuAllocations()
319+
304320
// only now set the timer
305321
if (!this.cronTimer) {
306322
this.setNewTimer()
@@ -1646,6 +1662,11 @@ export class C2DEngineDocker extends C2DEngine {
16461662
if (cpus && cpus > 0) {
16471663
hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default
16481664
hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod)
1665+
// Pin the container to specific physical CPU cores
1666+
const cpusetStr = this.allocateCpus(job.jobId, cpus)
1667+
if (cpusetStr) {
1668+
hostConfig.CpusetCpus = cpusetStr
1669+
}
16491670
}
16501671
const containerInfo: ContainerCreateOptions = {
16511672
name: job.jobId + '-algoritm',
@@ -1922,6 +1943,93 @@ export class C2DEngineDocker extends C2DEngine {
19221943
}
19231944

19241945
// eslint-disable-next-line require-await
1946+
private parseCpusetString(cpuset: string): number[] {
1947+
const cores: number[] = []
1948+
if (!cpuset) return cores
1949+
for (const part of cpuset.split(',')) {
1950+
if (part.includes('-')) {
1951+
const [start, end] = part.split('-').map(Number)
1952+
for (let i = start; i <= end; i++) {
1953+
cores.push(i)
1954+
}
1955+
} else {
1956+
cores.push(Number(part))
1957+
}
1958+
}
1959+
return cores
1960+
}
1961+
1962+
private allocateCpus(jobId: string, count: number): string | null {
1963+
if (this.envCpuCores.length === 0 || count <= 0) return null
1964+
1965+
const usedCores = new Set<number>()
1966+
for (const cores of this.cpuAllocations.values()) {
1967+
for (const core of cores) {
1968+
usedCores.add(core)
1969+
}
1970+
}
1971+
1972+
const freeCores: number[] = []
1973+
for (const core of this.envCpuCores) {
1974+
if (!usedCores.has(core)) {
1975+
freeCores.push(core)
1976+
if (freeCores.length === count) break
1977+
}
1978+
}
1979+
1980+
if (freeCores.length < count) {
1981+
CORE_LOGGER.warn(
1982+
`CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})`
1983+
)
1984+
return null
1985+
}
1986+
1987+
this.cpuAllocations.set(jobId, freeCores)
1988+
const cpusetStr = freeCores.join(',')
1989+
CORE_LOGGER.info(`CPU affinity: allocated cores [${cpusetStr}] to job ${jobId}`)
1990+
return cpusetStr
1991+
}
1992+
1993+
private releaseCpus(jobId: string): void {
1994+
const cores = this.cpuAllocations.get(jobId)
1995+
if (cores) {
1996+
CORE_LOGGER.info(
1997+
`CPU affinity: released cores [${cores.join(',')}] from job ${jobId}`
1998+
)
1999+
this.cpuAllocations.delete(jobId)
2000+
}
2001+
}
2002+
2003+
/**
2004+
* On startup, inspects running Docker containers to rebuild the CPU allocation map.
2005+
*/
2006+
private async rebuildCpuAllocations(): Promise<void> {
2007+
if (this.envCpuCores.length === 0) return
2008+
try {
2009+
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)
2010+
for (const job of jobs) {
2011+
try {
2012+
const container = this.docker.getContainer(job.jobId + '-algoritm')
2013+
const info = await container.inspect()
2014+
const cpuset = info.HostConfig?.CpusetCpus
2015+
if (cpuset) {
2016+
const cores = this.parseCpusetString(cpuset)
2017+
if (cores.length > 0) {
2018+
this.cpuAllocations.set(job.jobId, cores)
2019+
CORE_LOGGER.info(
2020+
`CPU affinity: recovered allocation [${cpuset}] for running job ${job.jobId}`
2021+
)
2022+
}
2023+
}
2024+
} catch (e) {
2025+
// Container may not exist yet (e.g., job is in pull/build phase)
2026+
}
2027+
}
2028+
} catch (e) {
2029+
CORE_LOGGER.error(`CPU affinity: failed to rebuild allocations: ${e.message}`)
2030+
}
2031+
}
2032+
19252033
private async cleanupJob(job: DBComputeJob) {
19262034
// cleaning up
19272035
// - claim payment or release lock
@@ -1930,6 +2038,7 @@ export class C2DEngineDocker extends C2DEngine {
19302038
// - delete container
19312039

19322040
this.jobImageSizes.delete(job.jobId)
2041+
this.releaseCpus(job.jobId)
19332042

19342043
try {
19352044
const container = await this.docker.getContainer(job.jobId + '-algoritm')

src/components/c2d/compute_engines.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export class C2DEngines {
2121
// if yes, do not create multiple engines
2222
if (config && config.c2dClusters) {
2323
this.engines = []
24+
let cpuOffset = 0
2425
for (const cluster of config.c2dClusters) {
2526
if (cluster.type === C2DClusterType.DOCKER) {
2627
this.engines.push(
@@ -29,9 +30,17 @@ export class C2DEngines {
2930
db,
3031
escrow,
3132
keyManager,
32-
config.dockerRegistrysAuth
33+
config.dockerRegistrysAuth,
34+
cpuOffset
3335
)
3436
)
37+
// Advance the CPU offset by this cluster's configured CPU total
38+
if (cluster.connection?.resources) {
39+
const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu')
40+
if (cpuRes?.total) {
41+
cpuOffset += cpuRes.total
42+
}
43+
}
3544
}
3645
}
3746
}

0 commit comments

Comments
 (0)