Skip to content

Commit 5c425cb

Browse files
committed
refactor(as400): reorganize logservice package and improve log processing logic
1 parent 18df2bf commit 5c425cb

File tree

1 file changed

+71
-104
lines changed

1 file changed

+71
-104
lines changed

as400/logservice/processor.go

Lines changed: 71 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
package agent
1+
package logservice
22

33
import (
44
"context"
55
"errors"
6+
"os"
67
"strconv"
78
"strings"
89
"sync"
@@ -11,7 +12,6 @@ import (
1112
"github.com/google/uuid"
1213
"github.com/threatwinds/go-sdk/plugins"
1314

14-
<<<<<<<< HEAD:as400/logservice/processor.go
1515
"github.com/utmstack/UTMStack/as400/agent"
1616
"github.com/utmstack/UTMStack/as400/config"
1717
"github.com/utmstack/UTMStack/as400/conn"
@@ -21,13 +21,6 @@ import (
2121
"google.golang.org/grpc/codes"
2222
"google.golang.org/grpc/metadata"
2323
"google.golang.org/grpc/status"
24-
========
25-
"github.com/utmstack/UTMStack/agent/config"
26-
"github.com/utmstack/UTMStack/agent/database"
27-
"github.com/utmstack/UTMStack/agent/models"
28-
"github.com/utmstack/UTMStack/agent/utils"
29-
"github.com/utmstack/UTMStack/shared/fs"
30-
>>>>>>>> origin/v11:agent/agent/logprocessor.go
3124
)
3225

3326
type LogProcessor struct {
@@ -38,48 +31,31 @@ type LogProcessor struct {
3831
}
3932

4033
var (
41-
processor LogProcessor
42-
processorOnce sync.Once
43-
processorInitErr error
44-
LogQueue = make(chan *plugins.Log, 10000)
45-
timeCLeanLogs = 10 * time.Minute
46-
47-
// ErrAgentUninstalled is returned when the agent uninstalls itself due to invalid key
48-
ErrAgentUninstalled = errors.New("agent uninstalled due to invalid key")
34+
processor LogProcessor
35+
processorOnce sync.Once
36+
LogQueue = make(chan *plugins.Log)
37+
timeToSleep = 10 * time.Second
38+
timeCLeanLogs = 10 * time.Minute
4939
)
5040

51-
func GetLogProcessor() (*LogProcessor, error) {
41+
func GetLogProcessor() LogProcessor {
5242
processorOnce.Do(func() {
53-
db, err := database.GetDB()
54-
if err != nil {
55-
processorInitErr = err
56-
return
57-
}
5843
processor = LogProcessor{
59-
db: db,
44+
db: database.GetDB(),
6045
connErrWritten: false,
6146
ackErrWritten: false,
6247
sendErrWritten: false,
6348
}
6449
})
65-
if processorInitErr != nil {
66-
return nil, processorInitErr
67-
}
68-
return &processor, nil
50+
return processor
6951
}
7052

7153
func (l *LogProcessor) ProcessLogs(cnf *config.Config, ctx context.Context) {
7254
go l.CleanCountedLogs()
7355

7456
for {
75-
select {
76-
case <-ctx.Done():
77-
utils.Logger.Info("ProcessLogs stopping due to context cancellation")
78-
return
79-
default:
80-
}
81-
82-
connection, err := GetCorrelationConnection(cnf)
57+
ctxEof, cancelEof := context.WithCancel(context.Background())
58+
connection, err := conn.GetCorrelationConnection(cnf)
8359
if err != nil {
8460
if !l.connErrWritten {
8561
utils.Logger.ErrorF("error connecting to Correlation: %v", err)
@@ -90,27 +66,9 @@ func (l *LogProcessor) ProcessLogs(cnf *config.Config, ctx context.Context) {
9066
}
9167

9268
client := plugins.NewIntegrationClient(connection)
93-
<<<<<<<< HEAD:as400/logservice/processor.go
9469
plClient := createClient(client, ctx, cnf)
95-
========
96-
plClient, err := createClient(client, ctx)
97-
if err != nil {
98-
if errors.Is(err, ErrAgentUninstalled) {
99-
utils.Logger.Info("Agent uninstalled, stopping log processor")
100-
return
101-
}
102-
if errors.Is(err, context.Canceled) {
103-
utils.Logger.Info("ProcessLogs stopping due to context cancellation")
104-
return
105-
}
106-
utils.Logger.ErrorF("error creating client: %v", err)
107-
continue
108-
}
109-
>>>>>>>> origin/v11:agent/agent/logprocessor.go
11070
l.connErrWritten = false
11171

112-
// Create context only after successful client creation to avoid leaks
113-
ctxEof, cancelEof := context.WithCancel(context.Background())
11472
go l.handleAcknowledgements(plClient, ctxEof, cancelEof)
11573
l.processLogs(plClient, ctxEof, cancelEof)
11674
}
@@ -124,20 +82,38 @@ func (l *LogProcessor) handleAcknowledgements(plClient plugins.Integration_Proce
12482
default:
12583
ack, err := plClient.Recv()
12684
if err != nil {
127-
action := HandleGRPCStreamError(err, "failed to receive ack", &l.ackErrWritten)
128-
if action == ActionReconnect {
85+
if strings.Contains(err.Error(), "EOF") {
86+
time.Sleep(timeToSleep)
12987
cancel()
13088
return
13189
}
132-
continue
90+
st, ok := status.FromError(err)
91+
if ok && (st.Code() == codes.Unavailable || st.Code() == codes.Canceled) {
92+
if !l.ackErrWritten {
93+
utils.Logger.ErrorF("failed to receive ack: %v", err)
94+
l.ackErrWritten = true
95+
}
96+
time.Sleep(timeToSleep)
97+
cancel()
98+
return
99+
} else {
100+
if !l.ackErrWritten {
101+
utils.Logger.ErrorF("failed to receive ack: %v", err)
102+
l.ackErrWritten = true
103+
}
104+
time.Sleep(timeToSleep)
105+
continue
106+
}
133107
}
134108

135109
l.ackErrWritten = false
136110

111+
l.db.Lock()
137112
err = l.db.Update(&models.Log{}, "id", ack.LastId, "processed", true)
138113
if err != nil {
139114
utils.Logger.ErrorF("failed to update log: %v", err)
140115
}
116+
l.db.Unlock()
141117
}
142118
}
143119
}
@@ -149,28 +125,44 @@ func (l *LogProcessor) processLogs(plClient plugins.Integration_ProcessLogClient
149125
utils.Logger.Info("context done, exiting processLogs")
150126
return
151127
case newLog := <-LogQueue:
152-
if newLog.Id == "" {
153-
id, err := uuid.NewRandom()
154-
if err != nil {
155-
utils.Logger.ErrorF("failed to generate uuid: %v", err)
156-
continue
157-
}
128+
id, err := uuid.NewRandom()
129+
if err != nil {
130+
utils.Logger.ErrorF("failed to generate uuid: %v", err)
131+
continue
132+
}
158133

159-
newLog.Id = id.String()
160-
err = l.db.Create(&models.Log{ID: newLog.Id, Log: newLog.Raw, Type: newLog.DataType, CreatedAt: time.Now(), DataSource: newLog.DataSource, Processed: false})
161-
if err != nil {
162-
utils.Logger.ErrorF("failed to save log: %v :log: %s", err, newLog.Raw)
163-
}
134+
newLog.Id = id.String()
135+
l.db.Lock()
136+
err = l.db.Create(&models.Log{ID: newLog.Id, Log: newLog.Raw, Type: newLog.DataType, CreatedAt: time.Now(), DataSource: newLog.DataSource, Processed: false})
137+
if err != nil {
138+
utils.Logger.ErrorF("failed to save log: %v :log: %s", err, newLog.Raw)
164139
}
140+
l.db.Unlock()
165141

166-
err := plClient.Send(newLog)
142+
err = plClient.Send(newLog)
167143
if err != nil {
168-
action := HandleGRPCStreamError(err, "failed to send log", &l.sendErrWritten)
169-
if action == ActionReconnect {
144+
if strings.Contains(err.Error(), "EOF") {
145+
time.Sleep(timeToSleep)
170146
cancel()
171147
return
172148
}
173-
continue
149+
st, ok := status.FromError(err)
150+
if ok && (st.Code() == codes.Unavailable || st.Code() == codes.Canceled) {
151+
if !l.sendErrWritten {
152+
utils.Logger.ErrorF("failed to send log: %v :log: %s", err, newLog.Raw)
153+
l.sendErrWritten = true
154+
}
155+
time.Sleep(timeToSleep)
156+
cancel()
157+
return
158+
} else {
159+
if !l.sendErrWritten {
160+
utils.Logger.ErrorF("failed to send log: %v :log: %s", err, newLog.Raw)
161+
l.sendErrWritten = true
162+
}
163+
time.Sleep(timeToSleep)
164+
continue
165+
}
174166
}
175167
l.sendErrWritten = false
176168
}
@@ -186,13 +178,17 @@ func (l *LogProcessor) CleanCountedLogs() {
186178
utils.Logger.ErrorF("error getting data retention: %s", err)
187179
continue
188180
}
181+
l.db.Lock()
189182
_, err = l.db.DeleteOld(&models.Log{}, dataRetention)
190183
if err != nil {
191184
utils.Logger.ErrorF("error deleting old logs: %s", err)
192185
}
186+
l.db.Unlock()
193187

194188
unprocessed := make([]models.Log, 0, 10)
189+
l.db.Lock()
195190
found, err := l.db.Find(&unprocessed, "processed", false)
191+
l.db.Unlock()
196192
if err != nil {
197193
utils.Logger.ErrorF("error finding unprocessed logs: %s", err)
198194
continue
@@ -212,18 +208,10 @@ func (l *LogProcessor) CleanCountedLogs() {
212208
}
213209
}
214210

215-
<<<<<<<< HEAD:as400/logservice/processor.go
216211
func createClient(client plugins.IntegrationClient, ctx context.Context, cnf *config.Config) plugins.Integration_ProcessLogClient {
217-
========
218-
func createClient(client plugins.IntegrationClient, ctx context.Context) (plugins.Integration_ProcessLogClient, error) {
219-
>>>>>>>> origin/v11:agent/agent/logprocessor.go
220212
var connErrMsgWritten bool
221213
invalidKeyCounter := 0
222-
invalidKeyDelay := timeToSleep
223-
maxInvalidKeyDelay := 5 * time.Minute
224-
maxInvalidKeyAttempts := 100 // ~8+ hours with backoff before uninstall
225214
for {
226-
<<<<<<<< HEAD:as400/logservice/processor.go
227215
authCtx := metadata.AppendToOutgoingContext(ctx,
228216
"key", cnf.CollectorKey,
229217
"id", strconv.Itoa(int(cnf.CollectorID)),
@@ -237,30 +225,9 @@ func createClient(client plugins.IntegrationClient, ctx context.Context) (plugin
237225
utils.Logger.Info("Uninstalling collector: reason: collector has been removed from the panel...")
238226
_ = agent.UninstallAll()
239227
os.Exit(1)
240-
========
241-
select {
242-
case <-ctx.Done():
243-
return nil, ctx.Err()
244-
default:
245-
}
246-
247-
plClient, err := client.ProcessLog(ctx)
248-
if err != nil {
249-
if strings.Contains(err.Error(), "invalid agent key") {
250-
invalidKeyCounter++
251-
utils.Logger.ErrorF("invalid agent key (attempt %d/%d), retrying in %v", invalidKeyCounter, maxInvalidKeyAttempts, invalidKeyDelay)
252-
if invalidKeyCounter >= maxInvalidKeyAttempts {
253-
utils.Logger.ErrorF("uninstalling agent after %d consecutive invalid key errors", maxInvalidKeyAttempts)
254-
_ = UninstallAll()
255-
return nil, ErrAgentUninstalled
256-
>>>>>>>> origin/v11:agent/agent/logprocessor.go
257228
}
258-
time.Sleep(invalidKeyDelay)
259-
invalidKeyDelay = utils.IncrementReconnectDelay(invalidKeyDelay, maxInvalidKeyDelay)
260-
continue
261229
} else {
262230
invalidKeyCounter = 0
263-
invalidKeyDelay = timeToSleep
264231
}
265232
if !connErrMsgWritten {
266233
utils.Logger.ErrorF("failed to create input client: %v", err)
@@ -269,7 +236,7 @@ func createClient(client plugins.IntegrationClient, ctx context.Context) (plugin
269236
time.Sleep(timeToSleep)
270237
continue
271238
}
272-
return plClient, nil
239+
return plClient
273240
}
274241
}
275242

@@ -287,12 +254,12 @@ func SetDataRetention(retention string) error {
287254
return errors.New("retention must be greater than 0")
288255
}
289256

290-
return fs.WriteJSON(config.RetentionConfigFile, models.DataRetention{Retention: retentionInt})
257+
return utils.WriteJSON(config.RetentionConfigFile, models.DataRetention{Retention: retentionInt})
291258
}
292259

293260
func GetDataRetention() (int, error) {
294261
retention := models.DataRetention{}
295-
err := fs.ReadJSON(config.RetentionConfigFile, &retention)
262+
err := utils.ReadJson(config.RetentionConfigFile, &retention)
296263
if err != nil {
297264
return 0, err
298265
}

0 commit comments

Comments
 (0)