Skip to content

Commit 4ed900a

Browse files
committed
fix: TFO-Agent logs native
1 parent d7986cc commit 4ed900a

11 files changed

Lines changed: 997 additions & 3 deletions

File tree

internal/agent/agent.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
dockercollector "github.com/telemetryflow/telemetryflow-agent/internal/collector/docker"
3535
ebpfcollector "github.com/telemetryflow/telemetryflow-agent/internal/collector/ebpf"
3636
"github.com/telemetryflow/telemetryflow-agent/internal/collector/kubernetes"
37+
logcollector "github.com/telemetryflow/telemetryflow-agent/internal/collector/log"
3738
"github.com/telemetryflow/telemetryflow-agent/internal/collector/nodeexporter"
3839
"github.com/telemetryflow/telemetryflow-agent/internal/collector/scraper"
3940
"github.com/telemetryflow/telemetryflow-agent/internal/collector/system"
@@ -243,6 +244,16 @@ func New(cfg *config.Config, logger *zap.Logger) (*Agent, error) {
243244
collectors = append(collectors, sysCollector)
244245
}
245246

247+
// Add log collector if enabled (file tailing + journald)
248+
if cfg.Collector.Logs.Enabled {
249+
logCol := logcollector.NewLogCollector(cfg.Collector.Logs, agentID, logger)
250+
collectors = append(collectors, logCol)
251+
logger.Info("Log collector enabled",
252+
zap.Int("paths", len(cfg.Collector.Logs.Paths)),
253+
zap.Bool("journald", cfg.Collector.Logs.Journald.Enabled),
254+
)
255+
}
256+
246257
// Add Prometheus Scraper collector if enabled
247258
if cfg.Collector.PrometheusScraper.Enabled {
248259
scraperCfg := scraper.ScraperConfig{

internal/collector/kubernetes/kubernetes.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,16 @@ func (k *KubernetesCollector) Collect(ctx context.Context) ([]collector.Metric,
311311
}
312312
}
313313

314+
// --- Node Logs ---
315+
if k.cfg.NodeLogs {
316+
nodeLogs, err := collectNodeLogs(ctx, k.clientset, k.cfg, k.logger)
317+
if err != nil {
318+
k.logger.Warn("Failed to collect node logs", zap.Error(err))
319+
} else {
320+
state.NodeLogs = nodeLogs
321+
}
322+
}
323+
314324
// --- Network (Kubelet Summary API) ---
315325
if k.cfg.Network {
316326
// Gather node names from already-collected state, or list them
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Package kubernetes — node log collection from K8s nodes.
2+
//
3+
// Collects kubelet, kube-proxy, and containerd logs from each node via the
4+
// K8s API server node proxy endpoint:
5+
//
6+
// GET /api/v1/nodes/{name}/proxy/logs/{source}.log
7+
//
8+
// Falls back to reading from well-known host paths if mounted (DaemonSet mode):
9+
//
10+
// /var/log/syslog, /var/log/kubelet.log, /var/log/containers/kube-proxy-*
11+
//
12+
// TelemetryFlow Agent - Community Enterprise Observability Platform
13+
// Copyright (c) 2024-2026 TelemetryFlow. All rights reserved.
14+
// Open Source Software built by DevOpsCorner Indonesia.
15+
//
16+
// Licensed under the Apache License, Version 2.0 (the "License");
17+
// you may not use this file except in compliance with the License.
18+
// You may obtain a copy of the License at
19+
//
20+
// http://www.apache.org/licenses/LICENSE-2.0
21+
//
22+
// Unless required by applicable law or agreed to in writing, software
23+
// distributed under the License is distributed on an "AS IS" BASIS,
24+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25+
// See the License for the specific language governing permissions and
26+
// limitations under the License.
27+
package kubernetes
28+
29+
import (
30+
"bufio"
31+
"context"
32+
"fmt"
33+
"io"
34+
"strings"
35+
"time"
36+
37+
"go.uber.org/zap"
38+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39+
"k8s.io/client-go/kubernetes"
40+
)
41+
42+
// nodeLogSourcePaths maps source names to their log file paths on the node.
43+
var nodeLogSourcePaths = map[string][]string{
44+
"kubelet": {"kubelet.log", "journal/kubelet"},
45+
"kube-proxy": {"kube-proxy.log"},
46+
"containerd": {"containerd.log"},
47+
}
48+
49+
// collectNodeLogs retrieves recent log lines from each node for configured sources.
50+
// It uses the K8s API node proxy to access /var/log/ on each node.
51+
func collectNodeLogs(
52+
ctx context.Context,
53+
cs kubernetes.Interface,
54+
cfg Config,
55+
logger *zap.Logger,
56+
) ([]NodeLogEntry, error) {
57+
tailLines := cfg.NodeLogsTailLines
58+
if tailLines <= 0 {
59+
tailLines = 200
60+
}
61+
62+
sources := cfg.NodeLogSources
63+
if len(sources) == 0 {
64+
sources = []string{"kubelet", "kube-proxy", "containerd"}
65+
}
66+
67+
// List all nodes
68+
nodeList, err := cs.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to list nodes: %w", err)
71+
}
72+
73+
var entries []NodeLogEntry
74+
now := time.Now()
75+
76+
for i := range nodeList.Items {
77+
node := &nodeList.Items[i]
78+
79+
for _, source := range sources {
80+
paths, ok := nodeLogSourcePaths[source]
81+
if !ok {
82+
paths = []string{source + ".log"}
83+
}
84+
85+
var lines []string
86+
var fetchErr error
87+
for _, logPath := range paths {
88+
lines, fetchErr = fetchNodeLogViaProxy(ctx, cs, node.Name, logPath, tailLines)
89+
if fetchErr == nil && len(lines) > 0 {
90+
break // Found logs at this path
91+
}
92+
}
93+
94+
if fetchErr != nil {
95+
logger.Debug("Failed to collect node logs via proxy",
96+
zap.String("node", node.Name),
97+
zap.String("source", source),
98+
zap.Error(fetchErr),
99+
)
100+
continue
101+
}
102+
103+
if len(lines) == 0 {
104+
continue
105+
}
106+
107+
entries = append(entries, NodeLogEntry{
108+
NodeName: node.Name,
109+
Source: source,
110+
Lines: lines,
111+
CollectedAt: now,
112+
})
113+
}
114+
}
115+
116+
return entries, nil
117+
}
118+
119+
// fetchNodeLogViaProxy reads log lines from a node using the K8s API proxy.
120+
// Path is relative to /var/log/ on the node (e.g., "kubelet.log").
121+
func fetchNodeLogViaProxy(
122+
ctx context.Context,
123+
cs kubernetes.Interface,
124+
nodeName string,
125+
logPath string,
126+
tailLines int64,
127+
) ([]string, error) {
128+
req := cs.CoreV1().RESTClient().
129+
Get().
130+
Resource("nodes").
131+
Name(nodeName).
132+
SubResource("proxy", "logs", logPath).
133+
Param("tailLines", fmt.Sprintf("%d", tailLines))
134+
135+
rc, err := req.Stream(ctx)
136+
if err != nil {
137+
return nil, fmt.Errorf("proxy stream to node %s/%s: %w", nodeName, logPath, err)
138+
}
139+
defer func() { _ = rc.Close() }()
140+
141+
return readLines(rc, tailLines), nil
142+
}
143+
144+
// readLines reads up to maxLines non-empty lines from a reader.
145+
func readLines(r io.Reader, maxLines int64) []string {
146+
scanner := bufio.NewScanner(r)
147+
// Increase buffer for long log lines (default 64KB)
148+
scanner.Buffer(make([]byte, 0, 64*1024), 256*1024)
149+
150+
var lines []string
151+
for scanner.Scan() {
152+
line := scanner.Text()
153+
if strings.TrimSpace(line) == "" {
154+
continue
155+
}
156+
lines = append(lines, line)
157+
if int64(len(lines)) >= maxLines {
158+
break
159+
}
160+
}
161+
return lines
162+
}

internal/collector/kubernetes/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type ClusterState struct {
4545
HPAs []HPAState `json:"hpas,omitempty"`
4646
PDBs []PDBState `json:"pdbs,omitempty"`
4747
PodLogs []PodLogEntry `json:"pod_logs,omitempty"`
48+
NodeLogs []NodeLogEntry `json:"node_logs,omitempty"`
4849
ApiServerMetrics *ApiServerMetrics `json:"apiserver_metrics,omitempty"`
4950
CoreDNSMetrics *CoreDNSMetrics `json:"coredns_metrics,omitempty"`
5051
}
@@ -229,6 +230,14 @@ type PodLogEntry struct {
229230
CollectedAt time.Time `json:"collected_at"`
230231
}
231232

233+
// NodeLogEntry holds log lines from a K8s node system service (kubelet, kube-proxy, containerd).
234+
type NodeLogEntry struct {
235+
NodeName string `json:"node_name"`
236+
Source string `json:"source"` // kubelet, kube-proxy, containerd
237+
Lines []string `json:"lines,omitempty"`
238+
CollectedAt time.Time `json:"collected_at"`
239+
}
240+
232241
// ResourceQuotaUsage represents a single resource's used/hard values.
233242
type ResourceQuotaUsage struct {
234243
Used string `json:"used"`

0 commit comments

Comments
 (0)