Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions test/e2e/apisix/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ func TestAPISIXE2E(t *testing.T) {
_, _ = fmt.Fprintf(GinkgoWriter, "Starting APISIX standalone e2e suite\n")
RunSpecs(t, "apisix standalone e2e suite")
}

// Tear down any prewarmed environments left in the pools when the suite ends.
var _ = AfterSuite(func() {
scaffold.ShutdownAllPools()
})
16 changes: 13 additions & 3 deletions test/e2e/crds/v2/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,9 +2201,19 @@ spec:
&apiv2.ApisixRoute{}, fmt.Sprintf(apisixRouteSpec, s.Namespace()))

By("check upstreams")
upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background())
Expect(err).ShouldNot(HaveOccurred())
Expect(upstreams).Should(HaveLen(4))
// Poll instead of asserting once: the controller syncs the four
// upstreams to the data plane asynchronously after the ApisixRoute is
// applied, so a single immediate check can race and observe fewer.
s.RetryAssertion(func() error {
upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background())
if err != nil {
return err
}
if len(upstreams) != 4 {
return fmt.Errorf("expected 4 upstreams, got %d", len(upstreams))
}
return nil
}).ShouldNot(HaveOccurred(), "waiting for 4 upstreams to be synced")

By("verify ApisixRoute works")
s.RequestAssert(&scaffold.RequestAssert{
Expand Down
17 changes: 14 additions & 3 deletions test/e2e/framework/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (f *Framework) ensureService(name, namespace string, desiredEndpoints int)
return f.ensureServiceWithTimeout(name, namespace, desiredEndpoints, 120)
}

// EnsureServiceReadyE waits until the named Service has the desired number of
// ready endpoints, returning an error instead of failing the test. It is used by
// the prewarm pool, whose background workers must not call Ginkgo assertions.
func (f *Framework) EnsureServiceReadyE(namespace, name string, desiredEndpoints int) error {
return f.ensureService(name, namespace, desiredEndpoints)
}

func (f *Framework) ensureServiceWithTimeout(name, namespace string, desiredEndpoints, timeout int) error {
backoff := wait.Backoff{
Duration: 6 * time.Second,
Expand Down Expand Up @@ -290,10 +297,14 @@ func WaitPodsAvailable(t testing.TestingT, kubeOps *k8s.KubectlOptions, opts met
}

func waitExponentialBackoff(condFunc func() (bool, error)) error {
// Poll at a fixed 2s interval up to ~180s. The previous exponential schedule
// (500ms, factor 2, 8 steps) polled at 7.5/15.5/31.5/63.5s, i.e. sparsely
// exactly during the 10-30s window when pods usually become ready, adding up
// to ~15s of needless waiting per call.
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 2,
Steps: 8,
Duration: 2 * time.Second,
Factor: 1,
Steps: 90,
}
return wait.ExponentialBackoff(backoff, condFunc)
}
Expand Down
28 changes: 28 additions & 0 deletions test/e2e/scaffold/apisix_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ func NewAPISIXDeployer(s *Scaffold) Deployer {
}

func (s *APISIXDeployer) BeforeEach() {
// Fast path: pick up a prewarmed environment so the deploy/readiness
// latency is paid by a background worker (overlapping the previous spec)
// instead of on this spec's critical path. Only the default profile is
// pooled; anything else, or any provisioning failure, falls back to the
// synchronous deploy below.
if prewarmEnabled() && isPoolable(s.opts) && specPrewarmable() {
fw, opts := s.Framework, s.opts
pool := getOrStartPool(profileKey(opts), prewarmDepth(), func() *pooledEnv {
return provisionAPISIXEnv(fw, opts)
})
env := pool.acquire()
if env != nil && env.err == nil {
if err := s.loadPooledEnv(env); err != nil {
s.Logf("prewarm tunnel setup failed, falling back to synchronous deploy: %v", err)
destroyPooledEnv(env)
} else {
return
}
} else if env != nil && env.err != nil {
s.Logf("prewarm provision failed, falling back to synchronous deploy: %v", env.err)
destroyPooledEnv(env)
}
}

s.beforeEachSync()
}

func (s *APISIXDeployer) beforeEachSync() {
s.runtimeOpts = s.opts
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.runtimeOpts.Name, time.Now().Nanosecond())
s.kubectlOptions = &k8s.KubectlOptions{
Expand Down
273 changes: 273 additions & 0 deletions test/e2e/scaffold/apisix_prewarm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package scaffold

import (
"bytes"
"fmt"
"os"
"strings"
"sync/atomic"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
. "github.com/onsi/ginkgo/v2" //nolint:staticcheck
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

"github.com/apache/apisix-ingress-controller/test/e2e/framework"
)

var nsCounter int64

// defaultProfileName is the scaffold Options.Name used by the default profile.
const defaultProfileName = "default"

// streamRouteLabels marks Gateway API stream-route specs (TCP/TLS/UDP).
var streamRouteLabels = map[string]struct{}{
"tcproute": {},
"tlsroute": {},
"udproute": {},
}

// specPrewarmable reports whether the currently running spec may be served from
// the prewarm pool. Stream-route specs (Gateway API TCP/TLS/UDP routes and the
// ApisixRoute stream-route suite) are excluded and fall back to synchronous
// deployment: under apisix-standalone a stream route served from a prewarmed
// data plane is flaky, because the data plane's stream subsystem races with the
// concurrent background provisioning of the next pooled environment, whereas
// HTTP routes tolerate that contention.
func specPrewarmable() bool {
report := CurrentSpecReport()
for _, label := range report.Labels() {
if _, ok := streamRouteLabels[label]; ok {
return false
}
}
for _, text := range report.ContainerHierarchyTexts {
if strings.Contains(text, "StreamRoute") {
return false
}
}
return true
}

// isPoolable reports whether an environment with these options can be served
// from the prewarm pool. Only the default profile is pooled; webhook-enabled
// and custom-keyed environments fall back to synchronous deployment.
func isPoolable(o Options) bool {
return !o.SkipHooks &&
!o.EnableWebhook &&
o.ControllerName == "" &&
o.APISIXAdminAPIKey == ""
}

// profileKey identifies the pool an environment belongs to. Within a process
// all default scaffolds share one pool.
func profileKey(o Options) string {
name := o.Name
if name == "" {
name = defaultProfileName
}
return "name=" + name
}

func formatRegistry(workloadTemplate string) string {
if customRegistry, ok := os.LookupEnv("REGISTRY"); ok {
return strings.ReplaceAll(workloadTemplate, "127.0.0.1:5000", customRegistry)
}
return workloadTemplate
}

// provisionAPISIXEnv builds a complete default-profile environment using
// error-returning primitives only, so it is safe to run in a background
// goroutine. Any failure is captured in pooledEnv.err for the caller to handle.
func provisionAPISIXEnv(fw *framework.Framework, opts Options) *pooledEnv {
t := &bgTestingT{}
env := &pooledEnv{}

name := opts.Name
if name == "" {
name = defaultProfileName
}
ns := fmt.Sprintf("ingress-apisix-e2e-tests-%s-p%d-%d",
name, GinkgoParallelProcess(), atomic.AddInt64(&nsCounter, 1))
env.namespace = ns
env.kubectlOptions = &k8s.KubectlOptions{
ConfigPath: GetKubeconfig(),
Namespace: ns,
}
env.adminKey = getEnvOrDefault("APISIX_ADMIN_KEY", "edd1c9f034335f136f87ad84b625c8f1")
env.controllerName = fmt.Sprintf("%s/%s", DefaultControllerName, ns)

if err := k8s.CreateNamespaceE(t, env.kubectlOptions, ns); err != nil {
env.err = fmt.Errorf("creating namespace: %w", err)
return env
}

// 1) Data plane (APISIX, plus etcd when the provider needs it).
svc, err := provisionDataplane(t, env, opts)
if err != nil {
env.err = err
return env
}
env.dataplaneService = svc

// 2) Ingress controller.
if err := provisionIngress(t, env); err != nil {
env.err = err
return env
}

// 3) httpbin test backend.
httpbinSvc, err := provisionHTTPBIN(fw, t, env)
if err != nil {
env.err = err
return env
}
env.httpbinService = httpbinSvc

return env
}

func provisionDataplane(t *bgTestingT, env *pooledEnv, _ Options) (*corev1.Service, error) {
serviceName := framework.ProviderType
configProvider := framework.ConfigProviderTypeYaml
if framework.ProviderType == framework.ProviderTypeAPISIX {
configProvider = framework.ConfigProviderTypeEtcd
}

if configProvider == framework.ConfigProviderTypeEtcd {
if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, framework.EtcdSpec); err != nil {
return nil, fmt.Errorf("applying etcd: %w", err)
}
if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=etcd",
}); err != nil {
return nil, fmt.Errorf("waiting for etcd pod: %w", err)
}
}

deployOpts := APISIXDeployOptions{
Namespace: env.namespace,
AdminKey: env.adminKey,
ServiceName: serviceName,
ServiceHTTPPort: 9080,
ServiceHTTPSPort: 9443,
ConfigProvider: configProvider,
Replicas: ptr.To(1),
}
buf := bytes.NewBuffer(nil)
if err := framework.APISIXStandaloneTpl.Execute(buf, &deployOpts); err != nil {
return nil, fmt.Errorf("rendering apisix template: %w", err)
}
if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, buf.String()); err != nil {
return nil, fmt.Errorf("applying apisix: %w", err)
}
if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=apisix",
}); err != nil {
return nil, fmt.Errorf("waiting for apisix pod: %w", err)
}

svc, err := k8s.GetServiceE(t, env.kubectlOptions, serviceName)
if err != nil {
return nil, fmt.Errorf("getting dataplane service: %w", err)
}
return svc, nil
}

func provisionIngress(t *bgTestingT, env *pooledEnv) error {
opts := framework.IngressDeployOpts{
ControllerName: env.controllerName,
ProviderType: framework.ProviderType,
ProviderSyncPeriod: 1 * time.Hour,
Namespace: env.namespace,
Replicas: ptr.To(1),
WebhookEnable: false,
}
buf := bytes.NewBuffer(nil)
if err := framework.IngressSpecTpl.Execute(buf, opts); err != nil {
return fmt.Errorf("rendering ingress template: %w", err)
}
if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, buf.String()); err != nil {
return fmt.Errorf("applying ingress controller: %w", err)
}
if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{
LabelSelector: "control-plane=controller-manager",
}); err != nil {
return fmt.Errorf("waiting for controller pod: %w", err)
}
return nil
}

func provisionHTTPBIN(fw *framework.Framework, t *bgTestingT, env *pooledEnv) (*corev1.Service, error) {
deployment := fmt.Sprintf(formatRegistry(_httpbinDeploymentTemplate), 1)
if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, deployment); err != nil {
return nil, fmt.Errorf("applying httpbin deployment: %w", err)
}
if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, _httpService); err != nil {
return nil, fmt.Errorf("applying httpbin service: %w", err)
}
if err := fw.EnsureServiceReadyE(env.namespace, HTTPBinServiceName, 1); err != nil {
return nil, fmt.Errorf("waiting for httpbin endpoints: %w", err)
}
svc, err := k8s.GetServiceE(t, env.kubectlOptions, HTTPBinServiceName)
if err != nil {
return nil, fmt.Errorf("getting httpbin service: %w", err)
}
return svc, nil
}

// loadPooledEnv installs a prewarmed environment onto the deployer's scaffold so
// the rest of the spec behaves exactly as if it had been deployed synchronously.
//
// Port-forward tunnels are (re)created here, on the spec's critical path, rather
// than reused from the background prewarm worker. A port-forward opened during
// prewarm can be left in a broken state when it is established before the data
// plane's listener is fully serving (e.g. the TLS stream listener) and then sits
// idle in the pool buffer until a spec picks it up, surfacing as an EOF on first
// use. Recreating them here against a fully-provisioned data plane matches the
// synchronous path exactly; tunnel setup is cheap (~1-2s) relative to the
// deploy/readiness latency that prewarm hides.
func (s *APISIXDeployer) loadPooledEnv(env *pooledEnv) error {
s.runtimeOpts = s.opts
s.namespace = env.namespace
s.kubectlOptions = env.kubectlOptions
s.runtimeOpts.ControllerName = env.controllerName
s.runtimeOpts.APISIXAdminAPIKey = env.adminKey
s.dataplaneService = env.dataplaneService
s.httpbinService = env.httpbinService
s.finalizers = nil
s.additionalGateways = make(map[string]*GatewayResources)

apisixTunnels, err := s.createDataplaneTunnels(env.dataplaneService, s.kubectlOptions, env.dataplaneService.Name)
if err != nil {
return fmt.Errorf("creating dataplane tunnels: %w", err)
}
s.apisixTunnels = apisixTunnels

adminTunnel, err := s.createAdminTunnel(env.dataplaneService)
if err != nil {
return fmt.Errorf("creating admin tunnel: %w", err)
}
s.adminTunnel = adminTunnel

return nil
}
Loading
Loading