@@ -26,6 +26,7 @@ const (
2626 defaultTenant = "ce66672c-e36d-4761-a8c8-90058fee1a24"
2727 urlCheckConnection = "https://sts.amazonaws.com"
2828 wait = 1 * time .Second
29+ targetLogGroup = "utmstack"
2930)
3031
3132func main () {
@@ -85,7 +86,7 @@ func main() {
8586func pull (startTime time.Time , endTime time.Time , group * config.ModuleGroup ) {
8687 agent := getAWSProcessor (group )
8788
88- logs , err := agent .getLogs (startTime , endTime )
89+ processedCount , err := agent .getLogs (startTime , endTime , group . GroupName )
8990 if err != nil {
9091 _ = catcher .Error ("cannot get logs" , err , map [string ]any {
9192 "startTime" : startTime ,
@@ -95,14 +96,10 @@ func pull(startTime time.Time, endTime time.Time, group *config.ModuleGroup) {
9596 return
9697 }
9798
98- for _ , log := range logs {
99- _ = plugins .EnqueueLog (& plugins.Log {
100- Id : uuid .NewString (),
101- TenantId : defaultTenant ,
102- DataType : "aws" ,
103- DataSource : group .GroupName ,
104- Timestamp : time .Now ().UTC ().Format (time .RFC3339Nano ),
105- Raw : log ,
99+ if processedCount > 0 {
100+ catcher .Info ("Successfully processed logs" , map [string ]any {
101+ "count" : processedCount ,
102+ "group" : group .GroupName ,
106103 })
107104 }
108105}
@@ -160,26 +157,6 @@ func (p *AWSProcessor) createAWSSession() (aws.Config, error) {
160157 return cfg , nil
161158}
162159
163- func (p * AWSProcessor ) describeLogGroups (cwl * cloudwatchlogs.Client ) ([]string , error ) {
164- var logGroups []string
165- paginator := cloudwatchlogs .NewDescribeLogGroupsPaginator (cwl , & cloudwatchlogs.DescribeLogGroupsInput {})
166-
167- ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Minute )
168- defer cancel ()
169-
170- for paginator .HasMorePages () {
171- page , err := paginator .NextPage (ctx )
172- if err != nil {
173- return nil , catcher .Error ("cannot get log groups" , err , nil )
174- }
175- for _ , group := range page .LogGroups {
176- logGroups = append (logGroups , * group .LogGroupName )
177- }
178- }
179-
180- return logGroups , nil
181- }
182-
183160func (p * AWSProcessor ) describeLogStreams (cwl * cloudwatchlogs.Client , logGroup string ) ([]string , error ) {
184161 var logStreams []string
185162 paginator := cloudwatchlogs .NewDescribeLogStreamsPaginator (cwl , & cloudwatchlogs.DescribeLogStreamsInput {
@@ -204,80 +181,79 @@ func (p *AWSProcessor) describeLogStreams(cwl *cloudwatchlogs.Client, logGroup s
204181 return logStreams , nil
205182}
206183
207- func (p * AWSProcessor ) getLogs (startTime , endTime time.Time ) ([] string , error ) {
184+ func (p * AWSProcessor ) getLogs (startTime , endTime time.Time , dataSource string ) (int , error ) {
208185 awsConfig , err := p .createAWSSession ()
209186 if err != nil {
210- return nil , catcher .Error ("cannot create AWS session" , err , nil )
187+ return 0 , catcher .Error ("cannot create AWS session" , err , nil )
211188 }
212189
213190 cwl := cloudwatchlogs .NewFromConfig (awsConfig )
214191
215- logGroups , err := p .describeLogGroups (cwl )
192+ logStreams , err := p .describeLogStreams (cwl , targetLogGroup )
216193 if err != nil {
217- return nil , catcher .Error ("cannot get log groups" , err , nil )
194+ return 0 , catcher .Error ("cannot get log streams for 'utmstack'" , err , map [string ]any {
195+ "logGroup" : targetLogGroup ,
196+ })
218197 }
219198
220- transformedLogs := make ([]string , 0 , 10 )
221- for _ , logGroup := range logGroups {
222- time .Sleep (500 * time .Millisecond )
199+ processedCount := 0
223200
224- logStreams , err := p .describeLogStreams (cwl , logGroup )
225- if err != nil {
226- _ = catcher .Error ("cannot get log streams, skipping log group" , err , map [string ]any {
227- "logGroup" : logGroup ,
228- })
229- continue
201+ for i , stream := range logStreams {
202+ if i > 0 && i % 5 == 0 {
203+ time .Sleep (2 * time .Second )
204+ } else if i > 0 {
205+ time .Sleep (300 * time .Millisecond )
230206 }
231207
232- for i , stream := range logStreams {
233- if i > 0 && i % 5 == 0 {
234- time .Sleep (2 * time .Second )
235- } else if i > 0 {
236- time .Sleep (300 * time .Millisecond )
208+ paginator := cloudwatchlogs .NewGetLogEventsPaginator (cwl , & cloudwatchlogs.GetLogEventsInput {
209+ LogGroupName : aws .String (targetLogGroup ),
210+ LogStreamName : aws .String (stream ),
211+ StartTime : aws .Int64 (startTime .Unix () * 1000 ),
212+ EndTime : aws .Int64 (endTime .Unix () * 1000 ),
213+ StartFromHead : aws .Bool (true ),
214+ }, func (options * cloudwatchlogs.GetLogEventsPaginatorOptions ) {
215+ options .StopOnDuplicateToken = true
216+ options .Limit = 1000
217+ })
218+
219+ pageCount := 0
220+ for paginator .HasMorePages () {
221+ if pageCount > 0 {
222+ time .Sleep (200 * time .Millisecond )
223+ }
224+ pageCount ++
225+
226+ ctx , cancel := context .WithTimeout (context .Background (), 2 * time .Minute )
227+ page , err := paginator .NextPage (ctx )
228+ cancel ()
229+
230+ if err != nil {
231+ _ = catcher .Error ("cannot get log events, skipping stream" , err , map [string ]any {
232+ "logGroup" : targetLogGroup ,
233+ "stream" : stream ,
234+ })
235+ break
236+ }
237+
238+ if page == nil {
239+ break
237240 }
238241
239- paginator := cloudwatchlogs .NewGetLogEventsPaginator (cwl , & cloudwatchlogs.GetLogEventsInput {
240- LogGroupName : aws .String (logGroup ),
241- LogStreamName : aws .String (stream ),
242- StartTime : aws .Int64 (startTime .Unix () * 1000 ),
243- EndTime : aws .Int64 (endTime .Unix () * 1000 ),
244- StartFromHead : aws .Bool (true ),
245- }, func (options * cloudwatchlogs.GetLogEventsPaginatorOptions ) {
246- options .StopOnDuplicateToken = true
247- options .Limit = 1000
248- })
249-
250- pageCount := 0
251- for paginator .HasMorePages () {
252- if pageCount > 0 {
253- time .Sleep (200 * time .Millisecond )
254- }
255- pageCount ++
256-
257- ctx , cancel := context .WithTimeout (context .Background (), 2 * time .Minute )
258- page , err := paginator .NextPage (ctx )
259- cancel ()
260-
261- if err != nil {
262- _ = catcher .Error ("cannot get log events, skipping stream" , err , map [string ]any {
263- "logGroup" : logGroup ,
264- "stream" : stream ,
265- })
266- break
267- }
268-
269- if page == nil {
270- break
271- }
272-
273- for _ , event := range page .Events {
274- transformedLogs = append (transformedLogs , * event .Message )
275- }
242+ for _ , event := range page .Events {
243+ _ = plugins .EnqueueLog (& plugins.Log {
244+ Id : uuid .NewString (),
245+ TenantId : defaultTenant ,
246+ DataType : "aws" ,
247+ DataSource : dataSource ,
248+ Timestamp : time .Now ().UTC ().Format (time .RFC3339Nano ),
249+ Raw : * event .Message ,
250+ })
251+ processedCount ++
276252 }
277253 }
278254 }
279255
280- return transformedLogs , nil
256+ return processedCount , nil
281257}
282258
283259func connectionChecker (url string ) error {
0 commit comments