Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 99 additions & 7 deletions pkg/cache/nodeinfo.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cache

import (
"bytes"
"fmt"
"log"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -30,6 +32,11 @@ type NodeInfo struct {
rwmu *sync.RWMutex
}

type DeviceIndexAvailableMem struct {
Idx int
AvailableMem uint
}

// Create Node Level
func NewNodeInfo(node *v1.Node) *NodeInfo {
log.Printf("debug: NewNodeInfo() creates nodeInfo for %s", node.Name)
Expand Down Expand Up @@ -246,13 +253,66 @@ func (n *NodeInfo) Allocate(clientset *kubernetes.Clientset, pod *v1.Pod) (err e
return err
}

// 计算pod的副本名称
func (n *NodeInfo) getPodReplicaName(podNs string, podName string) string {
var buffer bytes.Buffer
podNameItems := strings.Split(podName, "-")
buffer.WriteString(podNs)
if len(podNameItems) > 1 {
for i := 0; i < len(podNameItems)-1; i++ {
buffer.WriteString(podNameItems[i])
}
} else {
buffer.WriteString(podNameItems[0])
}
return buffer.String()
}

// 判断 当前GPU设备是否包含相同的副本
func (n *NodeInfo) deviceContainsSamePodReplica(deviceInfo *DeviceInfo, podNs string, podName string) bool {
result := false
podReplicaName := n.getPodReplicaName(podNs, podName)
for _, podInfo := range deviceInfo.podMap {
existsPodReplicaName := n.getPodReplicaName(podInfo.Namespace, podInfo.Name)
if strings.Compare(podReplicaName, existsPodReplicaName) == 0 {
result = true
break
}
}
return result
}

// 获取用户指定的GPU 设备 index
func (n *NodeInfo) getPodSpecialGPUIdx(pod *v1.Pod) (found bool, specialIdx int) {
found = false
specialIdx = -1
containers := pod.Spec.Containers
for i := 0; i < len(containers); i++ {
container := containers[i]
envs := container.Env
for j := 0; j < len(envs); j++ {
if strings.Compare(utils.EnvSpecialGPUIndex, envs[j].Name) == 0 {
tempIdx, err := strconv.Atoi(envs[j].Value)
if err == nil {
specialIdx = tempIdx
found = true
break
} else {
log.Printf("[%s] env value error, will ignore [%s]", utils.EnvSpecialGPUIndex, envs[j].Value)
}
}
}

}
return found, specialIdx
}

// allocate the GPU ID to the pod
func (n *NodeInfo) allocateGPUID(pod *v1.Pod) (candidateDevID int, found bool) {

reqGPU := uint(0)
found = false
candidateDevID = -1
candidateGPUMemory := uint(0)
availableGPUs := n.getAvailableGPUs()

reqGPU = uint(utils.GetGPUMemoryFromPodResource(pod))
Expand All @@ -261,19 +321,51 @@ func (n *NodeInfo) allocateGPUID(pod *v1.Pod) (candidateDevID int, found bool) {
log.Printf("info: reqGPU for pod %s in ns %s: %d", pod.Name, pod.Namespace, reqGPU)
log.Printf("info: AvailableGPUs: %v in node %s", availableGPUs, n.name)
if len(availableGPUs) > 0 {
var availableMemList []DeviceIndexAvailableMem // 所有可用的设备列表
var availableAndNotContainsReplicaList []DeviceIndexAvailableMem // 所有可用的,并且未包含相同副本pod的

useSpecialGPU, specialGPUIdx := n.getPodSpecialGPUIdx(pod) // 检查是否指定了固定的gpu设备

for devID := 0; devID < len(n.devs); devID++ {
availableGPU, ok := availableGPUs[devID]
if ok {
if availableGPU >= reqGPU {
if candidateDevID == -1 || candidateGPUMemory > availableGPU {
candidateDevID = devID
candidateGPUMemory = availableGPU
if availableGPU >= reqGPU { // 满足设备可用显存大小
if useSpecialGPU {
// 指定 gpu设备index
if specialGPUIdx == devID {
candidateDevID = specialGPUIdx
found = true
break
}
} else {
// 未指定gpu设备
// 检查当前dev设备中,是否已经调度了同一个类型的pod
availableMemList = append(availableMemList, DeviceIndexAvailableMem{Idx: devID, AvailableMem: availableGPU})
deviceInfo := n.devs[devID]
if !n.deviceContainsSamePodReplica(deviceInfo, pod.Namespace, pod.Name) {
// 未包含pod副本
availableAndNotContainsReplicaList = append(availableAndNotContainsReplicaList, DeviceIndexAvailableMem{Idx: devID, AvailableMem: availableGPU})
}
}

found = true
}
}
}
// 对于未包含pod副本的设备,找出一个availableGPU最大的一个,最空闲的一个
if !found && len(availableAndNotContainsReplicaList) > 0 {
sort.Slice(availableAndNotContainsReplicaList, func(i, j int) bool {
return availableAndNotContainsReplicaList[i].AvailableMem < availableAndNotContainsReplicaList[j].AvailableMem
})
candidateDevID = availableAndNotContainsReplicaList[len(availableAndNotContainsReplicaList)-1].Idx // 最空闲的一个
found = true
}
// 如果通过以上策略没有找到设备,则找出一个availableGPU最大的一个,最空闲的一个
if !found && len(availableMemList) > 0 {
sort.Slice(availableMemList, func(i, j int) bool {
return availableMemList[i].AvailableMem < availableMemList[j].AvailableMem
})
candidateDevID = availableMemList[len(availableMemList)-1].Idx // 最空闲的一个
found = true
}
}

if found {
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ const (
EnvResourceByDev = "ALIYUN_COM_GPU_MEM_DEV"
EnvAssignedFlag = "ALIYUN_COM_GPU_MEM_ASSIGNED"
EnvResourceAssumeTime = "ALIYUN_COM_GPU_MEM_ASSUME_TIME"
EnvSpecialGPUIndex = "ALIYUN_COM_GPU_SPECIAL_IDX"
)