Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea
migrate
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,62 @@ go build migrate
docker run --rm -v $(pwd):/app -w /app golang:latest \
bash -c 'GOOS=linux GOARCH=amd64 go build -o migrate_linux_amd64'
```

## ARM NodePool Example

```yaml
template:
metadata:
labels:
node.cloudpilot.ai/managed: "true"
spec:
taints:
- key: node.cloudpilot.ai/arch-arm64
effect: NoSchedule
requirements:
- key: karpenter.k8s.aws/instance-category
operator: NotIn
values:
- p
- g
- gr
- inf
- a
- key: kubernetes.io/arch
operator: In
values:
- arm64
- key: kubernetes.io/os
operator: In
values:
- linux
- key: karpenter.sh/capacity-type
operator: In
values:
- spot
- on-demand
- key: karpenter.k8s.aws/instance-memory
operator: Lt
values:
- "32769"
- key: karpenter.k8s.aws/instance-cpu
operator: Lt
values:
- "17"
- key: beta.kubernetes.io/instance-type
operator: NotIn
values:
- c1.medium
- m1.small
nodeClassRef:
kind: EC2NodeClass
name: cloudpilot
group: karpenter.k8s.aws
expireAfter: Never
disruption:
consolidateAfter: 60m
consolidationPolicy: WhenEmptyOrUnderutilized
budgets:
- nodes: "2"
weight: 2
```
177 changes: 177 additions & 0 deletions arm_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package main

import (
"context"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"

"github.com/awslabs/amazon-ecr-credential-helper/ecr-login"
"github.com/chrismellard/docker-credential-acr-env/pkg/credhelper"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/authn/github"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/google"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/types"
corev1 "k8s.io/api/core/v1"
)

type ArmResult struct {
Supported bool
Err error
}

const MaxConcurrent = 7

var armResultCache = make(map[string]ArmResult)
var armResultCacheMutex sync.RWMutex

func CheckAllWorkloadsArm(workloads []Workload) []ArmResult {
results := make([]ArmResult, len(workloads))

var wg sync.WaitGroup
sem := make(chan struct{}, MaxConcurrent)

for idx := range workloads {
wg.Add(1)

go func(i int) {
defer wg.Done()

sem <- struct{}{}
defer func() { <-sem }()

cacheKey := fmt.Sprintf("%s:%s:%s", workloads[i].Kind, workloads[i].Name, workloads[i].Namespace)

armResultCacheMutex.RLock()
cachedResult, found := armResultCache[cacheKey]
armResultCacheMutex.RUnlock()

if found {
results[i] = cachedResult
return
}

var supported bool
var err error

for {
supported, err = CheckWorkloadSupportsArm(&workloads[i])
if err != nil && strings.Contains(err.Error(), "TOOMANYREQUESTS") {
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(800)))
} else {
break
}
}
result := ArmResult{Supported: supported, Err: err}
results[i] = result

armResultCacheMutex.Lock()
armResultCache[cacheKey] = result
armResultCacheMutex.Unlock()
}(idx)
}
wg.Wait()
return results
}

func CheckWorkloadSupportsArm(w *Workload) (bool, error) {
var podSpec *corev1.PodSpec
switch w.Kind {
case WorkloadDeployment:
podSpec = &w.deployment.Spec.Template.Spec
case WorkloadStatefulSet:
podSpec = &w.statefulSet.Spec.Template.Spec
default:
return false, fmt.Errorf("unsupported workload kind: %s", w.Kind)
}
images := getPodTemplateImages(*podSpec)

supportArm := true
for _, image := range images {
ret, err := imageSupportsArm64(image)
if err != nil {
return false, fmt.Errorf("failed to check image %s for arm64 support: %w", image, err)
}
if ret == false {
supportArm = false
break
}
}
return supportArm, nil
}

func getPodTemplateImages(podSpec corev1.PodSpec) []string {
var images []string
for _, c := range podSpec.Containers {
images = append(images, c.Image)
}
for _, c := range podSpec.InitContainers {
images = append(images, c.Image)
}
return images
}

var keychain = authn.NewMultiKeychain(
authn.NewKeychainFromHelper(ecr.NewECRHelper(ecr.WithLogger(io.Discard))), // ECR
authn.NewKeychainFromHelper(credhelper.NewACRCredentialsHelper()), // ACR
google.Keychain, // GCR & Artifact Registry
github.Keychain, // GHCR
authn.DefaultKeychain, // local ~/.docker/config.json
)

// imageSupportsArm64 checks whether the given container image supports the linux/arm64
// platform. It returns true if the image manifest list (index) contains an arm64
// variant, or if the single-arch image itself is built for arm64.
func imageSupportsArm64(imageRef string) (bool, error) {
// Parse an arbitrary image reference (registry/name:tag or digest).
ref, err := name.ParseReference(imageRef, name.WeakValidation)
if err != nil {
return false, fmt.Errorf("failed to parse image reference: %w", err)
}

// Pull the descriptor (manifest or index) from the remote registry.
remoteOpts := []remote.Option{
remote.WithAuthFromKeychain(keychain),
remote.WithContext(context.Background()),
}
desc, err := remote.Get(ref, remoteOpts...)
if err != nil {
return false, fmt.Errorf("failed to fetch image descriptor: %w", err)
}

mt := desc.Descriptor.MediaType
// Handle multi-arch images (OCI index / Docker manifest list).
if mt == types.OCIImageIndex || mt == types.DockerManifestList {
idx, err := desc.ImageIndex()
if err != nil {
return false, fmt.Errorf("failed to load image index: %w", err)
}
indexManifest, err := idx.IndexManifest()
if err != nil {
return false, fmt.Errorf("failed to read index manifest: %w", err)
}
for _, manifest := range indexManifest.Manifests {
plat := manifest.Platform
if plat != nil && plat.Architecture == "arm64" && strings.EqualFold(plat.OS, "linux") {
return true, nil // linux/arm64 variant found
}
}
return false, nil // no arm64 variant in the index
}

// Handle single-arch images.
img, err := desc.Image()
if err != nil {
return false, fmt.Errorf("failed to load image: %w", err)
}
cfg, err := img.ConfigFile()
if err != nil {
return false, fmt.Errorf("failed to read image config: %w", err)
}
return cfg.Architecture == "arm64", nil
}
83 changes: 83 additions & 0 deletions arm_patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func patchWorkloadARMAffinity(selectedWorkloads []Workload) {
for _, workload := range selectedWorkloads {
var err error
switch workload.Kind {
case WorkloadDeployment:
err = patchDeploymentARMAffinity(&workload)
case WorkloadStatefulSet:
err = patchStatefulSetARMAffinity(&workload)
}
if err != nil {
fmt.Printf("Failed to patch %s workload %s/%s: %v\n", workload.Kind,
workload.Namespace, workload.Name, err)
}
}
}

func patchDeploymentARMAffinity(workload *Workload) error {
ctx := context.Background()
deployment, err := kubeClient.AppsV1().Deployments(workload.Namespace).
Get(ctx, workload.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get deployment: %w", err)
}

newDeployment := deployment.DeepCopy()
newDeployment.Spec.Template.Spec.Affinity = ensurePreferAffinity(newDeployment.Spec.Template.Spec.Affinity)

if HasArm64Preference(newDeployment.Spec.Template.Spec.Affinity) {
fmt.Printf("workload %s %s/%s already has arm preference, skip the prefer affinity\n",
workload.Kind, workload.Namespace, workload.Name)
} else {
newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
AddArm64Preference(newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
}
if CheckWorkloadHasARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations) {
fmt.Printf("workload %s %s/%s already has arm64 toleration, skip it\n",
workload.Kind, workload.Namespace, workload.Name)
} else {
newDeployment.Spec.Template.Spec.Tolerations = AddARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations)
}

return patchResource(ctx, deployment, newDeployment, workload.Namespace, workload.Name, workload.Kind)
}

func patchStatefulSetARMAffinity(workload *Workload) error {
ctx := context.Background()

ss, err := kubeClient.AppsV1().
StatefulSets(workload.Namespace).
Get(ctx, workload.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get statefulset: %w", err)
}

newSS := ss.DeepCopy()
newSS.Spec.Template.Spec.Affinity = ensurePreferAffinity(newSS.Spec.Template.Spec.Affinity)

if HasArm64Preference(newSS.Spec.Template.Spec.Affinity) {
fmt.Printf("workload %s %s/%s already has arm preference, skip the prefer affinity\n",
workload.Kind, workload.Namespace, workload.Name)
} else {
newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
AddArm64Preference(newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
}

if CheckWorkloadHasARM64Toleration(newSS.Spec.Template.Spec.Tolerations) {
fmt.Printf("workload %s %s/%s already has arm64 toleration, skip it\n",
workload.Kind, workload.Namespace, workload.Name)
} else {
newSS.Spec.Template.Spec.Tolerations = AddARM64Toleration(newSS.Spec.Template.Spec.Tolerations)
}

return patchResource(ctx, ss, newSS, workload.Namespace, workload.Name, workload.Kind)
}
62 changes: 62 additions & 0 deletions arm_rollback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func rollbackWorkloadARMAffinity(selectedWorkloads []Workload) {
for _, workload := range selectedWorkloads {
var err error
switch workload.Kind {
case WorkloadDeployment:
err = rollbackDeploymentARMAffinity(&workload)
case WorkloadStatefulSet:
err = rollbackStatefulSetARMAffinity(&workload)
}
if err != nil {
fmt.Printf("Failed to rollback %s workload %s/%s, err: %v\n", workload.Kind,
workload.Namespace, workload.Name, err)
}
}
}

func rollbackDeploymentARMAffinity(workload *Workload) error {
ctx := context.Background()
deployment, err := kubeClient.AppsV1().Deployments(workload.Namespace).
Get(ctx, workload.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get deployment: %w", err)
}

newDeployment := deployment.DeepCopy()
newDeployment.Spec.Template.Spec.Affinity = ensurePreferAffinity(newDeployment.Spec.Template.Spec.Affinity)

newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
RemoveArm64Preference(newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
newDeployment.Spec.Template.Spec.Tolerations = RemoveARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations)

return patchResource(ctx, deployment, newDeployment, workload.Namespace, workload.Name, workload.Kind)
}

func rollbackStatefulSetARMAffinity(workload *Workload) error {
ctx := context.Background()

ss, err := kubeClient.AppsV1().
StatefulSets(workload.Namespace).
Get(ctx, workload.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("get statefulset: %w", err)
}

newSS := ss.DeepCopy()
newSS.Spec.Template.Spec.Affinity = ensurePreferAffinity(newSS.Spec.Template.Spec.Affinity)

newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
RemoveArm64Preference(newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
newSS.Spec.Template.Spec.Tolerations = RemoveARM64Toleration(newSS.Spec.Template.Spec.Tolerations)

return patchResource(ctx, ss, newSS, workload.Namespace, workload.Name, workload.Kind)
}
Loading