Skip to content

Commit 113aa3f

Browse files
authored
Extend CacheRuntime phase 2.4: add dataset related labels to nodes and support app pod affinity (#5836)
* add dataset related labels to nodes and support app pod affinity mark thin runtime reference not support cache runtime Signed-off-by: xliuqq <xlzq1992@gmail.com> * fix test and lint Signed-off-by: xliuqq <xlzq1992@gmail.com> --------- Signed-off-by: xliuqq <xlzq1992@gmail.com>
1 parent 813d429 commit 113aa3f

16 files changed

Lines changed: 380 additions & 288 deletions

File tree

pkg/ddc/base/runtime.go

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import (
2424
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
2525
"github.com/fluid-cloudnative/fluid/pkg/common"
2626
"github.com/fluid-cloudnative/fluid/pkg/utils"
27+
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
2728
"github.com/pkg/errors"
29+
corev1 "k8s.io/api/core/v1"
2830
"k8s.io/apimachinery/pkg/api/resource"
2931
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3032
"k8s.io/apimachinery/pkg/types"
@@ -52,8 +54,6 @@ type Conventions interface {
5254

5355
GetDatasetNumLabelName() string
5456

55-
GetWorkerStatefulsetName() string
56-
5757
GetExclusiveLabelValue() string
5858
}
5959

@@ -63,6 +63,9 @@ type Conventions interface {
6363
type RuntimeInfoInterface interface {
6464
Conventions
6565

66+
// GetWorkerPods returns the worker object and selector for runtime workers.
67+
GetWorkerPods(client client.Client) ([]corev1.Pod, error)
68+
6669
GetTieredStoreInfo() TieredStoreInfo
6770

6871
GetName() string
@@ -375,6 +378,21 @@ func (info *RuntimeInfo) SetupFuseCleanPolicy(policy datav1alpha1.FuseCleanPolic
375378
info.fuse.CleanPolicy = policy
376379
}
377380

381+
func (info *RuntimeInfo) GetWorkerPods(client client.Client) ([]corev1.Pod, error) {
382+
workers, err := kubeclient.GetStatefulSet(client, info.GetWorkerStatefulsetName(), info.GetNamespace())
383+
if err != nil {
384+
return nil, err
385+
}
386+
workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector)
387+
if err != nil {
388+
return nil, err
389+
}
390+
391+
workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector)
392+
393+
return workerPods, err
394+
}
395+
378396
func (info *RuntimeInfo) GetFuseCleanPolicy() datav1alpha1.FuseCleanPolicy {
379397
return info.fuse.CleanPolicy
380398
}
@@ -598,66 +616,97 @@ func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo R
598616
return runtimeInfo, err
599617
}
600618

601-
// GetRuntimeStatus gets the runtime status according to the runtime type, name, and namespace.
602-
// This function is primarily responsible for retrieving the current status of a specific runtime
603-
// based on its type from the Kubernetes cluster.
604-
//
605-
// Parameters:
606-
// - client (client.Client): The Kubernetes client used to interact with the API server.
607-
// - runtimeType (string): The type of the runtime (e.g., AlluxioRuntime, JindoRuntime, GooseFSRuntime).
608-
// - name (string): The name of the runtime.
609-
// - namespace (string): The namespace where the runtime is located.
610-
//
611-
// Returns:
612-
// - status (*datav1alpha1.RuntimeStatus): The status of the requested runtime.
613-
// - err (error): Returns an error if the runtime status cannot be retrieved or the runtime type is unsupported, otherwise returns nil.
614-
func GetRuntimeStatus(client client.Client, runtimeType, name, namespace string) (status *datav1alpha1.RuntimeStatus, err error) {
619+
// RuntimeStatusAccessor provides a unified interface to access common status fields across different runtime types
620+
type RuntimeStatusAccessor interface {
621+
// GetCacheAffinity returns the cache affinity from the runtime status
622+
GetCacheAffinity() (*corev1.NodeAffinity, error)
623+
}
624+
625+
// GetRuntimeStatusAccessor returns a unified status accessor for the given runtime
626+
func GetRuntimeStatusAccessor(client client.Client, runtimeType, name, namespace string) (RuntimeStatusAccessor, error) {
627+
switch runtimeType {
628+
case common.AlluxioRuntime, common.JindoRuntime, common.GooseFSRuntime,
629+
common.JuiceFSRuntime, common.EFCRuntime, common.ThinRuntime, common.VineyardRuntime:
630+
status, err := GetDDCRuntimeStatus(client, runtimeType, name, namespace)
631+
if err != nil {
632+
return nil, err
633+
}
634+
return &DDCRuntimeStatusAccessor{status: status}, nil
635+
case common.CacheRuntime:
636+
runtime, err := utils.GetCacheRuntime(client, name, namespace)
637+
if err != nil {
638+
return nil, err
639+
}
640+
return &CacheRuntimeStatusAccessor{status: &runtime.Status}, nil
641+
default:
642+
return nil, fmt.Errorf("fail to get runtime status accessor for runtime type: %s", runtimeType)
643+
}
644+
}
645+
646+
// GetDDCRuntimeStatus retrieves the runtime object based on runtime type for DDC-based runtimes
647+
func GetDDCRuntimeStatus(client client.Client, runtimeType, name, namespace string) (*datav1alpha1.RuntimeStatus, error) {
615648
switch runtimeType {
616649
case common.AlluxioRuntime:
617650
runtime, err := utils.GetAlluxioRuntime(client, name, namespace)
618651
if err != nil {
619-
return status, err
652+
return nil, err
620653
}
621654
return &runtime.Status, nil
622655
case common.JindoRuntime:
623656
runtime, err := utils.GetJindoRuntime(client, name, namespace)
624657
if err != nil {
625-
return status, err
658+
return nil, err
626659
}
627660
return &runtime.Status, nil
628661
case common.GooseFSRuntime:
629662
runtime, err := utils.GetGooseFSRuntime(client, name, namespace)
630663
if err != nil {
631-
return status, err
664+
return nil, err
632665
}
633666
return &runtime.Status, nil
634667
case common.JuiceFSRuntime:
635668
runtime, err := utils.GetJuiceFSRuntime(client, name, namespace)
636669
if err != nil {
637-
return status, err
670+
return nil, err
638671
}
639672
return &runtime.Status, nil
640673
case common.EFCRuntime:
641674
runtime, err := utils.GetEFCRuntime(client, name, namespace)
642675
if err != nil {
643-
return status, err
676+
return nil, err
644677
}
645678
return &runtime.Status, nil
646679
case common.ThinRuntime:
647680
runtime, err := utils.GetThinRuntime(client, name, namespace)
648681
if err != nil {
649-
return status, err
682+
return nil, err
650683
}
651684
return &runtime.Status, nil
652685
case common.VineyardRuntime:
653686
runtime, err := utils.GetVineyardRuntime(client, name, namespace)
654687
if err != nil {
655-
return status, err
688+
return nil, err
656689
}
657690
return &runtime.Status, nil
658-
// TODO: how to handle with cache runtime? (currently used in app pod affinity scene)
659691
default:
660-
err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType)
661-
return nil, err
692+
return nil, fmt.Errorf("unsupported DDC runtime type: %s", runtimeType)
662693
}
663694
}
695+
696+
// DDCRuntimeStatusAccessor implements RuntimeStatusAccessor for DDC-based runtimes (Alluxio, Jindo, GooseFS, etc.)
697+
type DDCRuntimeStatusAccessor struct {
698+
status *datav1alpha1.RuntimeStatus
699+
}
700+
701+
func (d *DDCRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) {
702+
return d.status.CacheAffinity, nil
703+
}
704+
705+
// CacheRuntimeStatusAccessor implements RuntimeStatusAccessor for CacheRuntime
706+
type CacheRuntimeStatusAccessor struct {
707+
status *datav1alpha1.CacheRuntimeStatus
708+
}
709+
710+
func (c *CacheRuntimeStatusAccessor) GetCacheAffinity() (*corev1.NodeAffinity, error) {
711+
return c.status.CacheAffinity, nil
712+
}

pkg/ddc/base/runtime_conventions_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ var _ = Describe("RuntimeInfo.GetWorkerStatefulsetName", func() {
2828
DescribeTable("returns correct statefulset name",
2929
func(runtimeName, runtimeType, suffix string) {
3030
info, err := BuildRuntimeInfo(runtimeName, testNamespace, runtimeType)
31+
realInfo := info.(*RuntimeInfo)
3132
Expect(err).NotTo(HaveOccurred())
32-
Expect(info.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix))
33+
Expect(realInfo.GetWorkerStatefulsetName()).To(Equal(runtimeName + suffix))
3334
},
3435
Entry("JindoRuntime uses jindofs suffix", "mydata", common.JindoRuntime, "-jindofs-worker"),
3536
Entry("JindoCacheEngineImpl uses jindofs suffix", "cache", common.JindoCacheEngineImpl, "-jindofs-worker"),

0 commit comments

Comments
 (0)