Skip to content

Commit 85e622d

Browse files
authored
Refine context-based telemetry
1 parent 10f8973 commit 85e622d

28 files changed

+1823
-1318
lines changed

cli/k8s_client/client_factory.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,6 @@ func CreateK8SClients(masterURL, kubeConfigPath, overrideNamespace string) (*Cli
9191
return NewMetricsTransport(
9292
rt,
9393
WithMetricsTransportTarget(ContextRequestTargetKubernetes),
94-
WithMetricsTransportTelemeters(
95-
// client-go/metrics is configured to track certain metrics, so we avoid duplicating those here.
96-
OutgoingAPIRequestDurationTelemeter,
97-
OutgoingAPIRequestInFlightTelemeter,
98-
),
9994
)
10095
})
10196

cli/k8s_client/metrics.go

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2026 NetApp, Inc. All Rights Reserved.
22

33
package k8sclient
44

@@ -14,7 +14,6 @@ import (
1414
"k8s.io/client-go/tools/metrics"
1515

1616
. "github.com/netapp/trident/logging"
17-
"github.com/netapp/trident/utils/errors"
1817
)
1918

2019
// registerK8sClientGoMetricsAdapter registers client-go metric adapters to feed Trident telemeters.
@@ -32,16 +31,7 @@ func registerK8sClientGoMetricsAdapter() {
3231
type rateLimiterLatencyAdapter struct{}
3332

3433
func (rateLimiterLatencyAdapter) Observe(ctx context.Context, verb string, u url.URL, latency time.Duration) {
35-
rec := NewContextBuilder(ctx).
36-
WithTarget(ContextRequestTargetKubernetes).
37-
WithAddress(u.Host).
38-
WithMethod(verb).
39-
WithDuration(latency).
40-
WithTelemetry(OutgoingAPIRequestLimitedDurationTelemeter).
41-
BuildTelemetry()
42-
43-
err := errors.TooManyRequestsError("client-go rate limiter wait %s", latency.String())
44-
rec(&err)
34+
CaptureOutgoingAPIRequestTokenDuration(ctx, ContextRequestTargetKubernetes, u.Host, verb, latency)
4535
}
4636

4737
// requestRetryAdapter plugs Trident telemeters into client-go request retry metrics.
@@ -50,20 +40,10 @@ func (rateLimiterLatencyAdapter) Observe(ctx context.Context, verb string, u url
5040
type requestRetryAdapter struct{}
5141

5242
func (requestRetryAdapter) IncrementRetry(ctx context.Context, code string, method string, host string) {
53-
rec := NewContextBuilder(ctx).
54-
WithTarget(ContextRequestTargetKubernetes).
55-
WithAddress(host).
56-
WithMethod(method).
57-
WithTelemetry(OutgoingAPIRequestRetryTotalTelemeter).
58-
BuildTelemetry()
59-
60-
var err error
61-
// Only retries triggered by errors are counted.
62-
if err = assertErrorForCode(code); err != nil {
63-
// Assign to the outer err so the deferred recorder observes the retry
64-
err = errors.WrapWithMustRetryError(err, "retry triggered after http status: %s", code)
43+
err := assertErrorForCode(code)
44+
if err != nil {
45+
CaptureOutgoingAPIRequestRetryTotal(ctx, ContextRequestTargetKubernetes, host, method)
6546
}
66-
rec(&err)
6747
}
6848

6949
// assertErrorForCode returns nil for 2xx/3xx HTTP status codes, error otherwise.

frontend/csi/controller_server.go

Lines changed: 10 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
// Copyright 2025 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2026 NetApp, Inc. All Rights Reserved.
22

33
package csi
44

55
import (
66
"context"
77
"fmt"
88
"math"
9-
"net/http"
109
"reflect"
1110
"strconv"
1211
"strings"
@@ -34,20 +33,6 @@ import (
3433
func (p *Plugin) CreateVolume(
3534
ctx context.Context, req *csi.CreateVolumeRequest,
3635
) (res *csi.CreateVolumeResponse, err error) {
37-
ctx, rec := NewContextBuilder(ctx).
38-
WithWorkflow(WorkflowVolumeCreate).
39-
WithLayer(LogLayerCSIFrontend).
40-
WithSource(ContextSourceCSI).
41-
WithClient(ContextRequestClientCSIProvisioner).
42-
WithRoute(csi.Controller_CreateVolume_FullMethodName).
43-
WithMethod(http.MethodPost).
44-
WithTelemetry(
45-
IncomingAPIRequestInFlightTelemeter,
46-
IncomingAPIRequestDurationTelemeter,
47-
).
48-
BuildContextAndTelemetry()
49-
defer rec(&err)
50-
5136
fields := LogFields{"Method": "CreateVolume", "Type": "CSI_Controller", "name": req.Name}
5237
Logc(ctx).WithFields(fields).Debug(">>>> CreateVolume")
5338
defer Logc(ctx).WithFields(fields).Debug("<<<< CreateVolume")
@@ -316,20 +301,6 @@ func (p *Plugin) CreateVolume(
316301
func (p *Plugin) DeleteVolume(
317302
ctx context.Context, req *csi.DeleteVolumeRequest,
318303
) (res *csi.DeleteVolumeResponse, err error) {
319-
ctx, rec := NewContextBuilder(ctx).
320-
WithWorkflow(WorkflowVolumeDelete).
321-
WithLayer(LogLayerCSIFrontend).
322-
WithSource(ContextSourceCSI).
323-
WithClient(ContextRequestClientCSIProvisioner).
324-
WithRoute(csi.Controller_DeleteVolume_FullMethodName).
325-
WithMethod(http.MethodDelete).
326-
WithTelemetry(
327-
IncomingAPIRequestInFlightTelemeter,
328-
IncomingAPIRequestDurationTelemeter,
329-
).
330-
BuildContextAndTelemetry()
331-
defer rec(&err)
332-
333304
fields := LogFields{"Method": "DeleteVolume", "Type": "CSI_Controller"}
334305
Logc(ctx).WithFields(fields).Debug(">>>> DeleteVolume")
335306
defer Logc(ctx).WithFields(fields).Debug("<<<< DeleteVolume")
@@ -366,10 +337,7 @@ func stashIscsiTargetPortals(publishInfo map[string]string, volumePublishInfo *m
366337

367338
func (p *Plugin) ControllerPublishVolume(
368339
ctx context.Context, req *csi.ControllerPublishVolumeRequest,
369-
) (*csi.ControllerPublishVolumeResponse, error) {
370-
ctx = SetContextWorkflow(ctx, WorkflowControllerPublish)
371-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
372-
340+
) (res *csi.ControllerPublishVolumeResponse, err error) {
373341
fields := LogFields{"Method": "ControllerPublishVolume", "Type": "CSI_Controller"}
374342
Logc(ctx).WithFields(fields).Debug(">>>> ControllerPublishVolume")
375343
defer Logc(ctx).WithFields(fields).Debug("<<<< ControllerPublishVolume")
@@ -524,10 +492,7 @@ func populatePublishInfoFromCSIPublishRequest(info *models.VolumePublishInfo, re
524492

525493
func (p *Plugin) ControllerUnpublishVolume(
526494
ctx context.Context, req *csi.ControllerUnpublishVolumeRequest,
527-
) (*csi.ControllerUnpublishVolumeResponse, error) {
528-
ctx = SetContextWorkflow(ctx, WorkflowControllerUnpublish)
529-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
530-
495+
) (res *csi.ControllerUnpublishVolumeResponse, err error) {
531496
fields := LogFields{"Method": "ControllerUnpublishVolume", "Type": "CSI_Controller"}
532497
Logc(ctx).WithFields(fields).Debug(">>>> ControllerUnpublishVolume")
533498
defer Logc(ctx).WithFields(fields).Debug("<<<< ControllerUnpublishVolume")
@@ -581,10 +546,7 @@ func (p *Plugin) ControllerUnpublishVolume(
581546

582547
func (p *Plugin) ValidateVolumeCapabilities(
583548
ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest,
584-
) (*csi.ValidateVolumeCapabilitiesResponse, error) {
585-
ctx = SetContextWorkflow(ctx, WorkflowVolumeGetCapabilities)
586-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
587-
549+
) (res *csi.ValidateVolumeCapabilitiesResponse, err error) {
588550
volumeID := req.GetVolumeId()
589551
if volumeID == "" {
590552
return nil, status.Error(codes.InvalidArgument, "no volume ID provided")
@@ -628,10 +590,7 @@ func (p *Plugin) ValidateVolumeCapabilities(
628590

629591
func (p *Plugin) ListVolumes(
630592
ctx context.Context, req *csi.ListVolumesRequest,
631-
) (*csi.ListVolumesResponse, error) {
632-
ctx = SetContextWorkflow(ctx, WorkflowVolumeList)
633-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
634-
593+
) (res *csi.ListVolumesResponse, err error) {
635594
fields := LogFields{"Method": "ListVolumes", "Type": "CSI_Controller"}
636595
Logc(ctx).WithFields(fields).Trace(">>>> ListVolumes")
637596
defer Logc(ctx).WithFields(fields).Trace("<<<< ListVolumes")
@@ -710,10 +669,7 @@ func (p *Plugin) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi
710669

711670
func (p *Plugin) ControllerGetCapabilities(
712671
ctx context.Context, _ *csi.ControllerGetCapabilitiesRequest,
713-
) (*csi.ControllerGetCapabilitiesResponse, error) {
714-
ctx = SetContextWorkflow(ctx, WorkflowControllerGetCapabilities)
715-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
716-
672+
) (res *csi.ControllerGetCapabilitiesResponse, err error) {
717673
fields := LogFields{"Method": "ControllerGetCapabilities", "Type": "CSI_Controller"}
718674
Logc(ctx).WithFields(fields).Trace(">>>> ControllerGetCapabilities")
719675
defer Logc(ctx).WithFields(fields).Trace("<<<< ControllerGetCapabilities")
@@ -723,10 +679,7 @@ func (p *Plugin) ControllerGetCapabilities(
723679

724680
func (p *Plugin) CreateSnapshot(
725681
ctx context.Context, req *csi.CreateSnapshotRequest,
726-
) (*csi.CreateSnapshotResponse, error) {
727-
ctx = SetContextWorkflow(ctx, WorkflowSnapshotCreate)
728-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
729-
682+
) (res *csi.CreateSnapshotResponse, err error) {
730683
fields := LogFields{"Method": "CreateSnapshot", "Type": "CSI_Controller"}
731684
Logc(ctx).WithFields(fields).Debug(">>>> CreateSnapshot")
732685
defer Logc(ctx).WithFields(fields).Debug("<<<< CreateSnapshot")
@@ -802,10 +755,7 @@ func (p *Plugin) CreateSnapshot(
802755

803756
func (p *Plugin) DeleteSnapshot(
804757
ctx context.Context, req *csi.DeleteSnapshotRequest,
805-
) (*csi.DeleteSnapshotResponse, error) {
806-
ctx = SetContextWorkflow(ctx, WorkflowSnapshotDelete)
807-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
808-
758+
) (res *csi.DeleteSnapshotResponse, err error) {
809759
fields := LogFields{"Method": "DeleteSnapshot", "Type": "CSI_Controller"}
810760
Logc(ctx).WithFields(fields).Debug(">>>> DeleteSnapshot")
811761
defer Logc(ctx).WithFields(fields).Debug("<<<< DeleteSnapshot")
@@ -842,10 +792,7 @@ func (p *Plugin) DeleteSnapshot(
842792

843793
func (p *Plugin) ListSnapshots(
844794
ctx context.Context, req *csi.ListSnapshotsRequest,
845-
) (*csi.ListSnapshotsResponse, error) {
846-
ctx = SetContextWorkflow(ctx, WorkflowSnapshotList)
847-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
848-
795+
) (res *csi.ListSnapshotsResponse, err error) {
849796
fields := LogFields{"Method": "ListSnapshots", "Type": "CSI_Controller"}
850797
Logc(ctx).WithFields(fields).Trace(">>>> ListSnapshots")
851798
defer Logc(ctx).WithFields(fields).Trace("<<<< ListSnapshots")
@@ -973,10 +920,7 @@ func (p *Plugin) getListSnapshots(
973920

974921
func (p *Plugin) ControllerExpandVolume(
975922
ctx context.Context, req *csi.ControllerExpandVolumeRequest,
976-
) (*csi.ControllerExpandVolumeResponse, error) {
977-
ctx = SetContextWorkflow(ctx, WorkflowVolumeResize)
978-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
979-
923+
) (res *csi.ControllerExpandVolumeResponse, err error) {
980924
fields := LogFields{"Method": "ControllerExpandVolume", "Type": "CSI_Controller"}
981925
Logc(ctx).WithFields(fields).Debug(">>>> ControllerExpandVolume")
982926
defer Logc(ctx).WithFields(fields).Debug("<<<< ControllerExpandVolume")

frontend/csi/group_controller_server.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2026 NetApp, Inc. All Rights Reserved.
22

33
package csi
44

@@ -23,10 +23,7 @@ var volumeGroupSnapshotBackoff = 10 * time.Second
2323

2424
func (p *Plugin) GroupControllerGetCapabilities(
2525
ctx context.Context, _ *csi.GroupControllerGetCapabilitiesRequest,
26-
) (*csi.GroupControllerGetCapabilitiesResponse, error) {
27-
ctx = SetContextWorkflow(ctx, WorkflowControllerGetCapabilities)
28-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
29-
26+
) (res *csi.GroupControllerGetCapabilitiesResponse, err error) {
3027
fields := LogFields{"Method": "GroupControllerGetCapabilities", "Type": "CSI_Controller"}
3128
Logc(ctx).WithFields(fields).Trace(">>>> GroupControllerGetCapabilities")
3229
defer Logc(ctx).WithFields(fields).Trace("<<<< GroupControllerGetCapabilities")
@@ -36,10 +33,7 @@ func (p *Plugin) GroupControllerGetCapabilities(
3633

3734
func (p *Plugin) CreateVolumeGroupSnapshot(
3835
ctx context.Context, req *csi.CreateVolumeGroupSnapshotRequest,
39-
) (*csi.CreateVolumeGroupSnapshotResponse, error) {
40-
ctx = SetContextWorkflow(ctx, WorkflowGroupSnapshotCreate)
41-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
42-
36+
) (res *csi.CreateVolumeGroupSnapshotResponse, err error) {
4337
fields := LogFields{
4438
"Method": "CreateVolumeGroupSnapshot",
4539
"Type": p.name,
@@ -135,10 +129,7 @@ func (p *Plugin) CreateVolumeGroupSnapshot(
135129

136130
func (p *Plugin) GetVolumeGroupSnapshot(
137131
ctx context.Context, req *csi.GetVolumeGroupSnapshotRequest,
138-
) (*csi.GetVolumeGroupSnapshotResponse, error) {
139-
ctx = SetContextWorkflow(ctx, WorkflowGroupSnapshotGet)
140-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
141-
132+
) (res *csi.GetVolumeGroupSnapshotResponse, err error) {
142133
fields := LogFields{
143134
"Method": "GetVolumeGroupSnapshot",
144135
"Type": p.name,
@@ -192,10 +183,7 @@ func (p *Plugin) GetVolumeGroupSnapshot(
192183

193184
func (p *Plugin) DeleteVolumeGroupSnapshot(
194185
ctx context.Context, req *csi.DeleteVolumeGroupSnapshotRequest,
195-
) (*csi.DeleteVolumeGroupSnapshotResponse, error) {
196-
ctx = SetContextWorkflow(ctx, WorkflowGroupSnapshotDelete)
197-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
198-
186+
) (res *csi.DeleteVolumeGroupSnapshotResponse, err error) {
199187
fields := LogFields{
200188
"Method": "DeleteVolumeGroupSnapshot",
201189
"Type": p.name,

frontend/csi/grpc.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2026 NetApp, Inc. All Rights Reserved.
22

33
// Copyright 2017 The Kubernetes Authors.
44

@@ -104,7 +104,10 @@ func (s *nonBlockingGRPCServer) serve(
104104
}
105105

106106
opts := []grpc.ServerOption{
107-
grpc.UnaryInterceptor(logGRPC),
107+
// The first interceptor is always the outermost.
108+
// When CSI calls come in, the outermost interceptor is hit first.
109+
// The log gRPC and timeout interceptors should always be the first in the chain.
110+
grpc.ChainUnaryInterceptor(logGRPCInterceptor, timeoutInterceptor, incomingRequestMetricsInterceptor),
108111
}
109112
server := grpc.NewServer(opts...)
110113
s.server = server

frontend/csi/identity_server.go

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2026 NetApp, Inc. All Rights Reserved.
22

33
package csi
44

@@ -15,17 +15,14 @@ import (
1515

1616
func (p *Plugin) Probe(
1717
ctx context.Context, req *csi.ProbeRequest,
18-
) (*csi.ProbeResponse, error) {
19-
ctx = SetContextWorkflow(ctx, WorkflowIdentityProbe)
20-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
21-
18+
) (res *csi.ProbeResponse, err error) {
2219
fields := LogFields{"Method": "Probe", "Type": "CSI_Identity"}
2320
Logc(ctx).WithFields(fields).Trace(">>>> Probe")
2421
defer Logc(ctx).WithFields(fields).Trace("<<<< Probe")
2522

2623
// Ensure Trident bootstrapped OK. We only return an error if Trident bootstrapping
2724
// failed (i.e. unrecoverable), not if Trident is still initializing.
28-
_, err := p.orchestrator.GetVersion(ctx)
25+
_, err = p.orchestrator.GetVersion(ctx)
2926
if errors.IsBootstrapError(err) {
3027
return &csi.ProbeResponse{}, status.Error(codes.FailedPrecondition, err.Error())
3128
}
@@ -35,10 +32,7 @@ func (p *Plugin) Probe(
3532

3633
func (p *Plugin) GetPluginInfo(
3734
ctx context.Context, req *csi.GetPluginInfoRequest,
38-
) (*csi.GetPluginInfoResponse, error) {
39-
ctx = SetContextWorkflow(ctx, WorkflowIdentityGetInfo)
40-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
41-
35+
) (res *csi.GetPluginInfoResponse, err error) {
4236
fields := LogFields{"Method": "GetPluginInfo", "Type": "CSI_Identity"}
4337
Logc(ctx).WithFields(fields).Trace(">>>> GetPluginInfo")
4438
defer Logc(ctx).WithFields(fields).Trace("<<<< GetPluginInfo")
@@ -51,10 +45,7 @@ func (p *Plugin) GetPluginInfo(
5145

5246
func (p *Plugin) GetPluginCapabilities(
5347
ctx context.Context, req *csi.GetPluginCapabilitiesRequest,
54-
) (*csi.GetPluginCapabilitiesResponse, error) {
55-
ctx = SetContextWorkflow(ctx, WorkflowIdentityGetCapabilities)
56-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
57-
48+
) (res *csi.GetPluginCapabilitiesResponse, err error) {
5849
fields := LogFields{"Method": "GetPluginCapabilities", "Type": "CSI_Identity", "topologyInUse": p.topologyInUse}
5950
Logc(ctx).WithFields(fields).Trace(">>>> GetPluginCapabilities")
6051
defer Logc(ctx).WithFields(fields).Trace("<<<< GetPluginCapabilities")

0 commit comments

Comments
 (0)