Skip to content

Commit 910effa

Browse files
committed
new: configure metrics attribute
Signed-off-by: Andrea Terzolo <andreaterzolo3@gmail.com>
1 parent 2b2a08b commit 910effa

5 files changed

Lines changed: 330 additions & 135 deletions

File tree

checks/net_pod_correlation.go

Lines changed: 117 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package checks
33
import (
44
"context"
55
"fmt"
6+
"slices"
67
"time"
78

89
"github.com/DataDog/datadog-agent/pkg/network"
@@ -19,7 +20,12 @@ import (
1920
"go.opentelemetry.io/otel/metric"
2021
)
2122

23+
// Attribute keys for pod correlation metrics
2224
const (
25+
// Please bump this every time you add a new attribute key
26+
// this is used by tests to ensure we don't forget to update the list of all keys
27+
numAttributeKeys = 11
28+
2329
// We export them so that we can reuse these fields in e2e tests
2430

2531
// DirectionKey is the direction of the connection (0=outgoing, 1=incoming)
@@ -52,6 +58,32 @@ const (
5258
RemoteLabelsKey = "remote.pod.label"
5359
)
5460

61+
var (
62+
// Keep this updated with all the possible keys
63+
AllAttributeKeys = []string{
64+
DirectionKey,
65+
LocalIPKey,
66+
LocalPortKey,
67+
LocalPodNameKey,
68+
LocalNSKey,
69+
LocalLabelsKey,
70+
RemoteIPKey,
71+
RemotePortKey,
72+
RemotePodNameKey,
73+
RemoteNSKey,
74+
RemoteLabelsKey,
75+
}
76+
77+
DefaultAttributeKeys = []string{
78+
DirectionKey,
79+
LocalPodNameKey,
80+
LocalNSKey,
81+
LocalLabelsKey,
82+
RemotePodNameKey,
83+
RemoteNSKey,
84+
}
85+
)
86+
5587
type storedConnection struct {
5688
conn *network.ConnectionStats
5789
protocolMetrics []*model.ConnectionMetric
@@ -65,26 +97,61 @@ type podCorrelationInfo struct {
6597
exportPartialCorrelation bool
6698
storedConnections []*storedConnection
6799
rootNSIno uint32
100+
attributesKeys []string
101+
}
102+
103+
func validateAttributeKeys(keys []string) ([]string, error) {
104+
if len(keys) == 0 {
105+
// return the default keys
106+
return DefaultAttributeKeys, nil
107+
}
108+
109+
// We want unique keys
110+
slices.Sort(keys)
111+
keys = slices.Compact(keys)
112+
113+
for _, key := range keys {
114+
if !slices.Contains(AllAttributeKeys, key) {
115+
return nil, fmt.Errorf("invalid attribute key: '%s'", key)
116+
}
117+
}
118+
return keys, nil
68119
}
69120

70121
func newPodCorrelationInfo(cfg *config.PodCorrelationConfig, logLevel string, rootNSIno uint32) (*podCorrelationInfo, error) {
122+
log.Infof("Pod correlation enabled (protocol_metrics: %v), (partial_correlation: %v)", cfg.ProtocolMetrics, cfg.PartialCorrelation)
123+
124+
attrs, err := validateAttributeKeys(cfg.AttributesKeys)
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
log.Info("Using the following attribute keys for pod correlated metrics: ", attrs)
130+
71131
podCorrelationInfo := &podCorrelationInfo{
72132
exportProtocolMetrics: cfg.ProtocolMetrics,
73133
exportPartialCorrelation: cfg.PartialCorrelation,
74134
storedConnections: make([]*storedConnection, 0),
75135
rootNSIno: rootNSIno,
136+
attributesKeys: attrs,
76137
}
77138

78-
// if we are in tests we don't want to start the informer
79-
if cfg.Exporter.Type != config.ExporterTypeManual {
139+
switch cfg.Exporter.Type {
140+
case config.ExporterTypeDisabled:
141+
// Used only in tests to disable exporting and avoid starting the informer
142+
case config.ExporterTypeManual:
143+
// Here we only want the exporter
144+
if podCorrelationInfo.metrics, err = telemetry.NewMetricsExporter(cfg.Exporter); err != nil {
145+
return nil, err
146+
}
147+
case config.ExporterTypeStdout, config.ExporterTypeOTLP:
148+
// Here we want the informer and the exporter
80149
if err := podCorrelationInfo.startKubernetesInformer(cfg, logLevel); err != nil {
81150
return nil, err
82151
}
83-
}
84-
85-
var err error
86-
if podCorrelationInfo.metrics, err = telemetry.NewMetricsExporter(cfg.Exporter); err != nil {
87-
return nil, err
152+
if podCorrelationInfo.metrics, err = telemetry.NewMetricsExporter(cfg.Exporter); err != nil {
153+
return nil, err
154+
}
88155
}
89156

90157
return podCorrelationInfo, nil
@@ -115,7 +182,7 @@ func (pi *podCorrelationInfo) startKubernetesInformer(cfg *config.PodCorrelation
115182
}
116183

117184
func (pi *podCorrelationInfo) generateConnectionMetrics(conn *network.ConnectionStats, srcPodInfo, dstPodInfo *kube.PodInfo) {
118-
attrs := attribute.NewSet(getMetricAttributes(conn, srcPodInfo, dstPodInfo)...)
185+
attrs := attribute.NewSet(pi.getMetricAttributes(conn, srcPodInfo, dstPodInfo)...)
119186

120187
pi.metrics.BytesRecv.Add(context.Background(), int64(conn.Last.RecvBytes), metric.WithAttributeSet(attrs))
121188
pi.metrics.BytesSent.Add(context.Background(), int64(conn.Last.SentBytes), metric.WithAttributeSet(attrs))
@@ -136,33 +203,53 @@ func addPrefixToLabels(prefix string, labels map[string]string) []attribute.KeyV
136203
return otelLabels
137204
}
138205

139-
func getMetricAttributes(conn *network.ConnectionStats, localPodInfo, remotePodInfo *kube.PodInfo) []attribute.KeyValue {
140-
attributes := []attribute.KeyValue{
141-
attribute.String(LocalIPKey, conn.ConnectionTuple.Source.String()),
142-
attribute.String(LocalPortKey, fmt.Sprintf("%d", conn.ConnectionTuple.SPort)),
143-
attribute.String(RemoteIPKey, conn.ConnectionTuple.Dest.String()),
144-
attribute.String(RemotePortKey, fmt.Sprintf("%d", conn.ConnectionTuple.DPort)),
145-
attribute.String(DirectionKey, connectionDirectionToString(conn.Direction)),
146-
}
147-
if localPodInfo != nil {
148-
attributes = append(attributes,
149-
attribute.String(LocalPodNameKey, localPodInfo.Name),
150-
attribute.String(LocalNSKey, localPodInfo.Namespace),
151-
)
152-
attributes = append(attributes, addPrefixToLabels(LocalLabelsKey, localPodInfo.Labels)...)
153-
}
154-
if remotePodInfo != nil {
155-
attributes = append(attributes,
156-
attribute.String(RemotePodNameKey, remotePodInfo.Name),
157-
attribute.String(RemoteNSKey, remotePodInfo.Namespace),
158-
)
159-
attributes = append(attributes, addPrefixToLabels(RemoteLabelsKey, remotePodInfo.Labels)...)
206+
func (pi *podCorrelationInfo) getMetricAttributes(conn *network.ConnectionStats, localPodInfo, remotePodInfo *kube.PodInfo) []attribute.KeyValue {
207+
attributes := make([]attribute.KeyValue, 0, len(pi.attributesKeys))
208+
for _, key := range pi.attributesKeys {
209+
switch key {
210+
case LocalIPKey:
211+
attributes = append(attributes, attribute.String(LocalIPKey, conn.ConnectionTuple.Source.String()))
212+
case LocalPortKey:
213+
attributes = append(attributes, attribute.String(LocalPortKey, fmt.Sprintf("%d", conn.ConnectionTuple.SPort)))
214+
case RemoteIPKey:
215+
attributes = append(attributes, attribute.String(RemoteIPKey, conn.ConnectionTuple.Dest.String()))
216+
case RemotePortKey:
217+
attributes = append(attributes, attribute.String(RemotePortKey, fmt.Sprintf("%d", conn.ConnectionTuple.DPort)))
218+
case DirectionKey:
219+
attributes = append(attributes, attribute.String(DirectionKey, connectionDirectionToString(conn.Direction)))
220+
case LocalPodNameKey:
221+
if localPodInfo != nil {
222+
attributes = append(attributes, attribute.String(LocalPodNameKey, localPodInfo.Name))
223+
}
224+
case LocalNSKey:
225+
if localPodInfo != nil {
226+
attributes = append(attributes, attribute.String(LocalNSKey, localPodInfo.Namespace))
227+
}
228+
case LocalLabelsKey:
229+
if localPodInfo != nil {
230+
attributes = append(attributes, addPrefixToLabels(LocalLabelsKey, localPodInfo.Labels)...)
231+
}
232+
case RemotePodNameKey:
233+
if remotePodInfo != nil {
234+
attributes = append(attributes, attribute.String(RemotePodNameKey, remotePodInfo.Name))
235+
}
236+
case RemoteNSKey:
237+
if remotePodInfo != nil {
238+
attributes = append(attributes, attribute.String(RemoteNSKey, remotePodInfo.Namespace))
239+
}
240+
case RemoteLabelsKey:
241+
if remotePodInfo != nil {
242+
attributes = append(attributes, addPrefixToLabels(RemoteLabelsKey, remotePodInfo.Labels)...)
243+
}
244+
default:
245+
panic(fmt.Sprintf("Unknown attribute key: %s", key))
246+
}
160247
}
161248
return attributes
162249
}
163250

164251
func (pi *podCorrelationInfo) generateProtocolMetrics(conn *network.ConnectionStats, srcPodInfo, dstPodInfo *kube.PodInfo, metrics []*model.ConnectionMetric) {
165-
attr := getMetricAttributes(conn, srcPodInfo, dstPodInfo)
252+
attr := pi.getMetricAttributes(conn, srcPodInfo, dstPodInfo)
166253

167254
for _, m := range metrics {
168255
// todo!: for now we only support postgres metrics.

checks/net_pod_correlation_test.go

Lines changed: 108 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func TestPodCorrelation(t *testing.T) {
146146
metrics := rm.ScopeMetrics[0].Metrics
147147
sortOTELMetricsByName(metrics)
148148

149-
attrs := getMetricAttributes(&conn, &postgresClientPodInfo, &postgresServerPodInfo)
149+
attrs := pi.getMetricAttributes(&conn, &postgresClientPodInfo, &postgresServerPodInfo)
150150
assertInt64Metric(t, metrics[0], telemetry.ReceivedMetricName, metricdata.DataPoint[int64]{
151151
Value: postgresClientReceivedBytes,
152152
Attributes: attribute.NewSet(attrs...),
@@ -172,7 +172,7 @@ func TestPodCorrelation(t *testing.T) {
172172
metrics := rm.ScopeMetrics[0].Metrics
173173
sortOTELMetricsByName(metrics)
174174

175-
attrs := getMetricAttributes(&conn, &postgresServerPodInfo, &postgresClientPodInfo)
175+
attrs := pi.getMetricAttributes(&conn, &postgresServerPodInfo, &postgresClientPodInfo)
176176
assertInt64Metric(t, metrics[0], telemetry.ReceivedMetricName, metricdata.DataPoint[int64]{
177177
Value: postgresClientSentBytes,
178178
Attributes: attribute.NewSet(attrs...),
@@ -249,7 +249,7 @@ func TestPodCorrelation(t *testing.T) {
249249
metrics := rm.ScopeMetrics[0].Metrics
250250
sortOTELMetricsByName(metrics)
251251

252-
attrs := getMetricAttributes(&conn, &postgresServerPodInfo, nil)
252+
attrs := pi.getMetricAttributes(&conn, &postgresServerPodInfo, nil)
253253
assertInt64Metric(t, metrics[0], telemetry.ReceivedMetricName, metricdata.DataPoint[int64]{
254254
// The connection is incoming so they recv/sent are inverted.
255255
Value: 222,
@@ -276,7 +276,7 @@ func TestPodCorrelation(t *testing.T) {
276276
metrics := rm.ScopeMetrics[0].Metrics
277277
sortOTELMetricsByName(metrics)
278278

279-
attrs := getMetricAttributes(&conn, nil, &postgresClientPodInfo)
279+
attrs := pi.getMetricAttributes(&conn, nil, &postgresClientPodInfo)
280280
assertInt64Metric(t, metrics[0], telemetry.ReceivedMetricName, metricdata.DataPoint[int64]{
281281
// The connection is incoming so they recv/sent are inverted.
282282
Value: 222,
@@ -318,8 +318,9 @@ func TestPodCorrelation(t *testing.T) {
318318
Exporter: config.ExporterConfig{
319319
Type: config.ExporterTypeManual,
320320
},
321+
AttributesKeys: AllAttributeKeys,
321322
},
322-
"DEBUG",
323+
"debug",
323324
hostNs)
324325
require.NoError(t, err)
325326
// Overwrite the observer in the pod correlation struct
@@ -441,39 +442,120 @@ func TestGetMetricAttributes(t *testing.T) {
441442
}
442443

443444
tests := []struct {
444-
name string
445-
conn network.ConnectionStats
446-
want []attribute.KeyValue
447-
localPod *kube.PodInfo
448-
remotePod *kube.PodInfo
445+
name string
446+
conn network.ConnectionStats
447+
want []attribute.KeyValue
448+
localPod *kube.PodInfo
449+
remotePod *kube.PodInfo
450+
requiredAttributesKeys []string
449451
}{
450452
{
451-
name: "outgoing_both_pods",
452-
conn: outgoing,
453-
localPod: clientPod,
454-
remotePod: serverPod,
455-
want: allAttributes,
453+
name: "outgoing_both_pods",
454+
conn: outgoing,
455+
localPod: clientPod,
456+
remotePod: serverPod,
457+
requiredAttributesKeys: AllAttributeKeys,
458+
want: allAttributes,
456459
},
457460
{
458-
name: "outgoing_missing_dst_pod_only_client_attrs",
459-
conn: outgoing,
460-
localPod: clientPod,
461-
remotePod: nil,
462-
want: clientAttr,
461+
name: "outgoing_both_pods_limited_required_attrs",
462+
conn: outgoing,
463+
localPod: clientPod,
464+
remotePod: serverPod,
465+
requiredAttributesKeys: DefaultAttributeKeys,
466+
want: []attribute.KeyValue{
467+
attribute.String(LocalPodNameKey, clientPod.Name),
468+
attribute.String(LocalNSKey, clientPod.Namespace),
469+
attribute.String(localLabel1OTELKey, label1ValueClient),
470+
attribute.String(localLabel2OTELKey, label2ValueClient),
471+
attribute.String(RemotePodNameKey, serverPod.Name),
472+
attribute.String(RemoteNSKey, serverPod.Namespace),
473+
attribute.String(DirectionKey, connectionDirectionToString(outgoing.Direction)),
474+
},
475+
},
476+
{
477+
name: "outgoing_missing_dst_pod_only_client_attrs",
478+
conn: outgoing,
479+
localPod: clientPod,
480+
remotePod: nil,
481+
requiredAttributesKeys: AllAttributeKeys,
482+
want: clientAttr,
463483
},
464484
{
465-
name: "outgoing_missing_src_pod_only_server_attrs",
466-
conn: outgoing,
467-
localPod: nil,
468-
remotePod: serverPod,
469-
want: serverAttr,
485+
name: "outgoing_missing_src_pod_only_server_attrs",
486+
conn: outgoing,
487+
localPod: nil,
488+
remotePod: serverPod,
489+
requiredAttributesKeys: AllAttributeKeys,
490+
want: serverAttr,
470491
},
471492
}
472493

473494
for _, tt := range tests {
474495
t.Run(tt.name, func(t *testing.T) {
475-
got := getMetricAttributes(&tt.conn, tt.localPod, tt.remotePod)
496+
pi, err := newPodCorrelationInfo(
497+
&config.PodCorrelationConfig{
498+
Exporter: config.ExporterConfig{
499+
Type: config.ExporterTypeDisabled,
500+
},
501+
AttributesKeys: tt.requiredAttributesKeys,
502+
},
503+
"debug",
504+
0,
505+
)
506+
require.NoError(t, err)
507+
508+
got := pi.getMetricAttributes(&tt.conn, tt.localPod, tt.remotePod)
476509
require.ElementsMatch(t, tt.want, got)
477510
})
478511
}
479512
}
513+
514+
func TestAttributesKeysLen(t *testing.T) {
515+
require.Equal(t, numAttributeKeys, len(AllAttributeKeys), "Please update the AllAttributeKeys variable in net_pod_correlation.go if you added a new attribute key")
516+
}
517+
518+
func TestValidateAttributeKeys(t *testing.T) {
519+
tests := []struct {
520+
name string
521+
input []string
522+
expected []string
523+
}{
524+
{
525+
name: "valid_set",
526+
input: AllAttributeKeys,
527+
expected: AllAttributeKeys,
528+
},
529+
{
530+
name: "invalid_key",
531+
input: []string{LocalIPKey, "invalid.key"},
532+
expected: nil,
533+
},
534+
{
535+
name: "empty_set",
536+
input: []string{},
537+
expected: DefaultAttributeKeys,
538+
},
539+
{
540+
name: "duplicate_keys",
541+
input: []string{LocalIPKey, LocalIPKey, RemoteIPKey},
542+
expected: []string{LocalIPKey, RemoteIPKey},
543+
},
544+
}
545+
546+
for _, tt := range tests {
547+
t.Run(tt.name, func(t *testing.T) {
548+
got, err := validateAttributeKeys(tt.input)
549+
550+
if tt.expected == nil {
551+
require.Error(t, err)
552+
return
553+
}
554+
555+
require.NoError(t, err)
556+
require.ElementsMatch(t, tt.expected, got)
557+
558+
})
559+
}
560+
561+
}

0 commit comments

Comments
 (0)