Skip to content

Commit a817ecf

Browse files
committed
improve dcm tests to reduce flakiness in endpoints
Addressing a race when patching endpoints resource: * Polling EndpointSlice resource to ensure its controller created it after we create the Endpoints resource; * Polling EndpointSlice, waiting for it to be updated after we update the Endpoint resource. Also, dumping all endpoints and endpointSlice in the test namespace in case of a test failure, which helps to correlate the current state and what should be expected in the test scenario. Example of deployments state log: ``` deployment state: replicas=4 pods=route-scale-in-5b7b4f6b8c-9ncjg/Running/10.128.1.27 // route-scale-in-5b7b4f6b8c-ck45n/Running/10.128.1.28 // route-scale-in-5b7b4f6b8c-srffr/Running/10.128.1.29 // route-scale-in-5b7b4f6b8c-v9hm2/Running/10.128.1.26 ``` Example of Endpoints and EndpointSlice resources listed if the test fails: ``` Endpoints: NAME ADDRESSES NOT READY ADDRESSES PORTS route-scale-in 10.128.1.26,10.128.1.27,10.128.1.28,10.128.1.29 9376 route-scale-in-khbh5 10.128.1.26 9376 EndpointSlices: NAME SERVICE ADDRESSES NOT READY ADDRESSES PORTS route-scale-in-khbh5-8ncmh route-scale-in-khbh5 10.128.1.26 9376 route-scale-in-rrhzv route-scale-in 10.128.1.27,10.128.1.28,10.128.1.29,10.128.1.26 9376 ``` https://redhat.atlassian.net/browse/OCPBUGS-85426
1 parent b3b98a0 commit a817ecf

2 files changed

Lines changed: 200 additions & 49 deletions

File tree

test/extended/router/config_manager_ingress.go

Lines changed: 140 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
exutil "github.com/openshift/origin/test/extended/util"
4040
)
4141

42+
// execPodRef defines the attributes of the router pod used to execute local HTTP requests
4243
type execPodRef struct {
4344
types.NamespacedName
4445
ipAddress string
@@ -47,21 +48,46 @@ type execPodRef struct {
4748
var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.io][OCPFeatureGate:IngressControllerDynamicConfigurationManager]", func() {
4849
defer g.GinkgoRecover()
4950

51+
// dcmIngressTimeout defines the maximum amount of time to wait for test operations to complete.
5052
const dcmIngressTimeout = 2 * time.Minute
53+
// maxDynamicServers defines the number of empty dynamic servers to be allocated when a reload happens.
54+
// Some tests use this value as a premise to limit the number of new servers on scale-out operations.
5155
const maxDynamicServers = 4
5256

5357
ctx := context.Background()
5458
oc := exutil.NewCLIWithPodSecurityLevel("router-dcm-ingress", api.LevelPrivileged).AsAdmin()
5559
kubeClient := oc.AdminKubeClient()
60+
routeClient := oc.AdminRouteClient()
5661

57-
// variables updated on every new test
58-
var (
59-
execPod execPodRef
60-
controller types.NamespacedName
61-
routeSelectorSet labels.Set
62-
)
62+
// execPod is the pod used to run requests against the router.
63+
var execPod execPodRef
64+
// controller is the fully qualified name of the ingress controller resource used in the current test.
65+
var controller types.NamespacedName
66+
// routeSelectorSet is the label key/value pair that the ingress controller uses to filter route resources.
67+
// All route resources should add this label, so the router can find and process them.
68+
var routeSelectorSet labels.Set
6369

6470
g.AfterEach(func() {
71+
if g.CurrentSpecReport().Failed() {
72+
routes, err := routeClient.RouteV1().Routes(oc.Namespace()).List(ctx, metav1.ListOptions{})
73+
if err != nil {
74+
framework.Logf("failed to list Routes in namespace %q: %v", oc.Namespace(), err)
75+
} else {
76+
outputIngress(routes.Items...)
77+
}
78+
endpoints, err := kubeClient.CoreV1().Endpoints(oc.Namespace()).List(ctx, metav1.ListOptions{})
79+
if err != nil {
80+
framework.Logf("failed to list Endpoints in namespace %q: %v", oc.Namespace(), err)
81+
} else {
82+
outputEndpoints(endpoints.Items...)
83+
}
84+
epsList, err := kubeClient.DiscoveryV1().EndpointSlices(oc.Namespace()).List(ctx, metav1.ListOptions{})
85+
if err != nil {
86+
framework.Logf("failed to list EndpointSlices in namespace %q: %v", oc.Namespace(), err)
87+
} else {
88+
outputEndpointSlice(epsList.Items...)
89+
}
90+
}
6591
if controller.Name != "" {
6692
err := oc.AdminOperatorClient().OperatorV1().IngressControllers(controller.Namespace).Delete(ctx, controller.Name, *metav1.NewDeleteOptions(1))
6793
o.Expect(err).NotTo(o.HaveOccurred())
@@ -73,7 +99,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
7399
nsOperator := "openshift-ingress-operator"
74100
controllerName := names.SimpleNameGenerator.GenerateName("e2e-dcm-")
75101

76-
// ... and its router is created on router's namespace
102+
// ... and its router and service are created in router's namespace
77103
nsRouter := "openshift-ingress"
78104
svcName := "router-internal-" + controllerName
79105

@@ -189,12 +215,12 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
189215
// scaling-out to 4 replicas, one at a time
190216
for replicas := initReplicas + 1; replicas <= 4; replicas++ {
191217

192-
g.By(fmt.Sprintf("scaling-out to %d replicas", replicas))
218+
framework.Logf("scaling-out to %d replicas", replicas)
193219

194220
currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
195221
o.Expect(err).NotTo(o.HaveOccurred())
196222

197-
g.By("waiting router to add all the backend servers to the load balance")
223+
framework.Logf("waiting router to add all the backend servers to the load balance")
198224

199225
// router should eventually reach all the known replicas
200226
eventuallyRouteAllServers(execPod, hostname, false, currentServers, dcmIngressTimeout)
@@ -235,9 +261,9 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
235261
// instead of HAProxy removing it from the balance due to health-check starting to fail.
236262
for replicas := initReplicas - 1; replicas >= 1; replicas-- {
237263

238-
g.By(fmt.Sprintf("scaling-in to %d replicas", replicas))
264+
framework.Logf("scaling-in to %d replicas", replicas)
239265

240-
currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas)
266+
currentServers, err := builder.scaleInEndpoints(ctx, serviceName, replicas, dcmIngressTimeout)
241267
o.Expect(err).NotTo(o.HaveOccurred())
242268

243269
g.By("ensure that router removed the missing backend servers from the load balance")
@@ -471,6 +497,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
471497

472498
// ... k8s recreates it and we wait it to be fully functional
473499
err = builder.waitDeployment(replicas, dcmIngressTimeout)
500+
builder.printDeploymentState(ctx)
474501
o.Expect(err).NotTo(o.HaveOccurred())
475502
}
476503

@@ -637,7 +664,7 @@ var _ = g.Describe("[sig-network-edge][Feature:Router][apigroup:route.openshift.
637664
// Iterates over a number of scaling operations, always checking if the change is being applied.
638665
for i, replicas := range changingReplicas {
639666

640-
g.By(fmt.Sprintf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas))
667+
framework.Logf("iteration %d, scaling from %d to %d replicas", i+1, prevReplicas, replicas)
641668

642669
currentServers, err := builder.scaleDeployment(ctx, replicas, dcmIngressTimeout)
643670
o.Expect(err).NotTo(o.HaveOccurred())
@@ -778,7 +805,12 @@ func (r *routeStackBuilder) createDeploymentStack(ctx context.Context, routetype
778805
if err = r.waitDeployment(replicas, timeout); err != nil {
779806
return nil, err
780807
}
781-
return r.exposeDeployment(ctx)
808+
backendServers, err = r.exposeDeployment(ctx)
809+
if err != nil {
810+
return nil, err
811+
}
812+
r.printDeploymentState(ctx)
813+
return backendServers, nil
782814
}
783815

784816
// scaleDeployment scales-in/out the common deployment to the specified replicas. It waits for all the pods to be created and returns their names.
@@ -796,6 +828,7 @@ func (r *routeStackBuilder) scaleDeployment(ctx context.Context, replicas int, t
796828
}
797829
return len(backendServers) == replicas, nil
798830
})
831+
r.printDeploymentState(ctx)
799832
return backendServers, err
800833
}
801834

@@ -824,7 +857,14 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
824857
}
825858

826859
// we also need the deprecated Endpoints API, since router still uses it depending on the ROUTER_WATCH_ENDPOINTS envvar
827-
epCurrent, err := r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
860+
var epCurrent *corev1.Endpoints
861+
err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) {
862+
epCurrent, err = r.kubeClient.CoreV1().Endpoints(svcCurrent.Namespace).Get(ctx, svcCurrent.Name, metav1.GetOptions{})
863+
if err != nil {
864+
framework.Logf("error fetching Endpoints: %s", err.Error())
865+
}
866+
return err == nil, nil
867+
})
828868
if err != nil {
829869
return "", err
830870
}
@@ -841,7 +881,7 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
841881
}
842882

843883
// EndpointSlice use to be created as soon as the Endpoints resource is created. Lets wait for it, and create ourselves in case it is missing
844-
err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, false, func(ctx context.Context) (done bool, err error) {
884+
err = wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, false, func(ctx context.Context) (done bool, err error) {
845885
_, err = r.fetchEndpointSlice(ctx, serviceName)
846886
if err != nil {
847887
framework.Logf("error fetching EndpointSlice: %s", err.Error())
@@ -881,49 +921,70 @@ func (r *routeStackBuilder) createDetachedService(ctx context.Context) (serviceN
881921
// scaleInEndpoints changes the number of replicas of an endpoint and EndpointSlice. This only works on services
882922
// without selector created via `createDetachedService()`. It is useful as a way to scale-in a service and route without
883923
// removing the underlying pods of a deployment.
884-
func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int) (backendServers []string, err error) {
885-
var eps *discoveryv1.EndpointSlice
924+
func (r *routeStackBuilder) scaleInEndpoints(ctx context.Context, detachedServiceName string, replicas int, timeout time.Duration) (backendServers []string, err error) {
925+
var targetAddresses []corev1.EndpointAddress
926+
927+
// update Endpoints with the desired number of replicas
886928
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
887-
eps, err = r.fetchEndpointSlice(ctx, detachedServiceName)
929+
ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{})
888930
if err != nil {
889931
return err
890932
}
891-
if count := len(eps.Endpoints); count < replicas {
892-
return fmt.Errorf("endpoints can only be scaled-in. found %d replicas, want %d", count, replicas)
933+
if count := len(ep.Subsets); count != 1 {
934+
return fmt.Errorf("expected one subset in endpoints, found %d", count)
893935
}
894-
eps.Endpoints = eps.Endpoints[:replicas]
895-
_, err = r.kubeClient.DiscoveryV1().EndpointSlices(eps.Namespace).Update(ctx, eps, metav1.UpdateOptions{})
896-
if err != nil {
897-
return err
898-
}
899-
backendServers = make([]string, len(eps.Endpoints))
900-
for i, ep := range eps.Endpoints {
901-
backendServers[i] = ep.TargetRef.Name
936+
ss := &ep.Subsets[0]
937+
if count := len(ss.Addresses); count < replicas {
938+
return fmt.Errorf("endpoints can only be scaled-in. found %d replicas in Endpoints, want %d", count, replicas)
902939
}
903-
return nil
940+
targetAddresses = ss.Addresses[:replicas]
941+
ss.Addresses = targetAddresses
942+
_, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{})
943+
return err
904944
})
905945
if err != nil {
906946
return nil, err
907947
}
908-
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
909-
ep, err := r.kubeClient.CoreV1().Endpoints(r.namespace).Get(ctx, detachedServiceName, metav1.GetOptions{})
948+
949+
// Build backend server list from the preserved addresses
950+
backendServers = make([]string, replicas)
951+
for i, addr := range targetAddresses {
952+
backendServers[i] = addr.TargetRef.Name
953+
}
954+
955+
// The EndpointSlice mirroring controller will sync Endpoints -> EndpointSlice for selector-less services.
956+
// Wait for the controller to mirror the change.
957+
err = wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, false, func(ctx context.Context) (done bool, err error) {
958+
eps, err := r.fetchEndpointSlice(ctx, detachedServiceName)
910959
if err != nil {
911-
return err
960+
framework.Logf("error fetching EndpointSlice while waiting for mirroring: %s", err.Error())
961+
return false, nil
912962
}
913-
// deleting addresses, from all subnets, whose IP address is not found in the patched `eps`
914-
for i := range ep.Subsets {
915-
ss := &ep.Subsets[i]
916-
ss.Addresses = slices.DeleteFunc(ss.Addresses, func(addr corev1.EndpointAddress) bool {
917-
return !slices.ContainsFunc(eps.Endpoints, func(e discoveryv1.Endpoint) bool {
918-
return addr.IP == e.Addresses[0]
919-
})
920-
})
963+
// Verify the endpoints match by IP to ensure we have the right ones, not just the right count
964+
if !endpointSliceMatchesAddresses(eps.Endpoints, targetAddresses) {
965+
framework.Logf("EndpointSlice and Endpoints addresses do not match, waiting for mirroring controller. eps=%v ep=%v", eps.Endpoints, targetAddresses)
966+
return false, nil
921967
}
922-
_, err = r.kubeClient.CoreV1().Endpoints(ep.Namespace).Update(ctx, ep, metav1.UpdateOptions{})
923-
return err
924-
968+
return true, nil
925969
})
926-
return backendServers, err
970+
if err != nil {
971+
return nil, fmt.Errorf("EndpointSlice mirroring did not converge: %w", err)
972+
}
973+
974+
return backendServers, nil
975+
}
976+
977+
// endpointSliceMatchesAddresses verifies that the provided endpoints contains the same addresses as the target list
978+
func endpointSliceMatchesAddresses(eps []discoveryv1.Endpoint, targetAddresses []corev1.EndpointAddress) bool {
979+
if len(eps) != len(targetAddresses) {
980+
return false
981+
}
982+
for _, ep := range eps {
983+
if len(ep.Addresses) == 0 || !slices.ContainsFunc(targetAddresses, func(addr corev1.EndpointAddress) bool { return addr.IP == ep.Addresses[0] }) {
984+
return false
985+
}
986+
}
987+
return true
927988
}
928989

929990
// waitDeployment waits the common deployment to report all its replicas as ready.
@@ -939,6 +1000,8 @@ func (r *routeStackBuilder) createServeHostnameDeployment(replicas int) error {
9391000

9401001
// createDeployment creates the deployment resource. It should be called just once, since it uses the OC namespace and the common resource name.
9411002
func (r *routeStackBuilder) createDeployment(image string, replicas, port int, cmd ...string) error {
1003+
// This deployment is safe under Single Node OpenShift (SNO): although it can create more replicas,
1004+
// those replicas does not configure anti-affinity, so all of them can run in the same node.
9421005
runArgs := []string{"deployment", r.resourceName, "--image", image, "--replicas", strconv.Itoa(replicas), "--port", strconv.Itoa(port), "--"}
9431006
runArgs = append(runArgs, cmd...)
9441007
return r.oc.Run("create").Args(runArgs...).Execute()
@@ -953,6 +1016,25 @@ func (r *routeStackBuilder) exposeDeployment(ctx context.Context) (backendServer
9531016
return r.fetchServiceReplicas(ctx)
9541017
}
9551018

1019+
// printDeploymentState outputs the pod names, status, and their IP addresses. Best effort, it outputs the error instead in case it happens.
1020+
// It requires that `exposeDeployment()` was already called.
1021+
func (r *routeStackBuilder) printDeploymentState(ctx context.Context) {
1022+
pods, err := r.fetchPods(ctx)
1023+
if err != nil {
1024+
framework.Logf("deployment state: error reading deployment pods: %v", err)
1025+
return
1026+
}
1027+
var podDescription []string
1028+
for _, pod := range pods {
1029+
var podIPs []string
1030+
for _, ip := range pod.Status.PodIPs {
1031+
podIPs = append(podIPs, ip.IP)
1032+
}
1033+
podDescription = append(podDescription, fmt.Sprintf("%s/%s/%s", pod.Name, pod.Status.Phase, strings.Join(podIPs, ",")))
1034+
}
1035+
framework.Logf("deployment state: replicas=%d pods=%s", len(pods), strings.Join(podDescription, " // "))
1036+
}
1037+
9561038
// fetchEndpointSlice fetches the EndpointSlice of the provided service name. It currently supports only one EndpointSlice instance for simplicity.
9571039
func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName string) (*discoveryv1.EndpointSlice, error) {
9581040
listOpts := metav1.ListOptions{LabelSelector: discoveryv1.LabelServiceName + "=" + serviceName}
@@ -967,8 +1049,8 @@ func (r *routeStackBuilder) fetchEndpointSlice(ctx context.Context, serviceName
9671049
return &epsList.Items[0], nil
9681050
}
9691051

970-
// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
971-
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
1052+
// fetchPods fetches the pods from the exposed common deployment. It requires that `exposeDeployment()` was already called.
1053+
func (r *routeStackBuilder) fetchPods(ctx context.Context) ([]corev1.Pod, error) {
9721054
svc, err := r.kubeClient.CoreV1().Services(r.namespace).Get(ctx, r.resourceName, metav1.GetOptions{})
9731055
if err != nil {
9741056
return nil, err
@@ -978,9 +1060,18 @@ func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string,
9781060
if err != nil {
9791061
return nil, err
9801062
}
981-
backendServers := make([]string, len(pods.Items))
982-
for i := range pods.Items {
983-
backendServers[i] = pods.Items[i].Name
1063+
return pods.Items, nil
1064+
}
1065+
1066+
// fetchServiceReplicas fetches the pod names from the exposed common deployment. It requires that `exposeDeployment()` was already called.
1067+
func (r *routeStackBuilder) fetchServiceReplicas(ctx context.Context) ([]string, error) {
1068+
pods, err := r.fetchPods(ctx)
1069+
if err != nil {
1070+
return nil, err
1071+
}
1072+
backendServers := make([]string, len(pods))
1073+
for i := range pods {
1074+
backendServers[i] = pods[i].Name
9841075
}
9851076
return backendServers, nil
9861077
}

0 commit comments

Comments
 (0)