Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions manifests/00-cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ rules:
- monitoring.coreos.com
resources:
- servicemonitors
- podmonitors
verbs:
- create
- get
- list
- watch
- update

- apiGroups:
Expand Down
99 changes: 99 additions & 0 deletions pkg/operator/controller/gateway-podmonitor/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package gatewaypodmonitor

import (
"context"

logf "github.com/openshift/cluster-ingress-operator/pkg/log"
"github.com/openshift/cluster-ingress-operator/pkg/manifests"
operatorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

const (
controllerName = "gateway_podmonitor_controller"
)

var log = logf.Logger.WithName(controllerName)

func NewUnmanaged(mgr manager.Manager) (controller.Controller, error) {
operatorCache := mgr.GetCache()
reconciler := &reconciler{
client: mgr.GetClient(),
cache: operatorCache,
}
c, err := controller.NewUnmanaged(controllerName, controller.Options{Reconciler: reconciler})
if err != nil {
return nil, err
}

isOperandNamespace := predicate.NewPredicateFuncs(func(o client.Object) bool {
return o.GetNamespace() == operatorcontroller.DefaultOperandNamespace
})

if err := c.Watch(source.Kind[client.Object](operatorCache, &gatewayapiv1.Gateway{}, &handler.EnqueueRequestForObject{}, isOperandNamespace)); err != nil {
return nil, err
}

podMonitor := &unstructured.Unstructured{}
podMonitor.SetGroupVersionKind(schema.GroupVersionKind{
Group: "monitoring.coreos.com",
Kind: "PodMonitor",
Version: "v1",
})
if err := c.Watch(source.Kind[client.Object](operatorCache, podMonitor, enqueueRequestForOwningGateway(), isOperandNamespace)); err != nil {
return nil, err
}

return c, nil
}

func enqueueRequestForOwningGateway() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(
func(ctx context.Context, a client.Object) []reconcile.Request {
labels := a.GetLabels()
if gatewayName, ok := labels[manifests.OwningGatewayLabel]; ok {
log.Info("Queueing gateway", "related object", a.GetNamespace()+"/"+a.GetName())
return []reconcile.Request{{NamespacedName: types.NamespacedName{
Name: gatewayName,
Namespace: operatorcontroller.DefaultOperandNamespace,
}}}
}
return []reconcile.Request{}
})
Comment thread
rikatz marked this conversation as resolved.
}

type reconciler struct {
client client.Client
cache cache.Cache
}

func (r *reconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info("Reconciling gateway", "request", request)

gateway := gatewayapiv1.Gateway{}
if err := r.cache.Get(ctx, request.NamespacedName, &gateway); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if _, _, err := r.ensureGatewayPodMonitor(ctx, &gateway); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}
150 changes: 150 additions & 0 deletions pkg/operator/controller/gateway-podmonitor/podmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package gatewaypodmonitor

import (
"context"
"fmt"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/openshift/cluster-ingress-operator/pkg/manifests"
operatorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)

func (r *reconciler) ensureGatewayPodMonitor(ctx context.Context, gateway *gatewayapiv1.Gateway) (bool, *unstructured.Unstructured, error) {
name := operatorcontroller.GatewayPodMonitorName(gateway)
have, current, err := r.currentGatewayPodMonitor(ctx, name)
if err != nil {
return false, nil, err
}

ownerRef := metav1.OwnerReference{
APIVersion: gatewayapiv1.GroupVersion.String(),
Kind: "Gateway",
Name: gateway.Name,
UID: gateway.UID,
}
desired := desiredGatewayPodMonitor(name, ownerRef)

switch {
case !have:
if err := r.client.Create(ctx, desired); err != nil {
if errors.IsAlreadyExists(err) {
return r.currentGatewayPodMonitor(ctx, name)
}
return false, nil, fmt.Errorf("failed to create gateway podmonitor: %w", err)
}
log.Info("Created gateway podmonitor", "namespace", desired.GetNamespace(), "name", desired.GetName())
return r.currentGatewayPodMonitor(ctx, name)
default:
if updated, err := r.updateGatewayPodMonitor(ctx, current, desired); err != nil {
return true, current, fmt.Errorf("failed to update gateway podmonitor: %w", err)
} else if updated {
return r.currentGatewayPodMonitor(ctx, name)
}
}

return true, current, err
}

func desiredGatewayPodMonitor(name types.NamespacedName, ownerRef metav1.OwnerReference) *unstructured.Unstructured {
// Use []interface{} for all slice fields to avoid DeepCopy failures
// and comparison issues with API-returned objects.
pm := &unstructured.Unstructured{
Object: map[string]interface{}{
"metadata": map[string]interface{}{
"namespace": name.Namespace,
"name": name.Name,
"labels": map[string]interface{}{
manifests.OwningGatewayLabel: ownerRef.Name,
},
},
"spec": map[string]interface{}{
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
operatorcontroller.GatewayNameLabelKey: ownerRef.Name,
},
},
"podMetricsEndpoints": []interface{}{
map[string]interface{}{
"path": "/stats/prometheus",
"interval": "60s",
"port": "http-envoy-prom",
"metricRelabelings": []interface{}{
map[string]interface{}{
"action": "keep",
"sourceLabels": []interface{}{"__name__"},
"regex": "istio_.*|envoy_cluster_upstream_cx_active|envoy_cluster_upstream_cx_total|envoy_cluster_upstream_rq_total|envoy_listener_downstream_cx_active|envoy_listener_http_downstream_rq|envoy_server_memory_allocated|envoy_server_memory_heap_size|envoy_server_uptime",
},
},
},
},
},
},
}
pm.SetGroupVersionKind(schema.GroupVersionKind{
Group: "monitoring.coreos.com",
Kind: "PodMonitor",
Version: "v1",
})
pm.SetOwnerReferences([]metav1.OwnerReference{ownerRef})
return pm
}

func (r *reconciler) currentGatewayPodMonitor(ctx context.Context, name types.NamespacedName) (bool, *unstructured.Unstructured, error) {
pm := &unstructured.Unstructured{}
pm.SetGroupVersionKind(schema.GroupVersionKind{
Group: "monitoring.coreos.com",
Kind: "PodMonitor",
Version: "v1",
})
if err := r.client.Get(ctx, name, pm); err != nil {
if errors.IsNotFound(err) {
return false, nil, nil
}
return false, nil, err
}
return true, pm, nil
}

func (r *reconciler) updateGatewayPodMonitor(ctx context.Context, current, desired *unstructured.Unstructured) (bool, error) {
changed, updated := gatewayPodMonitorChanged(current, desired)
if !changed {
return false, nil
}
diff := cmp.Diff(current, updated, cmpopts.EquateEmpty())
if err := r.client.Update(ctx, updated); err != nil {
return false, err
}
log.Info("Updated gateway podmonitor", "namespace", updated.GetNamespace(), "name", updated.GetName(), "diff", diff)
return true, nil
}

func gatewayPodMonitorChanged(current, desired *unstructured.Unstructured) (bool, *unstructured.Unstructured) {
changed := false
updated := current.DeepCopy()

if !cmp.Equal(current.Object["spec"], desired.Object["spec"], cmpopts.EquateEmpty()) {
changed = true
updated.Object["spec"] = desired.Object["spec"]
}
if !cmp.Equal(current.GetLabels(), desired.GetLabels(), cmpopts.EquateEmpty()) {
changed = true
updated.SetLabels(desired.GetLabels())
}
if !cmp.Equal(current.GetOwnerReferences(), desired.GetOwnerReferences(), cmpopts.EquateEmpty()) {
changed = true
updated.SetOwnerReferences(desired.GetOwnerReferences())
}

if !changed {
return false, nil
}
return true, updated
}
7 changes: 7 additions & 0 deletions pkg/operator/controller/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,13 @@ func GatewayNetworkPolicyName(gateway *gatewayapiv1.Gateway) types.NamespacedNam
}
}

func GatewayPodMonitorName(gateway *gatewayapiv1.Gateway) types.NamespacedName {
return types.NamespacedName{
Namespace: gateway.Namespace,
Name: fmt.Sprintf("%s-monitor", gateway.Name),
}
}

func IstiodNetworkPolicyName() types.NamespacedName {
return types.NamespacedName{
Name: "istiod-allow",
Expand Down
7 changes: 7 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
dnscontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/dns"
gatewaylabelercontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gateway-labeler"
gatewaynetworkpolicycontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gateway-networkpolicy"
gatewaypodmonitorcontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gateway-podmonitor"
gatewayservicednscontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gateway-service-dns"
gatewaystatuscontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gateway-status"
gatewayapicontroller "github.com/openshift/cluster-ingress-operator/pkg/operator/controller/gatewayapi"
Expand Down Expand Up @@ -354,6 +355,11 @@ func New(config operatorconfig.Config, kubeConfig *rest.Config) (*Operator, erro
return nil, fmt.Errorf("failed to create gateway-networkpolicy controller: %w", err)
}

gatewayPodMonitorController, err := gatewaypodmonitorcontroller.NewUnmanaged(mgr)
if err != nil {
return nil, fmt.Errorf("failed to create gateway-podmonitor controller: %w", err)
}

// Set up the gatewayapi controller.
if _, err := gatewayapicontroller.New(mgr, gatewayapicontroller.Config{
MarketplaceEnabled: marketplaceEnabled,
Expand All @@ -365,6 +371,7 @@ func New(config operatorconfig.Config, kubeConfig *rest.Config) (*Operator, erro
gatewayLabelController,
gatewayStatusController,
gatewayNetworkPolicyController,
gatewayPodMonitorController,
},
}); err != nil {
return nil, fmt.Errorf("failed to create gatewayapi controller: %w", err)
Expand Down
70 changes: 70 additions & 0 deletions test/e2e/gateway_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/names"
Expand Down Expand Up @@ -1575,6 +1577,9 @@ func ensureGatewayObjectSuccess(t *testing.T, ns *corev1.Namespace) []string {
}
assertHorizontalPodAutoscalerEnabled(t, operatorcontroller.DefaultOperandNamespace, testGatewayName, operatorcontroller.OpenShiftDefaultGatewayClassName, expectedMinReplicas)

t.Log("Verifying PodMonitor is created for the gateway...")
assertGatewayPodMonitor(t, operatorcontroller.DefaultOperandNamespace, testGatewayName)

t.Log("Making sure the httproute is created and accepted...")
_, err = assertHttpRouteSuccessful(t, ns.Name, "test-httproute", gateway)
if err != nil {
Expand All @@ -1589,3 +1594,68 @@ func ensureGatewayObjectSuccess(t *testing.T, ns *corev1.Namespace) []string {

return errs
}

// assertGatewayPodMonitor verifies that a PodMonitor resource is created for
// the given gateway with the expected spec, labels, and ownerReference.
func assertGatewayPodMonitor(t *testing.T, namespace, gatewayName string) {
t.Helper()

podMonitorName := types.NamespacedName{
Namespace: namespace,
Name: fmt.Sprintf("%s-monitor", gatewayName),
}

pm := &unstructured.Unstructured{}
pm.SetGroupVersionKind(schema.GroupVersionKind{
Group: "monitoring.coreos.com",
Kind: "PodMonitor",
Version: "v1",
})

err := wait.PollUntilContextTimeout(context.Background(), 2*time.Second, 2*time.Minute, false, func(ctx context.Context) (bool, error) {
if err := kclient.Get(ctx, podMonitorName, pm); err != nil {
t.Logf("Failed to get PodMonitor %v: %v; retrying...", podMonitorName, err)
return false, nil
}
return true, nil
})
if !assert.NoError(t, err, "PodMonitor %v should exist", podMonitorName) {
return
}

// Verify the owning-gateway label used by the controller to map
// PodMonitor events back to the owning Gateway for reconciliation.
labels := pm.GetLabels()
assert.Equal(t, gatewayName, labels["ingress.openshift.io/owning-gateway"],
"PodMonitor should have owning-gateway label set to the gateway name")

// Verify the ownerReference used by Kubernetes GC for cascading
// deletion when the Gateway is removed.
ownerRefs := pm.GetOwnerReferences()
if assert.Len(t, ownerRefs, 1, "PodMonitor should have exactly one ownerReference") {
assert.Equal(t, "Gateway", ownerRefs[0].Kind, "ownerReference should point to a Gateway")
assert.Equal(t, gatewayName, ownerRefs[0].Name, "ownerReference should point to the test gateway")
}

// Verify the pod selector matches the gateway name label.
selector, found, err := unstructured.NestedMap(pm.Object, "spec", "selector", "matchLabels")
if assert.NoError(t, err) && assert.True(t, found, "PodMonitor should have spec.selector.matchLabels") {
assert.Equal(t, gatewayName, selector["gateway.networking.k8s.io/gateway-name"],
"PodMonitor selector should match the gateway name")
}

// Verify podMetricsEndpoints configuration.
endpoints, found, err := unstructured.NestedSlice(pm.Object, "spec", "podMetricsEndpoints")
if assert.NoError(t, err) && assert.True(t, found, "PodMonitor should have spec.podMetricsEndpoints") {
if assert.Len(t, endpoints, 1, "PodMonitor should have exactly one endpoint") {
ep, ok := endpoints[0].(map[string]interface{})
if assert.True(t, ok, "endpoint should be a map") {
assert.Equal(t, "/stats/prometheus", ep["path"], "endpoint path should be /stats/prometheus")
assert.Equal(t, "60s", ep["interval"], "endpoint interval should be 60s")
assert.Equal(t, "http-envoy-prom", ep["port"], "endpoint port should be http-envoy-prom")
}
}
}

t.Logf("PodMonitor %v verified successfully", podMonitorName)
}