Skip to content

Commit 7f98b41

Browse files
committed
fix[agent-manager]: CPU overutilization fix
1 parent 47f2c8c commit 7f98b41

2 files changed

Lines changed: 63 additions & 8 deletions

File tree

agent-manager/agent/collector_imp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func InitCollectorService() {
5555
CollectorConfigsCache: make(map[uint][]*CollectorConfigGroup),
5656
CacheCollectorKey: make(map[uint]string),
5757
CollectorPendigConfigChan: make(chan *CollectorConfig, 1000),
58-
CollectorTypes: []enum.UTMModule{enum.IBM_AS_400},
58+
CollectorTypes: []enum.UTMModule{},
5959
DBConnection: database.GetDB(),
6060
}
6161
collectors := []models.Collector{}

agent-manager/agent/lastseen_imp.go

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"fmt"
55
"io"
66
"strconv"
7-
sync "sync"
7+
"sync"
88
"time"
99

1010
"github.com/utmstack/UTMStack/agent-manager/database"
@@ -76,30 +76,84 @@ func (s *LastSeenService) processPings() {
7676
s.CacheCollectorLastSeenMutex.Unlock()
7777
}
7878
}
79+
80+
utils.ALogger.Info("processPings goroutine ended")
7981
}
8082

8183
func (s *LastSeenService) flushLastSeenToDB() {
8284
ticker := time.NewTicker(30 * time.Second)
8385
for range ticker.C {
86+
8487
pings := []models.LastSeen{}
8588

89+
// Agent cache access
8690
s.CacheAgentLastSeenMutex.Lock()
91+
agentPings := make([]models.LastSeen, 0, len(s.CacheAgentLastSeen))
8792
for _, lastSeen := range s.CacheAgentLastSeen {
88-
pings = append(pings, lastSeen)
93+
agentPings = append(agentPings, lastSeen)
8994
}
9095
s.CacheAgentLastSeenMutex.Unlock()
96+
pings = append(pings, agentPings...)
9197

98+
// Collector cache access
9299
s.CacheCollectorLastSeenMutex.Lock()
100+
collectorPings := make([]models.LastSeen, 0, len(s.CacheCollectorLastSeen))
93101
for _, lastSeen := range s.CacheCollectorLastSeen {
94-
pings = append(pings, lastSeen)
102+
collectorPings = append(collectorPings, lastSeen)
95103
}
96104
s.CacheCollectorLastSeenMutex.Unlock()
105+
pings = append(pings, collectorPings...)
106+
107+
// Database operations
108+
dbOpsCount := len(pings)
109+
110+
if dbOpsCount == 0 {
111+
continue
112+
}
113+
114+
// Use parallel individual upserts for better performance
115+
const maxWorkers = 10
116+
workers := dbOpsCount
117+
if workers > maxWorkers {
118+
workers = maxWorkers
119+
}
97120

121+
pingChan := make(chan models.LastSeen, dbOpsCount)
122+
errorChan := make(chan error, dbOpsCount)
123+
var wg sync.WaitGroup
124+
125+
// Start workers
126+
for i := 0; i < workers; i++ {
127+
wg.Add(1)
128+
go func() {
129+
defer wg.Done()
130+
for ping := range pingChan {
131+
err := s.DBConnection.Upsert(&ping, "connector_id = ?", nil, ping.ConnectorID)
132+
if err != nil {
133+
utils.ALogger.ErrorF("failed to save LastSeen item for connector %d: %v", ping.ConnectorID, err)
134+
select {
135+
case errorChan <- err:
136+
default:
137+
}
138+
}
139+
}
140+
}()
141+
}
142+
143+
// Send pings to workers
98144
for _, ping := range pings {
99-
err := s.DBConnection.Upsert(&ping, "connector_id = ?", nil, ping.ConnectorID)
100-
if err != nil {
101-
utils.ALogger.ErrorF("failed to save LastSeen item: %v", err)
102-
}
145+
pingChan <- ping
146+
}
147+
close(pingChan)
148+
149+
// Wait for all workers to complete
150+
wg.Wait()
151+
close(errorChan)
152+
153+
// Count errors
154+
totalErrors := 0
155+
for range errorChan {
156+
totalErrors++
103157
}
104158
}
105159
}
@@ -122,6 +176,7 @@ func (s *LastSeenService) Ping(stream PingService_PingServer) error {
122176
if err != nil {
123177
return status.Error(codes.Internal, err.Error())
124178
}
179+
125180
LastSeenChannel <- models.LastSeen{
126181
ConnectorID: uint(idInt),
127182
ConnectorType: typ,

0 commit comments

Comments
 (0)