From afc6cdc51265c9bb0285191754fa366a09b6dc5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Tue, 17 Jun 2025 16:10:56 +0800 Subject: [PATCH 1/9] feat: Add support for ApisixUpstream reconciliation and enhance upstream handling - Remove legacy ApisixUpstream controller. - Update related CRDs and deepcopy functions. - Add new translator logic for ApisixUpstream. - Introduce indexing and mapping logic for ApisixUpstream in ApisixRoute reconciliation. --- api/adc/types.go | 102 ++++++++ api/adc/zz_generated.deepcopy.go | 232 ++++++++++++++++++ api/v2/apisixupstream_types.go | 9 +- api/v2/shared_types.go | 48 ++++ api/v2/zz_generated.deepcopy.go | 3 +- .../apisix.apache.org_apisixupstreams.yaml | 64 +---- internal/controller/apisixroute_controller.go | 42 ++++ .../controller/apisixupstream_controller.go | 69 ------ internal/controller/indexer/indexer.go | 20 +- .../provider/adc/translator/apisixupstream.go | 147 +++++++++++ internal/provider/provider.go | 3 + 11 files changed, 596 insertions(+), 143 deletions(-) delete mode 100644 internal/controller/apisixupstream_controller.go create mode 100644 internal/provider/adc/translator/apisixupstream.go diff --git a/api/adc/types.go b/api/adc/types.go index d9b0dab6a..48a26d629 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -206,6 +206,108 @@ type Upstream struct { Timeout *Timeout `json:"timeout,omitempty" yaml:"timeout,omitempty"` Type UpstreamType `json:"type,omitempty" yaml:"type,omitempty"` UpstreamHost string `json:"upstream_host,omitempty" yaml:"upstream_host,omitempty"` + + Checks *UpstreamHealthCheck `json:"checks,omitempty" yaml:"checks,omitempty"` + TLS *ClientTLS `json:"tls,omitempty" yaml:"tls,omitempty"` + // for Service Discovery + DiscoveryType string `json:"discovery_type,omitempty" yaml:"discovery_type,omitempty"` + DiscoveryArgs map[string]string `json:"discovery_args,omitempty" yaml:"discovery_args,omitempty"` +} + +// UpstreamHealthCheck defines the active and/or passive health check for an Upstream, +// with the upstream health check feature, pods can be kicked out or joined in quickly, +// if the feedback of Kubernetes liveness/readiness probe is long. +// +k8s:deepcopy-gen=true +type UpstreamHealthCheck struct { + Active *UpstreamActiveHealthCheck `json:"active" yaml:"active"` + Passive *UpstreamPassiveHealthCheck `json:"passive,omitempty" yaml:"passive,omitempty"` +} + +// ClientTLS is tls cert and key use in mTLS +// +k8s:deepcopy-gen=true +type ClientTLS struct { + Cert string `json:"client_cert,omitempty" yaml:"client_cert,omitempty"` + Key string `json:"client_key,omitempty" yaml:"client_key,omitempty"` +} + +// UpstreamActiveHealthCheck defines the active kind of upstream health check. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheck struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + Timeout int `json:"timeout,omitempty" yaml:"timeout,omitempty"` + Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"` + Host string `json:"host,omitempty" yaml:"host,omitempty"` + Port int32 `json:"port,omitempty" yaml:"port,omitempty"` + HTTPPath string `json:"http_path,omitempty" yaml:"http_path,omitempty"` + HTTPSVerifyCert bool `json:"https_verify_certificate,omitempty" yaml:"https_verify_certificate,omitempty"` + HTTPRequestHeaders []string `json:"req_headers,omitempty" yaml:"req_headers,omitempty"` + Healthy UpstreamActiveHealthCheckHealthy `json:"healthy,omitempty" yaml:"healthy,omitempty"` + Unhealthy UpstreamActiveHealthCheckUnhealthy `json:"unhealthy,omitempty" yaml:"unhealthy,omitempty"` +} + +// UpstreamPassiveHealthCheck defines the passive kind of upstream health check. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheck struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + Healthy UpstreamPassiveHealthCheckHealthy `json:"healthy,omitempty" yaml:"healthy,omitempty"` + Unhealthy UpstreamPassiveHealthCheckUnhealthy `json:"unhealthy,omitempty" yaml:"unhealthy,omitempty"` +} + +// UpstreamActiveHealthCheckHealthy defines the conditions to judge whether +// an upstream node is healthy with the active manner. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheckHealthy struct { + UpstreamPassiveHealthCheckHealthy `json:",inline" yaml:",inline"` + + Interval int `json:"interval,omitempty" yaml:"interval,omitempty"` +} + +// UpstreamPassiveHealthCheckHealthy defines the conditions to judge whether +// an upstream node is healthy with the passive manner. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheckHealthy struct { + HTTPStatuses []int `json:"http_statuses,omitempty" yaml:"http_statuses,omitempty"` + Successes int `json:"successes,omitempty" yaml:"successes,omitempty"` +} + +// UpstreamPassiveHealthCheckUnhealthy defines the conditions to judge whether +// an upstream node is unhealthy with the passive manager. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheckUnhealthy struct { + HTTPStatuses []int `json:"http_statuses,omitempty" yaml:"http_statuses,omitempty"` + HTTPFailures int `json:"http_failures,omitempty" yaml:"http_failures,omitempty"` + TCPFailures int `json:"tcp_failures,omitempty" yaml:"tcp_failures,omitempty"` + Timeouts int `json:"timeouts,omitempty" yaml:"timeouts,omitempty"` +} + +// UpstreamActiveHealthCheckUnhealthy defines the conditions to judge whether +// an upstream node is unhealthy with the active manager. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheckUnhealthy struct { + UpstreamPassiveHealthCheckUnhealthy `json:",inline" yaml:",inline"` + + Interval int `json:"interval,omitempty" yaml:"interval,omitempty"` +} + +// TrafficSplitConfig is the config of traffic-split plugin. +// +k8s:deepcopy-gen=true +type TrafficSplitConfig struct { + Rules []TrafficSplitConfigRule `json:"rules"` +} + +// TrafficSplitConfigRule is the rule config in traffic-split plugin config. +// +k8s:deepcopy-gen=true +type TrafficSplitConfigRule struct { + WeightedUpstreams []TrafficSplitConfigRuleWeightedUpstream `json:"weighted_upstreams"` +} + +// TrafficSplitConfigRuleWeightedUpstream is the weighted upstream config in +// the traffic split plugin rule. +// +k8s:deepcopy-gen=true +type TrafficSplitConfigRuleWeightedUpstream struct { + UpstreamID string `json:"upstream_id,omitempty"` + Upstream *Upstream `json:"upstream,omitempty"` + Weight int `json:"weight"` } // +k8s:deepcopy-gen=true diff --git a/api/adc/zz_generated.deepcopy.go b/api/adc/zz_generated.deepcopy.go index 8051a98bc..6f3fe4133 100644 --- a/api/adc/zz_generated.deepcopy.go +++ b/api/adc/zz_generated.deepcopy.go @@ -62,6 +62,21 @@ func (in *ClientClass) DeepCopy() *ClientClass { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientTLS) DeepCopyInto(out *ClientTLS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientTLS. +func (in *ClientTLS) DeepCopy() *ClientTLS { + if in == nil { + return nil + } + out := new(ClientTLS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Consumer) DeepCopyInto(out *Consumer) { *out = *in @@ -402,6 +417,70 @@ func (in *TLSClass) DeepCopy() *TLSClass { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfig) DeepCopyInto(out *TrafficSplitConfig) { + *out = *in + if in.Rules != nil { + in, out := &in.Rules, &out.Rules + *out = make([]TrafficSplitConfigRule, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfig. +func (in *TrafficSplitConfig) DeepCopy() *TrafficSplitConfig { + if in == nil { + return nil + } + out := new(TrafficSplitConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfigRule) DeepCopyInto(out *TrafficSplitConfigRule) { + *out = *in + if in.WeightedUpstreams != nil { + in, out := &in.WeightedUpstreams, &out.WeightedUpstreams + *out = make([]TrafficSplitConfigRuleWeightedUpstream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfigRule. +func (in *TrafficSplitConfigRule) DeepCopy() *TrafficSplitConfigRule { + if in == nil { + return nil + } + out := new(TrafficSplitConfigRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfigRuleWeightedUpstream) DeepCopyInto(out *TrafficSplitConfigRuleWeightedUpstream) { + *out = *in + if in.Upstream != nil { + in, out := &in.Upstream, &out.Upstream + *out = new(Upstream) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfigRuleWeightedUpstream. +func (in *TrafficSplitConfigRuleWeightedUpstream) DeepCopy() *TrafficSplitConfigRuleWeightedUpstream { + if in == nil { + return nil + } + out := new(TrafficSplitConfigRuleWeightedUpstream) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Upstream) DeepCopyInto(out *Upstream) { *out = *in @@ -426,6 +505,23 @@ func (in *Upstream) DeepCopyInto(out *Upstream) { *out = new(Timeout) **out = **in } + if in.Checks != nil { + in, out := &in.Checks, &out.Checks + *out = new(UpstreamHealthCheck) + (*in).DeepCopyInto(*out) + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(ClientTLS) + **out = **in + } + if in.DiscoveryArgs != nil { + in, out := &in.DiscoveryArgs, &out.DiscoveryArgs + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Upstream. @@ -437,3 +533,139 @@ func (in *Upstream) DeepCopy() *Upstream { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheck) DeepCopyInto(out *UpstreamActiveHealthCheck) { + *out = *in + if in.HTTPRequestHeaders != nil { + in, out := &in.HTTPRequestHeaders, &out.HTTPRequestHeaders + *out = make([]string, len(*in)) + copy(*out, *in) + } + in.Healthy.DeepCopyInto(&out.Healthy) + in.Unhealthy.DeepCopyInto(&out.Unhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheck. +func (in *UpstreamActiveHealthCheck) DeepCopy() *UpstreamActiveHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheckHealthy) DeepCopyInto(out *UpstreamActiveHealthCheckHealthy) { + *out = *in + in.UpstreamPassiveHealthCheckHealthy.DeepCopyInto(&out.UpstreamPassiveHealthCheckHealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheckHealthy. +func (in *UpstreamActiveHealthCheckHealthy) DeepCopy() *UpstreamActiveHealthCheckHealthy { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheckHealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheckUnhealthy) DeepCopyInto(out *UpstreamActiveHealthCheckUnhealthy) { + *out = *in + in.UpstreamPassiveHealthCheckUnhealthy.DeepCopyInto(&out.UpstreamPassiveHealthCheckUnhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheckUnhealthy. +func (in *UpstreamActiveHealthCheckUnhealthy) DeepCopy() *UpstreamActiveHealthCheckUnhealthy { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheckUnhealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamHealthCheck) DeepCopyInto(out *UpstreamHealthCheck) { + *out = *in + if in.Active != nil { + in, out := &in.Active, &out.Active + *out = new(UpstreamActiveHealthCheck) + (*in).DeepCopyInto(*out) + } + if in.Passive != nil { + in, out := &in.Passive, &out.Passive + *out = new(UpstreamPassiveHealthCheck) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamHealthCheck. +func (in *UpstreamHealthCheck) DeepCopy() *UpstreamHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheck) DeepCopyInto(out *UpstreamPassiveHealthCheck) { + *out = *in + in.Healthy.DeepCopyInto(&out.Healthy) + in.Unhealthy.DeepCopyInto(&out.Unhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheck. +func (in *UpstreamPassiveHealthCheck) DeepCopy() *UpstreamPassiveHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheckHealthy) DeepCopyInto(out *UpstreamPassiveHealthCheckHealthy) { + *out = *in + if in.HTTPStatuses != nil { + in, out := &in.HTTPStatuses, &out.HTTPStatuses + *out = make([]int, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheckHealthy. +func (in *UpstreamPassiveHealthCheckHealthy) DeepCopy() *UpstreamPassiveHealthCheckHealthy { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheckHealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheckUnhealthy) DeepCopyInto(out *UpstreamPassiveHealthCheckUnhealthy) { + *out = *in + if in.HTTPStatuses != nil { + in, out := &in.HTTPStatuses, &out.HTTPStatuses + *out = make([]int, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheckUnhealthy. +func (in *UpstreamPassiveHealthCheckUnhealthy) DeepCopy() *UpstreamPassiveHealthCheckUnhealthy { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheckUnhealthy) + in.DeepCopyInto(out) + return out +} diff --git a/api/v2/apisixupstream_types.go b/api/v2/apisixupstream_types.go index 3e0807a14..27c80c70c 100644 --- a/api/v2/apisixupstream_types.go +++ b/api/v2/apisixupstream_types.go @@ -36,19 +36,14 @@ type ApisixUpstreamSpec struct { PortLevelSettings []PortLevelSettings `json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"` } -// ApisixUpstreamStatus defines the observed state of ApisixUpstream. -type ApisixUpstreamStatus = ApisixStatus - // +kubebuilder:object:root=true -// +kubebuilder:subresource:status // ApisixUpstream is the Schema for the apisixupstreams API. type ApisixUpstream struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec ApisixUpstreamSpec `json:"spec,omitempty"` - Status ApisixUpstreamStatus `json:"status,omitempty"` + Spec ApisixUpstreamSpec `json:"spec,omitempty"` } // +kubebuilder:object:root=true @@ -86,7 +81,7 @@ type ApisixUpstreamConfig struct { // How many times that the proxy (Apache APISIX) should do when // errors occur (error, timeout or bad http status codes like 500, 502). // +kubebuilder:validation:Optional - Retries *int `json:"retries,omitempty" yaml:"retries,omitempty"` + Retries *int64 `json:"retries,omitempty" yaml:"retries,omitempty"` // Timeout settings for the read, send and connect to the upstream. // +kubebuilder:validation:Optional diff --git a/api/v2/shared_types.go b/api/v2/shared_types.go index 4738c5d83..bb255e35f 100644 --- a/api/v2/shared_types.go +++ b/api/v2/shared_types.go @@ -77,3 +77,51 @@ const ( // ScopeVariable means the route match expression subject is in variable. ScopeVariable = "Variable" ) + +const ( + // SchemeHTTP represents the HTTP protocol. + SchemeHTTP = "http" + // SchemeGRPC represents the GRPC protocol. + SchemeGRPC = "grpc" + // SchemeHTTPS represents the HTTPS protocol. + SchemeHTTPS = "https" + // SchemeGRPCS represents the GRPCS protocol. + SchemeGRPCS = "grpcs" + // SchemeTCP represents the TCP protocol. + SchemeTCP = "tcp" + // SchemeUDP represents the UDP protocol. + SchemeUDP = "udp" +) + +const ( + // HashOnVars means the hash scope is variable. + HashOnVars = "vars" + // HashOnVarsCombination means the hash scope is the + // variable combination. + HashOnVarsCombination = "vars_combinations" + // HashOnHeader means the hash scope is HTTP request + // headers. + HashOnHeader = "header" + // HashOnCookie means the hash scope is HTTP Cookie. + HashOnCookie = "cookie" + // HashOnConsumer means the hash scope is APISIX consumer. + HashOnConsumer = "consumer" + + // LbRoundRobin is the round robin load balancer. + LbRoundRobin = "roundrobin" + // LbConsistentHash is the consistent hash load balancer. + LbConsistentHash = "chash" + // LbEwma is the ewma load balancer. + LbEwma = "ewma" + // LbLeaseConn is the least connection load balancer. + LbLeastConn = "least_conn" +) + +const ( + // PassHostPass represents pass option for pass_host Upstream settings. + PassHostPass = "pass" + // PassHostPass represents node option for pass_host Upstream settings. + PassHostNode = "node" + // PassHostPass represents rewrite option for pass_host Upstream settings. + PassHostRewrite = "rewrite" +) diff --git a/api/v2/zz_generated.deepcopy.go b/api/v2/zz_generated.deepcopy.go index 917d1885e..e353bbf75 100644 --- a/api/v2/zz_generated.deepcopy.go +++ b/api/v2/zz_generated.deepcopy.go @@ -1208,7 +1208,6 @@ func (in *ApisixUpstream) DeepCopyInto(out *ApisixUpstream) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstream. @@ -1239,7 +1238,7 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out *ApisixUpstreamConfig) { } if in.Retries != nil { in, out := &in.Retries, &out.Retries - *out = new(int) + *out = new(int64) **out = **in } if in.Timeout != nil { diff --git a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml index e6371f293..c4efa840c 100644 --- a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml +++ b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml @@ -366,6 +366,7 @@ spec: description: |- How many times that the proxy (Apache APISIX) should do when errors occur (error, timeout or bad http status codes like 500, 502). + format: int64 type: integer scheme: description: |- @@ -431,6 +432,7 @@ spec: description: |- How many times that the proxy (Apache APISIX) should do when errors occur (error, timeout or bad http status codes like 500, 502). + format: int64 type: integer scheme: description: |- @@ -488,68 +490,6 @@ spec: the pass_host is set to rewrite type: string type: object - status: - description: ApisixStatus is the status report for Apisix ingress Resources - properties: - conditions: - items: - description: Condition contains details for one aspect of the current - state of this API Resource. - properties: - lastTransitionTime: - description: |- - lastTransitionTime is the last time the condition transitioned from one status to another. - This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. - format: date-time - type: string - message: - description: |- - message is a human readable message indicating details about the transition. - This may be an empty string. - maxLength: 32768 - type: string - observedGeneration: - description: |- - observedGeneration represents the .metadata.generation that the condition was set based upon. - For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date - with respect to the current state of the instance. - format: int64 - minimum: 0 - type: integer - reason: - description: |- - reason contains a programmatic identifier indicating the reason for the condition's last transition. - Producers of specific condition types may define expected values and meanings for this field, - and whether the values are considered a guaranteed API. - The value should be a CamelCase string. - This field may not be empty. - maxLength: 1024 - minLength: 1 - pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ - type: string - status: - description: status of the condition, one of True, False, Unknown. - enum: - - "True" - - "False" - - Unknown - type: string - type: - description: type of condition in CamelCase or in foo.example.com/CamelCase. - maxLength: 316 - pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ - type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array - type: object type: object served: true storage: true - subresources: - status: {} diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index 261d7ee04..e71fc2a00 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -79,6 +79,9 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForSecret), ). + Watches(&apiv2.ApisixUpstream{}, + handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForApisixUpstream), + ). Named("apisixroute"). Complete(r) } @@ -250,6 +253,27 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov } tc.EndpointSlices[serviceNN] = endpoints.Items } + + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ups = apiv2.ApisixUpstream{ + ObjectMeta: metav1.ObjectMeta{ + Name: upstream.Name, + SelfLink: in.GetNamespace(), + }, + } + upsNN := utils.NamespacedName(&ups) + if err := r.Get(ctx, upsNN, &ups); err != nil { + if client.IgnoreNotFound(err) == nil { + r.Log.Error(err, "ApisixUpstream not found") + continue + } + return err + } + tc.Upstreams[utils.NamespacedNameKind(&ups)] = &ups + } } return nil @@ -341,6 +365,24 @@ func (r *ApisixRouteReconciler) listApisixRouteForGatewayProxy(ctx context.Conte return pkgutils.DedupComparable(requests) } +func (r *ApisixRouteReconciler) listApisixRouteForApisixUpstream(ctx context.Context, object client.Object) (requests []reconcile.Request) { + au, ok := object.(*apiv2.ApisixUpstream) + if !ok { + return nil + } + + var arList apiv2.ApisixRouteList + if err := r.List(ctx, &arList, client.MatchingFields{indexer.ApisixUpstreamRef: indexer.GenIndexKey(au.GetNamespace(), au.GetName())}); err != nil { + r.Log.Error(err, "failed to list ApisixUpstreams") + return nil + } + + for _, ar := range arList.Items { + requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)}) + } + return pkgutils.DedupComparable(requests) +} + func (r *ApisixRouteReconciler) matchesIngressController(obj client.Object) bool { ingressClass, ok := obj.(*networkingv1.IngressClass) if !ok { diff --git a/internal/controller/apisixupstream_controller.go b/internal/controller/apisixupstream_controller.go deleted file mode 100644 index 2dce86a06..000000000 --- a/internal/controller/apisixupstream_controller.go +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controller - -import ( - "context" - - "github.com/go-logr/logr" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - apiv2 "github.com/apache/apisix-ingress-controller/api/v2" -) - -// ApisixUpstreamReconciler reconciles a ApisixUpstream object -type ApisixUpstreamReconciler struct { - client.Client - Scheme *runtime.Scheme - Log logr.Logger -} - -// Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) -func (r *ApisixUpstreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Log.Info("reconcile", "request", req.NamespacedName) - - var obj apiv2.ApisixUpstream - if err := r.Get(ctx, req.NamespacedName, &obj); err != nil { - r.Log.Error(err, "failed to get ApisixConsumer", "request", req.NamespacedName) - return ctrl.Result{}, err - } - - obj.Status.Conditions = []metav1.Condition{ - { - Type: string(gatewayv1.RouteConditionAccepted), - Status: metav1.ConditionTrue, - ObservedGeneration: obj.GetGeneration(), - LastTransitionTime: metav1.Now(), - Reason: string(gatewayv1.RouteReasonAccepted), - }, - } - - if err := r.Status().Update(ctx, &obj); err != nil { - r.Log.Error(err, "failed to update status", "request", req.NamespacedName) - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *ApisixUpstreamReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&apiv2.ApisixUpstream{}). - Named("apisixupstream"). - Complete(r) -} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index fcc1d068a..c833e77a7 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -38,6 +38,7 @@ const ( ConsumerGatewayRef = "consumerGatewayRef" PolicyTargetRefs = "targetRefs" GatewayClassIndexRef = "gatewayClassRef" + ApisixUpstreamRef = "apisixUpstreamRef" ) func SetupIndexer(mgr ctrl.Manager) error { @@ -94,8 +95,9 @@ func setupConsumerIndexer(mgr ctrl.Manager) error { func setupApisixRouteIndexer(mgr ctrl.Manager) error { var indexers = map[string]func(client.Object) []string{ - ServiceIndexRef: ApisixRouteServiceIndexFunc, - SecretIndexRef: ApisixRouteRouteSecretIndexFunc, + ServiceIndexRef: ApisixRouteServiceIndexFunc, + SecretIndexRef: ApisixRouteSecretIndexFunc, + ApisixUpstreamRef: ApisixRouteApisixUpstreamIndexFunc, } for key, f := range indexers { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv2.ApisixRoute{}, key, f); err != nil { @@ -441,7 +443,7 @@ func ApisixRouteServiceIndexFunc(obj client.Object) (keys []string) { return } -func ApisixRouteRouteSecretIndexFunc(obj client.Object) (keys []string) { +func ApisixRouteSecretIndexFunc(obj client.Object) (keys []string) { ar := obj.(*apiv2.ApisixRoute) for _, http := range ar.Spec.HTTP { for _, plugin := range http.Plugins { @@ -460,6 +462,18 @@ func ApisixRouteRouteSecretIndexFunc(obj client.Object) (keys []string) { return } +func ApisixRouteApisixUpstreamIndexFunc(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, rule := range ar.Spec.HTTP { + for _, upstream := range rule.Upstreams { + if upstream.Name != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), upstream.Name)) + } + } + } + return +} + func HTTPRouteExtensionIndexFunc(rawObj client.Object) []string { hr := rawObj.(*gatewayv1.HTTPRoute) keys := make([]string, 0, len(hr.Spec.Rules)) diff --git a/internal/provider/adc/translator/apisixupstream.go b/internal/provider/adc/translator/apisixupstream.go new file mode 100644 index 000000000..33ab6e99f --- /dev/null +++ b/internal/provider/adc/translator/apisixupstream.go @@ -0,0 +1,147 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "cmp" + "errors" + + "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" +) + +func (t *Translator) TranslateApisixUpstream(au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) { + ups = adc.NewDefaultUpstream() + for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{ + translateApisixUpstreamScheme, + translateApisixUpstreamLoadBalancer, + translateApisixUpstreamHealthCheck, + translateApisixUpstreamRetriesAndTimeout, + translateApisixUpstreamClientTLS, + translateApisixUpstreamPassHost, + translateApisixUpstreamDiscovery, + } { + if err = f(au, ups); err != nil { + return + } + } + + return +} + +func translateApisixUpstreamScheme(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + switch au.Spec.Scheme { + case apiv2.SchemeHTTP, apiv2.SchemeHTTPS, apiv2.SchemeGRPC, apiv2.SchemeGRPCS: + ups.Scheme = au.Spec.Scheme + default: + ups.Scheme = apiv2.SchemeHTTP + } + return nil +} + +func translateApisixUpstreamLoadBalancer(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + lb := au.Spec.LoadBalancer + if lb == nil || lb.Type == "" { + ups.Type = apiv2.LbRoundRobin + return nil + } + switch lb.Type { + case apiv2.LbRoundRobin, apiv2.LbLeastConn, apiv2.LbEwma: + ups.Type = adc.UpstreamType(lb.Type) + case apiv2.LbConsistentHash: + ups.Type = adc.UpstreamType(lb.Type) + ups.Key = lb.Key + switch lb.HashOn { + case apiv2.HashOnVars: + fallthrough + case apiv2.HashOnHeader: + fallthrough + case apiv2.HashOnCookie: + fallthrough + case apiv2.HashOnConsumer: + fallthrough + case apiv2.HashOnVarsCombination: + ups.HashOn = lb.HashOn + default: + return errors.New("invalid hashOn") + } + default: + return errors.New("invalid loadBalancer type") + } + return nil +} + +func translateApisixUpstreamHealthCheck(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + // todo: no field `.Checks` in adc.Upstream + return nil +} + +func translateApisixUpstreamRetriesAndTimeout(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + retries := au.Spec.Retries + timeout := au.Spec.Timeout + + if retries != nil && *retries < 0 { + return errors.New("invalid value retries") + } + ups.Retries = retries + + if timeout == nil { + return nil + } + if timeout.Connect.Duration < 0 { + return errors.New("invalid value timeout.connect") + } + if timeout.Read.Duration < 0 { + return errors.New("invalid value timeout.read") + } + if timeout.Send.Duration < 0 { + return errors.New("invalid value timeout.send") + } + + // Since the schema of timeout doesn't allow only configuring + // one or two items. Here we assign the default value first. + connTimeout := cmp.Or(timeout.Connect.Duration, apiv2.DefaultUpstreamTimeout) + readTimeout := cmp.Or(timeout.Read.Duration, apiv2.DefaultUpstreamTimeout) + sendTimeout := cmp.Or(timeout.Send.Duration, apiv2.DefaultUpstreamTimeout) + + ups.Timeout = &adc.Timeout{ + Connect: int(connTimeout.Seconds()), + Read: int(readTimeout.Seconds()), + Send: int(sendTimeout.Seconds()), + } + + return nil +} + +func translateApisixUpstreamClientTLS(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + // todo: no field `.TLS` in adc.Upstream + return nil +} + +func translateApisixUpstreamPassHost(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + switch passHost := au.Spec.PassHost; passHost { + case apiv2.PassHostPass, apiv2.PassHostNode, apiv2.PassHostRewrite: + ups.PassHost = passHost + default: + ups.PassHost = "" + } + + ups.UpstreamHost = au.Spec.UpstreamHost + + return nil +} + +func translateApisixUpstreamDiscovery(upstream *apiv2.ApisixUpstream, upstream2 *adc.Upstream) error { + // todo: no filed `.Discovery*` in adc.Upstream + return nil +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 2521c2b05..829fd9415 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -22,6 +22,7 @@ import ( gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" "github.com/apache/apisix-ingress-controller/api/v1alpha1" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/status" "github.com/apache/apisix-ingress-controller/internal/types" ) @@ -47,6 +48,7 @@ type TranslateContext struct { BackendTrafficPolicies map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy GatewayProxies map[types.NamespacedNameKind]v1alpha1.GatewayProxy ResourceParentRefs map[types.NamespacedNameKind][]types.NamespacedNameKind + Upstreams map[types.NamespacedNameKind]*apiv2.ApisixUpstream HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy StatusUpdaters []status.Update @@ -62,5 +64,6 @@ func NewDefaultTranslateContext(ctx context.Context) *TranslateContext { BackendTrafficPolicies: make(map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy), GatewayProxies: make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy), ResourceParentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), + Upstreams: make(map[types.NamespacedNameKind]*apiv2.ApisixUpstream), } } From 18955cd5c946cf011c751c48da4e5f830fb7961d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Tue, 17 Jun 2025 21:22:32 +0800 Subject: [PATCH 2/9] feat: Add full support for ApisixUpstream in translator and e2e tests - Implement `translateApisixUpstream` to support ApisixUpstream translation. - Add handling for externalNodes with types Domain and Service. - Update ApisixRoute translator to integrate ApisixUpstream references. - Introduce `SchemeToPort` and `MatchHostDef` utility functions for validation and defaults. - Enable e2e tests for ApisixRoute referencing ApisixUpstream. --- api/v2/shared_types.go | 26 +++++ .../provider/adc/translator/apisixroute.go | 71 +++++++++++++- .../provider/adc/translator/apisixupstream.go | 96 ++++++++++++++++++- internal/utils/k8s.go | 12 +++ test/e2e/apisix/route.go | 43 ++++++++- 5 files changed, 238 insertions(+), 10 deletions(-) diff --git a/api/v2/shared_types.go b/api/v2/shared_types.go index bb255e35f..cda28a2f5 100644 --- a/api/v2/shared_types.go +++ b/api/v2/shared_types.go @@ -125,3 +125,29 @@ const ( // PassHostPass represents rewrite option for pass_host Upstream settings. PassHostRewrite = "rewrite" ) + +const ( + // ExternalTypeDomain type is a domain + // +k8s:deepcopy-gen=false + ExternalTypeDomain ApisixUpstreamExternalType = "Domain" + + // ExternalTypeService type is a K8s ExternalName service + // +k8s:deepcopy-gen=false + ExternalTypeService ApisixUpstreamExternalType = "Service" +) + +var schemeToPortMaps = map[string]int{ + SchemeHTTP: 80, + SchemeHTTPS: 443, + SchemeGRPC: 80, + SchemeGRPCS: 443, +} + +// SchemeToPort scheme converts to the default port +// ref https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/upstream.lua#L167-L172 +func SchemeToPort(schema string) int { + if val, ok := schemeToPortMaps[schema]; ok { + return val + } + return 80 +} diff --git a/internal/provider/adc/translator/apisixroute.go b/internal/provider/adc/translator/apisixroute.go index bca66460f..b3e588acf 100644 --- a/internal/provider/adc/translator/apisixroute.go +++ b/internal/provider/adc/translator/apisixroute.go @@ -16,6 +16,7 @@ import ( "cmp" "encoding/json" "fmt" + "strconv" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -27,11 +28,13 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" "github.com/apache/apisix-ingress-controller/internal/provider" + types2 "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" "github.com/apache/apisix-ingress-controller/pkg/id" pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils" ) +//nolint:gocyclo func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute) (result *TranslateResult, err error) { result = &TranslateResult{} for ruleIndex, rule := range ar.Spec.HTTP { @@ -113,8 +116,16 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a route.Timeout = timeout route.Uris = rule.Match.Paths route.Vars = vars + for key, value := range ar.GetObjectMeta().GetLabels() { + route.Labels[key] = value + } + + //nolint:staticcheck + if rule.PluginConfigName != "" { + // FIXME: handle PluginConfig + } - // translate to adc.Upstream + // translate backends var backendErr error for _, backend := range rule.Backends { var ( @@ -137,9 +148,61 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a upstream.Nodes = append(upstream.Nodes, upNodes...) } - //nolint:staticcheck - if len(rule.Backends) == 0 && len(rule.Upstreams) > 0 { - // FIXME: when the API ApisixUpstream is supported + var ( + apisixUpstreams []*apiv2.ApisixUpstream + adcUpstreams []*adc.Upstream + ) + for _, upstreamRef := range rule.Upstreams { + refKey := types2.NamespacedNameKind{ + Namespace: ar.GetNamespace(), + Name: upstreamRef.Name, + Kind: "ApisixUpstream", + } + apisixUpstream, ok := tctx.Upstreams[refKey] + if !ok { + continue + } + + // todo: translate external upstream + adcUpstream, err := t.translateApisixUpstream(tctx, apisixUpstream) + if err != nil { + t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(apisixUpstream)) + continue + } + + apisixUpstreams = append(apisixUpstreams, apisixUpstream) + adcUpstreams = append(adcUpstreams, adcUpstream) + } + + _ = apisixUpstreams + + // If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream; + // Other upstreams are configured in the traffic-split plugin + if len(rule.Backends) == 0 && len(adcUpstreams) > 0 { + service.Upstream = adcUpstreams[0] + adcUpstreams = adcUpstreams[1:] + } + + var wups []adc.TrafficSplitConfigRuleWeightedUpstream + for _, adcUpstream := range adcUpstreams { + weight, err := strconv.Atoi(adcUpstream.Labels["meta_weight"]) + if err != nil { + t.Log.Error(err, "failed to parse meta_weight from upstream labels", "labels", adcUpstream.GetLabels()) + weight = apiv2.DefaultWeight + } + wups = append(wups, adc.TrafficSplitConfigRuleWeightedUpstream{ + Upstream: adcUpstream, + Weight: weight, + }) + } + if len(wups) > 0 { + route.Plugins["traffic-split"] = &adc.TrafficSplitConfig{ + Rules: []adc.TrafficSplitConfigRule{ + { + WeightedUpstreams: wups, + }, + }, + } } // translate to adc.Service diff --git a/internal/provider/adc/translator/apisixupstream.go b/internal/provider/adc/translator/apisixupstream.go index 33ab6e99f..6b22bf63f 100644 --- a/internal/provider/adc/translator/apisixupstream.go +++ b/internal/provider/adc/translator/apisixupstream.go @@ -14,13 +14,23 @@ package translator import ( "cmp" - "errors" + "fmt" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "github.com/apache/apisix-ingress-controller/api/adc" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/utils" ) -func (t *Translator) TranslateApisixUpstream(au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) { +func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) { + if len(au.Spec.ExternalNodes) == 0 && au.Spec.Discovery == nil { + return nil, errors.Errorf("%s has empty externalNodes or discovery configuration", utils.NamespacedName(au)) + } + ups = adc.NewDefaultUpstream() for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{ translateApisixUpstreamScheme, @@ -35,6 +45,13 @@ func (t *Translator) TranslateApisixUpstream(au *apiv2.ApisixUpstream) (ups *adc return } } + for _, f := range []func(*provider.TranslateContext, *apiv2.ApisixUpstream, *adc.Upstream) error{ + translateApisixUpstreamExternalNodes, + } { + if err = f(tctx, au, ups); err != nil { + return + } + } return } @@ -145,3 +162,78 @@ func translateApisixUpstreamDiscovery(upstream *apiv2.ApisixUpstream, upstream2 // todo: no filed `.Discovery*` in adc.Upstream return nil } + +func translateApisixUpstreamExternalNodes(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + for _, node := range au.Spec.ExternalNodes { + switch node.Type { + case apiv2.ExternalTypeDomain: + if err := translateApisixUpstreamExternalNodesDomain(au, ups, node); err != nil { + return err + } + default: // apiv2.ExternalTypeService or default + if err := translateApisixUpstreamExternalNodesExternalName(tctx, au, ups, node); err != nil { + return err + } + } + } + + return nil +} +func translateApisixUpstreamExternalNodesDomain(au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { + weight := apiv2.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + + if !utils.MatchHostDef(node.Name) { + return fmt.Errorf("ApisixUpstream %s/%s ExternalNodes[]'s name %s as Domain must match lowercase RFC 1123 subdomain. "+ + "a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character", + au.Namespace, au.Name, node.Name) + } + + n := adc.UpstreamNode{ + Host: node.Name, + Weight: weight, + } + + if node.Port != nil { + n.Port = *node.Port + } else { + n.Port = apiv2.SchemeToPort(au.Spec.Scheme) + } + + ups.Nodes = append(ups.Nodes, n) + + return nil +} + +func translateApisixUpstreamExternalNodesExternalName(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { + serviceNN := types.NamespacedName{Namespace: au.GetNamespace(), Name: node.Name} + svc, ok := tctx.Services[serviceNN] + if !ok { + return errors.Errorf("service not found, service: %s", serviceNN) + } + + if svc.Spec.Type != corev1.ServiceTypeExternalName { + return errors.Errorf("ApisixUpstream %s ExternalNodes[] must refers to a ExternalName service: %s", utils.NamespacedName(au), node.Name) + } + + weight := apiv2.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + n := adc.UpstreamNode{ + Host: svc.Spec.ExternalName, + Weight: weight, + } + + if node.Port != nil { + n.Port = *node.Port + } else { + n.Port = apiv2.SchemeToPort(au.Spec.Scheme) + } + + ups.Nodes = append(ups.Nodes, n) + + return nil +} diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 7010a4655..547e7a88f 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -14,6 +14,7 @@ package utils import ( "net" + "regexp" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,3 +48,14 @@ func ValidateRemoteAddrs(remoteAddrs []string) error { } return nil } + +var hostDef = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" +var hostDefRegex = regexp.MustCompile(hostDef) + +// MatchHostDef checks that host matches host's shcema +// ref to : https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/schema_def.lua#L40 +// ref to : https://github.com/kubernetes/kubernetes/blob/976a940f4a4e84fe814583848f97b9aafcdb083f/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L205 +// They define regex differently, but k8s's dns is more accurate +func MatchHostDef(host string) bool { + return hostDefRegex.MatchString(host) +} diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 25e3863f7..1da56e661 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -296,10 +296,45 @@ spec: // So the case is pending for now }) - PIt("Test ApisixRoute reference ApisixUpstream", func() { - // This case depends on ApisixUpstream. - // ApisixUpstream is not implemented yet. - // So the case is pending for now. + It("Test ApisixRoute reference ApisixUpstream", func() { + const apisixRouteSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + paths: + - /* + upstreams: + - name: default-upstream +` + const apisixUpstreamSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Service + name: httpbin-service-e2e-test +` + By("create ApisixUpstream and ApisixRoute") + err := s.CreateResourceFromString(apisixUpstreamSpec) + Expect(err).ShouldNot(HaveOccurred(), "apply apisixUpstreamSpec") + + var apisxiRoute apiv2.ApisixRoute + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisxiRoute, apisixRouteSpec) + + By("verify ApisixRoute and ApisixUpstream works") + request := func(path string) int { + return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode + } + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) }) }) }) From 0e2923f1916ac1d69690339a9080f20572e4c020 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 09:02:35 +0800 Subject: [PATCH 3/9] refactor: Extract HTTP rule processing logic into a reusable function - Moved HTTP rule handling to `processApisixRouteHTTPRule` for improved modularity. - Updated ApisixRoute controller to integrate the new function. - Enhanced indexing for ApisixRoute to include ApisixUpstream secrets and services. - Refactored e2e tests to validate updated ApisixRoute and ApisixUpstream logic. --- internal/controller/apisixroute_controller.go | 232 ++++++++++-------- internal/controller/indexer/indexer.go | 93 +++++-- .../provider/adc/translator/apisixroute.go | 29 ++- .../provider/adc/translator/apisixupstream.go | 4 +- internal/provider/provider.go | 4 +- test/e2e/apisix/route.go | 36 ++- 6 files changed, 260 insertions(+), 138 deletions(-) diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index 5375b9bf7..fe281db30 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -141,7 +141,7 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov var ( rules = make(map[string]struct{}) ) - for httpIndex, http := range in.Spec.HTTP { + for _, http := range in.Spec.HTTP { // check rule names if _, ok := rules[http.Name]; ok { return ReasonError{ @@ -151,126 +151,166 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov } rules[http.Name] = struct{}{} - // check secret - for _, plugin := range http.Plugins { - if !plugin.Enable || plugin.Config == nil || plugin.SecretRef == "" { - continue - } - var ( - secret = corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: plugin.SecretRef, - Namespace: in.Namespace, - }, - } - secretNN = utils.NamespacedName(&secret) - ) - if err := r.Get(ctx, secretNN, &secret); err != nil { - return ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf("failed to get Secret: %s", secretNN), - } - } - - tc.Secrets[utils.NamespacedName(&secret)] = &secret + if err := r.processApisixRouteHTTPRule(ctx, tc, in, http); err != nil { + r.Log.Error(err, "failed to process ApisixRoute http rule") + return err } + } + + return nil +} - // check vars - // todo: cache the result to tctx - if _, err := http.Match.NginxVars.ToVars(); err != nil { +func (r *ApisixRouteReconciler) processApisixRouteHTTPRule(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP) error { + // check secret + for _, plugin := range rule.Plugins { + if !plugin.Enable || plugin.Config == nil || plugin.SecretRef == "" { + continue + } + var ( + secret = corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: plugin.SecretRef, + Namespace: in.Namespace, + }, + } + secretNN = utils.NamespacedName(&secret) + ) + if err := r.Get(ctx, secretNN, &secret); err != nil { return ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf(".spec.http[%d].match.exprs: %s", httpIndex, err.Error()), + Message: fmt.Sprintf("failed to get Secret: %s", secretNN), } } - // validate remote address - if err := utils.ValidateRemoteAddrs(http.Match.RemoteAddrs); err != nil { - return ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf(".spec.http[%d].match.remoteAddrs: %s", httpIndex, err.Error()), - } + tc.Secrets[utils.NamespacedName(&secret)] = &secret + } + + // check vars + // todo: cache the result to tctx + if _, err := rule.Match.NginxVars.ToVars(); err != nil { + return ReasonError{ + Reason: string(apiv2.ConditionReasonInvalidSpec), + Message: fmt.Sprintf(".spec.http[].match.exprs: %v", err), } + } - // process backend - var backends = make(map[types.NamespacedName]struct{}) - for _, backend := range http.Backends { - var ( - service = corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: backend.ServiceName, - Namespace: in.Namespace, - }, - } - serviceNN = utils.NamespacedName(&service) - ) - if _, ok := backends[serviceNN]; ok { - return ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf("duplicate backend service: %s", serviceNN), - } - } - backends[serviceNN] = struct{}{} + // validate remote address + if err := utils.ValidateRemoteAddrs(rule.Match.RemoteAddrs); err != nil { + return ReasonError{ + Reason: string(apiv2.ConditionReasonInvalidSpec), + Message: fmt.Sprintf(".spec.http[].match.remoteAddrs: %v", err), + } + } - if err := r.Get(ctx, serviceNN, &service); err != nil { - if err := client.IgnoreNotFound(err); err == nil { - r.Log.Error(errors.New("service not found"), "Service", serviceNN) - continue - } - return err - } - if service.Spec.Type == corev1.ServiceTypeExternalName { - tc.Services[serviceNN] = &service - continue + // process backend + var backends = make(map[types.NamespacedName]struct{}) + for _, backend := range rule.Backends { + var ( + service = corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: backend.ServiceName, + Namespace: in.Namespace, + }, } - - if backend.ResolveGranularity == "service" && service.Spec.ClusterIP == "" { - r.Log.Error(errors.New("service has no ClusterIP"), "Service", serviceNN, "ResolveGranularity", backend.ResolveGranularity) - continue + serviceNN = utils.NamespacedName(&service) + ) + if _, ok := backends[serviceNN]; ok { + return ReasonError{ + Reason: string(apiv2.ConditionReasonInvalidSpec), + Message: fmt.Sprintf("duplicate backend service: %s", serviceNN), } + } + backends[serviceNN] = struct{}{} - if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool { - return port.Port == int32(backend.ServicePort.IntValue()) - }) { - r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String()) + if err := r.Get(ctx, serviceNN, &service); err != nil { + if err := client.IgnoreNotFound(err); err == nil { + r.Log.Error(errors.New("service not found"), "Service", serviceNN) continue } + return err + } + if service.Spec.Type == corev1.ServiceTypeExternalName { tc.Services[serviceNN] = &service + continue + } - var endpoints discoveryv1.EndpointSliceList - if err := r.List(ctx, &endpoints, - client.InNamespace(service.Namespace), - client.MatchingLabels{ - discoveryv1.LabelServiceName: service.Name, - }, - ); err != nil { - return ReasonError{ - Reason: string(apiv2.ConditionReasonInvalidSpec), - Message: fmt.Sprintf("failed to list endpoint slices: %v", err), - } + if backend.ResolveGranularity == "service" && service.Spec.ClusterIP == "" { + r.Log.Error(errors.New("service has no ClusterIP"), "Service", serviceNN, "ResolveGranularity", backend.ResolveGranularity) + continue + } + + if !slices.ContainsFunc(service.Spec.Ports, func(port corev1.ServicePort) bool { + return port.Port == int32(backend.ServicePort.IntValue()) + }) { + r.Log.Error(errors.New("port not found in service"), "Service", serviceNN, "port", backend.ServicePort.String()) + continue + } + tc.Services[serviceNN] = &service + + var endpoints discoveryv1.EndpointSliceList + if err := r.List(ctx, &endpoints, + client.InNamespace(service.Namespace), + client.MatchingLabels{ + discoveryv1.LabelServiceName: service.Name, + }, + ); err != nil { + return ReasonError{ + Reason: string(apiv2.ConditionReasonInvalidSpec), + Message: fmt.Sprintf("failed to list endpoint slices: %v", err), } - tc.EndpointSlices[serviceNN] = endpoints.Items } + tc.EndpointSlices[serviceNN] = endpoints.Items + } - for _, upstream := range http.Upstreams { - if upstream.Name == "" { + for _, upstream := range rule.Upstreams { + if upstream.Name == "" { + continue + } + var ( + ups apiv2.ApisixUpstream + upsNN = types.NamespacedName{ + Namespace: in.GetNamespace(), + Name: upstream.Name, + } + ) + if err := r.Get(ctx, upsNN, &ups); err != nil { + r.Log.Error(err, "failed to get ApisixUpstream", "ApisixUpstream", upsNN) + if client.IgnoreNotFound(err) == nil { continue } - var ups = apiv2.ApisixUpstream{ - ObjectMeta: metav1.ObjectMeta{ - Name: upstream.Name, - SelfLink: in.GetNamespace(), - }, + return err + } + tc.Upstreams[upsNN] = &ups + + for _, node := range ups.Spec.ExternalNodes { + if node.Type == apiv2.ExternalTypeService { + var ( + service corev1.Service + serviceNN = types.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name} + ) + if err := r.Get(ctx, serviceNN, &service); err != nil { + r.Log.Error(err, "failed to get service in ApisixUpstream", "ApisixUpstream", upsNN, "Service", serviceNN) + if client.IgnoreNotFound(err) == nil { + continue + } + return err + } + tc.Services[utils.NamespacedName(&service)] = &service } - upsNN := utils.NamespacedName(&ups) - if err := r.Get(ctx, upsNN, &ups); err != nil { - if client.IgnoreNotFound(err) == nil { - r.Log.Error(err, "ApisixUpstream not found") - continue + } + + if ups.Spec.TLSSecret != nil && ups.Spec.TLSSecret.Name != "" { + var ( + secret corev1.Secret + secretNN = types.NamespacedName{Namespace: cmp.Or(ups.Spec.TLSSecret.Namespace, in.GetNamespace()), Name: ups.Spec.TLSSecret.Name} + ) + if err := r.Get(ctx, secretNN, &secret); err != nil { + r.Log.Error(err, "failed to get secret in ApisixUpstream", "ApisixUpstream", upsNN, "Secret", secretNN) + if client.IgnoreNotFound(err) != nil { + return err } - return err } - tc.Upstreams[utils.NamespacedNameKind(&ups)] = &ups + tc.Secrets[secretNN] = &secret } } diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index c833e77a7..455e4952c 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -13,6 +13,7 @@ package indexer import ( + "cmp" "context" networkingv1 "k8s.io/api/networking/v1" @@ -95,8 +96,8 @@ func setupConsumerIndexer(mgr ctrl.Manager) error { func setupApisixRouteIndexer(mgr ctrl.Manager) error { var indexers = map[string]func(client.Object) []string{ - ServiceIndexRef: ApisixRouteServiceIndexFunc, - SecretIndexRef: ApisixRouteSecretIndexFunc, + ServiceIndexRef: ApisixRouteServiceIndexFunc(mgr.GetClient()), + SecretIndexRef: ApisixRouteSecretIndexFunc(mgr.GetClient()), ApisixUpstreamRef: ApisixRouteApisixUpstreamIndexFunc, } for key, f := range indexers { @@ -430,36 +431,82 @@ func HTTPRouteServiceIndexFunc(rawObj client.Object) []string { return keys } -func ApisixRouteServiceIndexFunc(obj client.Object) (keys []string) { - ar := obj.(*apiv2.ApisixRoute) - for _, http := range ar.Spec.HTTP { - for _, backend := range http.Backends { - keys = append(keys, GenIndexKey(ar.GetNamespace(), backend.ServiceName)) +func ApisixRouteServiceIndexFunc(cli client.Client) func(client.Object) []string { + return func(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, http := range ar.Spec.HTTP { + // service reference in .backends + for _, backend := range http.Backends { + keys = append(keys, GenIndexKey(ar.GetNamespace(), backend.ServiceName)) + } + // service reference in .upstreams + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ( + au apiv2.ApisixUpstream + auNN = types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstream.Name, + } + ) + if err := cli.Get(context.Background(), auNN, &au); err != nil { + continue + } + for _, node := range au.Spec.ExternalNodes { + if node.Type == apiv2.ExternalTypeService && node.Name != "" { + keys = append(keys, GenIndexKey(au.GetNamespace(), node.Name)) + } + } + } } + for _, stream := range ar.Spec.Stream { + keys = append(keys, GenIndexKey(ar.GetNamespace(), stream.Backend.ServiceName)) + } + return keys } - for _, stream := range ar.Spec.Stream { - keys = append(keys, GenIndexKey(ar.GetNamespace(), stream.Backend.ServiceName)) - } - return } -func ApisixRouteSecretIndexFunc(obj client.Object) (keys []string) { - ar := obj.(*apiv2.ApisixRoute) - for _, http := range ar.Spec.HTTP { - for _, plugin := range http.Plugins { - if plugin.Enable && plugin.SecretRef != "" { - keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) +func ApisixRouteSecretIndexFunc(cli client.Client) func(client.Object) []string { + return func(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, http := range ar.Spec.HTTP { + // secret reference in .plugins + for _, plugin := range http.Plugins { + if plugin.Enable && plugin.SecretRef != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) + } + } + // secret reference in .upstreams + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ( + au apiv2.ApisixUpstream + auNN = types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstream.Name, + } + ) + if err := cli.Get(context.Background(), auNN, &au); err != nil { + continue + } + if secret := au.Spec.TLSSecret; secret != nil && secret.Name != "" { + keys = append(keys, GenIndexKey(cmp.Or(secret.Namespace, au.GetNamespace()), secret.Name)) + } } } - } - for _, stream := range ar.Spec.Stream { - for _, plugin := range stream.Plugins { - if plugin.Enable && plugin.SecretRef != "" { - keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) + for _, stream := range ar.Spec.Stream { + for _, plugin := range stream.Plugins { + if plugin.Enable && plugin.SecretRef != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) + } } } + return keys } - return } func ApisixRouteApisixUpstreamIndexFunc(obj client.Object) (keys []string) { diff --git a/internal/provider/adc/translator/apisixroute.go b/internal/provider/adc/translator/apisixroute.go index b3e588acf..319c1a261 100644 --- a/internal/provider/adc/translator/apisixroute.go +++ b/internal/provider/adc/translator/apisixroute.go @@ -18,6 +18,7 @@ import ( "fmt" "strconv" + "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -28,7 +29,6 @@ import ( apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/label" "github.com/apache/apisix-ingress-controller/internal/provider" - types2 "github.com/apache/apisix-ingress-controller/internal/types" "github.com/apache/apisix-ingress-controller/internal/utils" "github.com/apache/apisix-ingress-controller/pkg/id" pkgutils "github.com/apache/apisix-ingress-controller/pkg/utils" @@ -152,34 +152,41 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a apisixUpstreams []*apiv2.ApisixUpstream adcUpstreams []*adc.Upstream ) + data, _ := json.Marshal(rule.Upstreams) + log.Debugf(".http[].Upstreams: %s", data) for _, upstreamRef := range rule.Upstreams { - refKey := types2.NamespacedNameKind{ + upsNN := types.NamespacedName{ Namespace: ar.GetNamespace(), Name: upstreamRef.Name, - Kind: "ApisixUpstream", } - apisixUpstream, ok := tctx.Upstreams[refKey] + log.Debugf("try to get ApisixUpstream: %s", upsNN) + au, ok := tctx.Upstreams[upsNN] if !ok { + log.Debugf("failed to retrieve ApisixUpstream from tctx, ApisixUpstream: %s", upsNN) continue } - - // todo: translate external upstream - adcUpstream, err := t.translateApisixUpstream(tctx, apisixUpstream) + log.Debugf("try to translate the ApisixUpstream: %v", au) + adcUpstream, err := t.translateApisixUpstream(tctx, au) if err != nil { - t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(apisixUpstream)) + log.Debugf("failed to translate ApisixUpstream, ApisixUpstream: %v, err: %v", au, err) + t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(au)) continue } - apisixUpstreams = append(apisixUpstreams, apisixUpstream) + apisixUpstreams = append(apisixUpstreams, au) adcUpstreams = append(adcUpstreams, adcUpstream) } - _ = apisixUpstreams + _ = apisixUpstreams // todo: remove this + + data, _ = json.Marshal(adcUpstreams) + log.Debugf("len(rule.Backends): %v, adcUpstreams: %s", len(rule.Backends), string(data)) // If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream; // Other upstreams are configured in the traffic-split plugin if len(rule.Backends) == 0 && len(adcUpstreams) > 0 { - service.Upstream = adcUpstreams[0] + log.Debugf("set first upstream as service.Upstream, first upstream: %v", adcUpstreams[0]) + upstream = adcUpstreams[0] adcUpstreams = adcUpstreams[1:] } diff --git a/internal/provider/adc/translator/apisixupstream.go b/internal/provider/adc/translator/apisixupstream.go index 6b22bf63f..68cf2da18 100644 --- a/internal/provider/adc/translator/apisixupstream.go +++ b/internal/provider/adc/translator/apisixupstream.go @@ -171,7 +171,7 @@ func translateApisixUpstreamExternalNodes(tctx *provider.TranslateContext, au *a return err } default: // apiv2.ExternalTypeService or default - if err := translateApisixUpstreamExternalNodesExternalName(tctx, au, ups, node); err != nil { + if err := translateApisixUpstreamExternalNodesService(tctx, au, ups, node); err != nil { return err } } @@ -207,7 +207,7 @@ func translateApisixUpstreamExternalNodesDomain(au *apiv2.ApisixUpstream, ups *a return nil } -func translateApisixUpstreamExternalNodesExternalName(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { +func translateApisixUpstreamExternalNodesService(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { serviceNN := types.NamespacedName{Namespace: au.GetNamespace(), Name: node.Name} svc, ok := tctx.Services[serviceNN] if !ok { diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 829fd9415..805d3e6f9 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -46,9 +46,9 @@ type TranslateContext struct { PluginConfigs map[k8stypes.NamespacedName]*v1alpha1.PluginConfig Services map[k8stypes.NamespacedName]*corev1.Service BackendTrafficPolicies map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy + Upstreams map[k8stypes.NamespacedName]*apiv2.ApisixUpstream GatewayProxies map[types.NamespacedNameKind]v1alpha1.GatewayProxy ResourceParentRefs map[types.NamespacedNameKind][]types.NamespacedNameKind - Upstreams map[types.NamespacedNameKind]*apiv2.ApisixUpstream HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy StatusUpdaters []status.Update @@ -61,9 +61,9 @@ func NewDefaultTranslateContext(ctx context.Context) *TranslateContext { Secrets: make(map[k8stypes.NamespacedName]*corev1.Secret), PluginConfigs: make(map[k8stypes.NamespacedName]*v1alpha1.PluginConfig), Services: make(map[k8stypes.NamespacedName]*corev1.Service), + Upstreams: make(map[k8stypes.NamespacedName]*apiv2.ApisixUpstream), BackendTrafficPolicies: make(map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy), GatewayProxies: make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy), ResourceParentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), - Upstreams: make(map[types.NamespacedNameKind]*apiv2.ApisixUpstream), } } diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 1da56e661..18fefb146 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -312,7 +312,7 @@ spec: upstreams: - name: default-upstream ` - const apisixUpstreamSpec = ` + const apisixUpstreamSpec0 = ` apiVersion: apisix.apache.org/v2 kind: ApisixUpstream metadata: @@ -323,18 +323,46 @@ spec: - type: Service name: httpbin-service-e2e-test ` - By("create ApisixUpstream and ApisixRoute") - err := s.CreateResourceFromString(apisixUpstreamSpec) + const apisixUpstreamSpec1 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Service + name: alias-httpbin-service-e2e-test +` + const serviceSpec = ` +apiVersion: v1 +kind: Service +metadata: + name: alias-httpbin-service-e2e-test +spec: + type: ExternalName + externalName: httpbin-service-e2e-test +` + By("create Service, ApisixUpstream and ApisixRoute") + err := s.CreateResourceFromString(serviceSpec) + Expect(err).ShouldNot(HaveOccurred(), "apply service") + err = s.CreateResourceFromString(apisixUpstreamSpec0) Expect(err).ShouldNot(HaveOccurred(), "apply apisixUpstreamSpec") var apisxiRoute apiv2.ApisixRoute applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisxiRoute, apisixRouteSpec) - By("verify ApisixRoute and ApisixUpstream works") + By("verify that the ApisixUpstream reference a Service which is not ExternalName should not request OK") request := func(path string) int { return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode } + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusServiceUnavailable)) + + By("verify that ApisixUpstream reference a Service which is ExternalName should reqeust OK") + err = s.CreateResourceFromString(apisixUpstreamSpec1) + Expect(err).ShouldNot(HaveOccurred(), "update apisixUpstream") Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + }) }) }) From 8b967c1b855998c5ebf7748c18b96de00599da81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 09:30:43 +0800 Subject: [PATCH 4/9] refactor: Clean up unused variables and update TODO comments in translators - Removed redundant `apisixUpstreams` and `adcUpstreams` variables in ApisixRoute translator logic for clarity. - Streamlined upstream handling and variable naming in traffic-split logic. - Updated TODO comments to reflect upcoming `.Checks`, `.TLS`, and `.Discovery*` implementations. --- docs/crd/api.md | 2 - internal/controller/apisixroute_controller.go | 1 - .../provider/adc/translator/apisixroute.go | 40 ++++++------------- .../provider/adc/translator/apisixupstream.go | 6 +-- internal/utils/k8s.go | 1 + 5 files changed, 17 insertions(+), 33 deletions(-) diff --git a/docs/crd/api.md b/docs/crd/api.md index a337c2db5..37032cdc8 100644 --- a/docs/crd/api.md +++ b/docs/crd/api.md @@ -1300,8 +1300,6 @@ _Appears in:_ - - #### ApisixTlsSpec diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index fe281db30..59ef4528d 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -186,7 +186,6 @@ func (r *ApisixRouteReconciler) processApisixRouteHTTPRule(ctx context.Context, } // check vars - // todo: cache the result to tctx if _, err := rule.Match.NginxVars.ToVars(); err != nil { return ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), diff --git a/internal/provider/adc/translator/apisixroute.go b/internal/provider/adc/translator/apisixroute.go index 319c1a261..ccf967b9f 100644 --- a/internal/provider/adc/translator/apisixroute.go +++ b/internal/provider/adc/translator/apisixroute.go @@ -149,64 +149,50 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a } var ( - apisixUpstreams []*apiv2.ApisixUpstream - adcUpstreams []*adc.Upstream + upstreams []*adc.Upstream ) - data, _ := json.Marshal(rule.Upstreams) - log.Debugf(".http[].Upstreams: %s", data) for _, upstreamRef := range rule.Upstreams { upsNN := types.NamespacedName{ Namespace: ar.GetNamespace(), Name: upstreamRef.Name, } - log.Debugf("try to get ApisixUpstream: %s", upsNN) au, ok := tctx.Upstreams[upsNN] if !ok { log.Debugf("failed to retrieve ApisixUpstream from tctx, ApisixUpstream: %s", upsNN) continue } - log.Debugf("try to translate the ApisixUpstream: %v", au) - adcUpstream, err := t.translateApisixUpstream(tctx, au) + upstream, err := t.translateApisixUpstream(tctx, au) if err != nil { - log.Debugf("failed to translate ApisixUpstream, ApisixUpstream: %v, err: %v", au, err) t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(au)) continue } - apisixUpstreams = append(apisixUpstreams, au) - adcUpstreams = append(adcUpstreams, adcUpstream) + upstreams = append(upstreams, upstream) } - _ = apisixUpstreams // todo: remove this - - data, _ = json.Marshal(adcUpstreams) - log.Debugf("len(rule.Backends): %v, adcUpstreams: %s", len(rule.Backends), string(data)) - // If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream; // Other upstreams are configured in the traffic-split plugin - if len(rule.Backends) == 0 && len(adcUpstreams) > 0 { - log.Debugf("set first upstream as service.Upstream, first upstream: %v", adcUpstreams[0]) - upstream = adcUpstreams[0] - adcUpstreams = adcUpstreams[1:] + if len(rule.Backends) == 0 && len(upstreams) > 0 { + upstream = upstreams[0] + upstreams = upstreams[1:] } - var wups []adc.TrafficSplitConfigRuleWeightedUpstream - for _, adcUpstream := range adcUpstreams { - weight, err := strconv.Atoi(adcUpstream.Labels["meta_weight"]) + var weightedUpstreams []adc.TrafficSplitConfigRuleWeightedUpstream + for _, item := range upstreams { + weight, err := strconv.Atoi(item.Labels["meta_weight"]) if err != nil { - t.Log.Error(err, "failed to parse meta_weight from upstream labels", "labels", adcUpstream.GetLabels()) weight = apiv2.DefaultWeight } - wups = append(wups, adc.TrafficSplitConfigRuleWeightedUpstream{ - Upstream: adcUpstream, + weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{ + Upstream: item, Weight: weight, }) } - if len(wups) > 0 { + if len(weightedUpstreams) > 0 { route.Plugins["traffic-split"] = &adc.TrafficSplitConfig{ Rules: []adc.TrafficSplitConfigRule{ { - WeightedUpstreams: wups, + WeightedUpstreams: weightedUpstreams, }, }, } diff --git a/internal/provider/adc/translator/apisixupstream.go b/internal/provider/adc/translator/apisixupstream.go index 68cf2da18..34b6f24f6 100644 --- a/internal/provider/adc/translator/apisixupstream.go +++ b/internal/provider/adc/translator/apisixupstream.go @@ -99,7 +99,7 @@ func translateApisixUpstreamLoadBalancer(au *apiv2.ApisixUpstream, ups *adc.Upst } func translateApisixUpstreamHealthCheck(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { - // todo: no field `.Checks` in adc.Upstream + // todo: handle `.Checks` in next PR return nil } @@ -141,7 +141,7 @@ func translateApisixUpstreamRetriesAndTimeout(au *apiv2.ApisixUpstream, ups *adc } func translateApisixUpstreamClientTLS(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { - // todo: no field `.TLS` in adc.Upstream + // todo: handle `.TLS` in next PR return nil } @@ -159,7 +159,7 @@ func translateApisixUpstreamPassHost(au *apiv2.ApisixUpstream, ups *adc.Upstream } func translateApisixUpstreamDiscovery(upstream *apiv2.ApisixUpstream, upstream2 *adc.Upstream) error { - // todo: no filed `.Discovery*` in adc.Upstream + // todo: handle `.Discovery*` in next PR return nil } diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 547e7a88f..4c42ad865 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -56,6 +56,7 @@ var hostDefRegex = regexp.MustCompile(hostDef) // ref to : https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/schema_def.lua#L40 // ref to : https://github.com/kubernetes/kubernetes/blob/976a940f4a4e84fe814583848f97b9aafcdb083f/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L205 // They define regex differently, but k8s's dns is more accurate +// todo: validate by CRD definition func MatchHostDef(host string) bool { return hostDefRegex.MatchString(host) } From 1d2de259000cae49c27089e878f36e6448310399 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 09:32:54 +0800 Subject: [PATCH 5/9] resolve Copilot's comments --- internal/utils/k8s.go | 2 +- test/e2e/apisix/route.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 4c42ad865..7cc3767d5 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -52,7 +52,7 @@ func ValidateRemoteAddrs(remoteAddrs []string) error { var hostDef = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" var hostDefRegex = regexp.MustCompile(hostDef) -// MatchHostDef checks that host matches host's shcema +// MatchHostDef checks that host matches host's schema // ref to : https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/schema_def.lua#L40 // ref to : https://github.com/kubernetes/kubernetes/blob/976a940f4a4e84fe814583848f97b9aafcdb083f/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L205 // They define regex differently, but k8s's dns is more accurate diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 18fefb146..2a08fcd74 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -358,7 +358,7 @@ spec: } Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusServiceUnavailable)) - By("verify that ApisixUpstream reference a Service which is ExternalName should reqeust OK") + By("verify that ApisixUpstream reference a Service which is ExternalName should request OK") err = s.CreateResourceFromString(apisixUpstreamSpec1) Expect(err).ShouldNot(HaveOccurred(), "update apisixUpstream") Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) From 229cd37e372e35b29248f1635571756c0df589c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 17:39:17 +0800 Subject: [PATCH 6/9] feat: Enhance upstream and traffic-split handling in translator and tests - Refactor `NewDefaultUpstream` initialization for streamlined field ordering. - Update e2e framework's `GetServiceEndpoints` to accept `types.NamespacedName`. - Incorporate upstream weights into traffic-split logic, including default handling and labels. - Add comprehensive e2e tests for mixed backend and upstream scenarios. --- api/adc/types.go | 7 +- .../provider/adc/translator/apisixroute.go | 19 ++- test/e2e/apisix/route.go | 110 +++++++++++++++--- test/e2e/framework/k8s.go | 6 +- 4 files changed, 117 insertions(+), 25 deletions(-) diff --git a/api/adc/types.go b/api/adc/types.go index 48a26d629..d21c08e26 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -533,15 +533,14 @@ func NewDefaultService() *Service { func NewDefaultUpstream() *Upstream { return &Upstream{ - Type: Roundrobin, - Nodes: make(UpstreamNodes, 0), - Scheme: SchemeHTTP, Metadata: Metadata{ - Desc: "Created by apisix-ingress-controller, DO NOT modify it manually", Labels: map[string]string{ "managed-by": "apisix-ingress-controller", }, }, + Nodes: make(UpstreamNodes, 0), + Scheme: SchemeHTTP, + Type: Roundrobin, } } diff --git a/internal/provider/adc/translator/apisixroute.go b/internal/provider/adc/translator/apisixroute.go index ccf967b9f..a27cafece 100644 --- a/internal/provider/adc/translator/apisixroute.go +++ b/internal/provider/adc/translator/apisixroute.go @@ -166,6 +166,9 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(au)) continue } + if upstreamRef.Weight != nil { + upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*upstreamRef.Weight), 10) + } upstreams = append(upstreams, upstream) } @@ -178,6 +181,17 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a } var weightedUpstreams []adc.TrafficSplitConfigRuleWeightedUpstream + + // set the default upstream's weight in traffic-split + weight, err := strconv.Atoi(upstream.Labels["meta_weight"]) + if err != nil { + weight = apiv2.DefaultWeight + } + weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // set others upstreams in traffic-split for _, item := range upstreams { weight, err := strconv.Atoi(item.Labels["meta_weight"]) if err != nil { @@ -189,7 +203,7 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a }) } if len(weightedUpstreams) > 0 { - route.Plugins["traffic-split"] = &adc.TrafficSplitConfig{ + service.Plugins["traffic-split"] = &adc.TrafficSplitConfig{ Rules: []adc.TrafficSplitConfigRule{ { WeightedUpstreams: weightedUpstreams, @@ -207,9 +221,6 @@ func (t *Translator) TranslateApisixRoute(tctx *provider.TranslateContext, ar *a service.Routes = []*adc.Route{route} if backendErr != nil && len(upstream.Nodes) == 0 { - if service.Plugins == nil { - service.Plugins = make(map[string]any) - } service.Plugins["fault-injection"] = map[string]any{ "abort": map[string]any{ "http_status": 500, diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 2a08fcd74..f245d2ef8 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -35,20 +35,20 @@ var _ = Describe("Test ApisixRoute", func() { applier = framework.NewApplier(s.GinkgoT, s.K8sClient, s.CreateResourceFromString) ) - Context("Test ApisixRoute", func() { - BeforeEach(func() { - By("create GatewayProxy") - gatewayProxy := fmt.Sprintf(gatewayProxyYaml, s.Deployer.GetAdminEndpoint(), s.AdminKey()) - err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") - Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") - time.Sleep(5 * time.Second) - - By("create IngressClass") - err = s.CreateResourceFromStringWithNamespace(ingressClassYaml, "") - Expect(err).NotTo(HaveOccurred(), "creating IngressClass") - time.Sleep(5 * time.Second) - }) + BeforeEach(func() { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(gatewayProxyYaml, s.Deployer.GetAdminEndpoint(), s.AdminKey()) + err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(5 * time.Second) + + By("create IngressClass") + err = s.CreateResourceFromStringWithNamespace(ingressClassYaml, "") + Expect(err).NotTo(HaveOccurred(), "creating IngressClass") + time.Sleep(5 * time.Second) + }) + Context("Test ApisixRoute", func() { It("Basic tests", func() { const apisixRouteSpec = ` apiVersion: apisix.apache.org/v2 @@ -295,8 +295,11 @@ spec: // ApisixUpstream is not implemented yet. // So the case is pending for now }) + }) - It("Test ApisixRoute reference ApisixUpstream", func() { + Context("Test ApisixRoute reference ApisixUpstream", func() { + + It("Test reference ApisixUpstream", func() { const apisixRouteSpec = ` apiVersion: apisix.apache.org/v2 kind: ApisixRoute @@ -310,7 +313,7 @@ spec: paths: - /* upstreams: - - name: default-upstream + - name: default-upstream ` const apisixUpstreamSpec0 = ` apiVersion: apisix.apache.org/v2 @@ -362,7 +365,84 @@ spec: err = s.CreateResourceFromString(apisixUpstreamSpec1) Expect(err).ShouldNot(HaveOccurred(), "update apisixUpstream") Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + }) + + It("Test a Mix of Backends and Upstreams", func() { + // apisixUpstreamSpec is an ApisixUpstream reference to the Service httpbin-service-e2e-test + const apisixUpstreamSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Domain + name: httpbin-service-e2e-test + passHost: node +` + // apisixRouteSpec is an ApisixUpstream uses a backend and reference an upstream. + // It contains a plugin response-rewrite that lets us know what upstream the gateway forwards the request to. + const apisixRouteSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + paths: + - /* + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 + upstreams: + - name: default-upstream + plugins: + - name: response-rewrite + enable: true + config: + headers: + set: + "X-Upstream-Host": "$upstream_addr" +` + By("apply ApisixUpstream") + err := s.CreateResourceFromString(apisixUpstreamSpec) + Expect(err).ShouldNot(HaveOccurred(), "apply ApisixUpstream") + + By("apply ApisixRoute") + var apisixRoute apiv2.ApisixRoute + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, apisixRouteSpec) + + By("verify ApisixRoute works") + request := func(path string) int { + return s.NewAPISIXClient().GET(path).Expect().Raw().StatusCode + } + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + + By("verify the backends and the upstreams work commonly") + // .backends -> service httpbin-service-e2e-test -> endpoints httpbin-service-e2e-test, so we will get the $upstream_addr as endpoint IP + // .upstreams -> service alias-httpbin-service-e2e-test -> service httpbin-service-e2e-test, so we will get the $upstream_addr as the service's ClusterIP + var upstreamAddrs = make(map[string]struct{}) + for range 10 { + upstreamAddr := s.NewAPISIXClient().GET("/get").Expect().Raw().Header.Get("X-Upstream-Host") + upstreamAddrs[upstreamAddr] = struct{}{} + } + + endpoints, err := s.GetServiceEndpoints(types.NamespacedName{Namespace: s.Namespace(), Name: "httpbin-service-e2e-test"}) + Expect(err).ShouldNot(HaveOccurred(), "get endpoints") + Expect(endpoints).Should(HaveLen(1)) + endpoint := net.JoinHostPort(endpoints[0], "80") + + service, err := s.GetServiceByName("httpbin-service-e2e-test") + Expect(err).ShouldNot(HaveOccurred(), "get service") + clusterIP := net.JoinHostPort(service.Spec.ClusterIP, "80") + Expect(upstreamAddrs).Should(HaveLen(2)) + Eventually(upstreamAddrs).Should(HaveKey(endpoint)) + Eventually(upstreamAddrs).Should(HaveKey(clusterIP)) }) }) }) diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index 3bb2ea6fd..61d9db7d8 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -15,6 +15,7 @@ package framework import ( "bufio" "bytes" + "cmp" "context" "encoding/json" "fmt" @@ -33,6 +34,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1" @@ -108,8 +110,8 @@ func (f *Framework) ensureServiceWithTimeout(name, namespace string, desiredEndp return nil } -func (f *Framework) GetServiceEndpoints(name string) ([]string, error) { - ep, err := f.clientset.CoreV1().Endpoints(_namespace).Get(f.Context, name, metav1.GetOptions{}) +func (f *Framework) GetServiceEndpoints(nn types.NamespacedName) ([]string, error) { + ep, err := f.clientset.CoreV1().Endpoints(cmp.Or(nn.Namespace, _namespace)).Get(f.Context, nn.Name, metav1.GetOptions{}) if err != nil { return nil, err } From 829174879bb273eb2cf4bae59a4e1ac2e5f4dfd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 17:56:33 +0800 Subject: [PATCH 7/9] comment --- test/e2e/apisix/route.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index f245d2ef8..6c08536a1 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -423,8 +423,8 @@ spec: Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) By("verify the backends and the upstreams work commonly") - // .backends -> service httpbin-service-e2e-test -> endpoints httpbin-service-e2e-test, so we will get the $upstream_addr as endpoint IP - // .upstreams -> service alias-httpbin-service-e2e-test -> service httpbin-service-e2e-test, so we will get the $upstream_addr as the service's ClusterIP + // .backends -> Service httpbin-service-e2e-test -> Endpoint httpbin-service-e2e-test, so the $upstream_addr value we get is the Endpoint IP. + // .upstreams -> Service httpbin-service-e2e-test, so the $upstream_addr value we get is the Service ClusterIP. var upstreamAddrs = make(map[string]struct{}) for range 10 { upstreamAddr := s.NewAPISIXClient().GET("/get").Expect().Raw().Header.Get("X-Upstream-Host") From 403bf9b4c4a0d3f0b6e45b4bc3408b9facbe3bf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 20:29:43 +0800 Subject: [PATCH 8/9] revert typo --- internal/controller/apisixroute_controller.go | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index 07a2c8b42..4b52e530d 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -140,11 +140,11 @@ func (r *ApisixRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } -func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *provider.TranslateContext, ar *apiv2.ApisixRoute) error { +func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute) error { var ( rules = make(map[string]struct{}) ) - for httpIndex, http := range ar.Spec.HTTP { + for httpIndex, http := range in.Spec.HTTP { // check rule names if _, ok := rules[http.Name]; ok { return ReasonError{ @@ -156,13 +156,13 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov // check plugin config reference if http.PluginConfigName != "" { - if err := r.validatePluginConfig(ctx, tc, ar, http); err != nil { + if err := r.validatePluginConfig(ctx, tc, in, http); err != nil { return err } } // check secret - if err := r.validateSecrets(ctx, tc, ar, http); err != nil { + if err := r.validateSecrets(ctx, tc, in, http); err != nil { return err } @@ -183,11 +183,11 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov } // process backend - if err := r.validateBackends(ctx, tc, ar, http); err != nil { + if err := r.validateBackends(ctx, tc, in, http); err != nil { return err } // process upstreams - if err := r.validateUpstreams(ctx, tc, ar, http); err != nil { + if err := r.validateUpstreams(ctx, tc, in, http); err != nil { return err } } @@ -195,8 +195,8 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov return nil } -func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *provider.TranslateContext, ar *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { - pcNamespace := ar.Namespace +func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { + pcNamespace := in.Namespace if http.PluginConfigNamespace != "" { pcNamespace = http.PluginConfigNamespace } @@ -217,7 +217,7 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *pr } // Check if ApisixPluginConfig has IngressClassName and if it matches - if ar.Spec.IngressClassName != pc.Spec.IngressClassName && pc.Spec.IngressClassName != "" { + if in.Spec.IngressClassName != pc.Spec.IngressClassName && pc.Spec.IngressClassName != "" { var pcIC networkingv1.IngressClass if err := r.Get(ctx, client.ObjectKey{Name: pc.Spec.IngressClassName}, &pcIC); err != nil { return ReasonError{ @@ -260,7 +260,7 @@ func (r *ApisixRouteReconciler) validatePluginConfig(ctx context.Context, tc *pr return nil } -func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provider.TranslateContext, ar *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { +func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { for _, plugin := range http.Plugins { if !plugin.Enable || plugin.Config == nil || plugin.SecretRef == "" { continue @@ -269,7 +269,7 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provide secret = corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: plugin.SecretRef, - Namespace: ar.Namespace, + Namespace: in.Namespace, }, } secretNN = utils.NamespacedName(&secret) @@ -286,14 +286,14 @@ func (r *ApisixRouteReconciler) validateSecrets(ctx context.Context, tc *provide return nil } -func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provider.TranslateContext, ar *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { +func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provider.TranslateContext, in *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { var backends = make(map[types.NamespacedName]struct{}) for _, backend := range http.Backends { var ( service = corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: backend.ServiceName, - Namespace: ar.Namespace, + Namespace: in.Namespace, }, } serviceNN = utils.NamespacedName(&service) From b34115f95aef8a649f2c7d283fe911420f3c127e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=82=9F=E7=A9=BA?= Date: Wed, 18 Jun 2025 23:51:27 +0800 Subject: [PATCH 9/9] feat: Add ApisixUpstream controller - Introduce a new ApisixUpstream controller for resource management. - Update CRDs with `subresources.status` and define ApisixUpstreamStatus schema. - Refactor ApisixPluginConfig reconciliation by removing unused logic and improving error handling. - Enhance e2e tests to validate ApisixRoute referencing ApisixUpstream. - Add copy logic for ApisixUpstream status in generated deepcopy functionality. --- api/v2/apisixupstream_types.go | 7 +- api/v2/zz_generated.deepcopy.go | 1 + .../apisix.apache.org_apisixupstreams.yaml | 62 ++++++ docs/crd/api.md | 2 + .../apisixpluginconfig_controller.go | 42 +--- .../controller/apisixupstream_controller.go | 192 ++++++++++++++++++ internal/manager/controllers.go | 6 + test/e2e/apisix/route.go | 13 +- 8 files changed, 281 insertions(+), 44 deletions(-) create mode 100644 internal/controller/apisixupstream_controller.go diff --git a/api/v2/apisixupstream_types.go b/api/v2/apisixupstream_types.go index 27c80c70c..552164218 100644 --- a/api/v2/apisixupstream_types.go +++ b/api/v2/apisixupstream_types.go @@ -36,14 +36,19 @@ type ApisixUpstreamSpec struct { PortLevelSettings []PortLevelSettings `json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"` } +// ApisixUpstreamStatus defines the observed state of ApisixUpstream. +type ApisixUpstreamStatus = ApisixStatus + // +kubebuilder:object:root=true +// +kubebuilder:subresource:status // ApisixUpstream is the Schema for the apisixupstreams API. type ApisixUpstream struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec ApisixUpstreamSpec `json:"spec,omitempty"` + Spec ApisixUpstreamSpec `json:"spec,omitempty"` + Status ApisixUpstreamStatus `json:"status,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v2/zz_generated.deepcopy.go b/api/v2/zz_generated.deepcopy.go index e353bbf75..c97bfe340 100644 --- a/api/v2/zz_generated.deepcopy.go +++ b/api/v2/zz_generated.deepcopy.go @@ -1208,6 +1208,7 @@ func (in *ApisixUpstream) DeepCopyInto(out *ApisixUpstream) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstream. diff --git a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml index c4efa840c..542a0c5f3 100644 --- a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml +++ b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml @@ -490,6 +490,68 @@ spec: the pass_host is set to rewrite type: string type: object + status: + description: ApisixStatus is the status report for Apisix ingress Resources + properties: + conditions: + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + type: object type: object served: true storage: true + subresources: + status: {} diff --git a/docs/crd/api.md b/docs/crd/api.md index 37032cdc8..a337c2db5 100644 --- a/docs/crd/api.md +++ b/docs/crd/api.md @@ -1300,6 +1300,8 @@ _Appears in:_ + + #### ApisixTlsSpec diff --git a/internal/controller/apisixpluginconfig_controller.go b/internal/controller/apisixpluginconfig_controller.go index ba57cac01..332b0df2c 100644 --- a/internal/controller/apisixpluginconfig_controller.go +++ b/internal/controller/apisixpluginconfig_controller.go @@ -30,7 +30,6 @@ import ( "github.com/apache/apisix-ingress-controller/api/v1alpha1" apiv2 "github.com/apache/apisix-ingress-controller/api/v2" "github.com/apache/apisix-ingress-controller/internal/controller/status" - "github.com/apache/apisix-ingress-controller/internal/provider" "github.com/apache/apisix-ingress-controller/internal/utils" ) @@ -46,11 +45,7 @@ type ApisixPluginConfigReconciler struct { func (r *ApisixPluginConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&apiv2.ApisixPluginConfig{}). - WithEventFilter( - predicate.Or( - predicate.GenerationChangedPredicate{}, - ), - ). + WithEventFilter(predicate.GenerationChangedPredicate{}). Watches(&networkingv1.IngressClass{}, handler.EnqueueRequestsFromMapFunc(r.listApisixPluginConfigForIngressClass), builder.WithPredicates( @@ -67,23 +62,12 @@ func (r *ApisixPluginConfigReconciler) SetupWithManager(mgr ctrl.Manager) error func (r *ApisixPluginConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var pc apiv2.ApisixPluginConfig if err := r.Get(ctx, req.NamespacedName, &pc); err != nil { - if client.IgnoreNotFound(err) == nil { - pc.Namespace = req.Namespace - pc.Name = req.Name - pc.TypeMeta = metav1.TypeMeta{ - Kind: KindApisixPluginConfig, - APIVersion: apiv2.GroupVersion.String(), - } - - return ctrl.Result{}, nil - } - return ctrl.Result{}, err + return ctrl.Result{}, client.IgnoreNotFound(err) } var ( - tctx = provider.NewDefaultTranslateContext(ctx) - ic *networkingv1.IngressClass - err error + ic *networkingv1.IngressClass + err error ) defer func() { r.updateStatus(&pc, err) @@ -92,7 +76,7 @@ func (r *ApisixPluginConfigReconciler) Reconcile(ctx context.Context, req ctrl.R if ic, err = r.getIngressClass(&pc); err != nil { return ctrl.Result{}, err } - if err = r.processIngressClassParameters(ctx, tctx, &pc, ic); err != nil { + if err = r.processIngressClassParameters(ctx, &pc, ic); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -174,15 +158,13 @@ func (r *ApisixPluginConfigReconciler) getDefaultIngressClass() (*networkingv1.I } // processIngressClassParameters processes the IngressClass parameters that reference GatewayProxy -func (r *ApisixPluginConfigReconciler) processIngressClassParameters(ctx context.Context, tc *provider.TranslateContext, pc *apiv2.ApisixPluginConfig, ingressClass *networkingv1.IngressClass) error { +func (r *ApisixPluginConfigReconciler) processIngressClassParameters(ctx context.Context, pc *apiv2.ApisixPluginConfig, ingressClass *networkingv1.IngressClass) error { if ingressClass == nil || ingressClass.Spec.Parameters == nil { return nil } var ( - ingressClassKind = utils.NamespacedNameKind(ingressClass) - pcKind = utils.NamespacedNameKind(pc) - parameters = ingressClass.Spec.Parameters + parameters = ingressClass.Spec.Parameters ) if parameters.APIGroup == nil || *parameters.APIGroup != v1alpha1.GroupVersion.Group || parameters.Kind != KindGatewayProxy { return nil @@ -197,15 +179,7 @@ func (r *ApisixPluginConfigReconciler) processIngressClassParameters(ctx context ns = &pc.Namespace } - if err := r.Get(ctx, client.ObjectKey{Namespace: *ns, Name: parameters.Name}, &gatewayProxy); err != nil { - r.Log.Error(err, "failed to get GatewayProxy", "namespace", *ns, "name", parameters.Name) - return err - } - - tc.GatewayProxies[ingressClassKind] = gatewayProxy - tc.ResourceParentRefs[pcKind] = append(tc.ResourceParentRefs[pcKind], ingressClassKind) - - return nil + return r.Get(ctx, client.ObjectKey{Namespace: *ns, Name: parameters.Name}, &gatewayProxy) } func (r *ApisixPluginConfigReconciler) updateStatus(pc *apiv2.ApisixPluginConfig, err error) { diff --git a/internal/controller/apisixupstream_controller.go b/internal/controller/apisixupstream_controller.go new file mode 100644 index 000000000..0da4fc196 --- /dev/null +++ b/internal/controller/apisixupstream_controller.go @@ -0,0 +1,192 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "cmp" + "context" + + "github.com/go-logr/logr" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/apache/apisix-ingress-controller/api/v1alpha1" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/controller/status" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +// ApisixUpstreamReconciler reconciles a ApisixUpstream object +type ApisixUpstreamReconciler struct { + client.Client + Scheme *runtime.Scheme + Log logr.Logger + Updater status.Updater +} + +// SetupWithManager sets up the controller with the Manager. +func (r *ApisixUpstreamReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&apiv2.ApisixUpstream{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Watches(&networkingv1.IngressClass{}, + handler.EnqueueRequestsFromMapFunc(r.listApisixUpstreamForIngressClass), + builder.WithPredicates( + predicate.NewPredicateFuncs(r.matchesIngressController), + ), + ). + Watches(&v1alpha1.GatewayProxy{}, + handler.EnqueueRequestsFromMapFunc(r.listApisixUpstreamForGatewayProxy), + ). + Named("apisixupstream"). + Complete(r) +} + +func (r *ApisixUpstreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var au apiv2.ApisixUpstream + if err := r.Get(ctx, req.NamespacedName, &au); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + var ( + ic *networkingv1.IngressClass + err error + ) + defer func() { + r.updateStatus(&au, err) + }() + + if ic, err = r.getIngressClass(&au); err != nil { + return ctrl.Result{}, err + } + if err = r.processIngressClassParameters(ctx, &au, ic); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func (r *ApisixUpstreamReconciler) listApisixUpstreamForIngressClass(ctx context.Context, object client.Object) (requests []reconcile.Request) { + ic, ok := object.(*networkingv1.IngressClass) + if !ok { + return nil + } + + isDefaultIngressClass := IsDefaultIngressClass(ic) + var auList apiv2.ApisixUpstreamList + if err := r.List(ctx, &auList); err != nil { + return nil + } + for _, pc := range auList.Items { + if pc.Spec.IngressClassName == ic.Name || (isDefaultIngressClass && pc.Spec.IngressClassName == "") { + requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&pc)}) + } + } + return requests +} + +func (r *ApisixUpstreamReconciler) listApisixUpstreamForGatewayProxy(ctx context.Context, object client.Object) (requests []reconcile.Request) { + gp, ok := object.(*v1alpha1.GatewayProxy) + if !ok { + return nil + } + + var icList networkingv1.IngressClassList + if err := r.List(ctx, &icList); err != nil { + r.Log.Error(err, "failed to list ingress classes for gateway proxy", "gatewayproxy", gp.GetName()) + return nil + } + + for _, ic := range icList.Items { + requests = append(requests, r.listApisixUpstreamForIngressClass(ctx, &ic)...) + } + + return requests +} + +func (r *ApisixUpstreamReconciler) matchesIngressController(obj client.Object) bool { + ingressClass, ok := obj.(*networkingv1.IngressClass) + if !ok { + return false + } + return matchesController(ingressClass.Spec.Controller) +} + +func (r *ApisixUpstreamReconciler) getIngressClass(au *apiv2.ApisixUpstream) (*networkingv1.IngressClass, error) { + if au.Spec.IngressClassName == "" { + return r.getDefaultIngressClass() + } + + var ic networkingv1.IngressClass + if err := r.Get(context.Background(), client.ObjectKey{Name: au.Spec.IngressClassName}, &ic); err != nil { + return nil, err + } + return &ic, nil +} + +func (r *ApisixUpstreamReconciler) processIngressClassParameters(ctx context.Context, au *apiv2.ApisixUpstream, ic *networkingv1.IngressClass) error { + if ic == nil || ic.Spec.Parameters == nil { + return nil + } + + var ( + parameters = ic.Spec.Parameters + ) + if parameters.APIGroup == nil || *parameters.APIGroup != v1alpha1.GroupVersion.Group || parameters.Kind != KindGatewayProxy { + return nil + } + + // check if the parameters reference GatewayProxy + var ( + gp v1alpha1.GatewayProxy + ns = cmp.Or(parameters.Namespace, &au.Namespace) + ) + + return r.Get(ctx, client.ObjectKey{Namespace: *ns, Name: parameters.Name}, &gp) +} + +func (r *ApisixUpstreamReconciler) getDefaultIngressClass() (*networkingv1.IngressClass, error) { + var icList networkingv1.IngressClassList + if err := r.List(context.Background(), &icList); err != nil { + r.Log.Error(err, "failed to list ingress classes") + return nil, err + } + for _, ic := range icList.Items { + if IsDefaultIngressClass(&ic) && matchesController(ic.Spec.Controller) { + return &ic, nil + } + } + return nil, ReasonError{ + Reason: string(metav1.StatusReasonNotFound), + Message: "default ingress class not found or does not match the controller", + } +} + +func (r *ApisixUpstreamReconciler) updateStatus(au *apiv2.ApisixUpstream, err error) { + SetApisixCRDConditionAccepted(&au.Status, au.GetGeneration(), err) + r.Updater.Update(status.Update{ + NamespacedName: utils.NamespacedName(au), + Resource: &apiv2.ApisixUpstream{}, + Mutator: status.MutatorFunc(func(obj client.Object) client.Object { + cp := obj.(*apiv2.ApisixUpstream).DeepCopy() + cp.Status = au.Status + return cp + }), + }) +} diff --git a/internal/manager/controllers.go b/internal/manager/controllers.go index ac7fcf4f3..0a52499c2 100644 --- a/internal/manager/controllers.go +++ b/internal/manager/controllers.go @@ -140,5 +140,11 @@ func setupControllers(ctx context.Context, mgr manager.Manager, pro provider.Pro Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixPluginConfig"), Updater: updater, }, + &controller.ApisixUpstreamReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Log: ctrl.LoggerFrom(ctx).WithName("controllers").WithName("ApisixUpstream"), + Updater: updater, + }, }, nil } diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 6c08536a1..c7e645460 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -298,7 +298,6 @@ spec: }) Context("Test ApisixRoute reference ApisixUpstream", func() { - It("Test reference ApisixUpstream", func() { const apisixRouteSpec = ` apiVersion: apisix.apache.org/v2 @@ -349,8 +348,7 @@ spec: By("create Service, ApisixUpstream and ApisixRoute") err := s.CreateResourceFromString(serviceSpec) Expect(err).ShouldNot(HaveOccurred(), "apply service") - err = s.CreateResourceFromString(apisixUpstreamSpec0) - Expect(err).ShouldNot(HaveOccurred(), "apply apisixUpstreamSpec") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default-upstream"}, new(apiv2.ApisixUpstream), apisixUpstreamSpec0) var apisxiRoute apiv2.ApisixRoute applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisxiRoute, apisixRouteSpec) @@ -362,8 +360,7 @@ spec: Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusServiceUnavailable)) By("verify that ApisixUpstream reference a Service which is ExternalName should request OK") - err = s.CreateResourceFromString(apisixUpstreamSpec1) - Expect(err).ShouldNot(HaveOccurred(), "update apisixUpstream") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default-upstream"}, new(apiv2.ApisixUpstream), apisixUpstreamSpec1) Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) }) @@ -409,12 +406,10 @@ spec: "X-Upstream-Host": "$upstream_addr" ` By("apply ApisixUpstream") - err := s.CreateResourceFromString(apisixUpstreamSpec) - Expect(err).ShouldNot(HaveOccurred(), "apply ApisixUpstream") + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default-upstream"}, new(apiv2.ApisixUpstream), apisixUpstreamSpec) By("apply ApisixRoute") - var apisixRoute apiv2.ApisixRoute - applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, apisixRouteSpec) + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, new(apiv2.ApisixRoute), apisixRouteSpec) By("verify ApisixRoute works") request := func(path string) int {