Skip to content

Commit 1a4ff75

Browse files
committed
improve dcm tests to reduce flakiness in endpoints
Addressing a race when patching endpoints resource. This update adds a polling to ensure the endpoints resource was created. Also, dump all endpoints and endpointSlice in the test namespace in case of a test failure, which helps to correlate the current state and what should be expected in the test scenario. Example of deployments state log: ``` deployment state: replicas=4 pods=route-scale-in-5b7b4f6b8c-9ncjg/Running/10.128.1.27 // route-scale-in-5b7b4f6b8c-ck45n/Running/10.128.1.28 // route-scale-in-5b7b4f6b8c-srffr/Running/10.128.1.29 // route-scale-in-5b7b4f6b8c-v9hm2/Running/10.128.1.26 ``` Example of Endpoints and EndpointSlice resources listed if the test fails: ``` Endpoints: NAME ADDRESSES NOT READY ADDRESSES PORTS route-scale-in 10.128.1.26,10.128.1.27,10.128.1.28,10.128.1.29 9376 route-scale-in-khbh5 10.128.1.26 9376 EndpointSlices: NAME SERVICE ADDRESSES NOT READY ADDRESSES PORTS route-scale-in-khbh5-8ncmh route-scale-in-khbh5 10.128.1.26 9376 route-scale-in-rrhzv route-scale-in 10.128.1.27,10.128.1.28,10.128.1.29,10.128.1.26 9376 ``` https://redhat.atlassian.net/browse/OCPBUGS-85426
1 parent 428b9a0 commit 1a4ff75

2 files changed

Lines changed: 143 additions & 19 deletions

File tree

test/extended/router/config_manager_ingress.go

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
exutil "github.com/openshift/origin/test/extended/util"
4040
)
4141

42+
// execPodRef defines the attributes of the router pod used to execute local HTTP requests
4243
type execPodRef struct {
4344
types.NamespacedName
4445
ipAddress string
@@ -47,21 +48,40 @@ type execPodRef struct {
4748
var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() {
4849
defer g.GinkgoRecover()
4950

51+
// dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete.
5052
const dcmIngressTimeout = 2 * time.Minute
53+
// maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens.
54+
// Some tests use this value as a premise to limit the number of new servers on scale-out operations.
5155
const maxDynamicServers = 4
5256

5357
ctx := context.Background()
5458
oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin()
5559
kubeClient := oc.AdminKubeClient()
60+
routeClient := oc.AdminRouteClient()
5661

57-
// variables updated on every new test
58-
var (
59-
execPod execPodRef
60-
controller types.NamespacedName
61-
routeSelectorSet labels.Set
62-
)
62+
// execPod is the pod used to run requests against the router.
63+
var execPod execPodRef
64+
// controller is the fully qualified name of the ingress controller resource used in the current test.
65+
var controller types.NamespacedName
66+
// routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources.
67+
// All route resources should add this label, so the router can find and process them.
68+
var routeSelectorSet labels.Set
6369

6470
g.AfterEach(func() {
71+
if g.CurrentSpecReport().Failed() {
72+
routes, _ := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{})
73+
if routes != nil {
74+
outputIngress(routes.Items...)
75+
}
76+
endpoints, _ := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{})
77+
if endpoints != nil {
78+
outputEndpoints(endpoints.Items...)
79+
}
80+
epsList, _ := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{})
81+
if epsList != nil {
82+
outputEndpointSlice(epsList.Items...)
83+
}
84+
}
6585
if controller.Name != "" {
6686
err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1))
6787
o.Expect(err).NotTo(o.HaveOccurred())
@@ -73,7 +93,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
7393
nsOperator := "openshift-ingress-operator"
7494
controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-")
7595

76-
// ... and its router is created on router's namespace
96+
// ... and its router and service are created in router's namespace
7797
nsRouter := "openshift-ingress"
7898
svcName := "router-internal-" + controllerName
7999

@@ -189,12 +209,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
189209
// scaling-out to 4 replicas, one at a time
190210
for replicas := initReplicas + 1; replicas <= 4; replicas++ {
191211

192-
g.By(fmt.Sprintf("scaling-out to %d replicas", replicas))
212+
framework.Logf("scaling-out to %d replicas", replicas)
193213

194214
currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
195215
o.Expect(err).NotTo(o.HaveOccurred())
196216

197-
g.By("waiting router to add all the backend servers to the load balance")
217+
framework.Logf("waiting router to add all the backend servers to the load balance")
198218

199219
// router should eventually reach all the known replicas
200220
eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout)
@@ -235,7 +255,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
235255
// instead of HAProxy removing it from the balance due to health-check starting to fail.
236256
for replicas := initReplicas - 1; replicas >= 1; replicas-- {
237257

238-
g.By(fmt.Sprintf("scaling-in to %d replicas", replicas))
258+
framework.Logf("scaling-in to %d replicas", replicas)
239259

240260
currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas)
241261
o.Expect(err).NotTo(o.HaveOccurred())
@@ -471,6 +491,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
471491

472492
// ... k8s recreates it and we wait it to be fully functional
473493
err = builder.waitDeployment(replicas, dcmIngressTimeout)
494+
builder.printDeploymentState(ctx)
474495
o.Expect(err).NotTo(o.HaveOccurred())
475496
}
476497

@@ -637,7 +658,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
637658
// Iterates over a number of scaling operations, always checking if the change is being applied.
638659
for i, replicas := range changingReplicas {
639660

640-
g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas))
661+
framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)
641662

642663
currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
643664
o.Expect(err).NotTo(o.HaveOccurred())
@@ -778,7 +799,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype
778799
if err = r.waitDeployment(replicas, timeout); err != nil {
779800
return nil, err
780801
}
781-
return r.exposeDeployment(ctx)
802+
backendServers, err = r.exposeDeployment(ctx)
803+
if err != nil {
804+
return nil, err
805+
}
806+
r.printDeploymentState(ctx)
807+
return backendServers, nil
782808
}
783809

784810
// scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names.
@@ -796,6 +822,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t
796822
}
797823
return len(backendServers) == replicas, nil
798824
})
825+
r.printDeploymentState(ctx)
799826
return backendServers, err
800827
}
801828

@@ -824,7 +851,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
824851
}
825852

826853
// we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar
827-
epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
854+
var epCurrent *corev1.Endpoints
855+
err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) {
856+
epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
857+
if err != nil {
858+
framework.Logf("error fetching Endpoints: %s", err.Error())
859+
}
860+
return err == nil, nil
861+
})
828862
if err != nil {
829863
return "", err
830864
}
@@ -910,7 +944,7 @@ func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServic
910944
if err != nil {
911945
return err
912946
}
913-
// deleting addresses, from all subnets, whose IP address is not found in the patched `eps`
947+
// deleting addresses, from all subsets, whose IP address is not found in the patched `eps`
914948
for i := range ep.Subsets {
915949
ss := &ep.Subsets[i]
916950
ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool {
@@ -939,6 +973,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error {
939973

940974
// createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name.
941975
func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error {
976+
// This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas,
977+
// those replicas does not configure anti-affinity, so all of them can run in the same node.
942978
runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"}
943979
runArgs = append(runArgs, cmd...)
944980
return r.oc.Run("create").Args(runArgs...).Execute()
@@ -953,6 +989,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer
953989
return r.fetchServiceReplicas(ctx)
954990
}
955991

992+
// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens.
993+
// It requires that `exposeDeployment()` was already called.
994+
func (r *routeStackBuilder) printDeploymentState(ctx context.Context) {
995+
pods, err := r.fetchPods(ctx)
996+
if err != nil {
997+
framework.Logf("deployment state: error reading deployment pods: %v", err)
998+
return
999+
}
1000+
var podDescription []string
1001+
for _, pod := range pods {
1002+
var podIPs []string
1003+
for _, ip := range pod.Status.PodIPs {
1004+
podIPs = append(podIPs, ip.IP)
1005+
}
1006+
podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ",")))
1007+
}
1008+
framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // "))
1009+
}
1010+
9561011
// fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity.
9571012
func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) {
9581013
listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName}
@@ -967,8 +1022,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName
9671022
return &epsList.Items[0], nil
9681023
}
9691024

970-
// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
971-
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
1025+
// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called.
1026+
func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) {
9721027
svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{})
9731028
if err != nil {
9741029
return nil, err
@@ -978,9 +1033,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string,
9781033
if err != nil {
9791034
return nil, err
9801035
}
981-
backendServers := make([]string, len(pods.Items))
982-
for i := range pods.Items {
983-
backendServers[i] = pods.Items[i].Name
1036+
return pods.Items, nil
1037+
}
1038+
1039+
// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
1040+
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
1041+
pods, err := r.fetchPods(ctx)
1042+
if err != nil {
1043+
return nil, err
1044+
}
1045+
backendServers := make([]string, len(pods))
1046+
for i := range pods {
1047+
backendServers[i] = pods[i].Name
9841048
}
9851049
return backendServers, nil
9861050
}

test/extended/router/stress.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"strconv"
78
"strings"
89
"text/tabwriter"
910
"time"
@@ -16,6 +17,7 @@ import (
1617

1718
appsv1 "k8s.io/api/apps/v1"
1819
corev1 "k8s.io/api/core/v1"
20+
discoveryv1 "k8s.io/api/discovery/v1"
1921
rbacv1 "k8s.io/api/rbac/v1"
2022
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2123
"k8s.io/apimachinery/pkg/runtime"
@@ -656,6 +658,64 @@ func outputIngress(routes ...routev1.Route) {
656658
e2e.Logf("Routes:\n%s", b.String())
657659
}
658660

661+
func outputEndpoints(endpoints ...corev1.Endpoints) {
662+
b := &bytes.Buffer{}
663+
w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0)
664+
fmt.Fprintf(w, "NAME\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n")
665+
for _, ep := range endpoints {
666+
for _, ss := range ep.Subsets {
667+
resumeAddrs := func(addrs []corev1.EndpointAddress) string {
668+
var addrList []string
669+
for _, addr := range addrs {
670+
val := "-"
671+
if addr.IP != "" {
672+
val = addr.IP
673+
} else if addr.Hostname != "" {
674+
val = addr.Hostname
675+
}
676+
addrList = append(addrList, val)
677+
}
678+
return strings.Join(addrList, ",")
679+
}
680+
var portList []string
681+
for _, port := range ss.Ports {
682+
portList = append(portList, strconv.Itoa(int(port.Port)))
683+
}
684+
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", ep.Name, resumeAddrs(ss.Addresses), resumeAddrs(ss.NotReadyAddresses), strings.Join(portList, ","))
685+
}
686+
}
687+
w.Flush()
688+
e2e.Logf("Endpoints:\n%s", b.String())
689+
}
690+
691+
func outputEndpointSlice(epss ...discoveryv1.EndpointSlice) {
692+
b := &bytes.Buffer{}
693+
w := tabwriter.NewWriter(b, 0, 0, 2, ' ', 0)
694+
fmt.Fprintf(w, "NAME\tSERVICE\tADDRESSES\tNOT READY ADDRESSES\tPORTS\n")
695+
for _, eps := range epss {
696+
var addrList, notReadyAddrList []string
697+
for _, ep := range eps.Endpoints {
698+
addrs := strings.Join(ep.Addresses, "+")
699+
if ready := ep.Conditions.Ready; ready == nil || *ready == true {
700+
addrList = append(addrList, addrs)
701+
} else {
702+
notReadyAddrList = append(notReadyAddrList, addrs)
703+
}
704+
}
705+
var portList []string
706+
for _, port := range eps.Ports {
707+
val := "-"
708+
if port.Port != nil {
709+
val = strconv.Itoa(int(*port.Port))
710+
}
711+
portList = append(portList, val)
712+
}
713+
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", eps.Name, eps.Labels[discoveryv1.LabelServiceName], strings.Join(addrList, ","), strings.Join(notReadyAddrList, ","), strings.Join(portList, ","))
714+
}
715+
w.Flush()
716+
e2e.Logf("EndpointSlices:\n%s", b.String())
717+
}
718+
659719
// findMostRecentConditionTime returns the time of the most recent condition.
660720
func findMostRecentConditionTime(conditions []routev1.RouteIngressCondition) time.Time {
661721
var recent time.Time

0 commit comments

Comments
 (0)