Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,12 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
continue
}

var upstreamConfigMap map[string]interface{}
if upstream != nil {
upstreamConfigMap = upstream.Config
}

es, err := s.endpointsFromDiscoveryChain(
uid,
chain,
cfgSnap,
cfgSnap.Locality,
upstreamConfigMap,
upstream,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid],
false,
Expand Down Expand Up @@ -527,7 +522,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
cfgSnap.IngressGateway.DiscoveryChain[uid],
cfgSnap,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
u.Config,
&u,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[uid],
false,
Expand All @@ -549,8 +544,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.Co
readyListeners := getReadyListeners(cfgSnap)

for _, readyListener := range readyListeners {
for _, u := range readyListener.upstreams {
uid := proxycfg.NewUpstreamID(&u)
for _, upstream := range readyListener.upstreams {
uid := proxycfg.NewUpstreamID(&upstream)

// If we've already created endpoints for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
Expand All @@ -563,8 +558,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.Co
uid,
cfgSnap.APIGateway.DiscoveryChain[uid],
cfgSnap,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
u.Config,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: upstream.DestinationPartition},
&upstream,
cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid],
cfgSnap.APIGateway.WatchedGatewayEndpoints[uid],
false,
Expand Down Expand Up @@ -621,9 +616,25 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
// If an upstream is configured with local mesh gw mode, we make a load assignment
// from the gateway endpoints instead of those of the upstreams.
if upstreamGatewayMode == structs.MeshGatewayModeLocal {
localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String())
if !ok {
// local GW is not ready; return early

// This makeUpstreamLoadAssignmentForPeerService is a generic method
// and used by both connect-proxy and API-GW.
// So, whenever this method is invoked for API Gateway (api gateway peered in local mesh mode),
// below, localGw would be nil because the existing statement fetches endpoints from cfg.ConnectProxy.
//
// Fix: generate endpoints conditionally based on the kind of cfgSnap.

var localGatewayEndpoint structs.CheckServiceNodes
ready := false
if cfgSnap.Kind == structs.ServiceKindConnectProxy {
localGatewayEndpoint, ready = cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String())
}
if cfgSnap.Kind == structs.ServiceKindAPIGateway {
localGatewayEndpoint, ready = cfgSnap.APIGateway.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String())
}
if !ready {
// local mesh GW is not ready; skip load assignment; return early
s.Logger.Trace("local mesh GW is not ready; skipping load assignment", "cluster", clusterName)
return la, nil
}
la = makeLoadAssignment(
Expand All @@ -632,7 +643,7 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
clusterName,
nil,
[]loadAssignmentEndpointGroup{
{Endpoints: localGw},
{Endpoints: localGatewayEndpoint},
},
cfgSnap.Locality,
)
Expand All @@ -647,6 +658,7 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(

endpoints, ok := upstreamsSnapshot.PeerUpstreamEndpoints.Get(uid)
if !ok {
s.Logger.Trace("skipping load assignment for peer instances with hostname as their address", "upstream", uid, "cluster", clusterName)
return nil, nil
}
la = makeLoadAssignment(
Expand All @@ -667,7 +679,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
gatewayKey proxycfg.GatewayKey,
upstreamConfigMap map[string]interface{},
upstream *structs.Upstream,
upstreamEndpoints map[string]structs.CheckServiceNodes,
gatewayEndpoints map[string]structs.CheckServiceNodes,
forMeshGateway bool,
Expand All @@ -679,8 +691,9 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
return nil, nil
}

if upstreamConfigMap == nil {
upstreamConfigMap = make(map[string]interface{}) // TODO:needed?
upstreamConfigMap := make(map[string]interface{})
if upstream != nil {
upstreamConfigMap = upstream.Config
}

var resources []proto.Message
Expand Down Expand Up @@ -712,8 +725,25 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
}
}

// This endpointsFromDiscoveryChain is a generic method
// and used by both connect-proxy and API-GW.
// So, whenever this method is invoked for API Gateway,
// Existing `GetUpstream` method is not defined on cfgSnap.APIGateway,
// and would return empty object for upstream,
// resulting, upstream.MeshGateway.Mode to be "" and
// finally mgwMode will remain to MeshGatewayModeDefault (& Default is always treated as remote mode in peering)
// which is incorrect because API GW can be configured in local mesh mode as well.
//
// Fix: generate mgwMode conditionally based on the kind of cfgSnap.

mgwMode := structs.MeshGatewayModeDefault
if upstream, _ := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta); upstream != nil {
if cfgSnap.Kind == structs.ServiceKindConnectProxy {
upstreamConfig, _ := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
if upstreamConfig != nil {
mgwMode = upstreamConfig.MeshGateway.Mode
}
}
if cfgSnap.Kind == structs.ServiceKindAPIGateway {
mgwMode = upstream.MeshGateway.Mode
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
{
"nonce": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "c225dc1c~paymentService.default.paymentpeer.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.45.1.1",
"portValue": 8443
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"versionInfo": "00000001"
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"endpoint": {
"address": {
"socketAddress": {
"address": "172.68.1.1",
"address": "10.45.1.1",
"portValue": 8443
}
}
Expand Down
Loading