diff --git a/agent/consul/discoverychain/gateway.go b/agent/consul/discoverychain/gateway.go index 3435233aafe..cec5334909b 100644 --- a/agent/consul/discoverychain/gateway.go +++ b/agent/consul/discoverychain/gateway.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/structs" ) @@ -24,6 +25,7 @@ type GatewayChainSynthesizer struct { hostname string matchesByHostname map[string][]hostnameMatch tcpRoutes []structs.TCPRouteConfigEntry + serviceRouters map[structs.ServiceName][]*structs.ServiceRoute } type hostnameMatch struct { @@ -113,7 +115,9 @@ func (l *GatewayChainSynthesizer) Synthesize(chains ...*structs.CompiledDiscover return nil, nil, fmt.Errorf("must provide at least one compiled discovery chain") } + l.serviceRouters = serviceRouterRulesFromChains(chains) services, set := l.synthesizeEntries() + resolverEntries := resolverEntriesFromChains(chains) if len(set) == 0 { // we can't actually compile a discovery chain, i.e. we're using a TCPRoute-based listener, instead, just return the ingresses @@ -125,6 +129,9 @@ func (l *GatewayChainSynthesizer) Synthesize(chains ...*structs.CompiledDiscover for i, service := range services { entries := set[i] + if len(resolverEntries) > 0 { + entries.AddResolvers(resolverEntries...) + } compiled, err := Compile(CompileRequest{ ServiceName: service.Name, @@ -192,6 +199,59 @@ func (l *GatewayChainSynthesizer) Synthesize(chains ...*structs.CompiledDiscover return services, compiledChains, nil } +// resolverEntriesFromChains builds minimal service-resolver entries that include +// subset definitions from the real compiled chains. This allows gateway +// synthesis to compile routes that reference service subsets. +func resolverEntriesFromChains(chains []*structs.CompiledDiscoveryChain) []*structs.ServiceResolverConfigEntry { + out := []*structs.ServiceResolverConfigEntry{} + seen := map[structs.ServiceID]*structs.ServiceResolverConfigEntry{} + + for _, chain := range chains { + if chain == nil { + continue + } + entMeta := acl.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace) + sid := structs.NewServiceID(chain.ServiceName, &entMeta) + + entry, ok := seen[sid] + if !ok { + entry = &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: chain.ServiceName, + EnterpriseMeta: sid.EnterpriseMeta, + Subsets: make(map[string]structs.ServiceResolverSubset), + } + seen[sid] = entry + } + + for _, target := range chain.Targets { + if target == nil { + continue + } + if target.Service != chain.ServiceName { + continue + } + if target.Namespace != chain.Namespace || target.Partition != chain.Partition { + continue + } + if target.ServiceSubset == "" { + continue + } + if _, ok := entry.Subsets[target.ServiceSubset]; !ok { + entry.Subsets[target.ServiceSubset] = target.Subset + } + } + } + + for _, entry := range seen { + if len(entry.Subsets) == 0 { + continue + } + out = append(out, entry) + } + return out +} + // consolidateHTTPRoutes combines all rules into the shortest possible list of routes // with one route per hostname containing all rules for that hostname. func (l *GatewayChainSynthesizer) consolidateHTTPRoutes() []structs.HTTPRouteConfigEntry { @@ -290,7 +350,7 @@ func (l *GatewayChainSynthesizer) synthesizeEntries() ([]structs.IngressService, entries := []*configentry.DiscoveryChainSet{} for _, route := range l.consolidateHTTPRoutes() { - ingress, router, splitters, defaults := synthesizeHTTPRouteDiscoveryChain(route) + ingress, router, splitters, defaults := synthesizeHTTPRouteDiscoveryChain(route, l.serviceRouters) services = append(services, ingress) @@ -307,3 +367,39 @@ func (l *GatewayChainSynthesizer) synthesizeEntries() ([]structs.IngressService, return services, entries } + +func serviceRouterRulesFromChains(chains []*structs.CompiledDiscoveryChain) map[structs.ServiceName][]*structs.ServiceRoute { + out := make(map[structs.ServiceName][]*structs.ServiceRoute) + for _, chain := range chains { + if chain == nil { + continue + } + startNode := chain.Nodes[chain.StartNode] + if startNode == nil || !startNode.IsRouter() { + continue + } + routes := make([]*structs.ServiceRoute, 0, len(startNode.Routes)) + for _, route := range startNode.Routes { + if route == nil || route.Definition == nil { + continue + } + def := route.Definition.DeepCopy() + // If the route destination doesn't include a subset, but the next + // resolver target does, propagate it. This ensures default-subset + // behavior is preserved when composing gateway routes. + if def.Destination != nil && def.Destination.ServiceSubset == "" { + if node := chain.Nodes[route.NextNode]; node != nil && node.IsResolver() && node.Resolver != nil { + if target := chain.Targets[node.Resolver.Target]; target != nil && target.ServiceSubset != "" { + def.Destination.ServiceSubset = target.ServiceSubset + } + } + } + routes = append(routes, def) + } + if len(routes) == 0 { + continue + } + out[chain.CompoundServiceName()] = routes + } + return out +} diff --git a/agent/consul/discoverychain/gateway_httproute.go b/agent/consul/discoverychain/gateway_httproute.go index c4816e02744..b824c6a510c 100644 --- a/agent/consul/discoverychain/gateway_httproute.go +++ b/agent/consul/discoverychain/gateway_httproute.go @@ -5,10 +5,13 @@ package discoverychain import ( "fmt" + "strings" "github.com/hashicorp/consul/agent/structs" ) +const maxComposedRoutes = 256 + // compareHTTPRules implements the non-hostname order of precedence for routes specified by the K8s Gateway API spec. // https://gateway-api.sigs.k8s.io/v1alpha2/references/spec/#gateway.networking.k8s.io/v1alpha2.HTTPRouteRule // @@ -42,12 +45,12 @@ func httpServiceDefault(entry structs.ConfigEntry, meta map[string]string) *stru } } -func synthesizeHTTPRouteDiscoveryChain(route structs.HTTPRouteConfigEntry) (structs.IngressService, *structs.ServiceRouterConfigEntry, []*structs.ServiceSplitterConfigEntry, []*structs.ServiceConfigEntry) { +func synthesizeHTTPRouteDiscoveryChain(route structs.HTTPRouteConfigEntry, serviceRouters map[structs.ServiceName][]*structs.ServiceRoute) (structs.IngressService, *structs.ServiceRouterConfigEntry, []*structs.ServiceSplitterConfigEntry, []*structs.ServiceConfigEntry) { meta := route.GetMeta() splitters := []*structs.ServiceSplitterConfigEntry{} defaults := []*structs.ServiceConfigEntry{} - router, splits, upstreamDefaults := httpRouteToDiscoveryChain(route) + router, splits, upstreamDefaults := httpRouteToDiscoveryChain(route, serviceRouters) serviceDefault := httpServiceDefault(router, meta) defaults = append(defaults, serviceDefault) for _, split := range splits { @@ -68,7 +71,7 @@ func synthesizeHTTPRouteDiscoveryChain(route structs.HTTPRouteConfigEntry) (stru return ingress, router, splitters, defaults } -func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.ServiceRouterConfigEntry, []*structs.ServiceSplitterConfigEntry, []*structs.ServiceConfigEntry) { +func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry, serviceRouters map[structs.ServiceName][]*structs.ServiceRoute) (*structs.ServiceRouterConfigEntry, []*structs.ServiceSplitterConfigEntry, []*structs.ServiceConfigEntry) { router := &structs.ServiceRouterConfigEntry{ Kind: structs.ServiceRouter, Name: route.GetName(), @@ -123,6 +126,55 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser Protocol: "http", EnterpriseMeta: service.EnterpriseMeta, }) + + applyHTTPRouteFilters(&destination, rule) + + httpMatches := rule.Matches + if len(httpMatches) == 0 { + httpMatches = []structs.HTTPMatch{{ + Path: structs.HTTPPathMatch{ + Match: structs.HTTPPathMatchPrefix, + Value: "/", + }, + }} + } + + serviceRouterRoutes := lookupServiceRouterRules(serviceRouters, service) + if shouldComposeServiceRouter(httpMatches, serviceRouterRoutes) { + for _, match := range httpMatches { + httpMatch := &structs.ServiceRouteMatch{HTTP: httpRouteMatchToServiceRouteHTTPMatch(match)} + composed := false + + for _, svcRoute := range serviceRouterRoutes { + mergedMatch, ok := mergeServiceRouteMatch(httpMatch, svcRoute.Match) + if !ok { + continue + } + mergedDest := mergeServiceRouteDestination(&destination, svcRoute.Destination) + router.Routes = append(router.Routes, structs.ServiceRoute{ + Match: mergedMatch, + Destination: mergedDest, + }) + composed = true + } + + if !composed { + router.Routes = append(router.Routes, structs.ServiceRoute{ + Match: httpMatch, + Destination: &destination, + }) + } + } + } else { + for _, match := range httpMatches { + router.Routes = append(router.Routes, structs.ServiceRoute{ + Match: &structs.ServiceRouteMatch{HTTP: httpRouteMatchToServiceRouteHTTPMatch(match)}, + Destination: &destination, + }) + } + } + + continue } else { // create a virtual service to split destination.Service = fmt.Sprintf("%s-%d", route.GetName(), idx) @@ -176,24 +228,7 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser } } - if rule.Filters.RetryFilter != nil { - - destination.NumRetries = rule.Filters.RetryFilter.NumRetries - destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure - - if len(rule.Filters.RetryFilter.RetryOn) > 0 { - destination.RetryOn = rule.Filters.RetryFilter.RetryOn - } - - if len(rule.Filters.RetryFilter.RetryOnStatusCodes) > 0 { - destination.RetryOnStatusCodes = rule.Filters.RetryFilter.RetryOnStatusCodes - } - } - - if rule.Filters.TimeoutFilter != nil { - destination.IdleTimeout = rule.Filters.TimeoutFilter.IdleTimeout - destination.RequestTimeout = rule.Filters.TimeoutFilter.RequestTimeout - } + applyHTTPRouteFilters(&destination, rule) // for each match rule a ServiceRoute is created for the service-router // if there are no rules a single route with the destination is set @@ -213,6 +248,350 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser return router, splitters, defaults } +func applyHTTPRouteFilters(destination *structs.ServiceRouteDestination, rule structs.HTTPRouteRule) { + if rule.Filters.RetryFilter != nil { + destination.NumRetries = rule.Filters.RetryFilter.NumRetries + destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure + + if len(rule.Filters.RetryFilter.RetryOn) > 0 { + destination.RetryOn = rule.Filters.RetryFilter.RetryOn + } + + if len(rule.Filters.RetryFilter.RetryOnStatusCodes) > 0 { + destination.RetryOnStatusCodes = rule.Filters.RetryFilter.RetryOnStatusCodes + } + } + + if rule.Filters.TimeoutFilter != nil { + destination.IdleTimeout = rule.Filters.TimeoutFilter.IdleTimeout + destination.RequestTimeout = rule.Filters.TimeoutFilter.RequestTimeout + } +} + +func lookupServiceRouterRules(serviceRouters map[structs.ServiceName][]*structs.ServiceRoute, service structs.HTTPService) []*structs.ServiceRoute { + if len(serviceRouters) == 0 { + return nil + } + return serviceRouters[service.ServiceName()] +} + +func shouldComposeServiceRouter(httpMatches []structs.HTTPMatch, serviceRoutes []*structs.ServiceRoute) bool { + if len(serviceRoutes) == 0 { + return false + } + if len(httpMatches) == 0 { + return true + } + if len(serviceRoutes) > maxComposedRoutes/len(httpMatches) { + return false + } + return true +} + +func mergeServiceRouteMatch(httpMatch, svcMatch *structs.ServiceRouteMatch) (*structs.ServiceRouteMatch, bool) { + if httpMatch == nil || httpMatch.IsEmpty() { + return cloneServiceRouteMatch(svcMatch), true + } + if svcMatch == nil || svcMatch.IsEmpty() { + return cloneServiceRouteMatch(httpMatch), true + } + + mergedHTTP, ok := mergeServiceRouteHTTPMatch(httpMatch.HTTP, svcMatch.HTTP) + if !ok { + return nil, false + } + return &structs.ServiceRouteMatch{HTTP: mergedHTTP}, true +} + +func mergeServiceRouteHTTPMatch(a, b *structs.ServiceRouteHTTPMatch) (*structs.ServiceRouteHTTPMatch, bool) { + if a == nil || a.IsEmpty() { + return cloneServiceRouteHTTPMatch(b), true + } + if b == nil || b.IsEmpty() { + return cloneServiceRouteHTTPMatch(a), true + } + + merged := cloneServiceRouteHTTPMatch(a) + if merged == nil { + merged = &structs.ServiceRouteHTTPMatch{} + } + + path, ok := mergePathMatch(a, b) + if !ok { + return nil, false + } + merged.PathExact = path.pathExact + merged.PathPrefix = path.pathPrefix + merged.PathRegex = path.pathRegex + merged.CaseInsensitive = a.CaseInsensitive && b.CaseInsensitive + + merged.Header = append(append([]structs.ServiceRouteHTTPMatchHeader{}, a.Header...), b.Header...) + merged.QueryParam = append(append([]structs.ServiceRouteHTTPMatchQueryParam{}, a.QueryParam...), b.QueryParam...) + + merged.Methods = mergeHTTPMethods(a.Methods, b.Methods) + if len(merged.Methods) == 0 && len(a.Methods) > 0 && len(b.Methods) > 0 { + return nil, false + } + + return merged, true +} + +type mergedPath struct { + pathExact string + pathPrefix string + pathRegex string +} + +func mergePathMatch(a, b *structs.ServiceRouteHTTPMatch) (mergedPath, bool) { + aPath, aOK := extractPathMatch(a) + bPath, bOK := extractPathMatch(b) + + if !aOK && !bOK { + return mergedPath{}, true + } + if !aOK { + return mergedPath{pathExact: b.PathExact, pathPrefix: b.PathPrefix, pathRegex: b.PathRegex}, true + } + if !bOK { + return mergedPath{pathExact: a.PathExact, pathPrefix: a.PathPrefix, pathRegex: a.PathRegex}, true + } + + switch aPath.kind { + case "exact": + switch bPath.kind { + case "exact": + if aPath.value != bPath.value { + return mergedPath{}, false + } + return mergedPath{pathExact: aPath.value}, true + case "prefix": + if strings.HasPrefix(aPath.value, bPath.value) { + return mergedPath{pathExact: aPath.value}, true + } + return mergedPath{}, false + case "regex": + if aPath.value == bPath.value { + return mergedPath{pathExact: aPath.value}, true + } + return mergedPath{}, false + } + case "prefix": + switch bPath.kind { + case "exact": + if strings.HasPrefix(bPath.value, aPath.value) { + return mergedPath{pathExact: bPath.value}, true + } + return mergedPath{}, false + case "prefix": + if strings.HasPrefix(aPath.value, bPath.value) { + return mergedPath{pathPrefix: aPath.value}, true + } + if strings.HasPrefix(bPath.value, aPath.value) { + return mergedPath{pathPrefix: bPath.value}, true + } + return mergedPath{}, false + case "regex": + if aPath.value == bPath.value { + return mergedPath{pathPrefix: aPath.value}, true + } + return mergedPath{}, false + } + case "regex": + switch bPath.kind { + case "regex": + if aPath.value != bPath.value { + return mergedPath{}, false + } + return mergedPath{pathRegex: aPath.value}, true + case "exact": + if aPath.value == bPath.value { + return mergedPath{pathExact: bPath.value}, true + } + return mergedPath{}, false + case "prefix": + if aPath.value == bPath.value { + return mergedPath{pathPrefix: bPath.value}, true + } + return mergedPath{}, false + } + } + + return mergedPath{}, false +} + +type pathMatch struct { + kind string + value string +} + +func extractPathMatch(m *structs.ServiceRouteHTTPMatch) (pathMatch, bool) { + if m == nil { + return pathMatch{}, false + } + switch { + case m.PathExact != "": + return pathMatch{kind: "exact", value: m.PathExact}, true + case m.PathPrefix != "": + return pathMatch{kind: "prefix", value: m.PathPrefix}, true + case m.PathRegex != "": + return pathMatch{kind: "regex", value: m.PathRegex}, true + default: + return pathMatch{}, false + } +} + +func mergeHTTPMethods(a, b []string) []string { + if len(a) == 0 { + return append([]string(nil), b...) + } + if len(b) == 0 { + return append([]string(nil), a...) + } + set := make(map[string]struct{}, len(a)) + for _, method := range a { + set[method] = struct{}{} + } + var out []string + for _, method := range b { + if _, ok := set[method]; ok { + out = append(out, method) + } + } + return out +} + +func mergeServiceRouteDestination(httpDest, svcDest *structs.ServiceRouteDestination) *structs.ServiceRouteDestination { + if svcDest == nil { + return cloneServiceRouteDestination(httpDest) + } + merged := cloneServiceRouteDestination(svcDest) + if httpDest == nil { + return merged + } + + if merged.Service == "" { + merged.Service = httpDest.Service + } + if merged.ServiceSubset == "" { + merged.ServiceSubset = httpDest.ServiceSubset + } + if merged.Namespace == "" { + merged.Namespace = httpDest.Namespace + } + if merged.Partition == "" { + merged.Partition = httpDest.Partition + } + + if httpDest.PrefixRewrite != "" { + merged.PrefixRewrite = httpDest.PrefixRewrite + } + if httpDest.RequestTimeout != 0 { + merged.RequestTimeout = httpDest.RequestTimeout + } + if httpDest.IdleTimeout != 0 { + merged.IdleTimeout = httpDest.IdleTimeout + } + if httpDest.NumRetries != 0 { + merged.NumRetries = httpDest.NumRetries + } + if httpDest.RetryOnConnectFailure { + merged.RetryOnConnectFailure = true + } + if len(httpDest.RetryOn) > 0 { + merged.RetryOn = httpDest.RetryOn + } + if len(httpDest.RetryOnStatusCodes) > 0 { + merged.RetryOnStatusCodes = httpDest.RetryOnStatusCodes + } + + merged.RequestHeaders = mergeHeaderModifiers(merged.RequestHeaders, httpDest.RequestHeaders) + merged.ResponseHeaders = mergeHeaderModifiers(merged.ResponseHeaders, httpDest.ResponseHeaders) + + return merged +} + +func mergeHeaderModifiers(base, overlay *structs.HTTPHeaderModifiers) *structs.HTTPHeaderModifiers { + if base == nil && overlay == nil { + return nil + } + if base == nil { + return cloneHeaderModifiers(overlay) + } + if overlay == nil { + return cloneHeaderModifiers(base) + } + + merged := &structs.HTTPHeaderModifiers{ + Add: make(map[string]string), + Set: make(map[string]string), + Remove: []string{}, + } + + for k, v := range base.Add { + merged.Add[k] = v + } + for k, v := range base.Set { + merged.Set[k] = v + } + merged.Remove = append(merged.Remove, base.Remove...) + + for k, v := range overlay.Add { + merged.Add[k] = v + } + for k, v := range overlay.Set { + merged.Set[k] = v + } + merged.Remove = append(merged.Remove, overlay.Remove...) + + return merged +} + +func cloneServiceRouteMatch(in *structs.ServiceRouteMatch) *structs.ServiceRouteMatch { + if in == nil { + return nil + } + return &structs.ServiceRouteMatch{HTTP: cloneServiceRouteHTTPMatch(in.HTTP)} +} + +func cloneServiceRouteHTTPMatch(in *structs.ServiceRouteHTTPMatch) *structs.ServiceRouteHTTPMatch { + if in == nil { + return nil + } + out := *in + out.Header = append([]structs.ServiceRouteHTTPMatchHeader(nil), in.Header...) + out.QueryParam = append([]structs.ServiceRouteHTTPMatchQueryParam(nil), in.QueryParam...) + out.Methods = append([]string(nil), in.Methods...) + return &out +} + +func cloneServiceRouteDestination(in *structs.ServiceRouteDestination) *structs.ServiceRouteDestination { + if in == nil { + return nil + } + out := *in + out.RequestHeaders = cloneHeaderModifiers(in.RequestHeaders) + out.ResponseHeaders = cloneHeaderModifiers(in.ResponseHeaders) + return &out +} + +func cloneHeaderModifiers(in *structs.HTTPHeaderModifiers) *structs.HTTPHeaderModifiers { + if in == nil { + return nil + } + out := &structs.HTTPHeaderModifiers{ + Add: make(map[string]string), + Set: make(map[string]string), + Remove: append([]string(nil), in.Remove...), + } + for k, v := range in.Add { + out.Add[k] = v + } + for k, v := range in.Set { + out.Set[k] = v + } + return out +} + func httpRouteFiltersToDestinationPrefixRewrite(rewrite *structs.URLRewrite) string { if rewrite == nil { return "" diff --git a/agent/proxycfg/api_gateway.go b/agent/proxycfg/api_gateway.go index e7b6d4dcb9a..5526f46c3c6 100644 --- a/agent/proxycfg/api_gateway.go +++ b/agent/proxycfg/api_gateway.go @@ -398,6 +398,9 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat partition: service.PartitionOrDefault(), datacenter: h.source.Datacenter, } + // Ensure discovery chain compilation is HTTP-like for HTTPRoutes so service + // routers/splitters are included when present. + watchOpts.cfg.Protocol = string(route.GetProtocol()) handler := &handlerUpstreams{handlerState: h.handlerState} if err := handler.watchDiscoveryChain(ctx, snap, watchOpts); err != nil { @@ -449,6 +452,8 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat partition: service.PartitionOrDefault(), datacenter: h.source.Datacenter, } + // Ensure discovery chain compilation matches the route protocol. + watchOpts.cfg.Protocol = string(route.GetProtocol()) handler := &handlerUpstreams{handlerState: h.handlerState} if err := handler.watchDiscoveryChain(ctx, snap, watchOpts); err != nil { diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 723f192a0b8..2380b6fbbeb 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -34,6 +34,7 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/netutil" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/response" @@ -188,6 +189,8 @@ func newTestServerDeltaScenario( ) *testServerScenario { mgr := newTestManager() envoy := NewTestEnvoy(t, proxyID, token) + originalGetAgentBindAddr := netutil.GetAgentBindAddrFunc + netutil.GetAgentBindAddrFunc = netutil.GetMockGetAgentBindAddrFunc("0.0.0.0") sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) cfg := metrics.DefaultConfig("consul.xds.test") @@ -197,6 +200,7 @@ func newTestServerDeltaScenario( t.Cleanup(func() { envoy.Close() + netutil.GetAgentBindAddrFunc = originalGetAgentBindAddr sink := &metrics.BlackholeSink{} metrics.NewGlobal(cfg, sink) })