Skip to content

Commit 8b630ef

Browse files
committed
fix: Kubernetes Colector registration
1 parent e6649ae commit 8b630ef

4 files changed

Lines changed: 67 additions & 8 deletions

File tree

.env.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# -----------------------------------------------------------------------------
88
# Build Configuration
99
# -----------------------------------------------------------------------------
10-
VERSION=1.1.4
10+
VERSION=1.1.7
1111
OTEL_SDK_VERSION=1.39.0
1212
GIT_COMMIT=unknown
1313
GIT_BRANCH=main

deploy/kubernetes/daemonset.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ spec:
5252
- sh
5353
- -c
5454
- |
55-
until wget -qO- http://tfo-backend.telemetryflow.svc.cluster.local:3100/api/v2/health >/dev/null 2>&1; do
55+
until wget -qO- http://tfo-backend.telemetryflow.svc.cluster.local:3100/health >/dev/null 2>&1; do
5656
echo "Waiting for tfo-backend..."; sleep 5
5757
done
5858
echo "tfo-backend is ready."
@@ -67,7 +67,7 @@ spec:
6767

6868
containers:
6969
- name: tfo-agent
70-
image: telemetryflow/telemetryflow-agent:latest
70+
image: telemetryflow/telemetryflow-agent:1.1.7
7171
imagePullPolicy: Always
7272

7373
args:

deploy/kubernetes/deployment-k8s.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ spec:
5959
- sh
6060
- -c
6161
- |
62-
until wget -qO- http://tfo-backend.telemetryflow.svc.cluster.local:3100/api/v2/health >/dev/null 2>&1; do
62+
until wget -qO- http://tfo-backend.telemetryflow.svc.cluster.local:3100/health >/dev/null 2>&1; do
6363
echo "Waiting for tfo-backend..."; sleep 5
6464
done
6565
echo "tfo-backend is ready."
@@ -74,7 +74,7 @@ spec:
7474

7575
containers:
7676
- name: tfo-agent
77-
image: telemetryflow/telemetryflow-agent:latest
77+
image: telemetryflow/telemetryflow-agent:1.1.7
7878
imagePullPolicy: Always
7979

8080
args:

internal/agent/agent.go

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"os"
8+
"strings"
89
"sync"
910
"time"
1011

@@ -32,6 +33,7 @@ type Agent struct {
3233
client *api.Client
3334
heartbeat *exporter.Heartbeat
3435
k8sSync *exporter.KubernetesSync
36+
k8sCollector *kubernetes.KubernetesCollector // kept for registration retry
3537
collectors []collector.Collector
3638
prometheusServer *exporter.PrometheusServer
3739

@@ -126,7 +128,10 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
126128

127129
// Auto-detect Kubernetes: enable collector + sync when running inside a K8s cluster.
128130
// KUBERNETES_SERVICE_HOST is injected by the kubelet into every pod.
129-
if !cfg.Collector.Kubernetes.Enabled && os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
131+
// Skip auto-detection if K8s was explicitly disabled via TELEMETRYFLOW_K8S_ENABLED=false
132+
// (e.g. DaemonSet pods that only run node_exporter).
133+
k8sExplicitlyDisabled := strings.EqualFold(os.Getenv("TELEMETRYFLOW_K8S_ENABLED"), "false")
134+
if !cfg.Collector.Kubernetes.Enabled && !k8sExplicitlyDisabled && os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
130135
cfg.Collector.Kubernetes.Enabled = true
131136
if !cfg.Collector.Kubernetes.SyncToBackend {
132137
cfg.Collector.Kubernetes.SyncToBackend = true
@@ -136,13 +141,15 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
136141

137142
// Add Kubernetes collector if enabled
138143
var k8sSync *exporter.KubernetesSync
144+
var k8sCollector *kubernetes.KubernetesCollector
139145
if cfg.Collector.Kubernetes.Enabled {
140-
k8sCollector, err := kubernetes.NewKubernetesCollector(cfg.Collector.Kubernetes, logger)
146+
col, err := kubernetes.NewKubernetesCollector(cfg.Collector.Kubernetes, logger)
141147
if err != nil {
142148
logger.Warn("Failed to create Kubernetes collector, skipping",
143149
zap.Error(err),
144150
)
145151
} else {
152+
k8sCollector = col
146153
collectors = append(collectors, k8sCollector)
147154
logger.Info("Kubernetes collector enabled",
148155
zap.String("cluster", k8sCollector.ClusterName()),
@@ -158,7 +165,7 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
158165
})
159166
regCancel()
160167
if regErr != nil {
161-
logger.Warn("Failed to auto-register Kubernetes cluster, sync disabled",
168+
logger.Warn("Failed to auto-register Kubernetes cluster, will retry in background",
162169
zap.Error(regErr),
163170
zap.String("cluster", k8sCollector.ClusterName()),
164171
)
@@ -233,6 +240,7 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
233240
client: client,
234241
heartbeat: heartbeat,
235242
k8sSync: k8sSync,
243+
k8sCollector: k8sCollector,
236244
collectors: collectors,
237245
prometheusServer: promServer,
238246
}, nil
@@ -297,6 +305,57 @@ func (a *Agent) Run(ctx context.Context) error {
297305
errChan <- fmt.Errorf("kubernetes sync error: %w", err)
298306
}
299307
}()
308+
} else if a.k8sCollector != nil && a.config.Collector.Kubernetes.SyncToBackend {
309+
// Registration failed at startup — retry with exponential backoff in background.
310+
go func() {
311+
backoff := 15 * time.Second
312+
for {
313+
select {
314+
case <-ctx.Done():
315+
return
316+
case <-time.After(backoff):
317+
}
318+
regCtx, regCancel := context.WithTimeout(ctx, 30*time.Second)
319+
regResp, regErr := a.client.AgentRegisterCluster(regCtx, &api.AgentRegisterClusterRequest{
320+
Name: a.k8sCollector.ClusterName(),
321+
Provider: a.k8sCollector.ClusterProvider(),
322+
})
323+
regCancel()
324+
if regErr != nil {
325+
if backoff < 5*time.Minute {
326+
backoff *= 2
327+
}
328+
a.logger.Warn("K8s cluster registration retry failed",
329+
zap.Error(regErr),
330+
zap.Duration("next_retry", backoff),
331+
)
332+
continue
333+
}
334+
a.config.Collector.Kubernetes.ClusterID = regResp.ID
335+
a.logger.Info("Kubernetes cluster registered (retry succeeded)",
336+
zap.String("clusterID", regResp.ID),
337+
zap.String("name", regResp.Name),
338+
)
339+
syncInterval := a.config.Collector.Kubernetes.SyncInterval
340+
if syncInterval == 0 {
341+
syncInterval = 60 * time.Second
342+
}
343+
a.mu.Lock()
344+
a.k8sSync = exporter.NewKubernetesSync(exporter.KubernetesSyncConfig{
345+
ClusterID: regResp.ID,
346+
Interval: syncInterval,
347+
Collector: a.k8sCollector,
348+
Client: a.client,
349+
Logger: a.logger,
350+
})
351+
sync := a.k8sSync
352+
a.mu.Unlock()
353+
if err := sync.Start(ctx); err != nil && err != context.Canceled {
354+
a.logger.Error("Kubernetes sync stopped", zap.Error(err))
355+
}
356+
return
357+
}
358+
}()
300359
}
301360

302361
// Start collectors

0 commit comments

Comments
 (0)