Skip to content

Commit bf1a460

Browse files
committed
set cpu pinning for envs, release cpu once the job is done, handle the case when the node restarts
1 parent 8d43849 commit bf1a460

2 files changed

Lines changed: 121 additions & 3 deletions

File tree

src/components/c2d/compute_engine_docker.ts

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,19 @@ export class C2DEngineDocker extends C2DEngine {
6969
private retentionDays: number
7070
private cleanupInterval: number
7171
private paymentClaimInterval: number
72+
private cpuAllocations: Map<string, number[]> = new Map()
73+
private envCpuCores: number[] = []
74+
private cpuOffset: number
7275
public constructor(
7376
clusterConfig: C2DClusterInfo,
7477
db: C2DDatabase,
7578
escrow: Escrow,
7679
keyManager: KeyManager,
77-
dockerRegistryAuths: dockerRegistrysAuth
80+
dockerRegistryAuths: dockerRegistrysAuth,
81+
cpuOffset: number = 0
7882
) {
7983
super(clusterConfig, db, escrow, keyManager, dockerRegistryAuths)
84+
this.cpuOffset = cpuOffset
8085

8186
this.docker = null
8287
if (clusterConfig.connection.socketPath) {
@@ -247,7 +252,15 @@ export class C2DEngineDocker extends C2DEngine {
247252
}
248253
this.envs[0].resources.push(cpuResources)
249254
this.envs[0].resources.push(ramResources)
250-
/* TODO - get namedresources & discreete one
255+
// Build the list of physical CPU core indices for this environment
256+
this.envCpuCores = Array.from(
257+
{ length: cpuResources.total },
258+
(_, i) => this.cpuOffset + i
259+
)
260+
CORE_LOGGER.info(
261+
`CPU affinity: environment cores ${this.envCpuCores[0]}-${this.envCpuCores[this.envCpuCores.length - 1]} (offset=${this.cpuOffset}, total=${cpuResources.total})`
262+
)
263+
/* TODO - get namedresources & discreete one
251264
if (sysinfo.GenericResources) {
252265
for (const [key, value] of Object.entries(sysinfo.GenericResources)) {
253266
for (const [type, val] of Object.entries(value)) {
@@ -301,6 +314,9 @@ export class C2DEngineDocker extends C2DEngine {
301314
this.envs[0].id =
302315
this.getC2DConfig().hash + '-' + create256Hash(JSON.stringify(this.envs[0].fees))
303316

317+
// Rebuild CPU allocations from running containers (handles node restart)
318+
await this.rebuildCpuAllocations()
319+
304320
// only now set the timer
305321
if (!this.cronTimer) {
306322
this.setNewTimer()
@@ -1636,6 +1652,11 @@ export class C2DEngineDocker extends C2DEngine {
16361652
if (cpus && cpus > 0) {
16371653
hostConfig.CpuPeriod = 100000 // 100 miliseconds is usually the default
16381654
hostConfig.CpuQuota = Math.floor(cpus * hostConfig.CpuPeriod)
1655+
// Pin the container to specific physical CPU cores
1656+
const cpusetStr = this.allocateCpus(job.jobId, cpus)
1657+
if (cpusetStr) {
1658+
hostConfig.CpusetCpus = cpusetStr
1659+
}
16391660
}
16401661
const containerInfo: ContainerCreateOptions = {
16411662
name: job.jobId + '-algoritm',
@@ -1912,6 +1933,93 @@ export class C2DEngineDocker extends C2DEngine {
19121933
}
19131934

19141935
// eslint-disable-next-line require-await
1936+
private parseCpusetString(cpuset: string): number[] {
1937+
const cores: number[] = []
1938+
if (!cpuset) return cores
1939+
for (const part of cpuset.split(',')) {
1940+
if (part.includes('-')) {
1941+
const [start, end] = part.split('-').map(Number)
1942+
for (let i = start; i <= end; i++) {
1943+
cores.push(i)
1944+
}
1945+
} else {
1946+
cores.push(Number(part))
1947+
}
1948+
}
1949+
return cores
1950+
}
1951+
1952+
private allocateCpus(jobId: string, count: number): string | null {
1953+
if (this.envCpuCores.length === 0 || count <= 0) return null
1954+
1955+
const usedCores = new Set<number>()
1956+
for (const cores of this.cpuAllocations.values()) {
1957+
for (const core of cores) {
1958+
usedCores.add(core)
1959+
}
1960+
}
1961+
1962+
const freeCores: number[] = []
1963+
for (const core of this.envCpuCores) {
1964+
if (!usedCores.has(core)) {
1965+
freeCores.push(core)
1966+
if (freeCores.length === count) break
1967+
}
1968+
}
1969+
1970+
if (freeCores.length < count) {
1971+
CORE_LOGGER.warn(
1972+
`CPU affinity: not enough free cores for job ${jobId} (requested=${count}, available=${freeCores.length}/${this.envCpuCores.length})`
1973+
)
1974+
return null
1975+
}
1976+
1977+
this.cpuAllocations.set(jobId, freeCores)
1978+
const cpusetStr = freeCores.join(',')
1979+
CORE_LOGGER.info(`CPU affinity: allocated cores [${cpusetStr}] to job ${jobId}`)
1980+
return cpusetStr
1981+
}
1982+
1983+
private releaseCpus(jobId: string): void {
1984+
const cores = this.cpuAllocations.get(jobId)
1985+
if (cores) {
1986+
CORE_LOGGER.info(
1987+
`CPU affinity: released cores [${cores.join(',')}] from job ${jobId}`
1988+
)
1989+
this.cpuAllocations.delete(jobId)
1990+
}
1991+
}
1992+
1993+
/**
1994+
* On startup, inspects running Docker containers to rebuild the CPU allocation map.
1995+
*/
1996+
private async rebuildCpuAllocations(): Promise<void> {
1997+
if (this.envCpuCores.length === 0) return
1998+
try {
1999+
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)
2000+
for (const job of jobs) {
2001+
try {
2002+
const container = this.docker.getContainer(job.jobId + '-algoritm')
2003+
const info = await container.inspect()
2004+
const cpuset = info.HostConfig?.CpusetCpus
2005+
if (cpuset) {
2006+
const cores = this.parseCpusetString(cpuset)
2007+
if (cores.length > 0) {
2008+
this.cpuAllocations.set(job.jobId, cores)
2009+
CORE_LOGGER.info(
2010+
`CPU affinity: recovered allocation [${cpuset}] for running job ${job.jobId}`
2011+
)
2012+
}
2013+
}
2014+
} catch (e) {
2015+
// Container may not exist yet (e.g., job is in pull/build phase)
2016+
}
2017+
}
2018+
} catch (e) {
2019+
CORE_LOGGER.error(`CPU affinity: failed to rebuild allocations: ${e.message}`)
2020+
}
2021+
}
2022+
19152023
private async cleanupJob(job: DBComputeJob) {
19162024
// cleaning up
19172025
// - claim payment or release lock
@@ -1920,6 +2028,7 @@ export class C2DEngineDocker extends C2DEngine {
19202028
// - delete container
19212029

19222030
this.jobImageSizes.delete(job.jobId)
2031+
this.releaseCpus(job.jobId)
19232032

19242033
try {
19252034
const container = await this.docker.getContainer(job.jobId + '-algoritm')

src/components/c2d/compute_engines.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export class C2DEngines {
2121
// if yes, do not create multiple engines
2222
if (config && config.c2dClusters) {
2323
this.engines = []
24+
let cpuOffset = 0
2425
for (const cluster of config.c2dClusters) {
2526
if (cluster.type === C2DClusterType.DOCKER) {
2627
this.engines.push(
@@ -29,9 +30,17 @@ export class C2DEngines {
2930
db,
3031
escrow,
3132
keyManager,
32-
config.dockerRegistrysAuth
33+
config.dockerRegistrysAuth,
34+
cpuOffset
3335
)
3436
)
37+
// Advance the CPU offset by this cluster's configured CPU total
38+
if (cluster.connection?.resources) {
39+
const cpuRes = cluster.connection.resources.find((r: any) => r.id === 'cpu')
40+
if (cpuRes?.total) {
41+
cpuOffset += cpuRes.total
42+
}
43+
}
3544
}
3645
}
3746
}

0 commit comments

Comments
 (0)