Skip to content

Commit 1b15efe

Browse files
committed
new: query conntrack when the ebpf one is expired
Signed-off-by: Andrea Terzolo <andreaterzolo3@gmail.com>
1 parent f1ce8aa commit 1b15efe

6 files changed

Lines changed: 105 additions & 36 deletions

File tree

checks/net_linux.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
"github.com/DataDog/datadog-agent/pkg/network/tracer"
12-
"github.com/DataDog/datadog-agent/pkg/util/kernel"
1312
"github.com/StackVista/stackstate-process-agent/config"
1413
"github.com/StackVista/stackstate-process-agent/model"
1514
"github.com/StackVista/stackstate-process-agent/pkg/pods"
@@ -35,17 +34,8 @@ func (c *ConnectionsCheck) Init(cfg *config.AgentConfig, _ *model.SystemInfo) er
3534
}
3635

3736
if cfg.NetworkTracer.PodCorrelation.Enabled {
38-
rootHandle, err := kernel.GetRootNetNamespace(kernel.ProcFSRoot())
39-
if err != nil {
40-
return fmt.Errorf("Failed to get root net namespace: %v", err)
41-
}
42-
43-
ino, err := kernel.GetInoForNs(rootHandle)
44-
if err != nil {
45-
return fmt.Errorf("Failed to get inode for root net namespace: %v", err)
46-
}
47-
48-
c.podCorrelation, err = newPodCorrelationInfo(&cfg.NetworkTracer.PodCorrelation, cfg.LogLevel, ino)
37+
cfg.NetworkTracer.PodCorrelation.ObserverLogLevel = cfg.LogLevel
38+
c.podCorrelation, err = newPodCorrelationInfo(&cfg.NetworkTracer.PodCorrelation)
4939
if err != nil {
5040
return err
5141
}

checks/net_pod_correlation.go

Lines changed: 97 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package checks
33
import (
44
"context"
55
"fmt"
6+
"net/netip"
67
"slices"
78
"time"
89

910
"github.com/DataDog/datadog-agent/pkg/network"
11+
"github.com/DataDog/datadog-agent/pkg/network/netlink"
1012
"github.com/DataDog/datadog-agent/pkg/process/util"
13+
"github.com/DataDog/datadog-agent/pkg/util/kernel"
1114
"github.com/DataDog/sketches-go/ddsketch"
1215
"github.com/StackVista/stackstate-process-agent/config"
1316
"github.com/StackVista/stackstate-process-agent/model"
@@ -18,6 +21,7 @@ import (
1821
"go.opentelemetry.io/obi/pkg/kubecache/meta"
1922
"go.opentelemetry.io/otel/attribute"
2023
"go.opentelemetry.io/otel/metric"
24+
"golang.org/x/sys/unix"
2125
)
2226

2327
// Attribute keys for pod correlation metrics
@@ -98,6 +102,7 @@ type podCorrelationInfo struct {
98102
storedConnections []*storedConnection
99103
rootNSIno uint32
100104
attributesKeys []string
105+
rootCtrk *netlink.Conntrack
101106
}
102107

103108
func validateAttributeKeys(keys []string) ([]string, error) {
@@ -118,7 +123,7 @@ func validateAttributeKeys(keys []string) ([]string, error) {
118123
return keys, nil
119124
}
120125

121-
func newPodCorrelationInfo(cfg *config.PodCorrelationConfig, logLevel string, rootNSIno uint32) (*podCorrelationInfo, error) {
126+
func newPodCorrelationInfo(cfg *config.PodCorrelationConfig) (*podCorrelationInfo, error) {
122127
log.Infof("Pod correlation enabled (protocol_metrics: %v), (partial_correlation: %v)", cfg.ProtocolMetrics, cfg.PartialCorrelation)
123128

124129
attrs, err := validateAttributeKeys(cfg.AttributesKeys)
@@ -132,21 +137,41 @@ func newPodCorrelationInfo(cfg *config.PodCorrelationConfig, logLevel string, ro
132137
exportProtocolMetrics: cfg.ProtocolMetrics,
133138
exportPartialCorrelation: cfg.PartialCorrelation,
134139
storedConnections: make([]*storedConnection, 0),
135-
rootNSIno: rootNSIno,
136140
attributesKeys: attrs,
137141
}
138142

139143
switch cfg.Exporter.Type {
140144
case config.ExporterTypeDisabled:
141145
// Used only in tests to disable exporting and avoid starting the informer
146+
podCorrelationInfo.rootNSIno = 0
147+
podCorrelationInfo.rootCtrk = nil
142148
case config.ExporterTypeManual:
143-
// Here we only want the exporter
149+
// Used only in tests to query the exporter
150+
podCorrelationInfo.rootNSIno = 0
151+
podCorrelationInfo.rootCtrk = nil
144152
if podCorrelationInfo.metrics, err = telemetry.NewMetricsExporter(cfg.Exporter); err != nil {
145153
return nil, err
146154
}
147155
case config.ExporterTypeStdout, config.ExporterTypeOTLP:
156+
// We create the root netns handle and the conntrack table only in production use cases
157+
rootHandle, err := kernel.GetRootNetNamespace(kernel.ProcFSRoot())
158+
if err != nil {
159+
return nil, fmt.Errorf("Failed to get root net namespace: %v", err)
160+
}
161+
162+
podCorrelationInfo.rootNSIno, err = kernel.GetInoForNs(rootHandle)
163+
if err != nil {
164+
return nil, fmt.Errorf("Failed to get inode for root net namespace: %v", err)
165+
}
166+
167+
ctrk, err := netlink.NewConntrack(rootHandle)
168+
if err != nil {
169+
return nil, fmt.Errorf("Failed to create conntrack for root namespace: %v", err)
170+
}
171+
podCorrelationInfo.rootCtrk = &ctrk
172+
148173
// Here we want the informer and the exporter
149-
if err := podCorrelationInfo.startKubernetesInformer(cfg, logLevel); err != nil {
174+
if err := podCorrelationInfo.startKubernetesInformer(cfg); err != nil {
150175
return nil, err
151176
}
152177
if podCorrelationInfo.metrics, err = telemetry.NewMetricsExporter(cfg.Exporter); err != nil {
@@ -157,14 +182,14 @@ func newPodCorrelationInfo(cfg *config.PodCorrelationConfig, logLevel string, ro
157182
return podCorrelationInfo, nil
158183
}
159184

160-
func (pi *podCorrelationInfo) startKubernetesInformer(cfg *config.PodCorrelationConfig, logLevel string) error {
185+
func (pi *podCorrelationInfo) startKubernetesInformer(cfg *config.PodCorrelationConfig) error {
161186
log.Info("starting kubernetes informer for pod correlated metrics")
162187
informerCfg := kube.InformerConfig{
163188
KubeConfigPath: cfg.KubeConfigPath,
164189
SyncTimeout: 30 * time.Second,
165190
ResyncPeriod: 5 * time.Minute,
166191
MetaCacheAddr: cfg.RemoteCacheAddr,
167-
LogLevel: logLevel,
192+
LogLevel: cfg.ObserverLogLevel,
168193
}
169194

170195
var err error
@@ -281,6 +306,69 @@ func (pi *podCorrelationInfo) generateProtocolMetrics(conn *network.ConnectionSt
281306
}
282307
}
283308

309+
func conntrackOriginTCP(connTuple *network.ConnectionTuple) netlink.ConTuple {
310+
return netlink.ConTuple{
311+
Src: netip.AddrPortFrom(connTuple.Source.Addr, connTuple.SPort),
312+
Dst: netip.AddrPortFrom(connTuple.Dest.Addr, connTuple.DPort),
313+
Proto: unix.IPPROTO_TCP,
314+
}
315+
}
316+
317+
func (pi *podCorrelationInfo) tryClusterIpResolution(conn *network.ConnectionStats) *kube.PodInfo {
318+
// if we arrive here dstPodInfo is nil and the connection is OUTGOING
319+
var dstPodInfo *kube.PodInfo
320+
// First we try the resolution using the translation info attached to the connection
321+
// this comes from the ebpf conntrack cache, that could be expired
322+
if conn.IPTranslation != nil && conn.IPTranslation.ReplSrcIP.IsValid() {
323+
dstPodInfo = pi.observer.ResolvePodByIP(conn.IPTranslation.ReplSrcIP, conn.Duration)
324+
// we need to replace also the remote IP and port from ClusterIP to Pod
325+
if dstPodInfo != nil {
326+
conn.ConnectionTuple.Dest = conn.IPTranslation.ReplSrcIP
327+
conn.ConnectionTuple.DPort = conn.IPTranslation.ReplSrcPort
328+
return dstPodInfo
329+
}
330+
}
331+
332+
// dstPodInfo is still nil...
333+
334+
// if the conntrack ebpf cache is expired, we try to query the conntrack table in the root netns if present. At the moment we don't define it for tests
335+
if pi.rootCtrk == nil {
336+
return nil
337+
}
338+
339+
// we convert the tuple in the conntrack format
340+
// we need only the origin field.
341+
origin := netlink.Con{Origin: conntrackOriginTCP(&conn.ConnectionTuple)}
342+
reply, err := (*pi.rootCtrk).Get(&origin)
343+
// if there is no entry we don't return an error we return an empty reply
344+
if err != nil {
345+
log.Warnf("Failed to query conntrack table in root netns for connection %v: %v", conn, err)
346+
return nil
347+
}
348+
349+
// if there are no entries we obtain an empty netlink.Con{}
350+
// we check if the port is 0 to understand if it is empty
351+
if reply.Reply.Src.Port() == 0 {
352+
return nil
353+
}
354+
355+
// Example of a conntrack entry that converts a ClusterIP to Pod IP:
356+
// key: src(10.42.0.10:35926) -> dst(10.43.168.100:5432) --- value: src(10.42.0.9:5432) -> dst(10.42.0.10:35926)
357+
// if there is not natting we are not interested
358+
if conn.ConnectionTuple.Dest.Addr == reply.Reply.Src.Addr() {
359+
return nil
360+
}
361+
362+
// it is possible that we have the translated IP in the reply src field
363+
dstPodInfo = pi.observer.ResolvePodByIP(util.Address{Addr: reply.Reply.Src.Addr()}, conn.Duration)
364+
if dstPodInfo != nil {
365+
log.Debugf("Resolved pod info for connection %v -> %v", conn.ConnectionTuple.Dest, reply.Reply.Src.Addr())
366+
conn.ConnectionTuple.Dest = util.Address{Addr: reply.Reply.Src.Addr()}
367+
conn.ConnectionTuple.DPort = reply.Reply.Src.Port()
368+
}
369+
return dstPodInfo
370+
}
371+
284372
func (pi *podCorrelationInfo) exportOTELMetrics(conn *network.ConnectionStats, metrics []*model.ConnectionMetric) {
285373
// 1. Pod -> Pod (INCOMING and OUTGOING)
286374
// 2. Pod -> Pod HostNetwork (OUTGOING)
@@ -294,14 +382,9 @@ func (pi *podCorrelationInfo) exportOTELMetrics(conn *network.ConnectionStats, m
294382
return
295383
}
296384

297-
if conn.Direction == network.OUTGOING {
298-
// We try the resolution
299-
if dstPodInfo == nil && conn.IPTranslation != nil && conn.IPTranslation.ReplSrcIP.IsValid() {
300-
dstPodInfo = pi.observer.ResolvePodByIP(conn.IPTranslation.ReplSrcIP, conn.Duration)
301-
// we need to replace also the remote IP and port from ClusterIP to Pod
302-
conn.ConnectionTuple.Dest = conn.IPTranslation.ReplSrcIP
303-
conn.ConnectionTuple.DPort = conn.IPTranslation.ReplSrcPort
304-
}
385+
// if we have the destination pod there is no need to try to resolve it
386+
if conn.Direction == network.OUTGOING && dstPodInfo == nil {
387+
dstPodInfo = pi.tryClusterIpResolution(conn)
305388
}
306389

307390
// if one of the 2 is nil we need to check if we want to export partial correlation

checks/net_pod_correlation_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ func TestPodCorrelation(t *testing.T) {
7373
hostIP = util.AddressFromString("192.168.1.7")
7474
localhostIP = util.AddressFromString("127.0.0.1")
7575
randomLocalHostPort = uint16(46734)
76-
77-
hostNs = uint32(1)
7876
)
7977

8078
defaultPostgresOutgoingConnection := network.ConnectionStats{
@@ -362,10 +360,9 @@ func TestPodCorrelation(t *testing.T) {
362360
Exporter: config.ExporterConfig{
363361
Type: config.ExporterTypeManual,
364362
},
365-
AttributesKeys: AllAttributeKeys,
366-
},
367-
"debug",
368-
hostNs)
363+
AttributesKeys: AllAttributeKeys,
364+
ObserverLogLevel: "debug",
365+
})
369366
require.NoError(t, err)
370367
// Overwrite the observer in the pod correlation struct
371368
pi.observer = obs
@@ -544,8 +541,6 @@ func TestGetMetricAttributes(t *testing.T) {
544541
},
545542
AttributesKeys: tt.requiredAttributesKeys,
546543
},
547-
"debug",
548-
0,
549544
)
550545
require.NoError(t, err)
551546

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type PodCorrelationConfig struct {
7575
AttributesKeys []string
7676
// Interval to consider a connection short-lived
7777
ShortLivedConnectionsInterval time.Duration
78+
// Logger level for k8s observer
79+
ObserverLogLevel string
7880
}
7981

8082
// NetworkTracerConfig contains some[1] of the network tracer configuration options

k8s-deployment/yaml/postgres.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ spec:
6060
#
6161
#
6262
# For 90000 seconds we send a select query every 1 second on the same PostgreSQL connection.
63-
# Then we start again with a new connection.
6463
#
6564
#
6665
- |

pkg/kube/informer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,6 @@ func GetInformer(cfg InformerConfig) (meta.Notifier, error) {
8080
log.Infof("Using remote K8s cache service at '%s'", cfg.MetaCacheAddr)
8181
return initRemoteInformerCacheClient(context.Background(), cfg.MetaCacheAddr, cfg.SyncTimeout)
8282
}
83-
log.Info("Using local K8s informers")
83+
log.Infof("Using local K8s informers with kubeconfig path: '%s'", cfg.KubeConfigPath)
8484
return initLocalInformers(context.Background(), cfg)
8585
}

0 commit comments

Comments
 (0)