From 16e530071fcff3980b2e5ea039051d9e49631c6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Schottst=C3=A4dt?= Date: Sun, 31 May 2026 15:37:21 +0200 Subject: [PATCH] fix(kubernetes): use pod stats for keepalive --- README.md | 2 +- apps/druid/adapters/cli/daemon.go | 3 - config/helm-charts/druid-cli/chart_test.go | 8 +- .../druid-cli/templates/deployment.yaml | 2 - .../helm-charts/druid-cli/templates/rbac.yaml | 3 + config/helm-charts/druid-cli/values.yaml | 3 - docs_md/kubernetes_keepalive.md | 19 +- go.mod | 29 +- go.sum | 32 +- internal/runtime/kubernetes/backend.go | 29 +- internal/runtime/kubernetes/config.go | 15 +- internal/runtime/kubernetes/config_test.go | 13 - internal/runtime/kubernetes/hubble.go | 95 ----- internal/runtime/kubernetes/keepalive.go | 49 ++- internal/runtime/kubernetes/ports.go | 76 ++-- internal/runtime/kubernetes/resources_test.go | 206 ++++++++--- internal/runtime/kubernetes/traffic.go | 330 ++++++++++++++++++ test/integration/docker/docker_cli_test.go | 12 + test/integration/internal/e2e/harness.go | 15 +- .../kubernetes/kubernetes_cli_test.go | 45 ++- 20 files changed, 644 insertions(+), 342 deletions(-) delete mode 100644 internal/runtime/kubernetes/hubble.go create mode 100644 internal/runtime/kubernetes/traffic.go diff --git a/README.md b/README.md index f10e3f50..eea87aad 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ There is also websocket support for stdout. TTY is also supported. Runtime selection is daemon-only: start the daemon with `druid daemon --runtime docker`, then use `druid` to create, run, and inspect scrolls without passing a runtime. Docker runtime state stays in SQLite under the runtime state directory. Scroll specs and runtime data live together in one runtime root. -Kubernetes runtime support is available with `druid daemon --runtime kubernetes` for in-cluster daemons or out-of-cluster daemons using kubeconfig. It stores daemon scroll state in ConfigMaps, materializes OCI artifacts through `druid worker pull` Jobs, and uses Cilium/Hubble Relay for port traffic presence. See `docs/kubernetes_runtime.md` for kubeconfig, RBAC, PVC, and Hubble setup. +Kubernetes runtime support is available with `druid daemon --runtime kubernetes` for in-cluster daemons or out-of-cluster daemons using kubeconfig. It stores daemon scroll state in ConfigMaps, materializes OCI artifacts through `druid worker pull` Jobs, and uses kubelet pod stats for procedure-level traffic checks. See `docs/kubernetes_runtime.md` for kubeconfig, RBAC, and PVC setup. ## Documentation diff --git a/apps/druid/adapters/cli/daemon.go b/apps/druid/adapters/cli/daemon.go index 525c55c0..7369d314 100644 --- a/apps/druid/adapters/cli/daemon.go +++ b/apps/druid/adapters/cli/daemon.go @@ -35,7 +35,6 @@ var k8sUIS3Region string var k8sUIS3Endpoint string var k8sUIS3Prefix string var k8sUIS3Secret string -var hubbleRelayAddr string var k8sKubeconfig string var runtimeListen string var runtimePublicListen string @@ -98,7 +97,6 @@ func init() { DaemonCommand.Flags().StringVar(&k8sUIS3Prefix, "k8s-ui-s3-prefix", "", "Optional S3 key prefix for UI packages (default: DRUID_K8S_UI_S3_PREFIX)") DaemonCommand.Flags().StringVar(&k8sUIS3Secret, "k8s-ui-s3-credentials-secret", "", "Kubernetes secret with AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (default: DRUID_K8S_UI_S3_CREDENTIALS_SECRET)") DaemonCommand.Flags().StringVar(&k8sKubeconfig, "k8s-kubeconfig", "", "Kubernetes kubeconfig path for out-of-cluster runtime access (default: DRUID_K8S_KUBECONFIG, KUBECONFIG, or ~/.kube/config)") - DaemonCommand.Flags().StringVar(&hubbleRelayAddr, "hubble-relay-addr", "", "Hubble Relay gRPC address for Kubernetes port traffic (default: DRUID_HUBBLE_RELAY_ADDR or hubble-relay.kube-system.svc.cluster.local:80)") } func runRuntimeDaemon() error { @@ -111,7 +109,6 @@ func runRuntimeDaemon() error { StorageClass: k8sStorageClass, PullImage: k8sPullImage, RegistrySecret: k8sRegistrySecret, - HubbleRelayAddr: hubbleRelayAddr, Kubeconfig: k8sKubeconfig, UIS3Bucket: k8sUIS3Bucket, UIS3PublicBaseURL: k8sUIS3PublicBaseURL, diff --git a/config/helm-charts/druid-cli/chart_test.go b/config/helm-charts/druid-cli/chart_test.go index 9272b21a..ad3dc52f 100644 --- a/config/helm-charts/druid-cli/chart_test.go +++ b/config/helm-charts/druid-cli/chart_test.go @@ -30,7 +30,7 @@ func TestChartRendersDefaultAndCustomValues(t *testing.T) { `resources: ["secrets"]`, `resources: ["pods/attach"]`, `verbs: ["create"]`, - "hubble-relay.kube-system.svc.cluster.local:80", + `resources: ["nodes/proxy"]`, } { if !strings.Contains(defaultManifest, want) { t.Fatalf("default manifest does not contain %q", want) @@ -51,7 +51,6 @@ func TestChartRendersDefaultAndCustomValues(t *testing.T) { "--set", "runtime.pullImage=registry.local/druid-cli:e2e", "--set", "runtime.helperImage=busybox:1.36", "--set", "runtime.kubeconfigSecret.name=druid-kubeconfig", - "--set", "hubble.relayAddr=hubble.example:80", "--set", "networkPolicy.enabled=true", "--set", "ingress.enabled=true", "--set", "ingress.hosts[0].host=runtime.example.test", @@ -68,7 +67,6 @@ func TestChartRendersDefaultAndCustomValues(t *testing.T) { "value: \"busybox:1.36\"", "value: \"true\"", "value: /etc/druid/kubeconfig", - "hubble.example:80", "kind: NetworkPolicy", "kind: Ingress", "runtime.example.test", @@ -80,6 +78,10 @@ func TestChartRendersDefaultAndCustomValues(t *testing.T) { t.Fatalf("custom manifest does not contain %q", want) } } + removedTrafficEnv := "DRUID_" + "HU" + "BBLE_RELAY_ADDR" + if strings.Contains(defaultManifest, removedTrafficEnv) || strings.Contains(customManifest, removedTrafficEnv) { + t.Fatal("chart rendered removed traffic environment") + } } func helmTemplate(t *testing.T, args ...string) string { diff --git a/config/helm-charts/druid-cli/templates/deployment.yaml b/config/helm-charts/druid-cli/templates/deployment.yaml index 2daf2306..9410237a 100644 --- a/config/helm-charts/druid-cli/templates/deployment.yaml +++ b/config/helm-charts/druid-cli/templates/deployment.yaml @@ -68,8 +68,6 @@ spec: value: {{ .Values.runtime.registrySecret | quote }} - name: DRUID_REGISTRY_PLAIN_HTTP value: {{ ternary "true" "false" .Values.runtime.registryPlainHTTP | quote }} - - name: DRUID_HUBBLE_RELAY_ADDR - value: {{ .Values.hubble.relayAddr | quote }} {{- if .Values.runtime.kubeconfigSecret.name }} - name: DRUID_K8S_KUBECONFIG value: /etc/druid/kubeconfig diff --git a/config/helm-charts/druid-cli/templates/rbac.yaml b/config/helm-charts/druid-cli/templates/rbac.yaml index a41f1091..a4de92cf 100644 --- a/config/helm-charts/druid-cli/templates/rbac.yaml +++ b/config/helm-charts/druid-cli/templates/rbac.yaml @@ -21,6 +21,9 @@ rules: - apiGroups: [""] resources: ["pods/attach"] verbs: ["create"] + - apiGroups: [""] + resources: ["nodes/proxy"] + verbs: ["get"] - apiGroups: ["apps"] resources: ["statefulsets"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] diff --git a/config/helm-charts/druid-cli/values.yaml b/config/helm-charts/druid-cli/values.yaml index 6d794bef..b4db36c8 100644 --- a/config/helm-charts/druid-cli/values.yaml +++ b/config/helm-charts/druid-cli/values.yaml @@ -63,9 +63,6 @@ runtime: name: "" key: kubeconfig -hubble: - relayAddr: hubble-relay.kube-system.svc.cluster.local:80 - auth: enabled: false jwksUrl: "" diff --git a/docs_md/kubernetes_keepalive.md b/docs_md/kubernetes_keepalive.md index c66e7d4c..2901f226 100644 --- a/docs_md/kubernetes_keepalive.md +++ b/docs_md/kubernetes_keepalive.md @@ -5,26 +5,27 @@ sidebar_label: Kubernetes keepAliveTraffic ## Kubernetes keepAliveTraffic -Kubernetes runtimes use Hubble Relay to evaluate `keepAliveTraffic` on expected ports. +Kubernetes runtimes use kubelet pod network stats to evaluate `keepAliveTraffic` on running procedures. -When a running job procedure has an expected port with `keepAliveTraffic`, druid checks for matching Hubble flows over the configured window. If the full window has elapsed and no flow is observed, druid deletes that procedure job and records it as a clean stop. The command run mode is not changed; `restart` and `persistent` scheduling decide what runs next. +When a running job procedure has an expected port with `keepAliveTraffic`, druid samples that procedure pod's RX/TX bytes from `/api/v1/nodes//proxy/stats/summary`. If the full configured window has elapsed and the RX-byte delta is below every configured threshold, druid deletes that procedure job and records it as a clean stop. The command run mode is not changed; `restart` and `persistent` scheduling decide what runs next. Coldstarter procedures are not stopped by this rule. For Minecraft restart-mode scrolls, put `keepAliveTraffic` on the real runtime procedure's `main` expected port, not on the coldstarter procedure. -The current Hubble integration tracks flow presence. Use a minimum such as `1b/60m` to mean "at least one observed flow in the last 60 minutes". +Use values such as `10kb/5m` to mean "at least 10 KiB of pod RX traffic in the last 5 minutes". The metric is procedure-level: a single procedure pod can satisfy any of its configured keepalive expected ports. -Required daemon configuration: +Required Kubernetes RBAC: ``` -DRUID_HUBBLE_RELAY_ADDR=hubble-relay.kube-system.svc.cluster.local:80 +apiGroups: [""] +resources: ["nodes/proxy"] +verbs: ["get"] ``` Validation commands: ``` -kubectl -n kube-system get svc hubble-relay -kubectl -n kube-system rollout status deployment/hubble-relay -kubectl -n druid-system get deploy druid-cli -o jsonpath='{.spec.template.spec.containers[0].env[?(@.name=="DRUID_HUBBLE_RELAY_ADDR")].value}{"\n"}' +kubectl auth can-i get nodes/proxy --as=system:serviceaccount:druid-system:druid-cli +kubectl get --raw '/api/v1/nodes//proxy/stats/summary' | head ``` -If Hubble Relay is disabled or unavailable, druid does not stop any procedure for missing traffic and reports `hubble-relay-unavailable` in port status/logs. +After daemon restart, druid fails open until enough pod-stat samples exist to cover the configured window. If pod stats are unavailable or the active pod cannot be resolved, druid does not stop the procedure for missing traffic and reports `kubernetes-pod-stats-unavailable` in port status/logs. diff --git a/go.mod b/go.mod index 677481d3..a3462967 100644 --- a/go.mod +++ b/go.mod @@ -41,19 +41,17 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.65.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.47.0 // indirect + golang.org/x/net v0.47.0 golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect - google.golang.org/protobuf v1.36.6 + google.golang.org/protobuf v1.36.6 // indirect ) require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect - github.com/aws/aws-sdk-go-v2 v1.41.7 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.10 // indirect - github.com/aws/aws-sdk-go-v2/config v1.32.18 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.19.17 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect @@ -63,15 +61,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.15 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.23 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.101.0 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.36.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 // indirect github.com/aws/smithy-go v1.25.1 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/containerd/errdefs v0.3.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/containerd/log v0.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect @@ -90,6 +86,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/spdystream v0.5.0 // indirect + github.com/moby/sys/atomicwriter v0.1.0 // indirect + github.com/moby/term v0.5.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect @@ -99,11 +97,12 @@ require ( github.com/ncruces/go-strftime v1.0.0 // indirect github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect + github.com/onsi/gomega v1.36.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/robfig/cron/v3 v3.0.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -112,16 +111,20 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.36.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect - go.uber.org/atomic v1.11.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/exp v0.0.0-20241210194714-1829a127f884 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/time v0.12.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250721164621-a45f3dfb1074 // indirect + google.golang.org/grpc v1.74.2 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gotest.tools/v3 v3.5.2 // indirect @@ -139,7 +142,10 @@ require ( require ( github.com/MicahParks/keyfunc v1.9.0 - github.com/cilium/cilium v1.18.6 + github.com/aws/aws-sdk-go-v2 v1.41.7 + github.com/aws/aws-sdk-go-v2/config v1.32.18 + github.com/aws/aws-sdk-go-v2/service/s3 v1.101.0 + github.com/containerd/errdefs v0.3.0 github.com/docker/docker v28.3.3+incompatible github.com/docker/go-connections v0.5.0 github.com/getkin/kin-openapi v0.133.0 @@ -149,7 +155,6 @@ require ( github.com/otiai10/copy v1.14.0 github.com/yuin/gopher-lua v1.1.1 go.uber.org/mock v0.4.0 - google.golang.org/grpc v1.74.2 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.33.4 k8s.io/apimachinery v0.33.4 diff --git a/go.sum b/go.sum index 245d5d66..e5bbfabf 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwTo github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8= github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.10 h1:gx1AwW1Iyk9Z9dD9F4akX5gnN3QZwUB20GGKH/I+Rho= @@ -50,8 +52,6 @@ github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqx github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cilium/cilium v1.18.6 h1:acz5aRKhZbarCO1flx2vCP9wBh+lDc02uJgdqiTsJbA= -github.com/cilium/cilium v1.18.6/go.mod h1:mzpKpkILwP24adE975fTVdAojyy6C1tq7TDa9qZCWyo= github.com/containerd/errdefs v0.3.0 h1:FSZgGOeK4yuT/+DnF07/Olde/q4KBoMsaamhXxIMDp4= github.com/containerd/errdefs v0.3.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= @@ -60,7 +60,6 @@ github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -115,8 +114,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw= github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -125,7 +122,6 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= @@ -145,13 +141,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= @@ -193,7 +184,6 @@ github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 h1:G7ERwszslrBzRxj//J github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037/go.mod h1:2bpvgLBZEtENV5scfDFEtB/5+1M4hkQhDQrccEJ/qGw= github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 h1:bQx3WeLcUWy+RletIKwUIt4x3t8n2SxavmoclizMb8c= github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90/go.mod h1:y5+oSEHCPT/DGrS++Wc/479ERge0zTFxaF8PbGKcg2o= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.36.1 h1:bJDPBO7ibjxcbHMgSCoo4Yj18UWbKDlLwX1x9sybDcw= @@ -210,7 +200,6 @@ github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNH github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s= github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -221,10 +210,6 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -250,14 +235,9 @@ github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -292,15 +272,10 @@ go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/Wgbsd go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs= go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY= -go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis= -go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os= go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= @@ -368,17 +343,14 @@ google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeB google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= diff --git a/internal/runtime/kubernetes/backend.go b/internal/runtime/kubernetes/backend.go index 5bcce24b..f5f6e20c 100644 --- a/internal/runtime/kubernetes/backend.go +++ b/internal/runtime/kubernetes/backend.go @@ -21,7 +21,7 @@ type Backend struct { restConfig *rest.Config consoleManager ports.ConsoleManagerInterface config Config - hubble HubbleClient + statsReader nodeStatsReader jobLogRunner func(context.Context, *batchv1.Job) ([]byte, error) jobExitMu sync.Mutex jobExits map[string]recentJobExit @@ -40,7 +40,7 @@ const ( func New(config Config, consoleManager ports.ConsoleManagerInterface) (*Backend, error) { config = config.WithDefaults() - restConfig, namespace, source, inCluster, err := runtimeRESTConfig(config) + restConfig, namespace, source, _, err := runtimeRESTConfig(config) if err != nil { return nil, err } @@ -57,31 +57,17 @@ func New(config Config, consoleManager ports.ConsoleManagerInterface) (*Backend, return nil, fmt.Errorf("kubernetes API unavailable: %w", err) } logger.Log().Info("Using Kubernetes backend settings", zap.String("source", source), zap.String("namespace", config.Namespace)) - var hubble HubbleClient - if config.HubbleEnabled() { - hubble = NewHubbleRelayClient(config.HubbleRelayAddr) - } backend := &Backend{ client: client, restConfig: restConfig, consoleManager: consoleManager, config: config, - hubble: hubble, jobExits: make(map[string]recentJobExit), } + backend.statsReader = backend.readNodeStatsSummary if config.PullImage == "" { logger.Log().Warn("Kubernetes cluster materialization requires --k8s-pull-image or DRUID_K8S_PULL_IMAGE") } - if config.HubbleEnabled() && !inCluster && config.HubbleRelayAddr == defaultHubbleRelayAddr { - logger.Log().Warn("Default Hubble Relay address may not be reachable outside the cluster; set --hubble-relay-addr or port-forward Hubble Relay", zap.String("addr", config.HubbleRelayAddr)) - } - if config.HubbleEnabled() { - if err := backend.checkHubble(context.Background()); err != nil { - logger.Log().Warn("Hubble Relay unavailable; Kubernetes port traffic will degrade to Service/Endpoint status", zap.Error(err), zap.String("addr", config.HubbleRelayAddr)) - } - } else { - logger.Log().Info("Hubble Relay disabled; Kubernetes port traffic will use Service/Endpoint status", zap.String("addr", config.HubbleRelayAddr)) - } return backend, nil } @@ -142,15 +128,14 @@ func kubeconfigRESTConfig(config Config) (*rest.Config, string, string, error) { return restConfig, namespace, source, nil } -func NewWithClient(config Config, consoleManager ports.ConsoleManagerInterface, client k8sclient.Interface, hubble HubbleClient) *Backend { +func NewWithClient(config Config, consoleManager ports.ConsoleManagerInterface, client k8sclient.Interface) *Backend { config = config.WithDefaults() if config.Namespace == "" { config.Namespace = "default" } - if hubble == nil && config.HubbleEnabled() { - hubble = NewHubbleRelayClient(config.HubbleRelayAddr) - } - return &Backend{client: client, consoleManager: consoleManager, config: config, hubble: hubble, jobExits: make(map[string]recentJobExit)} + backend := &Backend{client: client, consoleManager: consoleManager, config: config, jobExits: make(map[string]recentJobExit)} + backend.statsReader = backend.readNodeStatsSummary + return backend } func (b *Backend) Name() string { diff --git a/internal/runtime/kubernetes/config.go b/internal/runtime/kubernetes/config.go index a4aaef51..50c350ed 100644 --- a/internal/runtime/kubernetes/config.go +++ b/internal/runtime/kubernetes/config.go @@ -7,8 +7,7 @@ import ( ) const ( - defaultHubbleRelayAddr = "hubble-relay.kube-system.svc.cluster.local:80" - defaultHelperImage = "busybox:1.36" + defaultHelperImage = "busybox:1.36" ) type Config struct { @@ -17,7 +16,6 @@ type Config struct { PullImage string RegistrySecret string RegistryPlainHTTP bool - HubbleRelayAddr string HelperImage string Kubeconfig string UIS3Bucket string @@ -47,12 +45,6 @@ func (c Config) WithDefaults() Config { if c.Kubeconfig == "" { c.Kubeconfig = os.Getenv("DRUID_K8S_KUBECONFIG") } - if c.HubbleRelayAddr == "" { - c.HubbleRelayAddr = os.Getenv("DRUID_HUBBLE_RELAY_ADDR") - } - if c.HubbleRelayAddr == "" { - c.HubbleRelayAddr = defaultHubbleRelayAddr - } if c.HelperImage == "" { c.HelperImage = os.Getenv("DRUID_K8S_HELPER_IMAGE") } @@ -80,11 +72,6 @@ func (c Config) WithDefaults() Config { return c } -func (c Config) HubbleEnabled() bool { - value := strings.ToLower(strings.TrimSpace(c.HubbleRelayAddr)) - return value != "disabled" && value != "disable" && value != "off" && value != "false" && value != "none" -} - func plainHTTPEnv(name string) bool { value := strings.ToLower(strings.TrimSpace(os.Getenv(name))) return value == "1" || value == "true" || value == "yes" diff --git a/internal/runtime/kubernetes/config_test.go b/internal/runtime/kubernetes/config_test.go index 08ea9cbb..624994dd 100644 --- a/internal/runtime/kubernetes/config_test.go +++ b/internal/runtime/kubernetes/config_test.go @@ -27,19 +27,6 @@ func TestConfigWithDefaultsReadsRegistryPlainHTTPEnv(t *testing.T) { } } -func TestConfigHubbleEnabled(t *testing.T) { - enabled := Config{HubbleRelayAddr: "hubble-relay.kube-system.svc.cluster.local:80"} - if !enabled.HubbleEnabled() { - t.Fatal("HubbleEnabled = false, want true") - } - - for _, value := range []string{"disabled", "disable", "off", "false", "none", " DISABLED "} { - if (Config{HubbleRelayAddr: value}).HubbleEnabled() { - t.Fatalf("HubbleEnabled(%q) = true, want false", value) - } - } -} - func TestKubeconfigRESTConfigUsesCurrentContextNamespace(t *testing.T) { kubeconfig := writeKubeconfig(t, "from-context") diff --git a/internal/runtime/kubernetes/hubble.go b/internal/runtime/kubernetes/hubble.go deleted file mode 100644 index 24d2ba86..00000000 --- a/internal/runtime/kubernetes/hubble.go +++ /dev/null @@ -1,95 +0,0 @@ -package kubernetes - -import ( - "context" - "errors" - "fmt" - "io" - "time" - - hubbleflow "github.com/cilium/cilium/api/v1/flow" - hubbleobserver "github.com/cilium/cilium/api/v1/observer" - "github.com/highcard-dev/daemon/internal/core/domain" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/types/known/timestamppb" -) - -type TrafficQuery struct { - Namespace string - ScrollID string - ProcedureName string - Port domain.Port - ExpectedPort domain.ExpectedPort - Window time.Duration -} - -type HubbleClient interface { - HasFlow(ctx context.Context, query TrafficQuery) (bool, error) -} - -type HubbleRelayClient struct { - addr string -} - -func NewHubbleRelayClient(addr string) *HubbleRelayClient { - return &HubbleRelayClient{addr: addr} -} - -func (c *HubbleRelayClient) HasFlow(ctx context.Context, query TrafficQuery) (bool, error) { - if c.addr == "" { - return false, fmt.Errorf("hubble relay address is required") - } - window := query.Window - if window <= 0 { - window = 5 * time.Minute - } - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - conn, err := grpc.NewClient(c.addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return false, err - } - defer conn.Close() - client := hubbleobserver.NewObserverClient(conn) - filter := &hubbleflow.FlowFilter{ - DestinationLabel: []string{ - labelManagedBy + "=druid", - labelScrollID + "=" + dnsLabel(query.ScrollID), - labelProcedure + "=" + dnsLabel(query.ProcedureName), - }, - DestinationPort: []string{fmt.Sprintf("%d", query.Port.Port)}, - } - if query.Namespace != "" { - filter.DestinationPod = []string{query.Namespace + "/"} - } - if protocol := normalizeProtocol(query.Port.Protocol); protocol != "" { - filter.Protocol = []string{protocol} - } - stream, err := client.GetFlows(ctx, &hubbleobserver.GetFlowsRequest{ - Since: timestamppb.New(time.Now().Add(-window)), - Whitelist: []*hubbleflow.FlowFilter{filter}, - }) - if err != nil { - return false, err - } - _, err = stream.Recv() - if err == nil { - return true, nil - } - if ctx.Err() != nil || errors.Is(err, io.EOF) { - return false, nil - } - return false, err -} - -func normalizeProtocol(protocol string) string { - switch protocol { - case "", "tcp", "TCP": - return "tcp" - case "udp", "UDP": - return "udp" - default: - return protocol - } -} diff --git a/internal/runtime/kubernetes/keepalive.go b/internal/runtime/kubernetes/keepalive.go index 657e0247..699e62df 100644 --- a/internal/runtime/kubernetes/keepalive.go +++ b/internal/runtime/kubernetes/keepalive.go @@ -14,7 +14,7 @@ import ( ) func (b *Backend) keepAliveTrafficIdleStopper(namespace string, root string, commandName string, procedureName string, procedure *domain.Procedure, globalPorts []domain.Port) jobIdleStopFunc { - if !b.config.HubbleEnabled() || b.hubble == nil || procedure == nil || coldstarterProcedure(procedureName, procedure) { + if procedure == nil || coldstarterProcedure(procedureName, procedure) { return nil } ports := portsByName(globalPorts) @@ -29,21 +29,19 @@ func (b *Backend) keepAliveTrafficIdleStopper(namespace string, root string, com return false, err } } - port, ok := ports[expectedPort.Name] - if !ok { + if _, ok := ports[expectedPort.Name]; !ok { return nil } thresholds = append(thresholds, keepAliveThreshold{ expectedPort: expectedPort, - port: port, + bytes: threshold.Bytes, window: threshold.Window, }) } if len(thresholds) == 0 { return nil } - _, scrollID, err := parseRef(root) - if err != nil { + if _, _, err := parseRef(root); err != nil { return func(context.Context, *batchv1.Job) (bool, error) { return false, err } @@ -53,32 +51,44 @@ func (b *Backend) keepAliveTrafficIdleStopper(namespace string, root string, com return false, nil } now := time.Now() + traffic, err := b.procedureTrafficForJob(ctx, namespace, job.Name, now) + if err != nil { + logger.Log().Warn("Kubernetes pod stats unavailable; keepAliveTraffic enforcement skipped", + zap.String("namespace", namespace), + zap.String("job", job.Name), + zap.String("command", commandName), + zap.String("procedure", procedureName), + zap.Error(err), + ) + return false, nil + } + if traffic == nil { + logger.Log().Warn("Active procedure pod unavailable; keepAliveTraffic enforcement skipped", + zap.String("namespace", namespace), + zap.String("job", job.Name), + zap.String("command", commandName), + zap.String("procedure", procedureName), + ) + return false, nil + } for _, threshold := range thresholds { if !keepAliveWindowElapsed(now, job.CreationTimestamp, threshold.window) { return false, nil } } for _, threshold := range thresholds { - hasTraffic, err := b.hubble.HasFlow(ctx, TrafficQuery{ - Namespace: namespace, - ScrollID: scrollID, - ProcedureName: procedureName, - Port: threshold.port, - ExpectedPort: threshold.expectedPort, - Window: threshold.window, - }) - if err != nil { - logger.Log().Warn("Hubble Relay unavailable; keepAliveTraffic enforcement skipped", + if !traffic.windowReady(threshold.window, now) { + logger.Log().Info("Kubernetes pod stats sample window warming; keepAliveTraffic enforcement skipped", zap.String("namespace", namespace), zap.String("job", job.Name), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("port", threshold.expectedPort.Name), - zap.Error(err), + zap.Duration("window", threshold.window), ) return false, nil } - if hasTraffic { + if traffic.rxDelta(threshold.window, now) >= threshold.bytes { return false, nil } } @@ -88,6 +98,7 @@ func (b *Backend) keepAliveTrafficIdleStopper(namespace string, root string, com zap.String("command", commandName), zap.String("procedure", procedureName), zap.Int("ports", len(thresholds)), + zap.Uint64("rx_bytes", traffic.rxBytes), ) if err := b.deleteJobAndWait(ctx, namespace, job.Name); err != nil { return false, err @@ -98,7 +109,7 @@ func (b *Backend) keepAliveTrafficIdleStopper(namespace string, root string, com type keepAliveThreshold struct { expectedPort domain.ExpectedPort - port domain.Port + bytes uint64 window time.Duration } diff --git a/internal/runtime/kubernetes/ports.go b/internal/runtime/kubernetes/ports.go index 39802225..bde2bf09 100644 --- a/internal/runtime/kubernetes/ports.go +++ b/internal/runtime/kubernetes/ports.go @@ -2,7 +2,6 @@ package kubernetes import ( "context" - "errors" "fmt" "sort" "strings" @@ -25,15 +24,7 @@ func (b *Backend) ExpectedPorts(root string, commands map[string]*domain.Command } portsByName := portsByName(globalPorts) statuses := []domain.RuntimePortStatus{} - hubbleEnabled := b.config.HubbleEnabled() - hubbleAvailable := false - if hubbleEnabled { - hubbleAvailable = true - if err := b.checkHubble(context.Background()); err != nil { - hubbleAvailable = false - logger.Log().Warn("Hubble Relay unavailable; Kubernetes port traffic unavailable", zap.Error(err)) - } - } + now := time.Now() for commandName, command := range commands { if command == nil { continue @@ -47,6 +38,7 @@ func (b *Backend) ExpectedPorts(root string, commands map[string]*domain.Command if procedure.Id != nil { procedureName = *procedure.Id } + traffic, trafficErr := b.procedureTrafficForSelector(context.Background(), namespace, serviceSelector(pvc, commandName, procedureName, "", nil), now) for _, expectedPort := range procedure.ExpectedPorts { port, ok := portsByName[expectedPort.Name] if !ok { @@ -64,44 +56,34 @@ func (b *Backend) ExpectedPorts(root string, commands map[string]*domain.Command serviceReady, hostPort := b.serviceReady(context.Background(), namespace, serviceName(root, serviceProcedure, expectedPort.Name)) status.Bound = serviceReady status.HostPort = hostPort - if !hubbleEnabled { + if trafficErr != nil { + status.Source = "kubernetes-pod-stats-unavailable" statuses = append(statuses, status) continue } - if !hubbleAvailable { - status.Source = "hubble-relay-unavailable" + if traffic == nil { statuses = append(statuses, status) continue } - window := 5 * time.Minute + status.Source = "kubernetes-pod-stats" + rxBytes := traffic.rxBytes + txBytes := traffic.txBytes + status.RXBytes = &rxBytes + status.TXBytes = &txBytes + status.LastActivityAt = traffic.lastActivityAt + delta := traffic.rxDelta(0, now) if expectedPort.KeepAliveTraffic != "" { threshold, err := domain.ParseKeepAliveTraffic(expectedPort.KeepAliveTraffic) if err != nil { return nil, err } - window = threshold.Window - status.TrafficWindow = threshold.Window.String() - } - traffic, err := b.hubble.HasFlow(context.Background(), TrafficQuery{ - Namespace: namespace, - ScrollID: pvc, - ProcedureName: procedureName, - Port: port, - ExpectedPort: expectedPort, - Window: window, - }) - if err != nil { - logger.Log().Warn("Hubble Relay query failed", zap.Error(err)) - status.Source = "hubble-relay-unavailable" - statuses = append(statuses, status) - continue - } - status.Source = "hubble-relay" - status.Traffic = traffic - if expectedPort.KeepAliveTraffic != "" { - trafficOK := traffic + delta = traffic.rxDelta(threshold.Window, now) + trafficOK := delta >= threshold.Bytes status.TrafficOK = &trafficOK + status.TrafficWindow = threshold.Window.String() } + status.Traffic = delta > 0 + status.TrafficBytes = &delta statuses = append(statuses, status) } } @@ -311,19 +293,6 @@ func endpointSlicesReady(slices []discoveryv1.EndpointSlice) bool { return false } -func (b *Backend) checkHubble(ctx context.Context) error { - if b.hubble == nil { - return errors.New("hubble client is not configured") - } - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - _, err := b.hubble.HasFlow(ctx, TrafficQuery{Namespace: b.config.Namespace, Port: domain.Port{Port: 1, Protocol: "tcp"}}) - if err != nil && !strings.Contains(err.Error(), "context deadline") { - return err - } - return nil -} - func portsByName(ports []domain.Port) map[string]domain.Port { result := map[string]domain.Port{} for _, port := range ports { @@ -331,3 +300,14 @@ func portsByName(ports []domain.Port) map[string]domain.Port { } return result } + +func normalizeProtocol(protocol string) string { + switch strings.ToLower(protocol) { + case "", "tcp": + return "tcp" + case "udp": + return "udp" + default: + return protocol + } +} diff --git a/internal/runtime/kubernetes/resources_test.go b/internal/runtime/kubernetes/resources_test.go index cb2cf322..386b95bb 100644 --- a/internal/runtime/kubernetes/resources_test.go +++ b/internal/runtime/kubernetes/resources_test.go @@ -15,6 +15,7 @@ import ( discoveryv1 "k8s.io/api/discovery/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" "github.com/highcard-dev/daemon/internal/core/domain" @@ -22,17 +23,58 @@ import ( coreservices "github.com/highcard-dev/daemon/internal/core/services" ) -type fakeHubble struct { - hasFlow bool - err error +func setStatsReader(backend *Backend, nodeName string, summary *nodeStatsSummary, err error) { + backend.statsReader = func(ctx context.Context, node string) (*nodeStatsSummary, error) { + if err != nil { + return nil, err + } + if node != nodeName { + return nil, fmt.Errorf("unexpected node %s", node) + } + return summary, nil + } +} + +func podStats(namespace string, uid string, rx uint64, tx uint64) *nodeStatsSummary { + rxCopy := rx + txCopy := tx + return &nodeStatsSummary{Pods: []nodePodStats{{ + PodRef: struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + UID string `json:"uid"` + }{Name: "runtime-pod", Namespace: namespace, UID: uid}, + Network: &nodeNetworkStats{RXBytes: &rxCopy, TXBytes: &txCopy}, + }}} +} + +func runningProcedurePod(namespace string, root string, command string, procedure string, attempt int, uid string, jobName string) *corev1.Pod { + labels := procedureTestLabels(root, command, procedure, attempt) + if jobName != "" { + labels["batch.kubernetes.io/job-name"] = jobName + } + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "runtime-pod", + Namespace: namespace, + UID: typesUID(uid), + Labels: labels, + }, + Spec: corev1.PodSpec{NodeName: "node-a"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } } -func (f fakeHubble) HasFlow(context.Context, TrafficQuery) (bool, error) { - return f.hasFlow, f.err +func typesUID(value string) types.UID { + return types.UID(value) +} + +func ptrString(value string) *string { + return &value } func TestRootRefUsesRequestedNamespace(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) if got, want := backend.RootRef("deployment-123", "games"), ref("games", dataPVCName("deployment-123")); got != want { t.Fatalf("RootRef = %s, want %s", got, want) } @@ -193,7 +235,7 @@ func TestWorkerPullJobSpecRunsDruidWorkerPull(t *testing.T) { func TestSpawnPullWorkerCreateUsesFinalPVCAndWorkerJob(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid", PullImage: "druid-cli:test"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid", PullImage: "druid-cli:test"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) var jobs []*batchv1.Job backend.jobLogRunner = func(ctx context.Context, job *batchv1.Job) ([]byte, error) { jobs = append(jobs, job.DeepCopy()) @@ -234,7 +276,7 @@ func TestDeleteFinishedJobRemovesJob(t *testing.T) { client := fake.NewSimpleClientset(&batchv1.Job{ ObjectMeta: metav1.ObjectMeta{Name: "finished", Namespace: "druid"}, }) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) backend.deleteFinishedJob(context.Background(), "druid", "finished") @@ -251,7 +293,7 @@ func TestCreateFreshJobKeepsFailedJob(t *testing.T) { Status: corev1.ConditionTrue, }}}, }) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) created, err := backend.createFreshJob(context.Background(), &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: "failed", Namespace: "druid"}}) if err != nil { @@ -282,7 +324,7 @@ func TestCreateOrReuseProcedureJobRetainsFailedBaseAndCreatesRetry(t *testing.T) Status: corev1.ConditionTrue, }}}, }) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil) if err != nil { @@ -303,7 +345,7 @@ func TestCreateOrReuseProcedureJobUsesNextRetryAttempt(t *testing.T) { failedProcedureJob(root, base, "start", "start.1", 1), failedProcedureJob(root, base+"-r2", "start", "start.1", 2), ) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil) if err != nil { @@ -326,7 +368,7 @@ func TestCreateOrReuseProcedureJobReusesActiveAttempt(t *testing.T) { Status: batchv1.JobStatus{Active: 1}, } client := fake.NewSimpleClientset(active) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil) if err != nil { @@ -365,7 +407,7 @@ func TestResumeRestartProcedureIndexDeletesSupersededActiveProcedure(t *testing. Status: batchv1.JobStatus{Active: 1}, } client := fake.NewSimpleClientset(coldstartJob, startJob) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) resumeIndex, err := backend.resumeRestartProcedureIndex(context.Background(), root, "start", &domain.CommandInstructionSet{Procedures: []*domain.Procedure{ {Id: &coldstart}, @@ -397,7 +439,7 @@ func TestResumeRestartProcedureIndexKeepsOnlyActiveFirstProcedure(t *testing.T) Status: batchv1.JobStatus{Active: 1}, } client := fake.NewSimpleClientset(coldstartJob) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) resumeIndex, err := backend.resumeRestartProcedureIndex(context.Background(), root, "start", &domain.CommandInstructionSet{Procedures: []*domain.Procedure{{Id: &coldstart}}}) if err != nil { @@ -422,7 +464,7 @@ func TestCreateOrReuseProcedureJobDeletesSucceededAttempt(t *testing.T) { }, Status: batchv1.JobStatus{Succeeded: 1}, }) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil) if err != nil { @@ -451,7 +493,7 @@ func TestWaitForJobReportsPodFailureReason(t *testing.T) { }}, }}}, } - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(job, pod), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(job, pod)) exitCode, err := backend.waitForJob(context.Background(), "druid", "failed-start") if err == nil { @@ -466,7 +508,7 @@ func TestWaitForJobReportsPodFailureReason(t *testing.T) { } func TestWaitForJobUsesRecentSuccessfulDeletion(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) backend.recordJobExit("druid", "finished", 0) exitCode, err := backend.waitForJob(context.Background(), "druid", "finished") @@ -479,7 +521,7 @@ func TestWaitForJobUsesRecentSuccessfulDeletion(t *testing.T) { } func TestWaitForJobMissingWithoutRecentExitFails(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) if _, err := backend.waitForJob(context.Background(), "druid", "missing"); !apierrors.IsNotFound(err) { t.Fatalf("waitForJob error = %v, want not found", err) @@ -557,7 +599,7 @@ func TestStartupContainerFailureReportsNonzeroTermination(t *testing.T) { func TestExpectedServicesUseRootNamespace(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid-system"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid-system"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("games", dataPVCName("deployment-123")) procedure := &domain.Procedure{ExpectedPorts: []domain.ExpectedPort{{Name: "http"}}} @@ -575,7 +617,7 @@ func TestExpectedServicesUseRootNamespace(t *testing.T) { func TestRegistryConfigSecretUsesDruidClientConfigShape(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) secretName, cleanup, err := backend.createRegistryConfigSecret(context.Background(), "druid", "artifact", []domain.RegistryCredential{{ Host: "artifacts.druid.gg/user/scroll", Username: "robot$scroll", @@ -600,11 +642,18 @@ func TestRegistryConfigSecretUsesDruidClientConfigShape(t *testing.T) { } } -func TestExpectedPortsUsesHubbleFlowPresence(t *testing.T) { +func TestExpectedPortsUsesPodStatsTraffic(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{hasFlow: true}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") procedureName := "start" + pod := runningProcedurePod("druid", root, "start", procedureName, 1, "pod-start-stats", "start-job") + if _, err := client.CoreV1().Pods("druid").Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + globalPodTrafficStore.record("pod-start-stats", 100, 50, time.Now().Add(-6*time.Minute)) + globalPodTrafficStore.record("pod-start-stats", 100, 50, time.Now().Add(-4*time.Minute)) + setStatsReader(backend, "node-a", podStats("druid", "pod-start-stats", 200, 75), nil) service, err := serviceSpec("druid", root, procedureName, serviceSelector(refPVCName(root), procedureName, procedureName, "http", map[string]int{"http": 1}), "http", domain.Port{Name: "http", Port: 80, Protocol: "tcp"}) if err != nil { t.Fatal(err) @@ -641,18 +690,23 @@ func TestExpectedPortsUsesHubbleFlowPresence(t *testing.T) { if !status.Bound || !status.Traffic || status.TrafficOK == nil || !*status.TrafficOK { t.Fatalf("status = %#v", status) } - if status.Source != "hubble-relay" { - t.Fatalf("source = %s, want hubble-relay", status.Source) + if status.Source != "kubernetes-pod-stats" { + t.Fatalf("source = %s, want kubernetes-pod-stats", status.Source) } - if status.RXBytes != nil || status.TXBytes != nil || status.TrafficBytes != nil { - t.Fatalf("byte counters should be nil for Kubernetes Hubble status: %#v", status) + if status.RXBytes == nil || *status.RXBytes != 200 || status.TXBytes == nil || *status.TXBytes != 75 || status.TrafficBytes == nil || *status.TrafficBytes != 100 { + t.Fatalf("byte counters = %#v", status) } } -func TestExpectedPortsDegradesWhenHubbleUnavailable(t *testing.T) { +func TestExpectedPortsDegradesWhenPodStatsUnavailable(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{err: errors.New("relay unavailable")}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-start-unavailable", "start-job") + if _, err := client.CoreV1().Pods("druid").Create(context.Background(), pod, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + setStatsReader(backend, "node-a", nil, errors.New("stats unavailable")) service, err := serviceSpec("druid", root, "start", serviceSelector(refPVCName(root), "start", "start", "http", map[string]int{"http": 1}), "http", domain.Port{Name: "http", Port: 80, Protocol: "tcp"}) if err != nil { t.Fatal(err) @@ -674,7 +728,7 @@ func TestExpectedPortsDegradesWhenHubbleUnavailable(t *testing.T) { } statuses, err := backend.ExpectedPorts(root, map[string]*domain.CommandInstructionSet{ - "start": {Procedures: []*domain.Procedure{{ExpectedPorts: []domain.ExpectedPort{{Name: "http", KeepAliveTraffic: "1b/5m"}}}}}, + "start": {Procedures: []*domain.Procedure{{Id: ptrString("start"), ExpectedPorts: []domain.ExpectedPort{{Name: "http", KeepAliveTraffic: "1b/5m"}}}}}, }, []domain.Port{{Name: "http", Port: 80, Protocol: "tcp"}}) if err != nil { t.Fatal(err) @@ -683,19 +737,25 @@ func TestExpectedPortsDegradesWhenHubbleUnavailable(t *testing.T) { t.Fatalf("statuses = %#v", statuses) } status := statuses[0] - if status.Source != "hubble-relay-unavailable" { - t.Fatalf("source = %s, want hubble-relay-unavailable", status.Source) + if status.Source != "kubernetes-pod-stats-unavailable" { + t.Fatalf("source = %s, want kubernetes-pod-stats-unavailable", status.Source) } if status.Traffic || status.TrafficOK != nil { t.Fatalf("traffic should be unavailable: %#v", status) } } -func TestExpectedPortsSkipsHubbleWhenDisabled(t *testing.T) { +func TestExpectedPortsWithoutActivePodDoesNotBorrowTraffic(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid", HubbleRelayAddr: "disabled"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, nil) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") procedureName := "start" + unrelated := runningProcedurePod("druid", ref("druid", "other-scroll"), "start", procedureName, 1, "pod-other", "start-job") + if _, err := client.CoreV1().Pods("druid").Create(context.Background(), unrelated, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + globalPodTrafficStore.record("pod-other", 100, 50, time.Now().Add(-6*time.Minute)) + setStatsReader(backend, "node-a", podStats("druid", "pod-other", 200, 75), nil) service, err := serviceSpec("druid", root, procedureName, serviceSelector(refPVCName(root), procedureName, procedureName, "http", map[string]int{"http": 1}), "http", domain.Port{Name: "http", Port: 80, Protocol: "tcp"}) if err != nil { t.Fatal(err) @@ -733,12 +793,13 @@ func TestExpectedPortsSkipsHubbleWhenDisabled(t *testing.T) { t.Fatalf("bound = false, want true: %#v", status) } if status.Traffic || status.TrafficOK != nil { - t.Fatalf("traffic should be skipped when Hubble is disabled: %#v", status) + t.Fatalf("traffic should be absent without an active matching pod: %#v", status) } } func TestKeepAliveTrafficStopsIdleRunningProcedure(t *testing.T) { root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-idle-stop", "start-job") job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "start-job", @@ -748,8 +809,10 @@ func TestKeepAliveTrafficStopsIdleRunningProcedure(t *testing.T) { }, Status: batchv1.JobStatus{Active: 1}, } - client := fake.NewSimpleClientset(job) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{hasFlow: false}) + client := fake.NewSimpleClientset(job, pod) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) + globalPodTrafficStore.record("pod-idle-stop", 100, 50, time.Now().Add(-2*time.Minute)) + setStatsReader(backend, "node-a", podStats("druid", "pod-idle-stop", 100, 50), nil) stopper := backend.keepAliveTrafficIdleStopper("druid", root, "start", "start", &domain.Procedure{ ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1s"}}, }, []domain.Port{{Name: "main", Port: 25565, Protocol: "tcp"}}) @@ -771,6 +834,7 @@ func TestKeepAliveTrafficStopsIdleRunningProcedure(t *testing.T) { func TestKeepAliveTrafficKeepsProcedureWhenTrafficPresent(t *testing.T) { root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-traffic-present", "start-job") job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "start-job", @@ -780,8 +844,11 @@ func TestKeepAliveTrafficKeepsProcedureWhenTrafficPresent(t *testing.T) { }, Status: batchv1.JobStatus{Active: 1}, } - client := fake.NewSimpleClientset(job) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{hasFlow: true}) + client := fake.NewSimpleClientset(job, pod) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) + globalPodTrafficStore.record("pod-traffic-present", 100, 50, time.Now().Add(-2*time.Minute)) + globalPodTrafficStore.record("pod-traffic-present", 100, 50, time.Now().Add(-500*time.Millisecond)) + setStatsReader(backend, "node-a", podStats("druid", "pod-traffic-present", 102, 55), nil) stopper := backend.keepAliveTrafficIdleStopper("druid", root, "start", "start", &domain.Procedure{ ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1s"}}, }, []domain.Port{{Name: "main", Port: 25565, Protocol: "tcp"}}) @@ -798,8 +865,9 @@ func TestKeepAliveTrafficKeepsProcedureWhenTrafficPresent(t *testing.T) { } } -func TestKeepAliveTrafficKeepsProcedureWhenHubbleUnavailable(t *testing.T) { +func TestKeepAliveTrafficKeepsProcedureWhenPodStatsUnavailable(t *testing.T) { root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-stats-unavailable", "start-job") job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "start-job", @@ -809,8 +877,9 @@ func TestKeepAliveTrafficKeepsProcedureWhenHubbleUnavailable(t *testing.T) { }, Status: batchv1.JobStatus{Active: 1}, } - client := fake.NewSimpleClientset(job) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{err: errors.New("relay unavailable")}) + client := fake.NewSimpleClientset(job, pod) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) + setStatsReader(backend, "node-a", nil, errors.New("stats unavailable")) stopper := backend.keepAliveTrafficIdleStopper("druid", root, "start", "start", &domain.Procedure{ ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1s"}}, }, []domain.Port{{Name: "main", Port: 25565, Protocol: "tcp"}}) @@ -827,8 +896,37 @@ func TestKeepAliveTrafficKeepsProcedureWhenHubbleUnavailable(t *testing.T) { } } +func TestKeepAliveTrafficKeepsProcedureWhilePodStatsWarmUp(t *testing.T) { + root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-stats-warmup", "start-job") + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "start-job", + Namespace: "druid", + CreationTimestamp: metav1.NewTime(time.Now().Add(-2 * time.Minute)), + Labels: procedureTestLabels(root, "start", "start", 1), + }, + Status: batchv1.JobStatus{Active: 1}, + } + client := fake.NewSimpleClientset(job, pod) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) + setStatsReader(backend, "node-a", podStats("druid", "pod-stats-warmup", 100, 50), nil) + stopper := backend.keepAliveTrafficIdleStopper("druid", root, "start", "start", &domain.Procedure{ + ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1s"}}, + }, []domain.Port{{Name: "main", Port: 25565, Protocol: "tcp"}}) + + stopped, err := stopper(context.Background(), job) + if err != nil { + t.Fatal(err) + } + if stopped { + t.Fatal("stopped = true, want false while pod stats sample window warms up") + } +} + func TestKeepAliveTrafficWaitsForFullWindowBeforeStopping(t *testing.T) { root := ref("druid", "druid-static-web-data") + pod := runningProcedurePod("druid", root, "start", "start", 1, "pod-full-window", "start-job") job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "start-job", @@ -838,8 +936,10 @@ func TestKeepAliveTrafficWaitsForFullWindowBeforeStopping(t *testing.T) { }, Status: batchv1.JobStatus{Active: 1}, } - client := fake.NewSimpleClientset(job) - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{hasFlow: false}) + client := fake.NewSimpleClientset(job, pod) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) + globalPodTrafficStore.record("pod-full-window", 100, 50, time.Now().Add(-2*time.Minute)) + setStatsReader(backend, "node-a", podStats("druid", "pod-full-window", 100, 50), nil) stopper := backend.keepAliveTrafficIdleStopper("druid", root, "start", "start", &domain.Procedure{ ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1m"}}, }, []domain.Port{{Name: "main", Port: 25565, Protocol: "tcp"}}) @@ -857,7 +957,7 @@ func TestKeepAliveTrafficWaitsForFullWindowBeforeStopping(t *testing.T) { } func TestKeepAliveTrafficDoesNotStopColdstarter(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{hasFlow: false}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) stopper := backend.keepAliveTrafficIdleStopper("druid", ref("druid", "druid-static-web-data"), "start", "coldstart", &domain.Procedure{ Command: []string{"druid-coldstarter"}, ExpectedPorts: []domain.ExpectedPort{{Name: "main", KeepAliveTraffic: "1b/1s"}}, @@ -868,7 +968,7 @@ func TestKeepAliveTrafficDoesNotStopColdstarter(t *testing.T) { } func TestRoutingTargetsReturnStableBackendServices(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) root := ref("druid", "druid-static-web-data") procedureID := "web" @@ -910,7 +1010,7 @@ func TestRoutingTargetsReturnStableBackendServices(t *testing.T) { } func TestRoutingTargetsCollapseColdstarterAndRuntimePort(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) root := ref("druid", "druid-minecraft-data") coldstart := "coldstart" start := "start" @@ -944,7 +1044,7 @@ func TestRoutingTargetsCollapseColdstarterAndRuntimePort(t *testing.T) { func TestExpectedServiceForSharedPortMovesToActiveProcedure(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-minecraft-data") coldstart := &domain.Procedure{ExpectedPorts: []domain.ExpectedPort{{Name: "main"}}} start := &domain.Procedure{ExpectedPorts: []domain.ExpectedPort{{Name: "main"}}} @@ -976,7 +1076,7 @@ func TestExpectedServiceForSharedPortMovesToActiveProcedure(t *testing.T) { func TestRoutingTargetsUseCurrentServiceSelector(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-minecraft-data") coldstart := "coldstart" start := "start" @@ -1012,7 +1112,7 @@ func TestRoutingTargetsUseCurrentServiceSelector(t *testing.T) { func TestStopRuntimeDeletesWorkloadsButPreservesDataAndServices(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") labels := baseLabels("druid-static-web-data") labels[labelProcedure] = "web" @@ -1071,7 +1171,7 @@ func TestStopRuntimeDeletesWorkloadsButPreservesDataAndServices(t *testing.T) { func TestDeleteRuntimePurgesServicesAndDataWhenRequested(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") service, err := serviceSpec("druid", root, "web", serviceSelector(refPVCName(root), "web", "web", "http", map[string]int{"http": 1}), "http", domain.Port{Name: "http", Port: 8080, Protocol: "tcp"}) if err != nil { @@ -1113,7 +1213,7 @@ func TestBackupJobSpecUsesRuntimePVCAndRegistryEnv(t *testing.T) { } func TestSpawnPullWorkerRequiresPullImage(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) err := backend.SpawnPullWorker(context.Background(), ports.RuntimeWorkerAction{ Mode: ports.RuntimeWorkerModeCreate, RuntimeID: "scroll", @@ -1129,7 +1229,7 @@ func TestSpawnPullWorkerRequiresPullImage(t *testing.T) { } func TestSpawnPullWorkerRejectsLocalArtifactPath(t *testing.T) { - backend := NewWithClient(Config{Namespace: "druid", PullImage: "druid-cli:test"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset(), fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid", PullImage: "druid-cli:test"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), fake.NewSimpleClientset()) err := backend.SpawnPullWorker(context.Background(), ports.RuntimeWorkerAction{ Mode: ports.RuntimeWorkerModeCreate, RuntimeID: "scroll", @@ -1146,7 +1246,7 @@ func TestSpawnPullWorkerRejectsLocalArtifactPath(t *testing.T) { func TestSignalDeletesPersistentStatefulSetAndPods(t *testing.T) { client := fake.NewSimpleClientset() - backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client, fakeHubble{}) + backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client) root := ref("druid", "druid-static-web-data") name := "static-web-start-0" labels := baseLabels("druid-static-web-data") diff --git a/internal/runtime/kubernetes/traffic.go b/internal/runtime/kubernetes/traffic.go new file mode 100644 index 00000000..69ae1855 --- /dev/null +++ b/internal/runtime/kubernetes/traffic.go @@ -0,0 +1,330 @@ +package kubernetes + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +type nodeStatsReader func(context.Context, string) (*nodeStatsSummary, error) + +type nodeStatsSummary struct { + Pods []nodePodStats `json:"pods"` +} + +type nodePodStats struct { + PodRef struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + UID string `json:"uid"` + } `json:"podRef"` + Network *nodeNetworkStats `json:"network,omitempty"` +} + +type nodeNetworkStats struct { + RXBytes *uint64 `json:"rxBytes,omitempty"` + TXBytes *uint64 `json:"txBytes,omitempty"` + Interfaces []nodeNetworkInterface `json:"interfaces,omitempty"` +} + +type nodeNetworkInterface struct { + RXBytes *uint64 `json:"rxBytes,omitempty"` + TXBytes *uint64 `json:"txBytes,omitempty"` +} + +type podTraffic struct { + podUID string + rxBytes uint64 + txBytes uint64 + lastDeltaRX uint64 + lastActivityAt *time.Time + samples []podTrafficSample +} + +type podTrafficSample struct { + at time.Time + rx uint64 + tx uint64 +} + +type podTrafficStore struct { + mu sync.Mutex + samples map[string][]podTrafficSample + lastActivityAt map[string]time.Time +} + +var globalPodTrafficStore = &podTrafficStore{ + samples: map[string][]podTrafficSample{}, + lastActivityAt: map[string]time.Time{}, +} + +func (s *podTrafficStore) record(podUID string, rxBytes uint64, txBytes uint64, now time.Time) podTraffic { + s.mu.Lock() + defer s.mu.Unlock() + + samples := s.samples[podUID] + var lastDeltaRX uint64 + if len(samples) > 0 && rxBytes >= samples[len(samples)-1].rx { + lastDeltaRX = rxBytes - samples[len(samples)-1].rx + if lastDeltaRX > 0 { + s.lastActivityAt[podUID] = now + } + } + samples = append(samples, podTrafficSample{at: now, rx: rxBytes, tx: txBytes}) + cutoff := now.Add(-24 * time.Hour) + keepFrom := 0 + for keepFrom < len(samples) && samples[keepFrom].at.Before(cutoff) { + keepFrom++ + } + samples = samples[keepFrom:] + s.samples[podUID] = samples + + var lastActivityAt *time.Time + if last, ok := s.lastActivityAt[podUID]; ok { + lastCopy := last + lastActivityAt = &lastCopy + } + + return podTraffic{ + podUID: podUID, + rxBytes: rxBytes, + txBytes: txBytes, + lastDeltaRX: lastDeltaRX, + lastActivityAt: lastActivityAt, + samples: append([]podTrafficSample(nil), samples...), + } +} + +func (t podTraffic) rxDelta(window time.Duration, now time.Time) uint64 { + if window <= 0 || len(t.samples) == 0 { + return t.lastDeltaRX + } + cutoff := now.Add(-window) + base := t.samples[0] + for _, sample := range t.samples { + if !sample.at.Before(cutoff) { + base = sample + break + } + } + if t.rxBytes < base.rx { + return 0 + } + return t.rxBytes - base.rx +} + +func (t podTraffic) windowReady(window time.Duration, now time.Time) bool { + if window <= 0 { + return len(t.samples) > 1 + } + if len(t.samples) < 2 { + return false + } + return !t.samples[0].at.After(now.Add(-window)) +} + +type procedureTraffic struct { + pods []podTraffic + rxBytes uint64 + txBytes uint64 + lastActivityAt *time.Time +} + +func (t procedureTraffic) rxDelta(window time.Duration, now time.Time) uint64 { + var total uint64 + for _, pod := range t.pods { + total += pod.rxDelta(window, now) + } + return total +} + +func (t procedureTraffic) windowReady(window time.Duration, now time.Time) bool { + if len(t.pods) == 0 { + return false + } + for _, pod := range t.pods { + if !pod.windowReady(window, now) { + return false + } + } + return true +} + +func (b *Backend) procedureTrafficForSelector(ctx context.Context, namespace string, selector map[string]string, now time.Time) (*procedureTraffic, error) { + pods, err := b.activePodsForSelector(ctx, namespace, selector) + if err != nil { + return nil, err + } + return b.procedureTrafficForPods(ctx, pods, now) +} + +func (b *Backend) procedureTrafficForJob(ctx context.Context, namespace string, jobName string, now time.Time) (*procedureTraffic, error) { + pods, err := b.activePodsForJob(ctx, namespace, jobName) + if err != nil { + return nil, err + } + return b.procedureTrafficForPods(ctx, pods, now) +} + +func (b *Backend) procedureTrafficForPods(ctx context.Context, pods []corev1.Pod, now time.Time) (*procedureTraffic, error) { + if len(pods) == 0 { + return nil, nil + } + result := &procedureTraffic{pods: make([]podTraffic, 0, len(pods))} + for _, pod := range pods { + traffic, err := b.podTraffic(ctx, pod, now) + if err != nil { + return nil, err + } + result.pods = append(result.pods, traffic) + result.rxBytes += traffic.rxBytes + result.txBytes += traffic.txBytes + if traffic.lastActivityAt != nil && (result.lastActivityAt == nil || traffic.lastActivityAt.After(*result.lastActivityAt)) { + last := *traffic.lastActivityAt + result.lastActivityAt = &last + } + } + return result, nil +} + +func (b *Backend) activePodsForSelector(ctx context.Context, namespace string, selector map[string]string) ([]corev1.Pod, error) { + if len(selector) == 0 { + return nil, nil + } + list, err := b.client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(selector).String()}) + if err != nil { + return nil, err + } + return activePodItems(list.Items), nil +} + +func (b *Backend) activePodsForJob(ctx context.Context, namespace string, jobName string) ([]corev1.Pod, error) { + pods, err := b.podsForJobLabel(ctx, namespace, "batch.kubernetes.io/job-name", jobName) + if err != nil { + return nil, err + } + if len(pods) > 0 { + return pods, nil + } + return b.podsForJobLabel(ctx, namespace, "job-name", jobName) +} + +func (b *Backend) podsForJobLabel(ctx context.Context, namespace string, label string, jobName string) ([]corev1.Pod, error) { + list, err := b.client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set{label: jobName}).String()}) + if err != nil { + return nil, err + } + return activePodItems(list.Items), nil +} + +func activePodItems(items []corev1.Pod) []corev1.Pod { + pods := make([]corev1.Pod, 0, len(items)) + for _, pod := range items { + if pod.DeletionTimestamp != nil || pod.Spec.NodeName == "" || pod.UID == "" { + continue + } + if pod.Status.Phase != "" && pod.Status.Phase != corev1.PodRunning { + continue + } + pods = append(pods, pod) + } + return pods +} + +func (b *Backend) podTraffic(ctx context.Context, pod corev1.Pod, now time.Time) (podTraffic, error) { + if pod.Spec.NodeName == "" { + return podTraffic{}, fmt.Errorf("pod %s/%s has no assigned node", pod.Namespace, pod.Name) + } + if pod.UID == "" { + return podTraffic{}, fmt.Errorf("pod %s/%s has no UID", pod.Namespace, pod.Name) + } + if b.statsReader == nil { + return podTraffic{}, fmt.Errorf("kubernetes node stats reader is not configured") + } + summary, err := b.statsReader(ctx, pod.Spec.NodeName) + if err != nil { + return podTraffic{}, err + } + stats := summary.podByUID(pod.Namespace, string(pod.UID)) + if stats == nil { + return podTraffic{}, fmt.Errorf("pod %s/%s uid %s not found in node stats", pod.Namespace, pod.Name, pod.UID) + } + rxBytes, txBytes, ok := stats.networkBytes() + if !ok { + return podTraffic{}, fmt.Errorf("pod %s/%s uid %s has no network stats", pod.Namespace, pod.Name, pod.UID) + } + traffic := globalPodTrafficStore.record(string(pod.UID), rxBytes, txBytes, now) + return traffic, nil +} + +func (s *nodeStatsSummary) podByUID(namespace string, uid string) *nodePodStats { + if s == nil { + return nil + } + for idx := range s.Pods { + pod := &s.Pods[idx] + if pod.PodRef.Namespace == namespace && pod.PodRef.UID == uid { + return pod + } + } + return nil +} + +func (p *nodePodStats) networkBytes() (uint64, uint64, bool) { + if p == nil || p.Network == nil { + return 0, 0, false + } + if p.Network.RXBytes != nil || p.Network.TXBytes != nil { + var rx uint64 + var tx uint64 + if p.Network.RXBytes != nil { + rx = *p.Network.RXBytes + } + if p.Network.TXBytes != nil { + tx = *p.Network.TXBytes + } + return rx, tx, true + } + var rx uint64 + var tx uint64 + var found bool + for _, iface := range p.Network.Interfaces { + if iface.RXBytes != nil { + rx += *iface.RXBytes + found = true + } + if iface.TXBytes != nil { + tx += *iface.TXBytes + found = true + } + } + return rx, tx, found +} + +func (b *Backend) readNodeStatsSummary(ctx context.Context, nodeName string) (*nodeStatsSummary, error) { + if b.client == nil { + return nil, fmt.Errorf("kubernetes client is not configured") + } + raw, err := b.client.CoreV1().RESTClient(). + Get(). + Resource("nodes"). + Name(nodeName). + SubResource("proxy"). + Suffix("stats", "summary"). + Do(ctx). + Raw() + if err != nil { + return nil, err + } + var summary nodeStatsSummary + if err := json.Unmarshal(raw, &summary); err != nil { + return nil, err + } + return &summary, nil +} diff --git a/test/integration/docker/docker_cli_test.go b/test/integration/docker/docker_cli_test.go index 1ccf45ac..9b292886 100644 --- a/test/integration/docker/docker_cli_test.go +++ b/test/integration/docker/docker_cli_test.go @@ -59,6 +59,9 @@ func TestDockerBackendCLIComplexLifecycle(t *testing.T) { t.Fatalf("USER_ENV = %q, want fixture", env["USER_ENV"]) } + _ = e2e.RunClientJSON[[]e2e.RuntimePortStatus](t, bins, socket, "ports", created.ID) + _ = e2e.WaitHTTP(t, fmt.Sprintf("http://127.0.0.1:%d/index.txt", fixture.RoutePort)) + time.Sleep(500 * time.Millisecond) statuses := e2e.RunClientJSON[[]e2e.RuntimePortStatus](t, bins, socket, "ports", created.ID) assertPortBound(t, statuses, fixture) @@ -229,6 +232,15 @@ func assertPortBound(t *testing.T, statuses []e2e.RuntimePortStatus, fixture e2e if status.HostPort != fixture.RoutePort { t.Fatalf("host port = %d, want %d in status %#v", status.HostPort, fixture.RoutePort, status) } + if status.Source != "docker-container-stats" { + t.Fatalf("source = %s, want docker-container-stats in status %#v", status.Source, status) + } + if status.RXBytes == nil || status.TXBytes == nil || status.TrafficBytes == nil { + t.Fatalf("traffic counters missing in status %#v", status) + } + if *status.TrafficBytes == 0 || status.TrafficOK == nil || !*status.TrafficOK { + t.Fatalf("traffic status = %#v, want positive traffic and traffic_ok=true", status) + } return } } diff --git a/test/integration/internal/e2e/harness.go b/test/integration/internal/e2e/harness.go index ce9716bb..6265408b 100644 --- a/test/integration/internal/e2e/harness.go +++ b/test/integration/internal/e2e/harness.go @@ -41,11 +41,16 @@ type RuntimeScroll struct { } type RuntimePortStatus struct { - Name string `json:"name"` - Procedure string `json:"procedure"` - Port int `json:"port"` - Bound bool `json:"bound"` - HostPort int `json:"host_port"` + Name string `json:"name"` + Procedure string `json:"procedure"` + Port int `json:"port"` + Bound bool `json:"bound"` + HostPort int `json:"host_port"` + TrafficBytes *uint64 `json:"traffic_bytes"` + RXBytes *uint64 `json:"rx_bytes"` + TXBytes *uint64 `json:"tx_bytes"` + TrafficOK *bool `json:"traffic_ok"` + Source string `json:"source"` } type RuntimeRoutingTarget struct { diff --git a/test/integration/kubernetes/kubernetes_cli_test.go b/test/integration/kubernetes/kubernetes_cli_test.go index 913954d0..065e7541 100644 --- a/test/integration/kubernetes/kubernetes_cli_test.go +++ b/test/integration/kubernetes/kubernetes_cli_test.go @@ -49,7 +49,6 @@ func TestKubernetesBackendCLIComplexLifecycle(t *testing.T) { "--k8s-namespace", namespace, "--k8s-kubeconfig", kubeconfig, "--k8s-pull-image", workerImage, - "--hubble-relay-addr", "127.0.0.1:9", "--worker-callback-listen", fmt.Sprintf(":%d", callbackPort), "--worker-callback-url", fmt.Sprintf("http://%s:%d", containerHost, callbackPort), "--listen", fmt.Sprintf(":%d", managementPort), @@ -106,7 +105,17 @@ func TestKubernetesBackendCLIComplexLifecycle(t *testing.T) { t.Fatalf("webdav file = %q, want dev-write", got) } - statuses := e2e.RunClientJSON[[]e2e.RuntimePortStatus](t, bins, socket, "ports", created.ID) + _ = e2e.RunClientJSON[[]e2e.RuntimePortStatus](t, bins, socket, "ports", created.ID) + deadline := time.Now().Add(30 * time.Second) + var statuses []e2e.RuntimePortStatus + for time.Now().Before(deadline) { + _ = e2e.WaitHTTP(t, fmt.Sprintf("http://127.0.0.1:%d/index.txt", localPort)) + time.Sleep(1 * time.Second) + statuses = e2e.RunClientJSON[[]e2e.RuntimePortStatus](t, bins, socket, "ports", created.ID) + if status, ok := findRuntimePortStatus(statuses, fixture); ok && status.Source == "kubernetes-pod-stats" && status.RXBytes != nil && status.TXBytes != nil && status.TrafficBytes != nil && *status.TrafficBytes > 0 && status.TrafficOK != nil && *status.TrafficOK { + break + } + } assertKubernetesPort(t, statuses, fixture) if got := readPVCFile(t, namespace, pvc, "data/finite.txt"); !strings.Contains(got, "finite-ok") { @@ -273,18 +282,34 @@ func waitRuntimePodReady(t *testing.T, namespace string, pvc string) { func assertKubernetesPort(t *testing.T, statuses []e2e.RuntimePortStatus, fixture e2e.Fixture) { t.Helper() + status, ok := findRuntimePortStatus(statuses, fixture) + if !ok { + t.Fatalf("http port for %s not found in %#v", fixture.ServeProc, statuses) + } + if !status.Bound { + t.Fatalf("port status = %#v, want bound", status) + } + if status.Port != fixture.Port { + t.Fatalf("service port = %d, want %d in status %#v", status.Port, fixture.Port, status) + } + if status.Source != "kubernetes-pod-stats" { + t.Fatalf("source = %s, want kubernetes-pod-stats in status %#v", status.Source, status) + } + if status.RXBytes == nil || status.TXBytes == nil || status.TrafficBytes == nil { + t.Fatalf("traffic counters missing in status %#v", status) + } + if *status.TrafficBytes == 0 || status.TrafficOK == nil || !*status.TrafficOK { + t.Fatalf("traffic status = %#v, want positive traffic and traffic_ok=true", status) + } +} + +func findRuntimePortStatus(statuses []e2e.RuntimePortStatus, fixture e2e.Fixture) (e2e.RuntimePortStatus, bool) { for _, status := range statuses { if status.Name == "http" && status.Procedure == fixture.ServeProc { - if !status.Bound { - t.Fatalf("port status = %#v, want bound", status) - } - if status.Port != fixture.Port { - t.Fatalf("service port = %d, want %d in status %#v", status.Port, fixture.Port, status) - } - return + return status, true } } - t.Fatalf("http port for %s not found in %#v", fixture.ServeProc, statuses) + return e2e.RuntimePortStatus{}, false } func readPVCFile(t *testing.T, namespace string, pvc string, relativePath string) string {