Skip to content

Commit 6c56b53

Browse files
committed
feat: K8S API client integration
1 parent 8fddbd6 commit 6c56b53

19 files changed

Lines changed: 917 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
107107
- `workload_generations: true` — Deployment/StatefulSet observed vs desired generation drift
108108
- **`collectors.remote_write_receiver` config section**: Added to all config files (`tfo-agent.yaml`, `tfo-agent.default.yaml`, `tfo-agent-one-for-all.yaml`), default: `enabled: false`, `port: 9091`
109109
- **Apache License 2.0 headers**: Full license boilerplate + package documentation added to all 187 `.go` files across all packages; property-based test files previously missing headers now covered
110+
- **K8s NetworkPolicy Collector** (`internal/collector/kubernetes/network_policies.go`): Full NetworkPolicy resource collection with ingress/egress rule detail
111+
- Gathers all NetworkPolicy resources across namespaces; respects `shouldCollectNamespace` filter
112+
- Extracts policy types (Ingress/Egress), pod selectors, ingress/egress rule counts
113+
- Parses ingress rules: ports (protocol, port), peers (podSelector, namespaceSelector, IPBlock with CIDR and except ranges)
114+
- Parses egress rules: same structure as ingress — ports + to-peers with full IPBlock support
115+
- Emits `k8s.networkpolicy.count` gauge metric per namespace with cluster label
116+
- New data types: `NetworkPolicyState`, `NetworkPolicyRule`, `NetworkPolicyPort`, `NetworkPolicyPeer`, `NetworkPolicyIPBlock`
117+
- Added to `ClusterState.NetworkPolicies` for sync to TFO Platform
118+
- Configurable via `collectors.kubernetes.network_policies: true`
119+
- **Network Flow Exporter** (`internal/exporter/network_flows.go`): New exporter that batches and sends pod-to-pod network flow events to the TFO Platform
120+
- `NetworkFlowRecord` struct aligned with Cilium Hubble flow model: source/target namespace, pod, IP, port, labels, protocol, direction, verdict, bytes/packets, retransmits, RTT, HTTP status code, DNS query, external flag
121+
- `NetworkFlowExporter` with thread-safe buffer, periodic flush loop (default: 10s), configurable max batch size (default: 500)
122+
- POSTs `NetworkFlowBatch` to `/api/v2/monitoring/network-map/k8s/flows` with API key authentication headers
123+
- Graceful shutdown with final flush of remaining buffered flows
124+
- **Ingress Collector Separated** (`internal/collector/kubernetes/ingresses.go`): Extracted Ingress collection into its own file for clarity
125+
- Previously inlined; now a standalone `collectIngresses()` function returning `([]Metric, []IngressState, error)`
126+
- Collects alongside Services since they share networking context
127+
- **Services Collector Enhanced** (`internal/collector/kubernetes/services.go`): Expanded to return full `EndpointState` objects alongside `ServiceState`
128+
- `collectServices()` signature changed from `→ (metrics, []ServiceState, error)` to `→ (metrics, []ServiceState, []EndpointState, error)`
129+
- Pre-fetched endpoints now produce full `EndpointSubset` with ready/not-ready addresses, node names, target refs, and ports
130+
- Services now include `ServicePort` detail (name, protocol, port, target_port, node_port), external IPs from both spec and LoadBalancer status
131+
- Added describe-level fields: `SessionAffinity`, `ExternalTrafficPolicy`, `HealthCheckNodePort`, `LoadBalancerSourceRanges`
132+
- **Node Network Metrics Expanded** (`internal/collector/kubernetes/nodes.go`): Three new node-level network metrics from Kubelet summary
133+
- `k8s.node.network.receive_bytes` (Counter) — network bytes received per node
134+
- `k8s.node.network.transmit_bytes` (Counter) — network bytes transmitted per node
135+
- `k8s.node.network.receive_drop_total` (Counter) — network receive errors/drops per node
136+
- Also now tracks `totalRxDrop` and `totalTxDrop` across all node interfaces
137+
- **Pod QoS & Status Metrics** (`internal/collector/kubernetes/pods.go`): Two new pod-level metrics
138+
- `k8s.pod.qos_class` (Gauge, label: `qos_class`) — exposes pod QoS class (Guaranteed, Burstable, BestEffort)
139+
- `k8s.pod.status_reason` (Gauge, label: `reason`) — exposes pod status reason when present (Evicted, NodeLost, etc.)
140+
- **Test Exports** (`internal/collector/kubernetes/exports.go`): New file exposing internal parse functions for unit testing
141+
- `ParseApiServerMetricsExported()`, `ParseCoreDNSMetricsExported()`, `ParsePromLineExported()` — test-only wrappers
142+
- **K8s Unit Tests** (`tests/unit/domain/kubernetes/`): Four new test files for recently added sub-collectors
143+
- `apiserver_test.go` — API Server metrics parser validation
144+
- `coredns_test.go` — CoreDNS metrics parser validation
145+
- `ingresses_test.go` — Ingress collection with rules, TLS, LoadBalancer IPs
146+
- `services_test.go` — Service + Endpoint collection with describe-level fields
147+
- **Container Build Script** (`run-container.sh`): New unified container build/run script (221 lines) replacing the previous `run-build-container.sh`
110148

111149
### Fixed
112150

@@ -124,6 +162,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
124162
- **Helm chart path**: Renamed `deploy/helm/tfo-agent/``deploy/helm/telemetryflow-agent/` for naming consistency with other TelemetryFlow Helm charts
125163
- **Platform monolith configs** (`config/tfo-agent/`): All three deployment configs (`tfo-agent.yaml`, `tfo-agent.k8s.yaml`, `tfo-agent.container.yaml`) updated with KSM gap fields and `remote_write_receiver` section
126164
- **All config files updated** (`configs/`, `deploy/`): Added extended K8s metrics fields (`apiserver_metrics`, `coredns_metrics`, `container_extended_metrics`, `pv_io_stats`) to all config variants — `configs/tfo-agent.yaml`, `configs/tfo-agent.default.yaml`, `configs/tfo-agent-one-for-all.yaml`, `deploy/helm/values.yaml`, `deploy/helm/values-one-for-all.yaml`, `deploy/kubernetes/configmap.yaml`
165+
- **K8s config**: Added `network_policies: true` to all config files under `collectors.kubernetes`
166+
- **`ClusterState` type**: Added `NetworkPolicies []NetworkPolicyState` field for platform sync
167+
- **RBAC ClusterRole**: Added `networkpolicies` (networking.k8s.io) resource permission for NetworkPolicy collector
168+
169+
### Dependencies
170+
171+
- `google.golang.org/grpc`: Bumped from v1.79.1 to v1.79.3
127172

128173
## [1.1.8] - 2026-03-09
129174

@@ -540,7 +585,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
540585

541586
| Version | Date | OTEL SDK | Description |
542587
| ------- | ---------- | -------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
543-
| 1.1.9 | 2026-03-20 | v1.40.0 | K8s network resources (Services/Endpoints/Ingresses); API Server & CoreDNS metrics scrapers; Fluent Bit log collector; Prometheus Remote Write Receiver; KSM gap fields (5); license headers; eBPF build constraint fixes; Helm rename |
588+
| 1.1.9 | 2026-03-20 | v1.47.0 | K8s network resources (Services/Endpoints/Ingresses); NetworkPolicy collector + Network Flow Exporter; API Server & CoreDNS metrics scrapers; Fluent Bit log collector; Prometheus Remote Write Receiver; KSM gap fields (5); Pod QoS/status metrics; Node network rx/tx/drop metrics; 4 new K8s test files; license headers; eBPF build constraint fixes; Helm rename; gRPC v1.79.3 |
544589
| 1.1.8 | 2026-03-09 | v1.40.0 | HPA/PDB/pod-logs sub-collectors; Kubelet summary ephemeral + working set; Go 1.26 + security fixes; 17 collector docs |
545590
| 1.1.7 | 2026-03-08 | v1.40.0 | Stable agent identity via UUIDv5 host fingerprint; K8s provider detection (15 providers); fix SyncKubernetesState |
546591
| 1.1.6 | 2026-02-21 | v1.40.0 | Go 1.25.7, OTEL SDK v1.40.0, build-tag lint fixes, errcheck/staticcheck cleanup |

deploy/helm/telemetryflow-agent/templates/deployment-k8s.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,24 @@ spec:
9393
value: "true"
9494
- name: TELEMETRYFLOW_PROMETHEUS_PORT
9595
value: {{ .Values.ports.metrics | quote }}
96+
{{- if .Values.agentApi.enabled }}
97+
- name: TELEMETRYFLOW_AGENT_API_ENABLED
98+
value: "true"
99+
- name: TELEMETRYFLOW_AGENT_API_PORT
100+
value: {{ .Values.agentApi.port | default 8889 | quote }}
101+
{{- end }}
96102
{{- with .Values.extraEnv }}
97103
{{- toYaml . | nindent 12 }}
98104
{{- end }}
99105
ports:
100106
- containerPort: {{ .Values.ports.metrics }}
101107
name: metrics
102108
protocol: TCP
109+
{{- if .Values.agentApi.enabled }}
110+
- containerPort: {{ .Values.agentApi.port | default 8889 }}
111+
name: agent-api
112+
protocol: TCP
113+
{{- end }}
103114
# startupProbe: 90s budget for first-boot cluster auto-registration call
104115
startupProbe:
105116
httpGet:
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{{- if .Values.agentApi.enabled }}
2+
apiVersion: v1
3+
kind: Service
4+
metadata:
5+
name: {{ include "tfo-agent.fullname" . }}-k8s-api
6+
namespace: {{ .Release.Namespace }}
7+
labels:
8+
{{- include "tfo-agent.k8sCollector.labels" . | nindent 4 }}
9+
app.kubernetes.io/component: k8s-api
10+
spec:
11+
type: ClusterIP
12+
selector:
13+
{{- include "tfo-agent.k8sCollector.selectorLabels" . | nindent 4 }}
14+
ports:
15+
- name: agent-api
16+
port: {{ .Values.agentApi.port | default 8889 }}
17+
targetPort: {{ .Values.agentApi.port | default 8889 }}
18+
protocol: TCP
19+
{{- end }}

deploy/helm/telemetryflow-agent/values.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ ports:
3939
otlpHttp: 4318
4040
metrics: 8888
4141
health: 13133
42+
agentApi: 8889
43+
44+
# -- Agent HTTP API server (real-time K8s queries: pod log streaming)
45+
agentApi:
46+
enabled: true
47+
port: 8889
4248

4349
# -- ServiceAccount settings
4450
serviceAccount:

internal/agent/agent.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"go.uber.org/zap"
3131

32+
agentapi "github.com/telemetryflow/telemetryflow-agent/internal/api"
3233
"github.com/telemetryflow/telemetryflow-agent/internal/collector"
3334
cadvisorcollector "github.com/telemetryflow/telemetryflow-agent/internal/collector/cadvisor"
3435
dockercollector "github.com/telemetryflow/telemetryflow-agent/internal/collector/docker"
@@ -58,6 +59,7 @@ type Agent struct {
5859
k8sCollector *kubernetes.KubernetesCollector // kept for registration retry
5960
collectors []collector.Collector
6061
prometheusServer *exporter.PrometheusServer
62+
agentAPIServer *agentapi.Server
6163

6264
// State
6365
mu sync.RWMutex
@@ -367,6 +369,23 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
367369
)
368370
}
369371

372+
// Create Agent API server if enabled (for real-time K8s queries like pod log streaming)
373+
var apiServer *agentapi.Server
374+
if cfg.AgentAPI.Enabled && k8sCollector != nil {
375+
apiServer = agentapi.NewServer(
376+
agentapi.Config{
377+
Enabled: cfg.AgentAPI.Enabled,
378+
Port: cfg.AgentAPI.Port,
379+
APIKey: cfg.AgentAPI.APIKey,
380+
},
381+
k8sCollector.Clientset(),
382+
logger,
383+
)
384+
logger.Info("Agent API server enabled",
385+
zap.Int("port", cfg.AgentAPI.Port),
386+
)
387+
}
388+
370389
return &Agent{
371390
id: agentID,
372391
config: cfg,
@@ -377,6 +396,7 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
377396
k8sCollector: k8sCollector,
378397
collectors: collectors,
379398
prometheusServer: promServer,
399+
agentAPIServer: apiServer,
380400
}, nil
381401
}
382402

@@ -425,6 +445,15 @@ func (a *Agent) Run(ctx context.Context) error {
425445
}()
426446
}
427447

448+
// Start Agent API server (pod log streaming)
449+
if a.agentAPIServer != nil {
450+
go func() {
451+
if err := a.agentAPIServer.Start(ctx); err != nil && err != context.Canceled {
452+
errChan <- fmt.Errorf("agent API server error: %w", err)
453+
}
454+
}()
455+
}
456+
428457
// Start heartbeat
429458
go func() {
430459
if err := a.heartbeat.Start(ctx); err != nil && err != context.Canceled {
@@ -536,6 +565,19 @@ func (a *Agent) shutdown() error {
536565
}()
537566
}
538567

568+
// Stop Agent API server
569+
if a.agentAPIServer != nil {
570+
wg.Add(1)
571+
go func() {
572+
defer wg.Done()
573+
if err := a.agentAPIServer.Stop(); err != nil {
574+
errMu.Lock()
575+
errs = append(errs, fmt.Errorf("agent API server stop: %w", err))
576+
errMu.Unlock()
577+
}
578+
}()
579+
}
580+
539581
// Stop heartbeat
540582
wg.Add(1)
541583
go func() {

internal/api/handlers.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package api
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"strconv"
9+
10+
"go.uber.org/zap"
11+
corev1 "k8s.io/api/core/v1"
12+
)
13+
14+
// handleHealth returns a simple health check response.
15+
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
16+
w.Header().Set("Content-Type", "application/json")
17+
w.WriteHeader(http.StatusOK)
18+
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
19+
}
20+
21+
// handlePodLogs streams pod container logs via SSE (Server-Sent Events).
22+
// Query params: container, follow, tailLines, sinceSeconds, previous, timestamps
23+
func (s *Server) handlePodLogs(w http.ResponseWriter, r *http.Request) {
24+
namespace := r.PathValue("namespace")
25+
podName := r.PathValue("pod")
26+
27+
if namespace == "" || podName == "" {
28+
http.Error(w, `{"error":"namespace and pod are required"}`, http.StatusBadRequest)
29+
return
30+
}
31+
32+
// Parse query parameters
33+
container := r.URL.Query().Get("container")
34+
follow := r.URL.Query().Get("follow") == "true"
35+
timestamps := r.URL.Query().Get("timestamps") != "false" // default true
36+
previous := r.URL.Query().Get("previous") == "true"
37+
38+
var tailLines *int64
39+
if tl := r.URL.Query().Get("tailLines"); tl != "" {
40+
if n, err := strconv.ParseInt(tl, 10, 64); err == nil {
41+
tailLines = &n
42+
}
43+
}
44+
// Default to 100 tail lines if not specified and not following
45+
if tailLines == nil {
46+
defaultTail := int64(100)
47+
tailLines = &defaultTail
48+
}
49+
50+
var sinceSeconds *int64
51+
if ss := r.URL.Query().Get("sinceSeconds"); ss != "" {
52+
if n, err := strconv.ParseInt(ss, 10, 64); err == nil {
53+
sinceSeconds = &n
54+
}
55+
}
56+
57+
// Build PodLogOptions
58+
opts := &corev1.PodLogOptions{
59+
Follow: follow,
60+
Timestamps: timestamps,
61+
Previous: previous,
62+
TailLines: tailLines,
63+
SinceSeconds: sinceSeconds,
64+
}
65+
if container != "" {
66+
opts.Container = container
67+
}
68+
69+
s.logger.Debug("Pod log request",
70+
zap.String("namespace", namespace),
71+
zap.String("pod", podName),
72+
zap.String("container", container),
73+
zap.Bool("follow", follow),
74+
)
75+
76+
// Get log stream from K8s API
77+
stream, err := s.clientset.CoreV1().Pods(namespace).GetLogs(podName, opts).Stream(r.Context())
78+
if err != nil {
79+
s.logger.Error("Failed to get pod log stream",
80+
zap.String("namespace", namespace),
81+
zap.String("pod", podName),
82+
zap.Error(err),
83+
)
84+
http.Error(w, fmt.Sprintf(`{"error":"failed to get pod logs: %s"}`, err.Error()), http.StatusInternalServerError)
85+
return
86+
}
87+
defer func() { _ = stream.Close() }()
88+
89+
if follow {
90+
// SSE streaming mode
91+
w.Header().Set("Content-Type", "text/event-stream")
92+
w.Header().Set("Cache-Control", "no-cache")
93+
w.Header().Set("Connection", "keep-alive")
94+
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
95+
96+
flusher, ok := w.(http.Flusher)
97+
if !ok {
98+
http.Error(w, `{"error":"streaming not supported"}`, http.StatusInternalServerError)
99+
return
100+
}
101+
102+
scanner := bufio.NewScanner(stream)
103+
// Increase buffer size for long log lines (1MB)
104+
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
105+
106+
for scanner.Scan() {
107+
select {
108+
case <-r.Context().Done():
109+
return
110+
default:
111+
}
112+
line := scanner.Text()
113+
_, _ = fmt.Fprintf(w, "data: %s\n\n", line)
114+
flusher.Flush()
115+
}
116+
117+
if err := scanner.Err(); err != nil {
118+
s.logger.Debug("Pod log stream ended", zap.Error(err))
119+
}
120+
} else {
121+
// Non-streaming mode: return all lines as JSON array
122+
w.Header().Set("Content-Type", "application/json")
123+
124+
var lines []string
125+
scanner := bufio.NewScanner(stream)
126+
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
127+
128+
for scanner.Scan() {
129+
lines = append(lines, scanner.Text())
130+
}
131+
132+
_ = json.NewEncoder(w).Encode(map[string]interface{}{
133+
"namespace": namespace,
134+
"pod": podName,
135+
"container": container,
136+
"lines": lines,
137+
"count": len(lines),
138+
})
139+
}
140+
}

internal/api/middleware.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package api
2+
3+
import (
4+
"net/http"
5+
6+
"go.uber.org/zap"
7+
)
8+
9+
// authMiddleware validates API key from request headers.
10+
// Accepts X-TelemetryFlow-Key-ID or X-API-Key-ID headers.
11+
func (s *Server) authMiddleware(next http.Handler) http.Handler {
12+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
13+
// Health endpoint is always public
14+
if r.URL.Path == "/api/v1/health" {
15+
next.ServeHTTP(w, r)
16+
return
17+
}
18+
19+
// Skip auth if no API key is configured (development mode)
20+
if s.config.APIKey == "" {
21+
next.ServeHTTP(w, r)
22+
return
23+
}
24+
25+
// Check API key from headers
26+
apiKey := r.Header.Get("X-TelemetryFlow-Key-ID")
27+
if apiKey == "" {
28+
apiKey = r.Header.Get("X-API-Key-ID")
29+
}
30+
if apiKey == "" {
31+
apiKey = r.Header.Get("Authorization")
32+
}
33+
34+
if apiKey != s.config.APIKey {
35+
s.logger.Debug("Agent API auth failed",
36+
zap.String("remote", r.RemoteAddr),
37+
zap.String("path", r.URL.Path),
38+
)
39+
http.Error(w, `{"error":"unauthorized"}`, http.StatusUnauthorized)
40+
return
41+
}
42+
43+
next.ServeHTTP(w, r)
44+
})
45+
}

0 commit comments

Comments
 (0)