diff --git a/api/adc/types.go b/api/adc/types.go index d9b0dab6a..d21c08e26 100644 --- a/api/adc/types.go +++ b/api/adc/types.go @@ -206,6 +206,108 @@ type Upstream struct { Timeout *Timeout `json:"timeout,omitempty" yaml:"timeout,omitempty"` Type UpstreamType `json:"type,omitempty" yaml:"type,omitempty"` UpstreamHost string `json:"upstream_host,omitempty" yaml:"upstream_host,omitempty"` + + Checks *UpstreamHealthCheck `json:"checks,omitempty" yaml:"checks,omitempty"` + TLS *ClientTLS `json:"tls,omitempty" yaml:"tls,omitempty"` + // for Service Discovery + DiscoveryType string `json:"discovery_type,omitempty" yaml:"discovery_type,omitempty"` + DiscoveryArgs map[string]string `json:"discovery_args,omitempty" yaml:"discovery_args,omitempty"` +} + +// UpstreamHealthCheck defines the active and/or passive health check for an Upstream, +// with the upstream health check feature, pods can be kicked out or joined in quickly, +// if the feedback of Kubernetes liveness/readiness probe is long. +// +k8s:deepcopy-gen=true +type UpstreamHealthCheck struct { + Active *UpstreamActiveHealthCheck `json:"active" yaml:"active"` + Passive *UpstreamPassiveHealthCheck `json:"passive,omitempty" yaml:"passive,omitempty"` +} + +// ClientTLS is tls cert and key use in mTLS +// +k8s:deepcopy-gen=true +type ClientTLS struct { + Cert string `json:"client_cert,omitempty" yaml:"client_cert,omitempty"` + Key string `json:"client_key,omitempty" yaml:"client_key,omitempty"` +} + +// UpstreamActiveHealthCheck defines the active kind of upstream health check. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheck struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + Timeout int `json:"timeout,omitempty" yaml:"timeout,omitempty"` + Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"` + Host string `json:"host,omitempty" yaml:"host,omitempty"` + Port int32 `json:"port,omitempty" yaml:"port,omitempty"` + HTTPPath string `json:"http_path,omitempty" yaml:"http_path,omitempty"` + HTTPSVerifyCert bool `json:"https_verify_certificate,omitempty" yaml:"https_verify_certificate,omitempty"` + HTTPRequestHeaders []string `json:"req_headers,omitempty" yaml:"req_headers,omitempty"` + Healthy UpstreamActiveHealthCheckHealthy `json:"healthy,omitempty" yaml:"healthy,omitempty"` + Unhealthy UpstreamActiveHealthCheckUnhealthy `json:"unhealthy,omitempty" yaml:"unhealthy,omitempty"` +} + +// UpstreamPassiveHealthCheck defines the passive kind of upstream health check. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheck struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + Healthy UpstreamPassiveHealthCheckHealthy `json:"healthy,omitempty" yaml:"healthy,omitempty"` + Unhealthy UpstreamPassiveHealthCheckUnhealthy `json:"unhealthy,omitempty" yaml:"unhealthy,omitempty"` +} + +// UpstreamActiveHealthCheckHealthy defines the conditions to judge whether +// an upstream node is healthy with the active manner. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheckHealthy struct { + UpstreamPassiveHealthCheckHealthy `json:",inline" yaml:",inline"` + + Interval int `json:"interval,omitempty" yaml:"interval,omitempty"` +} + +// UpstreamPassiveHealthCheckHealthy defines the conditions to judge whether +// an upstream node is healthy with the passive manner. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheckHealthy struct { + HTTPStatuses []int `json:"http_statuses,omitempty" yaml:"http_statuses,omitempty"` + Successes int `json:"successes,omitempty" yaml:"successes,omitempty"` +} + +// UpstreamPassiveHealthCheckUnhealthy defines the conditions to judge whether +// an upstream node is unhealthy with the passive manager. +// +k8s:deepcopy-gen=true +type UpstreamPassiveHealthCheckUnhealthy struct { + HTTPStatuses []int `json:"http_statuses,omitempty" yaml:"http_statuses,omitempty"` + HTTPFailures int `json:"http_failures,omitempty" yaml:"http_failures,omitempty"` + TCPFailures int `json:"tcp_failures,omitempty" yaml:"tcp_failures,omitempty"` + Timeouts int `json:"timeouts,omitempty" yaml:"timeouts,omitempty"` +} + +// UpstreamActiveHealthCheckUnhealthy defines the conditions to judge whether +// an upstream node is unhealthy with the active manager. +// +k8s:deepcopy-gen=true +type UpstreamActiveHealthCheckUnhealthy struct { + UpstreamPassiveHealthCheckUnhealthy `json:",inline" yaml:",inline"` + + Interval int `json:"interval,omitempty" yaml:"interval,omitempty"` +} + +// TrafficSplitConfig is the config of traffic-split plugin. +// +k8s:deepcopy-gen=true +type TrafficSplitConfig struct { + Rules []TrafficSplitConfigRule `json:"rules"` +} + +// TrafficSplitConfigRule is the rule config in traffic-split plugin config. +// +k8s:deepcopy-gen=true +type TrafficSplitConfigRule struct { + WeightedUpstreams []TrafficSplitConfigRuleWeightedUpstream `json:"weighted_upstreams"` +} + +// TrafficSplitConfigRuleWeightedUpstream is the weighted upstream config in +// the traffic split plugin rule. +// +k8s:deepcopy-gen=true +type TrafficSplitConfigRuleWeightedUpstream struct { + UpstreamID string `json:"upstream_id,omitempty"` + Upstream *Upstream `json:"upstream,omitempty"` + Weight int `json:"weight"` } // +k8s:deepcopy-gen=true @@ -431,15 +533,14 @@ func NewDefaultService() *Service { func NewDefaultUpstream() *Upstream { return &Upstream{ - Type: Roundrobin, - Nodes: make(UpstreamNodes, 0), - Scheme: SchemeHTTP, Metadata: Metadata{ - Desc: "Created by apisix-ingress-controller, DO NOT modify it manually", Labels: map[string]string{ "managed-by": "apisix-ingress-controller", }, }, + Nodes: make(UpstreamNodes, 0), + Scheme: SchemeHTTP, + Type: Roundrobin, } } diff --git a/api/adc/zz_generated.deepcopy.go b/api/adc/zz_generated.deepcopy.go index 8051a98bc..6f3fe4133 100644 --- a/api/adc/zz_generated.deepcopy.go +++ b/api/adc/zz_generated.deepcopy.go @@ -62,6 +62,21 @@ func (in *ClientClass) DeepCopy() *ClientClass { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClientTLS) DeepCopyInto(out *ClientTLS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClientTLS. +func (in *ClientTLS) DeepCopy() *ClientTLS { + if in == nil { + return nil + } + out := new(ClientTLS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Consumer) DeepCopyInto(out *Consumer) { *out = *in @@ -402,6 +417,70 @@ func (in *TLSClass) DeepCopy() *TLSClass { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfig) DeepCopyInto(out *TrafficSplitConfig) { + *out = *in + if in.Rules != nil { + in, out := &in.Rules, &out.Rules + *out = make([]TrafficSplitConfigRule, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfig. +func (in *TrafficSplitConfig) DeepCopy() *TrafficSplitConfig { + if in == nil { + return nil + } + out := new(TrafficSplitConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfigRule) DeepCopyInto(out *TrafficSplitConfigRule) { + *out = *in + if in.WeightedUpstreams != nil { + in, out := &in.WeightedUpstreams, &out.WeightedUpstreams + *out = make([]TrafficSplitConfigRuleWeightedUpstream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfigRule. +func (in *TrafficSplitConfigRule) DeepCopy() *TrafficSplitConfigRule { + if in == nil { + return nil + } + out := new(TrafficSplitConfigRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficSplitConfigRuleWeightedUpstream) DeepCopyInto(out *TrafficSplitConfigRuleWeightedUpstream) { + *out = *in + if in.Upstream != nil { + in, out := &in.Upstream, &out.Upstream + *out = new(Upstream) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficSplitConfigRuleWeightedUpstream. +func (in *TrafficSplitConfigRuleWeightedUpstream) DeepCopy() *TrafficSplitConfigRuleWeightedUpstream { + if in == nil { + return nil + } + out := new(TrafficSplitConfigRuleWeightedUpstream) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Upstream) DeepCopyInto(out *Upstream) { *out = *in @@ -426,6 +505,23 @@ func (in *Upstream) DeepCopyInto(out *Upstream) { *out = new(Timeout) **out = **in } + if in.Checks != nil { + in, out := &in.Checks, &out.Checks + *out = new(UpstreamHealthCheck) + (*in).DeepCopyInto(*out) + } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(ClientTLS) + **out = **in + } + if in.DiscoveryArgs != nil { + in, out := &in.DiscoveryArgs, &out.DiscoveryArgs + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Upstream. @@ -437,3 +533,139 @@ func (in *Upstream) DeepCopy() *Upstream { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheck) DeepCopyInto(out *UpstreamActiveHealthCheck) { + *out = *in + if in.HTTPRequestHeaders != nil { + in, out := &in.HTTPRequestHeaders, &out.HTTPRequestHeaders + *out = make([]string, len(*in)) + copy(*out, *in) + } + in.Healthy.DeepCopyInto(&out.Healthy) + in.Unhealthy.DeepCopyInto(&out.Unhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheck. +func (in *UpstreamActiveHealthCheck) DeepCopy() *UpstreamActiveHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheckHealthy) DeepCopyInto(out *UpstreamActiveHealthCheckHealthy) { + *out = *in + in.UpstreamPassiveHealthCheckHealthy.DeepCopyInto(&out.UpstreamPassiveHealthCheckHealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheckHealthy. +func (in *UpstreamActiveHealthCheckHealthy) DeepCopy() *UpstreamActiveHealthCheckHealthy { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheckHealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamActiveHealthCheckUnhealthy) DeepCopyInto(out *UpstreamActiveHealthCheckUnhealthy) { + *out = *in + in.UpstreamPassiveHealthCheckUnhealthy.DeepCopyInto(&out.UpstreamPassiveHealthCheckUnhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamActiveHealthCheckUnhealthy. +func (in *UpstreamActiveHealthCheckUnhealthy) DeepCopy() *UpstreamActiveHealthCheckUnhealthy { + if in == nil { + return nil + } + out := new(UpstreamActiveHealthCheckUnhealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamHealthCheck) DeepCopyInto(out *UpstreamHealthCheck) { + *out = *in + if in.Active != nil { + in, out := &in.Active, &out.Active + *out = new(UpstreamActiveHealthCheck) + (*in).DeepCopyInto(*out) + } + if in.Passive != nil { + in, out := &in.Passive, &out.Passive + *out = new(UpstreamPassiveHealthCheck) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamHealthCheck. +func (in *UpstreamHealthCheck) DeepCopy() *UpstreamHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheck) DeepCopyInto(out *UpstreamPassiveHealthCheck) { + *out = *in + in.Healthy.DeepCopyInto(&out.Healthy) + in.Unhealthy.DeepCopyInto(&out.Unhealthy) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheck. +func (in *UpstreamPassiveHealthCheck) DeepCopy() *UpstreamPassiveHealthCheck { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheck) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheckHealthy) DeepCopyInto(out *UpstreamPassiveHealthCheckHealthy) { + *out = *in + if in.HTTPStatuses != nil { + in, out := &in.HTTPStatuses, &out.HTTPStatuses + *out = make([]int, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheckHealthy. +func (in *UpstreamPassiveHealthCheckHealthy) DeepCopy() *UpstreamPassiveHealthCheckHealthy { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheckHealthy) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamPassiveHealthCheckUnhealthy) DeepCopyInto(out *UpstreamPassiveHealthCheckUnhealthy) { + *out = *in + if in.HTTPStatuses != nil { + in, out := &in.HTTPStatuses, &out.HTTPStatuses + *out = make([]int, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamPassiveHealthCheckUnhealthy. +func (in *UpstreamPassiveHealthCheckUnhealthy) DeepCopy() *UpstreamPassiveHealthCheckUnhealthy { + if in == nil { + return nil + } + out := new(UpstreamPassiveHealthCheckUnhealthy) + in.DeepCopyInto(out) + return out +} diff --git a/api/v2/apisixupstream_types.go b/api/v2/apisixupstream_types.go index 3e0807a14..27c80c70c 100644 --- a/api/v2/apisixupstream_types.go +++ b/api/v2/apisixupstream_types.go @@ -36,19 +36,14 @@ type ApisixUpstreamSpec struct { PortLevelSettings []PortLevelSettings `json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"` } -// ApisixUpstreamStatus defines the observed state of ApisixUpstream. -type ApisixUpstreamStatus = ApisixStatus - // +kubebuilder:object:root=true -// +kubebuilder:subresource:status // ApisixUpstream is the Schema for the apisixupstreams API. type ApisixUpstream struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec ApisixUpstreamSpec `json:"spec,omitempty"` - Status ApisixUpstreamStatus `json:"status,omitempty"` + Spec ApisixUpstreamSpec `json:"spec,omitempty"` } // +kubebuilder:object:root=true @@ -86,7 +81,7 @@ type ApisixUpstreamConfig struct { // How many times that the proxy (Apache APISIX) should do when // errors occur (error, timeout or bad http status codes like 500, 502). // +kubebuilder:validation:Optional - Retries *int `json:"retries,omitempty" yaml:"retries,omitempty"` + Retries *int64 `json:"retries,omitempty" yaml:"retries,omitempty"` // Timeout settings for the read, send and connect to the upstream. // +kubebuilder:validation:Optional diff --git a/api/v2/shared_types.go b/api/v2/shared_types.go index 4738c5d83..cda28a2f5 100644 --- a/api/v2/shared_types.go +++ b/api/v2/shared_types.go @@ -77,3 +77,77 @@ const ( // ScopeVariable means the route match expression subject is in variable. ScopeVariable = "Variable" ) + +const ( + // SchemeHTTP represents the HTTP protocol. + SchemeHTTP = "http" + // SchemeGRPC represents the GRPC protocol. + SchemeGRPC = "grpc" + // SchemeHTTPS represents the HTTPS protocol. + SchemeHTTPS = "https" + // SchemeGRPCS represents the GRPCS protocol. + SchemeGRPCS = "grpcs" + // SchemeTCP represents the TCP protocol. + SchemeTCP = "tcp" + // SchemeUDP represents the UDP protocol. + SchemeUDP = "udp" +) + +const ( + // HashOnVars means the hash scope is variable. + HashOnVars = "vars" + // HashOnVarsCombination means the hash scope is the + // variable combination. + HashOnVarsCombination = "vars_combinations" + // HashOnHeader means the hash scope is HTTP request + // headers. + HashOnHeader = "header" + // HashOnCookie means the hash scope is HTTP Cookie. + HashOnCookie = "cookie" + // HashOnConsumer means the hash scope is APISIX consumer. + HashOnConsumer = "consumer" + + // LbRoundRobin is the round robin load balancer. + LbRoundRobin = "roundrobin" + // LbConsistentHash is the consistent hash load balancer. + LbConsistentHash = "chash" + // LbEwma is the ewma load balancer. + LbEwma = "ewma" + // LbLeaseConn is the least connection load balancer. + LbLeastConn = "least_conn" +) + +const ( + // PassHostPass represents pass option for pass_host Upstream settings. + PassHostPass = "pass" + // PassHostPass represents node option for pass_host Upstream settings. + PassHostNode = "node" + // PassHostPass represents rewrite option for pass_host Upstream settings. + PassHostRewrite = "rewrite" +) + +const ( + // ExternalTypeDomain type is a domain + // +k8s:deepcopy-gen=false + ExternalTypeDomain ApisixUpstreamExternalType = "Domain" + + // ExternalTypeService type is a K8s ExternalName service + // +k8s:deepcopy-gen=false + ExternalTypeService ApisixUpstreamExternalType = "Service" +) + +var schemeToPortMaps = map[string]int{ + SchemeHTTP: 80, + SchemeHTTPS: 443, + SchemeGRPC: 80, + SchemeGRPCS: 443, +} + +// SchemeToPort scheme converts to the default port +// ref https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/upstream.lua#L167-L172 +func SchemeToPort(schema string) int { + if val, ok := schemeToPortMaps[schema]; ok { + return val + } + return 80 +} diff --git a/api/v2/zz_generated.deepcopy.go b/api/v2/zz_generated.deepcopy.go index 917d1885e..e353bbf75 100644 --- a/api/v2/zz_generated.deepcopy.go +++ b/api/v2/zz_generated.deepcopy.go @@ -1208,7 +1208,6 @@ func (in *ApisixUpstream) DeepCopyInto(out *ApisixUpstream) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApisixUpstream. @@ -1239,7 +1238,7 @@ func (in *ApisixUpstreamConfig) DeepCopyInto(out *ApisixUpstreamConfig) { } if in.Retries != nil { in, out := &in.Retries, &out.Retries - *out = new(int) + *out = new(int64) **out = **in } if in.Timeout != nil { diff --git a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml index e6371f293..c4efa840c 100644 --- a/config/crd/bases/apisix.apache.org_apisixupstreams.yaml +++ b/config/crd/bases/apisix.apache.org_apisixupstreams.yaml @@ -366,6 +366,7 @@ spec: description: |- How many times that the proxy (Apache APISIX) should do when errors occur (error, timeout or bad http status codes like 500, 502). + format: int64 type: integer scheme: description: |- @@ -431,6 +432,7 @@ spec: description: |- How many times that the proxy (Apache APISIX) should do when errors occur (error, timeout or bad http status codes like 500, 502). + format: int64 type: integer scheme: description: |- @@ -488,68 +490,6 @@ spec: the pass_host is set to rewrite type: string type: object - status: - description: ApisixStatus is the status report for Apisix ingress Resources - properties: - conditions: - items: - description: Condition contains details for one aspect of the current - state of this API Resource. - properties: - lastTransitionTime: - description: |- - lastTransitionTime is the last time the condition transitioned from one status to another. - This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. - format: date-time - type: string - message: - description: |- - message is a human readable message indicating details about the transition. - This may be an empty string. - maxLength: 32768 - type: string - observedGeneration: - description: |- - observedGeneration represents the .metadata.generation that the condition was set based upon. - For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date - with respect to the current state of the instance. - format: int64 - minimum: 0 - type: integer - reason: - description: |- - reason contains a programmatic identifier indicating the reason for the condition's last transition. - Producers of specific condition types may define expected values and meanings for this field, - and whether the values are considered a guaranteed API. - The value should be a CamelCase string. - This field may not be empty. - maxLength: 1024 - minLength: 1 - pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ - type: string - status: - description: status of the condition, one of True, False, Unknown. - enum: - - "True" - - "False" - - Unknown - type: string - type: - description: type of condition in CamelCase or in foo.example.com/CamelCase. - maxLength: 316 - pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ - type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array - type: object type: object served: true storage: true - subresources: - status: {} diff --git a/docs/crd/api.md b/docs/crd/api.md index a337c2db5..37032cdc8 100644 --- a/docs/crd/api.md +++ b/docs/crd/api.md @@ -1300,8 +1300,6 @@ _Appears in:_ - - #### ApisixTlsSpec diff --git a/internal/controller/apisixroute_controller.go b/internal/controller/apisixroute_controller.go index 7b20957d3..4b52e530d 100644 --- a/internal/controller/apisixroute_controller.go +++ b/internal/controller/apisixroute_controller.go @@ -80,6 +80,9 @@ func (r *ApisixRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&corev1.Secret{}, handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForSecret), ). + Watches(&apiv2.ApisixUpstream{}, + handler.EnqueueRequestsFromMapFunc(r.listApisixRouteForApisixUpstream), + ). Watches(&apiv2.ApisixPluginConfig{}, handler.EnqueueRequestsFromMapFunc(r.listApisixRoutesForPluginConfig), ). @@ -164,7 +167,6 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov } // check vars - // todo: cache the result to tctx if _, err := http.Match.NginxVars.ToVars(); err != nil { return ReasonError{ Reason: string(apiv2.ConditionReasonInvalidSpec), @@ -184,6 +186,10 @@ func (r *ApisixRouteReconciler) processApisixRoute(ctx context.Context, tc *prov if err := r.validateBackends(ctx, tc, in, http); err != nil { return err } + // process upstreams + if err := r.validateUpstreams(ctx, tc, in, http); err != nil { + return err + } } return nil @@ -339,6 +345,63 @@ func (r *ApisixRouteReconciler) validateBackends(ctx context.Context, tc *provid } tc.EndpointSlices[serviceNN] = endpoints.Items } + + return nil +} + +func (r *ApisixRouteReconciler) validateUpstreams(ctx context.Context, tc *provider.TranslateContext, ar *apiv2.ApisixRoute, http apiv2.ApisixRouteHTTP) error { + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ( + ups apiv2.ApisixUpstream + upsNN = types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstream.Name, + } + ) + if err := r.Get(ctx, upsNN, &ups); err != nil { + r.Log.Error(err, "failed to get ApisixUpstream", "ApisixUpstream", upsNN) + if client.IgnoreNotFound(err) == nil { + continue + } + return err + } + tc.Upstreams[upsNN] = &ups + + for _, node := range ups.Spec.ExternalNodes { + if node.Type == apiv2.ExternalTypeService { + var ( + service corev1.Service + serviceNN = types.NamespacedName{Namespace: ups.GetNamespace(), Name: node.Name} + ) + if err := r.Get(ctx, serviceNN, &service); err != nil { + r.Log.Error(err, "failed to get service in ApisixUpstream", "ApisixUpstream", upsNN, "Service", serviceNN) + if client.IgnoreNotFound(err) == nil { + continue + } + return err + } + tc.Services[utils.NamespacedName(&service)] = &service + } + } + + if ups.Spec.TLSSecret != nil && ups.Spec.TLSSecret.Name != "" { + var ( + secret corev1.Secret + secretNN = types.NamespacedName{Namespace: cmp.Or(ups.Spec.TLSSecret.Namespace, ar.GetNamespace()), Name: ups.Spec.TLSSecret.Name} + ) + if err := r.Get(ctx, secretNN, &secret); err != nil { + r.Log.Error(err, "failed to get secret in ApisixUpstream", "ApisixUpstream", upsNN, "Secret", secretNN) + if client.IgnoreNotFound(err) != nil { + return err + } + } + tc.Secrets[secretNN] = &secret + } + } + return nil } @@ -454,6 +517,24 @@ func (r *ApisixRouteReconciler) listApisixRouteForGatewayProxy(ctx context.Conte return pkgutils.DedupComparable(requests) } +func (r *ApisixRouteReconciler) listApisixRouteForApisixUpstream(ctx context.Context, object client.Object) (requests []reconcile.Request) { + au, ok := object.(*apiv2.ApisixUpstream) + if !ok { + return nil + } + + var arList apiv2.ApisixRouteList + if err := r.List(ctx, &arList, client.MatchingFields{indexer.ApisixUpstreamRef: indexer.GenIndexKey(au.GetNamespace(), au.GetName())}); err != nil { + r.Log.Error(err, "failed to list ApisixUpstreams") + return nil + } + + for _, ar := range arList.Items { + requests = append(requests, reconcile.Request{NamespacedName: utils.NamespacedName(&ar)}) + } + return pkgutils.DedupComparable(requests) +} + func (r *ApisixRouteReconciler) matchesIngressController(obj client.Object) bool { ingressClass, ok := obj.(*networkingv1.IngressClass) if !ok { diff --git a/internal/controller/apisixupstream_controller.go b/internal/controller/apisixupstream_controller.go deleted file mode 100644 index 2dce86a06..000000000 --- a/internal/controller/apisixupstream_controller.go +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controller - -import ( - "context" - - "github.com/go-logr/logr" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" - - apiv2 "github.com/apache/apisix-ingress-controller/api/v2" -) - -// ApisixUpstreamReconciler reconciles a ApisixUpstream object -type ApisixUpstreamReconciler struct { - client.Client - Scheme *runtime.Scheme - Log logr.Logger -} - -// Reconcile FIXME: implement the reconcile logic (For now, it dose nothing other than directly accepting) -func (r *ApisixUpstreamReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - r.Log.Info("reconcile", "request", req.NamespacedName) - - var obj apiv2.ApisixUpstream - if err := r.Get(ctx, req.NamespacedName, &obj); err != nil { - r.Log.Error(err, "failed to get ApisixConsumer", "request", req.NamespacedName) - return ctrl.Result{}, err - } - - obj.Status.Conditions = []metav1.Condition{ - { - Type: string(gatewayv1.RouteConditionAccepted), - Status: metav1.ConditionTrue, - ObservedGeneration: obj.GetGeneration(), - LastTransitionTime: metav1.Now(), - Reason: string(gatewayv1.RouteReasonAccepted), - }, - } - - if err := r.Status().Update(ctx, &obj); err != nil { - r.Log.Error(err, "failed to update status", "request", req.NamespacedName) - return ctrl.Result{}, err - } - - return ctrl.Result{}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *ApisixUpstreamReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&apiv2.ApisixUpstream{}). - Named("apisixupstream"). - Complete(r) -} diff --git a/internal/controller/indexer/indexer.go b/internal/controller/indexer/indexer.go index 0655a610b..252931185 100644 --- a/internal/controller/indexer/indexer.go +++ b/internal/controller/indexer/indexer.go @@ -13,6 +13,7 @@ package indexer import ( + "cmp" "context" networkingv1 "k8s.io/api/networking/v1" @@ -38,6 +39,7 @@ const ( ConsumerGatewayRef = "consumerGatewayRef" PolicyTargetRefs = "targetRefs" GatewayClassIndexRef = "gatewayClassRef" + ApisixUpstreamRef = "apisixUpstreamRef" PluginConfigIndexRef = "pluginConfigRefs" ) @@ -96,8 +98,9 @@ func setupConsumerIndexer(mgr ctrl.Manager) error { func setupApisixRouteIndexer(mgr ctrl.Manager) error { var indexers = map[string]func(client.Object) []string{ - ServiceIndexRef: ApisixRouteServiceIndexFunc, - SecretIndexRef: ApisixRouteRouteSecretIndexFunc, + ServiceIndexRef: ApisixRouteServiceIndexFunc(mgr.GetClient()), + SecretIndexRef: ApisixRouteSecretIndexFunc(mgr.GetClient()), + ApisixUpstreamRef: ApisixRouteApisixUpstreamIndexFunc, PluginConfigIndexRef: ApisixRoutePluginConfigIndexFunc, } for key, f := range indexers { @@ -443,32 +446,90 @@ func HTTPRouteServiceIndexFunc(rawObj client.Object) []string { return keys } -func ApisixRouteServiceIndexFunc(obj client.Object) (keys []string) { - ar := obj.(*apiv2.ApisixRoute) - for _, http := range ar.Spec.HTTP { - for _, backend := range http.Backends { - keys = append(keys, GenIndexKey(ar.GetNamespace(), backend.ServiceName)) +func ApisixRouteServiceIndexFunc(cli client.Client) func(client.Object) []string { + return func(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, http := range ar.Spec.HTTP { + // service reference in .backends + for _, backend := range http.Backends { + keys = append(keys, GenIndexKey(ar.GetNamespace(), backend.ServiceName)) + } + // service reference in .upstreams + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ( + au apiv2.ApisixUpstream + auNN = types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstream.Name, + } + ) + if err := cli.Get(context.Background(), auNN, &au); err != nil { + continue + } + for _, node := range au.Spec.ExternalNodes { + if node.Type == apiv2.ExternalTypeService && node.Name != "" { + keys = append(keys, GenIndexKey(au.GetNamespace(), node.Name)) + } + } + } } + for _, stream := range ar.Spec.Stream { + keys = append(keys, GenIndexKey(ar.GetNamespace(), stream.Backend.ServiceName)) + } + return keys } - for _, stream := range ar.Spec.Stream { - keys = append(keys, GenIndexKey(ar.GetNamespace(), stream.Backend.ServiceName)) - } - return } -func ApisixRouteRouteSecretIndexFunc(obj client.Object) (keys []string) { - ar := obj.(*apiv2.ApisixRoute) - for _, http := range ar.Spec.HTTP { - for _, plugin := range http.Plugins { - if plugin.Enable && plugin.SecretRef != "" { - keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) +func ApisixRouteSecretIndexFunc(cli client.Client) func(client.Object) []string { + return func(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, http := range ar.Spec.HTTP { + // secret reference in .plugins + for _, plugin := range http.Plugins { + if plugin.Enable && plugin.SecretRef != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) + } + } + // secret reference in .upstreams + for _, upstream := range http.Upstreams { + if upstream.Name == "" { + continue + } + var ( + au apiv2.ApisixUpstream + auNN = types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstream.Name, + } + ) + if err := cli.Get(context.Background(), auNN, &au); err != nil { + continue + } + if secret := au.Spec.TLSSecret; secret != nil && secret.Name != "" { + keys = append(keys, GenIndexKey(cmp.Or(secret.Namespace, au.GetNamespace()), secret.Name)) + } + } + } + for _, stream := range ar.Spec.Stream { + for _, plugin := range stream.Plugins { + if plugin.Enable && plugin.SecretRef != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) + } } } + return keys } - for _, stream := range ar.Spec.Stream { - for _, plugin := range stream.Plugins { - if plugin.Enable && plugin.SecretRef != "" { - keys = append(keys, GenIndexKey(ar.GetNamespace(), plugin.SecretRef)) +} + +func ApisixRouteApisixUpstreamIndexFunc(obj client.Object) (keys []string) { + ar := obj.(*apiv2.ApisixRoute) + for _, rule := range ar.Spec.HTTP { + for _, upstream := range rule.Upstreams { + if upstream.Name != "" { + keys = append(keys, GenIndexKey(ar.GetNamespace(), upstream.Name)) } } } diff --git a/internal/provider/adc/translator/apisixroute.go b/internal/provider/adc/translator/apisixroute.go index 6936cdda6..37ecf7f3b 100644 --- a/internal/provider/adc/translator/apisixroute.go +++ b/internal/provider/adc/translator/apisixroute.go @@ -16,7 +16,9 @@ import ( "cmp" "encoding/json" "fmt" + "strconv" + "github.com/api7/gopkg/pkg/log" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -54,13 +56,9 @@ func (t *Translator) translateHTTPRule(tctx *provider.TranslateContext, ar *apiv return nil, err } - route := t.buildRoute(ar, rule, plugins, timeout, vars) - upstream, backendErr := t.buildUpstream(tctx, ar, rule) - service := t.buildService(ar, rule, ruleIndex, route, upstream) - - if backendErr != nil && len(upstream.Nodes) == 0 { - t.addFaultInjectionPlugin(service) - } + service := t.buildService(ar, rule, ruleIndex) + t.buildRoute(ar, service, rule, plugins, timeout, vars) + t.buildUpstream(tctx, service, ar, rule) return service, nil } @@ -167,7 +165,7 @@ func (t *Translator) addAuthenticationPlugins(rule apiv2.ApisixRouteHTTP, plugin } } -func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars) *adc.Route { +func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, service *adc.Service, rule apiv2.ApisixRouteHTTP, plugins adc.Plugins, timeout *adc.Timeout, vars adc.Vars) { route := adc.NewDefaultRoute() route.Name = adc.ComposeRouteName(ar.Namespace, ar.Name, rule.Name) route.ID = id.GenID(route.Name) @@ -183,12 +181,20 @@ func (t *Translator) buildRoute(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTT route.Timeout = timeout route.Uris = rule.Match.Paths route.Vars = vars - return route + for key, value := range ar.GetObjectMeta().GetLabels() { + route.Labels[key] = value + } + + service.Routes = []*adc.Route{route} } -func (t *Translator) buildUpstream(tctx *provider.TranslateContext, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP) (*adc.Upstream, error) { - upstream := adc.NewDefaultUpstream() - var backendErr error +func (t *Translator) buildUpstream(tctx *provider.TranslateContext, service *adc.Service, ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP) { + var ( + upstream = adc.NewDefaultUpstream() + upstreams = make([]*adc.Upstream, 0) + weightedUpstreams = make([]adc.TrafficSplitConfigRuleWeightedUpstream, 0) + backendErr error + ) for _, backend := range rule.Backends { var upNodes adc.UpstreamNodes @@ -208,22 +214,78 @@ func (t *Translator) buildUpstream(tctx *provider.TranslateContext, ar *apiv2.Ap upstream.Nodes = append(upstream.Nodes, upNodes...) } - //nolint:staticcheck - if len(rule.Backends) == 0 && len(rule.Upstreams) > 0 { - // FIXME: when the API ApisixUpstream is supported + for _, upstreamRef := range rule.Upstreams { + upsNN := types.NamespacedName{ + Namespace: ar.GetNamespace(), + Name: upstreamRef.Name, + } + au, ok := tctx.Upstreams[upsNN] + if !ok { + log.Debugf("failed to retrieve ApisixUpstream from tctx, ApisixUpstream: %s", upsNN) + continue + } + upstream, err := t.translateApisixUpstream(tctx, au) + if err != nil { + t.Log.Error(err, "failed to translate ApisixUpstream", "ApisixUpstream", utils.NamespacedName(au)) + continue + } + if upstreamRef.Weight != nil { + upstream.Labels["meta_weight"] = strconv.FormatInt(int64(*upstreamRef.Weight), 10) + } + + upstreams = append(upstreams, upstream) + } + + // If no .http[].backends is used and only .http[].upstreams is used, the first valid upstream is used as service.upstream; + // Other upstreams are configured in the traffic-split plugin + if len(rule.Backends) == 0 && len(upstreams) > 0 { + upstream = upstreams[0] + upstreams = upstreams[1:] + } + + // set the default upstream's weight in traffic-split + weight, err := strconv.Atoi(upstream.Labels["meta_weight"]) + if err != nil { + weight = apiv2.DefaultWeight + } + weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{ + Weight: weight, + }) + + // set others upstreams in traffic-split + for _, item := range upstreams { + weight, err := strconv.Atoi(item.Labels["meta_weight"]) + if err != nil { + weight = apiv2.DefaultWeight + } + weightedUpstreams = append(weightedUpstreams, adc.TrafficSplitConfigRuleWeightedUpstream{ + Upstream: item, + Weight: weight, + }) } - return upstream, backendErr + // set service + service.Upstream = upstream + if backendErr != nil && len(upstream.Nodes) == 0 { + t.addFaultInjectionPlugin(service) + } + if len(weightedUpstreams) > 0 { + service.Plugins["traffic-split"] = &adc.TrafficSplitConfig{ + Rules: []adc.TrafficSplitConfigRule{ + { + WeightedUpstreams: weightedUpstreams, + }, + }, + } + } } -func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, ruleIndex int, route *adc.Route, upstream *adc.Upstream) *adc.Service { +func (t *Translator) buildService(ar *apiv2.ApisixRoute, rule apiv2.ApisixRouteHTTP, ruleIndex int) *adc.Service { service := adc.NewDefaultService() service.Name = adc.ComposeServiceNameWithRule(ar.Namespace, ar.Name, fmt.Sprintf("%d", ruleIndex)) service.ID = id.GenID(service.Name) service.Labels = label.GenLabel(ar) service.Hosts = rule.Match.Hosts - service.Upstream = upstream - service.Routes = []*adc.Route{route} return service } diff --git a/internal/provider/adc/translator/apisixupstream.go b/internal/provider/adc/translator/apisixupstream.go new file mode 100644 index 000000000..34b6f24f6 --- /dev/null +++ b/internal/provider/adc/translator/apisixupstream.go @@ -0,0 +1,239 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "cmp" + "fmt" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/apache/apisix-ingress-controller/api/adc" + apiv2 "github.com/apache/apisix-ingress-controller/api/v2" + "github.com/apache/apisix-ingress-controller/internal/provider" + "github.com/apache/apisix-ingress-controller/internal/utils" +) + +func (t *Translator) translateApisixUpstream(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream) (ups *adc.Upstream, err error) { + if len(au.Spec.ExternalNodes) == 0 && au.Spec.Discovery == nil { + return nil, errors.Errorf("%s has empty externalNodes or discovery configuration", utils.NamespacedName(au)) + } + + ups = adc.NewDefaultUpstream() + for _, f := range []func(*apiv2.ApisixUpstream, *adc.Upstream) error{ + translateApisixUpstreamScheme, + translateApisixUpstreamLoadBalancer, + translateApisixUpstreamHealthCheck, + translateApisixUpstreamRetriesAndTimeout, + translateApisixUpstreamClientTLS, + translateApisixUpstreamPassHost, + translateApisixUpstreamDiscovery, + } { + if err = f(au, ups); err != nil { + return + } + } + for _, f := range []func(*provider.TranslateContext, *apiv2.ApisixUpstream, *adc.Upstream) error{ + translateApisixUpstreamExternalNodes, + } { + if err = f(tctx, au, ups); err != nil { + return + } + } + + return +} + +func translateApisixUpstreamScheme(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + switch au.Spec.Scheme { + case apiv2.SchemeHTTP, apiv2.SchemeHTTPS, apiv2.SchemeGRPC, apiv2.SchemeGRPCS: + ups.Scheme = au.Spec.Scheme + default: + ups.Scheme = apiv2.SchemeHTTP + } + return nil +} + +func translateApisixUpstreamLoadBalancer(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + lb := au.Spec.LoadBalancer + if lb == nil || lb.Type == "" { + ups.Type = apiv2.LbRoundRobin + return nil + } + switch lb.Type { + case apiv2.LbRoundRobin, apiv2.LbLeastConn, apiv2.LbEwma: + ups.Type = adc.UpstreamType(lb.Type) + case apiv2.LbConsistentHash: + ups.Type = adc.UpstreamType(lb.Type) + ups.Key = lb.Key + switch lb.HashOn { + case apiv2.HashOnVars: + fallthrough + case apiv2.HashOnHeader: + fallthrough + case apiv2.HashOnCookie: + fallthrough + case apiv2.HashOnConsumer: + fallthrough + case apiv2.HashOnVarsCombination: + ups.HashOn = lb.HashOn + default: + return errors.New("invalid hashOn") + } + default: + return errors.New("invalid loadBalancer type") + } + return nil +} + +func translateApisixUpstreamHealthCheck(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + // todo: handle `.Checks` in next PR + return nil +} + +func translateApisixUpstreamRetriesAndTimeout(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + retries := au.Spec.Retries + timeout := au.Spec.Timeout + + if retries != nil && *retries < 0 { + return errors.New("invalid value retries") + } + ups.Retries = retries + + if timeout == nil { + return nil + } + if timeout.Connect.Duration < 0 { + return errors.New("invalid value timeout.connect") + } + if timeout.Read.Duration < 0 { + return errors.New("invalid value timeout.read") + } + if timeout.Send.Duration < 0 { + return errors.New("invalid value timeout.send") + } + + // Since the schema of timeout doesn't allow only configuring + // one or two items. Here we assign the default value first. + connTimeout := cmp.Or(timeout.Connect.Duration, apiv2.DefaultUpstreamTimeout) + readTimeout := cmp.Or(timeout.Read.Duration, apiv2.DefaultUpstreamTimeout) + sendTimeout := cmp.Or(timeout.Send.Duration, apiv2.DefaultUpstreamTimeout) + + ups.Timeout = &adc.Timeout{ + Connect: int(connTimeout.Seconds()), + Read: int(readTimeout.Seconds()), + Send: int(sendTimeout.Seconds()), + } + + return nil +} + +func translateApisixUpstreamClientTLS(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + // todo: handle `.TLS` in next PR + return nil +} + +func translateApisixUpstreamPassHost(au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + switch passHost := au.Spec.PassHost; passHost { + case apiv2.PassHostPass, apiv2.PassHostNode, apiv2.PassHostRewrite: + ups.PassHost = passHost + default: + ups.PassHost = "" + } + + ups.UpstreamHost = au.Spec.UpstreamHost + + return nil +} + +func translateApisixUpstreamDiscovery(upstream *apiv2.ApisixUpstream, upstream2 *adc.Upstream) error { + // todo: handle `.Discovery*` in next PR + return nil +} + +func translateApisixUpstreamExternalNodes(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream) error { + for _, node := range au.Spec.ExternalNodes { + switch node.Type { + case apiv2.ExternalTypeDomain: + if err := translateApisixUpstreamExternalNodesDomain(au, ups, node); err != nil { + return err + } + default: // apiv2.ExternalTypeService or default + if err := translateApisixUpstreamExternalNodesService(tctx, au, ups, node); err != nil { + return err + } + } + } + + return nil +} +func translateApisixUpstreamExternalNodesDomain(au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { + weight := apiv2.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + + if !utils.MatchHostDef(node.Name) { + return fmt.Errorf("ApisixUpstream %s/%s ExternalNodes[]'s name %s as Domain must match lowercase RFC 1123 subdomain. "+ + "a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character", + au.Namespace, au.Name, node.Name) + } + + n := adc.UpstreamNode{ + Host: node.Name, + Weight: weight, + } + + if node.Port != nil { + n.Port = *node.Port + } else { + n.Port = apiv2.SchemeToPort(au.Spec.Scheme) + } + + ups.Nodes = append(ups.Nodes, n) + + return nil +} + +func translateApisixUpstreamExternalNodesService(tctx *provider.TranslateContext, au *apiv2.ApisixUpstream, ups *adc.Upstream, node apiv2.ApisixUpstreamExternalNode) error { + serviceNN := types.NamespacedName{Namespace: au.GetNamespace(), Name: node.Name} + svc, ok := tctx.Services[serviceNN] + if !ok { + return errors.Errorf("service not found, service: %s", serviceNN) + } + + if svc.Spec.Type != corev1.ServiceTypeExternalName { + return errors.Errorf("ApisixUpstream %s ExternalNodes[] must refers to a ExternalName service: %s", utils.NamespacedName(au), node.Name) + } + + weight := apiv2.DefaultWeight + if node.Weight != nil { + weight = *node.Weight + } + n := adc.UpstreamNode{ + Host: svc.Spec.ExternalName, + Weight: weight, + } + + if node.Port != nil { + n.Port = *node.Port + } else { + n.Port = apiv2.SchemeToPort(au.Spec.Scheme) + } + + ups.Nodes = append(ups.Nodes, n) + + return nil +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 6aef882a3..db52a3826 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -47,6 +47,7 @@ type TranslateContext struct { ApisixPluginConfigs map[k8stypes.NamespacedName]*apiv2.ApisixPluginConfig Services map[k8stypes.NamespacedName]*corev1.Service BackendTrafficPolicies map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy + Upstreams map[k8stypes.NamespacedName]*apiv2.ApisixUpstream GatewayProxies map[types.NamespacedNameKind]v1alpha1.GatewayProxy ResourceParentRefs map[types.NamespacedNameKind][]types.NamespacedNameKind HTTPRoutePolicies []v1alpha1.HTTPRoutePolicy @@ -62,6 +63,7 @@ func NewDefaultTranslateContext(ctx context.Context) *TranslateContext { PluginConfigs: make(map[k8stypes.NamespacedName]*v1alpha1.PluginConfig), ApisixPluginConfigs: make(map[k8stypes.NamespacedName]*apiv2.ApisixPluginConfig), Services: make(map[k8stypes.NamespacedName]*corev1.Service), + Upstreams: make(map[k8stypes.NamespacedName]*apiv2.ApisixUpstream), BackendTrafficPolicies: make(map[k8stypes.NamespacedName]*v1alpha1.BackendTrafficPolicy), GatewayProxies: make(map[types.NamespacedNameKind]v1alpha1.GatewayProxy), ResourceParentRefs: make(map[types.NamespacedNameKind][]types.NamespacedNameKind), diff --git a/internal/utils/k8s.go b/internal/utils/k8s.go index 7010a4655..7cc3767d5 100644 --- a/internal/utils/k8s.go +++ b/internal/utils/k8s.go @@ -14,6 +14,7 @@ package utils import ( "net" + "regexp" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -47,3 +48,15 @@ func ValidateRemoteAddrs(remoteAddrs []string) error { } return nil } + +var hostDef = "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" +var hostDefRegex = regexp.MustCompile(hostDef) + +// MatchHostDef checks that host matches host's schema +// ref to : https://github.com/apache/apisix/blob/c5fc10d9355a0c177a7532f01c77745ff0639a7f/apisix/schema_def.lua#L40 +// ref to : https://github.com/kubernetes/kubernetes/blob/976a940f4a4e84fe814583848f97b9aafcdb083f/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L205 +// They define regex differently, but k8s's dns is more accurate +// todo: validate by CRD definition +func MatchHostDef(host string) bool { + return hostDefRegex.MatchString(host) +} diff --git a/test/e2e/apisix/route.go b/test/e2e/apisix/route.go index 25e3863f7..6c08536a1 100644 --- a/test/e2e/apisix/route.go +++ b/test/e2e/apisix/route.go @@ -35,20 +35,20 @@ var _ = Describe("Test ApisixRoute", func() { applier = framework.NewApplier(s.GinkgoT, s.K8sClient, s.CreateResourceFromString) ) - Context("Test ApisixRoute", func() { - BeforeEach(func() { - By("create GatewayProxy") - gatewayProxy := fmt.Sprintf(gatewayProxyYaml, s.Deployer.GetAdminEndpoint(), s.AdminKey()) - err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") - Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") - time.Sleep(5 * time.Second) + BeforeEach(func() { + By("create GatewayProxy") + gatewayProxy := fmt.Sprintf(gatewayProxyYaml, s.Deployer.GetAdminEndpoint(), s.AdminKey()) + err := s.CreateResourceFromStringWithNamespace(gatewayProxy, "default") + Expect(err).NotTo(HaveOccurred(), "creating GatewayProxy") + time.Sleep(5 * time.Second) - By("create IngressClass") - err = s.CreateResourceFromStringWithNamespace(ingressClassYaml, "") - Expect(err).NotTo(HaveOccurred(), "creating IngressClass") - time.Sleep(5 * time.Second) - }) + By("create IngressClass") + err = s.CreateResourceFromStringWithNamespace(ingressClassYaml, "") + Expect(err).NotTo(HaveOccurred(), "creating IngressClass") + time.Sleep(5 * time.Second) + }) + Context("Test ApisixRoute", func() { It("Basic tests", func() { const apisixRouteSpec = ` apiVersion: apisix.apache.org/v2 @@ -295,11 +295,154 @@ spec: // ApisixUpstream is not implemented yet. // So the case is pending for now }) + }) - PIt("Test ApisixRoute reference ApisixUpstream", func() { - // This case depends on ApisixUpstream. - // ApisixUpstream is not implemented yet. - // So the case is pending for now. + Context("Test ApisixRoute reference ApisixUpstream", func() { + + It("Test reference ApisixUpstream", func() { + const apisixRouteSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + paths: + - /* + upstreams: + - name: default-upstream +` + const apisixUpstreamSpec0 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Service + name: httpbin-service-e2e-test +` + const apisixUpstreamSpec1 = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Service + name: alias-httpbin-service-e2e-test +` + const serviceSpec = ` +apiVersion: v1 +kind: Service +metadata: + name: alias-httpbin-service-e2e-test +spec: + type: ExternalName + externalName: httpbin-service-e2e-test +` + By("create Service, ApisixUpstream and ApisixRoute") + err := s.CreateResourceFromString(serviceSpec) + Expect(err).ShouldNot(HaveOccurred(), "apply service") + err = s.CreateResourceFromString(apisixUpstreamSpec0) + Expect(err).ShouldNot(HaveOccurred(), "apply apisixUpstreamSpec") + + var apisxiRoute apiv2.ApisixRoute + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisxiRoute, apisixRouteSpec) + + By("verify that the ApisixUpstream reference a Service which is not ExternalName should not request OK") + request := func(path string) int { + return s.NewAPISIXClient().GET(path).WithHost("httpbin").Expect().Raw().StatusCode + } + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusServiceUnavailable)) + + By("verify that ApisixUpstream reference a Service which is ExternalName should request OK") + err = s.CreateResourceFromString(apisixUpstreamSpec1) + Expect(err).ShouldNot(HaveOccurred(), "update apisixUpstream") + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + }) + + It("Test a Mix of Backends and Upstreams", func() { + // apisixUpstreamSpec is an ApisixUpstream reference to the Service httpbin-service-e2e-test + const apisixUpstreamSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixUpstream +metadata: + name: default-upstream +spec: + ingressClassName: apisix + externalNodes: + - type: Domain + name: httpbin-service-e2e-test + passHost: node +` + // apisixRouteSpec is an ApisixUpstream uses a backend and reference an upstream. + // It contains a plugin response-rewrite that lets us know what upstream the gateway forwards the request to. + const apisixRouteSpec = ` +apiVersion: apisix.apache.org/v2 +kind: ApisixRoute +metadata: + name: default +spec: + ingressClassName: apisix + http: + - name: rule0 + match: + paths: + - /* + backends: + - serviceName: httpbin-service-e2e-test + servicePort: 80 + upstreams: + - name: default-upstream + plugins: + - name: response-rewrite + enable: true + config: + headers: + set: + "X-Upstream-Host": "$upstream_addr" +` + By("apply ApisixUpstream") + err := s.CreateResourceFromString(apisixUpstreamSpec) + Expect(err).ShouldNot(HaveOccurred(), "apply ApisixUpstream") + + By("apply ApisixRoute") + var apisixRoute apiv2.ApisixRoute + applier.MustApplyAPIv2(types.NamespacedName{Namespace: s.Namespace(), Name: "default"}, &apisixRoute, apisixRouteSpec) + + By("verify ApisixRoute works") + request := func(path string) int { + return s.NewAPISIXClient().GET(path).Expect().Raw().StatusCode + } + Eventually(request).WithArguments("/get").WithTimeout(8 * time.Second).ProbeEvery(time.Second).Should(Equal(http.StatusOK)) + + By("verify the backends and the upstreams work commonly") + // .backends -> Service httpbin-service-e2e-test -> Endpoint httpbin-service-e2e-test, so the $upstream_addr value we get is the Endpoint IP. + // .upstreams -> Service httpbin-service-e2e-test, so the $upstream_addr value we get is the Service ClusterIP. + var upstreamAddrs = make(map[string]struct{}) + for range 10 { + upstreamAddr := s.NewAPISIXClient().GET("/get").Expect().Raw().Header.Get("X-Upstream-Host") + upstreamAddrs[upstreamAddr] = struct{}{} + } + + endpoints, err := s.GetServiceEndpoints(types.NamespacedName{Namespace: s.Namespace(), Name: "httpbin-service-e2e-test"}) + Expect(err).ShouldNot(HaveOccurred(), "get endpoints") + Expect(endpoints).Should(HaveLen(1)) + endpoint := net.JoinHostPort(endpoints[0], "80") + + service, err := s.GetServiceByName("httpbin-service-e2e-test") + Expect(err).ShouldNot(HaveOccurred(), "get service") + clusterIP := net.JoinHostPort(service.Spec.ClusterIP, "80") + + Expect(upstreamAddrs).Should(HaveLen(2)) + Eventually(upstreamAddrs).Should(HaveKey(endpoint)) + Eventually(upstreamAddrs).Should(HaveKey(clusterIP)) }) }) }) diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index 3bb2ea6fd..61d9db7d8 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -15,6 +15,7 @@ package framework import ( "bufio" "bytes" + "cmp" "context" "encoding/json" "fmt" @@ -33,6 +34,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1" @@ -108,8 +110,8 @@ func (f *Framework) ensureServiceWithTimeout(name, namespace string, desiredEndp return nil } -func (f *Framework) GetServiceEndpoints(name string) ([]string, error) { - ep, err := f.clientset.CoreV1().Endpoints(_namespace).Get(f.Context, name, metav1.GetOptions{}) +func (f *Framework) GetServiceEndpoints(nn types.NamespacedName) ([]string, error) { + ep, err := f.clientset.CoreV1().Endpoints(cmp.Or(nn.Namespace, _namespace)).Get(f.Context, nn.Name, metav1.GetOptions{}) if err != nil { return nil, err }