Skip to content

Commit a4214b2

Browse files
authored
Merge pull request #208 from datum-cloud/fix/connector-liveness-annotation
2 parents 6ca76df + 418b58e commit a4214b2

6 files changed

Lines changed: 393 additions & 146 deletions

File tree

api/v1alpha1/groupversion_info.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ var (
2222
AddToScheme = SchemeBuilder.AddToScheme
2323
)
2424

25+
// UpstreamStatusAnnotation carries a verbatim copy of a resource's upstream
26+
// .status subresource down to edge member clusters.
27+
//
28+
// A resource's status is computed authoritatively in the Project control plane.
29+
// Karmada propagates a resource template's spec and metadata
30+
// (labels/annotations) to member clusters but NOT the status subresource, so a
31+
// member-cluster object never carries its upstream status. For resource types
32+
// whose downstream consumer needs that status (e.g. the edge extension server
33+
// reading Connector liveness), the replicator mirrors the full upstream status
34+
// JSON into this annotation — which Karmada DOES propagate — and the consumer
35+
// parses it back, falling back to the live status when the annotation is absent.
36+
//
37+
// The value is the resource's .status object marshalled to JSON verbatim; it is
38+
// resource-agnostic and carries no bespoke schema.
39+
const UpstreamStatusAnnotation = "networking.datumapis.com/upstream-status"
40+
2541
func addKnownTypes(scheme *runtime.Scheme) error {
2642
scheme.AddKnownTypes(GroupVersion,
2743
&Connector{},

internal/controller/gateway_resource_replicator_controller.go

Lines changed: 49 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package controller
44

55
import (
66
"context"
7+
"encoding/json"
78
"fmt"
89
"strings"
910
"time"
@@ -35,6 +36,7 @@ import (
3536
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
3637
mcsource "sigs.k8s.io/multicluster-runtime/pkg/source"
3738

39+
networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1"
3840
"go.datum.net/network-services-operator/internal/config"
3941
downstreamclient "go.datum.net/network-services-operator/internal/downstreamclient"
4042
)
@@ -71,12 +73,13 @@ type replicationResourceConfig struct {
7173
statusTransform statusTransformFunc
7274
conditionHandlers conditionReasonHandlers
7375

74-
// mirrorStatusDownstream copies upstream status → downstream status after
75-
// each spec sync. Used for resource types (e.g. Connector) whose status is
76-
// authoritative in the upstream cluster and must be readable by consumers
77-
// in the downstream cluster (e.g. the extension server). When true,
78-
// skipUpstreamStatusSync is implicitly honoured as well.
79-
mirrorStatusDownstream bool
76+
// mirrorStatusToAnnotation, when true, copies the upstream resource's full
77+
// .status verbatim into the UpstreamStatusAnnotation on the downstream
78+
// object's metadata. Karmada propagates resource-template annotations (but
79+
// NOT the status subresource) to member clusters, so this is how an upstream
80+
// status reaches a downstream consumer on a member cluster. Resource-agnostic
81+
// and opt-in per type. Implies skipUpstreamStatusSync should also be set.
82+
mirrorStatusToAnnotation bool
8083

8184
// skipUpstreamStatusSync suppresses the normal downstream→upstream status
8285
// propagation. Set this for resource types where the upstream status is
@@ -146,14 +149,17 @@ func initReplicationResourceConfigs() map[string]replicationResourceConfig {
146149
}
147150
}
148151

149-
// Connector status (conditions + connectionDetails) is authoritative
150-
// upstream and must be readable by the extension server downstream so it
151-
// can determine whether a tunnel is online before injecting connector
152-
// cluster patches. Mirror status downstream; do not propagate back.
152+
// Connector status (Ready condition + connectionDetails) is authoritative
153+
// upstream and must be readable by the edge extension server so it can
154+
// determine whether a tunnel is online before injecting connector cluster
155+
// patches. The extension server runs on a member cluster, and Karmada does
156+
// not propagate the status subresource hub→member — only spec + metadata.
157+
// Mirror the upstream status into an annotation (which Karmada DOES
158+
// propagate) instead. Do not propagate status back upstream.
153159
connectorGVK := schema.GroupVersionKind{Group: groupNetworkingDatumAPIs, Version: versionV1Alpha1, Kind: KindConnector}
154160
configs[gvkKey(connectorGVK)] = replicationResourceConfig{
155-
mirrorStatusDownstream: true,
156-
skipUpstreamStatusSync: true,
161+
mirrorStatusToAnnotation: true,
162+
skipUpstreamStatusSync: true,
157163
}
158164

159165
return configs
@@ -278,8 +284,8 @@ func (r *GatewayResourceReplicatorReconciler) ensureDownstreamResource(
278284
) error {
279285
logger := log.FromContext(ctx)
280286

281-
// Time the full downstream sync (CreateOrUpdate + status mirror) per resource
282-
// kind so replication latency regressions per family are attributable.
287+
// Time the full downstream sync (CreateOrUpdate) per resource kind so
288+
// replication latency regressions per family are attributable.
283289
syncStart := time.Now()
284290
syncOutcome := "success"
285291
defer func() {
@@ -314,6 +320,16 @@ func (r *GatewayResourceReplicatorReconciler) ensureDownstreamResource(
314320
return fmt.Errorf("failed to set downstream controller reference: %w", err)
315321
}
316322

323+
// Mirror the upstream status into an annotation on the downstream
324+
// object's metadata. This is part of the same CreateOrUpdate Update, so it
325+
// is persisted as ordinary metadata (which Karmada propagates to members)
326+
// — no status subresource write involved.
327+
if resource.replicationResourceConfig.mirrorStatusToAnnotation {
328+
if err := setUpstreamStatusAnnotation(downstreamObj, upstreamObj); err != nil {
329+
return err
330+
}
331+
}
332+
317333
return nil
318334
})
319335
if err != nil {
@@ -337,17 +353,6 @@ func (r *GatewayResourceReplicatorReconciler) ensureDownstreamResource(
337353
)
338354
}
339355

340-
// Mirror upstream status → downstream when configured (e.g. Connector).
341-
// This is the reverse of syncUpstreamStatus: upstream is authoritative and
342-
// the downstream copy must reflect it so local consumers (extension server)
343-
// can read liveness without reaching into upstream project namespaces.
344-
if resource.replicationResourceConfig.mirrorStatusDownstream {
345-
if err := r.mirrorUpstreamStatusToDownstream(ctx, resource.gvk.Kind, upstreamObj, downstreamObj, downstreamStrategy); err != nil {
346-
syncOutcome = syncOutcomeError
347-
return err
348-
}
349-
}
350-
351356
// Propagate downstream status → upstream for types where a downstream
352357
// controller (e.g. Envoy Gateway) writes acceptance conditions. Skip for
353358
// types whose status is owned by NSO's own upstream controllers.
@@ -361,69 +366,31 @@ func (r *GatewayResourceReplicatorReconciler) ensureDownstreamResource(
361366
return nil
362367
}
363368

364-
// mirrorUpstreamStatusToDownstream copies the upstream object's status
365-
// subresource to the corresponding downstream object. This is used for resource
366-
// types (currently Connector) where the upstream cluster holds the authoritative
367-
// status and downstream consumers need to read it locally.
368-
func (r *GatewayResourceReplicatorReconciler) mirrorUpstreamStatusToDownstream(
369-
ctx context.Context,
370-
resourceKind string,
371-
upstreamObj *unstructured.Unstructured,
372-
downstreamObj *unstructured.Unstructured,
373-
downstreamStrategy downstreamclient.ResourceStrategy,
374-
) error {
375-
logger := log.FromContext(ctx)
376-
377-
// Re-fetch the downstream to get the current resourceVersion needed for
378-
// the status subresource update.
379-
currentDownstream := downstreamObj.DeepCopy()
380-
if err := downstreamStrategy.GetClient().Get(ctx, client.ObjectKeyFromObject(currentDownstream), currentDownstream); err != nil {
381-
if apierrors.IsNotFound(err) {
382-
return nil
383-
}
384-
return fmt.Errorf("failed to fetch downstream %s/%s for status mirror: %w",
385-
currentDownstream.GetNamespace(), currentDownstream.GetName(), err)
386-
}
387-
388-
upstreamStatus, hasUpstreamStatus := upstreamObj.Object["status"]
389-
existingDownstreamStatus, hasExistingDownstreamStatus := currentDownstream.Object["status"]
390-
391-
// Nothing to sync: upstream has no status and downstream already has none.
392-
if !hasUpstreamStatus && !hasExistingDownstreamStatus {
369+
// setUpstreamStatusAnnotation copies the upstream resource's full .status
370+
// verbatim into the UpstreamStatusAnnotation on the downstream object's
371+
// metadata. It is the Karmada-friendly replacement for mirroring the status
372+
// subresource downstream: Karmada propagates a resource template's metadata
373+
// (annotations) to member clusters but not its status, so a downstream consumer
374+
// on a member cluster reads the upstream status from this annotation instead.
375+
// The copy is resource-agnostic — no field-level knowledge of any status shape.
376+
func setUpstreamStatusAnnotation(downstreamObj, upstreamObj *unstructured.Unstructured) error {
377+
statusRaw, ok := upstreamObj.Object[jsonKeyStatus]
378+
if !ok {
379+
// No upstream status yet — nothing to mirror.
393380
return nil
394381
}
395382

396-
// Already in sync: deep equality check avoids a spurious write.
397-
if hasUpstreamStatus && hasExistingDownstreamStatus &&
398-
apiequality.Semantic.DeepEqual(upstreamStatus, existingDownstreamStatus) {
399-
return nil
383+
raw, err := json.Marshal(statusRaw)
384+
if err != nil {
385+
return fmt.Errorf("failed to marshal upstream status: %w", err)
400386
}
401387

402-
if hasUpstreamStatus {
403-
currentDownstream.Object["status"] = runtime.DeepCopyJSONValue(upstreamStatus)
404-
} else {
405-
delete(currentDownstream.Object, "status")
388+
annotations := downstreamObj.GetAnnotations()
389+
if annotations == nil {
390+
annotations = make(map[string]string, 1)
406391
}
407-
408-
if err := downstreamStrategy.GetClient().Status().Update(ctx, currentDownstream); err != nil {
409-
if apierrors.IsNotFound(err) {
410-
return nil
411-
}
412-
// Count status-mirror failures per resource kind so flaky downstream API
413-
// server connectivity surfaces as a metric rather than only as an
414-
// incremented generic reconcile error counter.
415-
replicatorStatusMirrorErrorsTotal.WithLabelValues(resourceKind).Inc()
416-
return fmt.Errorf("failed to mirror upstream status to downstream %s/%s: %w",
417-
currentDownstream.GetNamespace(), currentDownstream.GetName(), err)
418-
}
419-
420-
logger.Info(
421-
"downstream status mirrored from upstream",
422-
"gvk", upstreamObj.GroupVersionKind().String(),
423-
jsonKeyNamespace, upstreamObj.GetNamespace(),
424-
"name", upstreamObj.GetName(),
425-
)
426-
392+
annotations[networkingv1alpha1.UpstreamStatusAnnotation] = string(raw)
393+
downstreamObj.SetAnnotations(annotations)
427394
return nil
428395
}
429396

0 commit comments

Comments
 (0)