@@ -11,6 +11,7 @@ import (
1111 "time"
1212
1313 "github.com/aws/aws-sdk-go-v2/aws"
14+ "github.com/aws/aws-sdk-go-v2/aws/retry"
1415 awsConfig "github.com/aws/aws-sdk-go-v2/config"
1516 "github.com/aws/aws-sdk-go-v2/credentials"
1617 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
@@ -136,9 +137,21 @@ func (p *AWSProcessor) createAWSSession() (aws.Config, error) {
136137 ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Minute )
137138 defer cancel ()
138139
140+ adaptiveRetryer := retry .NewAdaptiveMode (func (ao * retry.AdaptiveModeOptions ) {
141+ ao .StandardOptions = append (ao .StandardOptions , func (so * retry.StandardOptions ) {
142+ so .MaxAttempts = 10 // Increment max attempts for throttling
143+ so .MaxBackoff = 30 * time .Second // Increase max backoff time
144+ })
145+ ao .RequestCost = 1
146+ ao .FailOnNoAttemptTokens = false // Allow retries even without tokens
147+ })
148+
139149 cfg , err := awsConfig .LoadDefaultConfig (ctx ,
140150 awsConfig .WithRegion (p .RegionName ),
141151 awsConfig .WithCredentialsProvider (credentials .NewStaticCredentialsProvider (p .AccessKey , p .SecretAccessKey , "" )),
152+ awsConfig .WithRetryer (func () aws.Retryer {
153+ return adaptiveRetryer
154+ }),
142155 )
143156 if err != nil {
144157 return aws.Config {}, catcher .Error ("cannot create AWS session" , err , nil )
@@ -147,13 +160,7 @@ func (p *AWSProcessor) createAWSSession() (aws.Config, error) {
147160 return cfg , nil
148161}
149162
150- func (p * AWSProcessor ) describeLogGroups () ([]string , error ) {
151- awsConfig , err := p .createAWSSession ()
152- if err != nil {
153- return nil , catcher .Error ("cannot create AWS session" , err , nil )
154- }
155-
156- cwl := cloudwatchlogs .NewFromConfig (awsConfig )
163+ func (p * AWSProcessor ) describeLogGroups (cwl * cloudwatchlogs.Client ) ([]string , error ) {
157164 var logGroups []string
158165 paginator := cloudwatchlogs .NewDescribeLogGroupsPaginator (cwl , & cloudwatchlogs.DescribeLogGroupsInput {})
159166
@@ -173,13 +180,7 @@ func (p *AWSProcessor) describeLogGroups() ([]string, error) {
173180 return logGroups , nil
174181}
175182
176- func (p * AWSProcessor ) describeLogStreams (logGroup string ) ([]string , error ) {
177- awsConfig , err := p .createAWSSession ()
178- if err != nil {
179- return nil , catcher .Error ("cannot create AWS session" , err , nil )
180- }
181-
182- cwl := cloudwatchlogs .NewFromConfig (awsConfig )
183+ func (p * AWSProcessor ) describeLogStreams (cwl * cloudwatchlogs.Client , logGroup string ) ([]string , error ) {
183184 var logStreams []string
184185 paginator := cloudwatchlogs .NewDescribeLogStreamsPaginator (cwl , & cloudwatchlogs.DescribeLogStreamsInput {
185186 LogGroupName : aws .String (logGroup ),
@@ -204,95 +205,37 @@ func (p *AWSProcessor) describeLogStreams(logGroup string) ([]string, error) {
204205}
205206
206207func (p * AWSProcessor ) getLogs (startTime , endTime time.Time ) ([]string , error ) {
207- // Retry logic for AWS session creation
208- maxRetries := 3
209- retryDelay := 2 * time .Second
210- var awsConfig aws.Config
211- var err error
212-
213- for retry := 0 ; retry < maxRetries ; retry ++ {
214- awsConfig , err = p .createAWSSession ()
215- if err == nil {
216- break
217- }
218-
219- _ = catcher .Error ("cannot create AWS session, retrying" , err , map [string ]any {
220- "retry" : retry + 1 ,
221- "maxRetries" : maxRetries ,
222- })
223-
224- if retry < maxRetries - 1 {
225- time .Sleep (retryDelay )
226- // Increase delay for next retry
227- retryDelay *= 2
228- }
229- }
230-
208+ awsConfig , err := p .createAWSSession ()
231209 if err != nil {
232- return nil , catcher .Error ("all retries failed when creating AWS session" , err , nil )
210+ return nil , catcher .Error ("cannot create AWS session" , err , nil )
233211 }
234212
235213 cwl := cloudwatchlogs .NewFromConfig (awsConfig )
236214
237- // Retry logic for describing log groups
238- retryDelay = 2 * time .Second
239- var logGroups []string
240-
241- for retry := 0 ; retry < maxRetries ; retry ++ {
242- logGroups , err = p .describeLogGroups ()
243- if err == nil {
244- break
245- }
246-
247- _ = catcher .Error ("cannot get log groups, retrying" , err , map [string ]any {
248- "retry" : retry + 1 ,
249- "maxRetries" : maxRetries ,
250- })
251-
252- if retry < maxRetries - 1 {
253- time .Sleep (retryDelay )
254- // Increase delay for next retry
255- retryDelay *= 2
256- }
257- }
258-
215+ logGroups , err := p .describeLogGroups (cwl )
259216 if err != nil {
260- return nil , catcher .Error ("all retries failed when getting log groups" , err , nil )
217+ return nil , catcher .Error ("cannot get log groups" , err , nil )
261218 }
262219
263220 transformedLogs := make ([]string , 0 , 10 )
264221 for _ , logGroup := range logGroups {
265- // Retry logic for describing log streams
266- retryDelay = 2 * time .Second
267- var logStreams []string
268-
269- for retry := 0 ; retry < maxRetries ; retry ++ {
270- logStreams , err = p .describeLogStreams (logGroup )
271- if err == nil {
272- break
273- }
274-
275- _ = catcher .Error ("cannot get log streams, retrying" , err , map [string ]any {
276- "retry" : retry + 1 ,
277- "maxRetries" : maxRetries ,
278- "logGroup" : logGroup ,
279- })
280-
281- if retry < maxRetries - 1 {
282- time .Sleep (retryDelay )
283- // Increase delay for next retry
284- retryDelay *= 2
285- }
286- }
222+ time .Sleep (500 * time .Millisecond )
287223
224+ logStreams , err := p .describeLogStreams (cwl , logGroup )
288225 if err != nil {
289- _ = catcher .Error ("all retries failed when getting log streams " , err , map [string ]any {
226+ _ = catcher .Error ("cannot get log streams, skipping log group " , err , map [string ]any {
290227 "logGroup" : logGroup ,
291228 })
292- continue // Skip this log group and try the next one
229+ continue
293230 }
294231
295- for _ , stream := range logStreams {
232+ for i , stream := range logStreams {
233+ if i > 0 && i % 5 == 0 {
234+ time .Sleep (2 * time .Second )
235+ } else if i > 0 {
236+ time .Sleep (300 * time .Millisecond )
237+ }
238+
296239 paginator := cloudwatchlogs .NewGetLogEventsPaginator (cwl , & cloudwatchlogs.GetLogEventsInput {
297240 LogGroupName : aws .String (logGroup ),
298241 LogStreamName : aws .String (stream ),
@@ -301,48 +244,30 @@ func (p *AWSProcessor) getLogs(startTime, endTime time.Time) ([]string, error) {
301244 StartFromHead : aws .Bool (true ),
302245 }, func (options * cloudwatchlogs.GetLogEventsPaginatorOptions ) {
303246 options .StopOnDuplicateToken = true
304- options .Limit = 10000
247+ options .Limit = 1000
305248 })
306249
250+ pageCount := 0
307251 for paginator .HasMorePages () {
308- // Retry logic for getting log events
309- retryDelay = 2 * time .Second
310- var page * cloudwatchlogs.GetLogEventsOutput
311-
312- for retry := 0 ; retry < maxRetries ; retry ++ {
313- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Minute )
314-
315- page , err = paginator .NextPage (ctx )
316- if err == nil {
317- cancel ()
318- break
319- }
320-
321- _ = catcher .Error ("cannot get logs, retrying" , err , map [string ]any {
322- "retry" : retry + 1 ,
323- "maxRetries" : maxRetries ,
324- "logGroup" : logGroup ,
325- "stream" : stream ,
326- })
327-
328- if retry < maxRetries - 1 {
329- time .Sleep (retryDelay )
330- // Increase delay for next retry
331- retryDelay *= 2
332- }
333- cancel ()
252+ if pageCount > 0 {
253+ time .Sleep (200 * time .Millisecond )
334254 }
255+ pageCount ++
256+
257+ ctx , cancel := context .WithTimeout (context .Background (), 2 * time .Minute )
258+ page , err := paginator .NextPage (ctx )
259+ cancel ()
335260
336261 if err != nil {
337- _ = catcher .Error ("all retries failed when getting logs " , err , map [string ]any {
262+ _ = catcher .Error ("cannot get log events, skipping stream " , err , map [string ]any {
338263 "logGroup" : logGroup ,
339264 "stream" : stream ,
340265 })
341- continue // Skip this page and try the next one
266+ break
342267 }
343268
344269 if page == nil {
345- continue
270+ break
346271 }
347272
348273 for _ , event := range page .Events {
0 commit comments