Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/extensionserver/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ var (
},
)

// ConnectorOfflineRoutesTotal counts total user-facing forwarding routes
// rewritten to a tunnel-offline 503 direct_response across all hook
// invocations. Each increment = one route that previously targeted an
// endpoint-less offline-connector cluster and now returns a deterministic
// 503 (instead of Envoy's generic 503 no_healthy_upstream). Use rate()/sum()
// to derive per-build averages.
ConnectorOfflineRoutesTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "nso_extension_connector_offline_routes_total",
Help: "Total user-facing forwarding routes rewritten to a tunnel-offline 503 direct_response across all hook invocations.",
},
)

// CacheSynced is 1 when the informer cache has synced and the extension server
// is accepting gRPC connections, 0 during startup or if the cache lost sync.
CacheSynced = promauto.NewGauge(
Expand Down
63 changes: 47 additions & 16 deletions internal/extensionserver/mutate/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,19 @@ func ReplaceConnectorClusters(
}

// ApplyConnectorRoutes applies connector route mutations to a RouteConfiguration:
// - Online (replaced) connector found in a VH: prepend a CONNECT upgrade route
// targeting the replaced cluster and append info.TargetHost to VH domains.
// - Offline connector found in a VH: prepend a direct_response 503 CONNECT route.
// - Online (replaced) connector: prepend a CONNECT upgrade route targeting the
// replaced cluster and append info.TargetHost to VH domains.
// - Offline connector: prepend a CONNECT direct_response 503 (tunnel-control
// clients) and rewrite the user-facing forwarding routes to a 503
// direct_response (see the offline branch for why).
//
// Returns the number of VirtualHosts mutated.
// Returns the number of VirtualHosts mutated and the number of forwarding
// routes converted to a tunnel-offline direct_response.
func ApplyConnectorRoutes(
rc *routev3.RouteConfiguration,
idx *extcache.PolicyIndex,
replaced, offline map[string]*extcache.ConnectorInfo,
) (int, error) {
mutated := 0
) (mutated, converted int, err error) {
for _, vh := range rc.GetVirtualHosts() {
// Find any connector cluster referenced by routes in this VH.
var connectorCluster string
Expand All @@ -102,7 +104,6 @@ func ApplyConnectorRoutes(
}

var newRoute *routev3.Route
var err error

if info, ok := replaced[connectorCluster]; ok {
// Online: CONNECT route targeting the replaced cluster.
Expand All @@ -111,7 +112,7 @@ func ApplyConnectorRoutes(
connectorCluster,
)
if err != nil {
return mutated, fmt.Errorf("build connect route for %q: %w", vh.GetName(), err)
return mutated, converted, fmt.Errorf("build connect route for %q: %w", vh.GetName(), err)
}
// Prepend the CONNECT route (NSO inserts at /virtual_hosts/0/routes/0).
vh.Routes = append([]*routev3.Route{newRoute}, vh.Routes...)
Expand All @@ -120,17 +121,31 @@ func ApplyConnectorRoutes(
// NOT a synthetic .connector.local domain.
vh.Domains = appendUnique(vh.Domains, info.TargetHost)
} else {
// Offline: direct_response 503 CONNECT route ("Tunnel not online").
// Offline connect_matcher route for tunnel-control clients.
newRoute, err = buildOfflineRoute("connector-offline-" + sanitizeID(vh.GetName()))
if err != nil {
return mutated, fmt.Errorf("build offline route for %q: %w", vh.GetName(), err)
return mutated, converted, fmt.Errorf("build offline route for %q: %w", vh.GetName(), err)
}
vh.Routes = append([]*routev3.Route{newRoute}, vh.Routes...)
// No domain appended for offline connectors (no live tunnel).

// Route user traffic to a deterministic 503 instead of the
// endpoint-less offline cluster, which would yield a generic
// no_healthy_upstream plus retry/cluster-stat noise. Replacing only
// the Action oneof preserves each route's match/metadata; idempotent
// because direct_responses carry no cluster to re-match.
for _, rt := range vh.GetRoutes() {
if routeCluster(rt) != connectorCluster {
continue
}
if derr := setRouteDirectResponse(rt, 503, offlineResponseBody); derr != nil {
return mutated, converted, fmt.Errorf("convert offline forward route for %q: %w", vh.GetName(), derr)
}
converted++
}
}
mutated++
}
return mutated, nil
return mutated, converted, nil
}

// --- Internal helpers ---
Expand Down Expand Up @@ -234,18 +249,34 @@ func buildConnectRoute(name, cluster string) (*routev3.Route, error) {
return rt, nil
}

// buildOfflineRoute builds the offline CONNECT route (direct_response 503
// "Tunnel not online"). Mirrors buildOfflineRoute in the seed. The exact body
// offlineResponseBody is the inline body for tunnel-offline 503 responses,
// shared by both offline paths so they return an identical body. The exact
// string is part of the STATE.md metadata contract.
const offlineResponseBody = "Tunnel not online"

// buildOfflineRoute builds the offline CONNECT route (direct_response 503
// "Tunnel not online"). Mirrors buildOfflineRoute in the seed.
func buildOfflineRoute(name string) (*routev3.Route, error) {
j := fmt.Sprintf(`{
"name": %q,
"match": { "connect_matcher": {} },
"direct_response": { "status": 503, "body": { "inline_string": "Tunnel not online" } }
}`, name)
"direct_response": { "status": 503, "body": { "inline_string": %q } }
}`, name, offlineResponseBody)
rt := &routev3.Route{}
if err := protojson.Unmarshal([]byte(j), rt); err != nil {
return nil, fmt.Errorf("unmarshal offline route JSON: %w", err)
}
return rt, nil
}

// setRouteDirectResponse swaps a route's action for a direct_response. Only the
// Action oneof is replaced, so match/metadata/typed_per_filter_config survive.
func setRouteDirectResponse(rt *routev3.Route, status uint32, body string) error {
j := fmt.Sprintf(`{ "status": %d, "body": { "inline_string": %q } }`, status, body)
dr := &routev3.DirectResponseAction{}
if err := protojson.Unmarshal([]byte(j), dr); err != nil {
return fmt.Errorf("unmarshal direct_response action JSON: %w", err)
}
rt.Action = &routev3.Route_DirectResponse{DirectResponse: dr}
return nil
}
126 changes: 121 additions & 5 deletions internal/extensionserver/mutate/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"google.golang.org/protobuf/types/known/anypb"

extcache "go.datum.net/network-services-operator/internal/extensionserver/cache"
)
Expand Down Expand Up @@ -206,9 +207,10 @@ func TestApplyConnectorRoutes_Online_PrependsCONNECTRouteAndAppendsTargetHost(t
},
}

n, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
n, converted, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 1, n, "one VH should be mutated")
assert.Equal(t, 0, converted, "online connector must not convert any forwarding routes")

vh := rc.VirtualHosts[0]
require.Len(t, vh.Routes, 2, "CONNECT route must be prepended; want 2 routes total")
Expand Down Expand Up @@ -252,7 +254,7 @@ func TestApplyConnectorRoutes_Online_TargetHostDeduplicated(t *testing.T) {
},
}

_, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
_, _, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)

// Domain must not be duplicated.
Expand Down Expand Up @@ -288,27 +290,139 @@ func TestApplyConnectorRoutes_Offline_Prepends503Route_NoDomain(t *testing.T) {
},
}

n, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
n, converted, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 1, n, "offline VH must be mutated (503 route prepended)")
assert.Equal(t, 1, converted, "the user-facing forwarding route must be converted to a direct_response")

vh := rc.VirtualHosts[0]
require.Len(t, vh.Routes, 2, "503 direct_response route must be prepended")

// First route: the connect_matcher offline route for CONNECT clients.
offlineRoute := vh.Routes[0]
dr := offlineRoute.GetDirectResponse()
require.NotNil(t, dr, "first route must be a direct_response")
assert.NotNil(t, offlineRoute.GetMatch().GetConnectMatcher(),
"prepended offline route must keep its connect_matcher")
assert.Equal(t, uint32(503), dr.GetStatus(),
"offline route must return 503")
assert.Equal(t, "Tunnel not online", dr.GetBody().GetInlineString(),
"offline route body must be 'Tunnel not online' per STATE.md contract")

// Second route: the original user-facing forwarding route must now be a
// direct_response 503 too, NOT a cluster route to the empty cluster.
fwd := vh.Routes[1]
assert.Equal(t, "fwd", fwd.GetName(), "forwarding route identity preserved")
assert.Empty(t, routeCluster(fwd),
"forwarding route must no longer target the endpoint-less connector cluster")
fwdDR := fwd.GetDirectResponse()
require.NotNil(t, fwdDR, "forwarding route must be converted to a direct_response")
assert.Equal(t, uint32(503), fwdDR.GetStatus(), "converted forwarding route must return 503")
assert.Equal(t, "Tunnel not online", fwdDR.GetBody().GetInlineString(),
"converted forwarding route must reuse the offline body")

// No domain must be appended for offline connectors.
assert.NotContains(t, vh.Domains, testTargetHost,
"target host must NOT be appended for offline connector")
assert.Len(t, vh.Domains, 1, "domains must remain unchanged for offline connector")
}

// TestApplyConnectorRoutes_Offline_PreservesMatchAndConfig verifies that
// converting a forwarding route to a direct_response only replaces the Action
// oneof — the route's match and typed_per_filter_config survive — and that a
// co-located non-connector route in the same VH is untouched.
func TestApplyConnectorRoutes_Offline_PreservesMatchAndUntouchedRoute(t *testing.T) {
idx := connectorPolicyIndex(false)
clusterName := testClusterName()
offlineInfo := &extcache.ConnectorInfo{Online: false, TargetHost: testTargetHost, TargetPort: testTargetPort}
replaced := map[string]*extcache.ConnectorInfo{}
offline := map[string]*extcache.ConnectorInfo{clusterName: offlineInfo}

// Connector forwarding route carries a prefix match + typed_per_filter_config.
connRoute := routeTargeting(clusterName)
connRoute.Match = &routev3.RouteMatch{
PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"},
}
connRoute.TypedPerFilterConfig = map[string]*anypb.Any{
"envoy.filters.http.cors": {TypeUrl: "type.googleapis.com/example.Cfg"},
}

// A second route in the same VH targets an unrelated cluster.
otherRoute := routeTargeting("infra-cluster")
otherRoute.Name = "other"

rc := &routev3.RouteConfiguration{
VirtualHosts: []*routev3.VirtualHost{
{
Name: "vh",
Domains: []string{"app.local.test"},
Routes: []*routev3.Route{connRoute, otherRoute},
},
},
}

_, converted, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 1, converted, "only the connector forwarding route must be converted")

vh := rc.VirtualHosts[0]
require.Len(t, vh.Routes, 3, "connect_matcher route prepended to the two originals")

// Converted forwarding route: match + typed_per_filter_config preserved.
gotConn := vh.Routes[1]
require.NotNil(t, gotConn.GetDirectResponse(), "connector forwarding route must be a direct_response")
assert.Equal(t, "/", gotConn.GetMatch().GetPrefix(), "prefix match must be preserved")
assert.Contains(t, gotConn.GetTypedPerFilterConfig(), "envoy.filters.http.cors",
"typed_per_filter_config must be preserved on the converted route")

// The non-connector route must be completely untouched.
gotOther := vh.Routes[2]
assert.Equal(t, "other", gotOther.GetName())
assert.Equal(t, "infra-cluster", routeCluster(gotOther),
"non-connector route must still target its cluster")
assert.Nil(t, gotOther.GetDirectResponse(), "non-connector route must not be converted")
}

// TestApplyConnectorRoutes_Offline_Idempotent verifies a second pass does not
// double-apply: the connect_matcher route is prepended once more (it is keyed by
// VH name and matches no cluster), but no forwarding route is re-converted
// because converted routes are direct_responses and no longer target the cluster.
func TestApplyConnectorRoutes_Offline_Idempotent(t *testing.T) {
idx := connectorPolicyIndex(false)
clusterName := testClusterName()
offlineInfo := &extcache.ConnectorInfo{Online: false, TargetHost: testTargetHost, TargetPort: testTargetPort}
replaced := map[string]*extcache.ConnectorInfo{}
offline := map[string]*extcache.ConnectorInfo{clusterName: offlineInfo}

rc := &routev3.RouteConfiguration{
VirtualHosts: []*routev3.VirtualHost{
{
Name: "vh",
Domains: []string{"app.local.test"},
Routes: []*routev3.Route{routeTargeting(clusterName)},
},
},
}

_, converted1, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 1, converted1, "first pass converts the forwarding route")

// Second pass: the cluster is gone from all routes, so nothing converts.
_, converted2, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 0, converted2, "second pass must not re-convert any route")

// Every direct_response route still returns the offline 503 body.
for _, rt := range rc.VirtualHosts[0].Routes {
if dr := rt.GetDirectResponse(); dr != nil {
assert.Equal(t, uint32(503), dr.GetStatus())
assert.Equal(t, "Tunnel not online", dr.GetBody().GetInlineString())
}
assert.Empty(t, routeCluster(rt), "no route may target the offline connector cluster after conversion")
}
}

func TestApplyConnectorRoutes_NoConnector_VHUntouched(t *testing.T) {
idx := connectorPolicyIndex(true)

Expand All @@ -326,9 +440,10 @@ func TestApplyConnectorRoutes_NoConnector_VHUntouched(t *testing.T) {
},
}

n, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
n, converted, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 0, n, "VH with non-connector cluster must not be mutated")
assert.Equal(t, 0, converted, "no forwarding routes converted when no connector present")
assert.Len(t, rc.VirtualHosts[0].Routes, 1, "route count must not change")
assert.Len(t, rc.VirtualHosts[0].Domains, 1, "domain list must not change")
}
Expand All @@ -340,7 +455,8 @@ func TestApplyConnectorRoutes_EmptyRouteConfiguration_NoOp(t *testing.T) {

rc := &routev3.RouteConfiguration{Name: "empty"}

n, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
n, converted, err := ApplyConnectorRoutes(rc, idx, replaced, offline)
require.NoError(t, err)
assert.Equal(t, 0, n)
assert.Equal(t, 0, converted)
}
11 changes: 9 additions & 2 deletions internal/extensionserver/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *Server) PostTranslateModify(
localReplyCount int
tppCount int
vhCount int
offlineRtCount int
replaced map[string]*extcache.ConnectorInfo
connOffline map[string]*extcache.ConnectorInfo
)
Expand Down Expand Up @@ -236,7 +237,7 @@ func (s *Server) PostTranslateModify(

_, connRoutesSpan := tr.Start(mctx, "connector.routes")
for _, rc := range routes {
n, mutErr := mutate.ApplyConnectorRoutes(rc, idx, replaced, connOffline)
n, offlineRt, mutErr := mutate.ApplyConnectorRoutes(rc, idx, replaced, connOffline)
if mutErr != nil {
s.log.Error("apply connector routes", "route_config", rc.GetName(), "err", mutErr)
connRoutesSpan.RecordError(mutErr)
Expand All @@ -249,8 +250,12 @@ func (s *Server) PostTranslateModify(
return nil, mutErr
}
vhCount += n
offlineRtCount += offlineRt
}
connRoutesSpan.SetAttributes(attribute.Int("vhosts.connector_applied", vhCount))
connRoutesSpan.SetAttributes(
attribute.Int("vhosts.connector_applied", vhCount),
attribute.Int("routes.connector_offline", offlineRtCount),
)
connRoutesSpan.End()
mspan.End()

Expand All @@ -263,6 +268,7 @@ func (s *Server) PostTranslateModify(
extmetrics.WAFRouteMutationsTotal.Add(float64(tppCount))
extmetrics.ConnectorClustersTotal.Add(float64(len(replaced)))
extmetrics.ConnectorRoutesTotal.Add(float64(vhCount))
extmetrics.ConnectorOfflineRoutesTotal.Add(float64(offlineRtCount))

s.log.Info("PostTranslateModify",
"clusters", len(clusters),
Expand All @@ -274,6 +280,7 @@ func (s *Server) PostTranslateModify(
"clusters_replaced", len(replaced),
"clusters_offline", len(connOffline),
"vhosts_connector_applied", vhCount,
"connector_offline_routes", offlineRtCount,
)

return &pb.PostTranslateModifyResponse{
Expand Down
Loading