@@ -5,44 +5,58 @@ import (
55 "errors"
66 "time"
77
8+ "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
89 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
910)
1011
1112// QueryAndTailLogGroup queries the log group from the give start time and initiates a Live Tail session.
1213// This function also handles the case where the log group does not exist yet.
1314// The caller should call `Close()` on the returned EventStream when done.
14- func QueryAndTailLogGroup (ctx context.Context , lgi LogGroupInput , start time.Time ) (EventStream , error ) {
15+ func QueryAndTailLogGroup (ctx context.Context , lgi LogGroupInput , start , end time.Time ) (EventStream , error ) {
1516 ctx , cancel := context .WithCancel (ctx )
1617
1718 es := & eventStream {
1819 cancel : cancel ,
1920 ch : make (chan types.StartLiveTailResponseStream ),
2021 }
2122
22- // First call TailLogGroup once to check if the log group exists or we have another error
23- eventStream , err := TailLogGroup (ctx , lgi )
24- if err != nil {
25- var resourceNotFound * types.ResourceNotFoundException
26- if ! errors .As (err , & resourceNotFound ) {
27- return nil , err
23+ doTail := ! end .IsZero ()
24+
25+ var tailStream cloudwatchlogs.StartLiveTailResponseStreamReader
26+ if doTail {
27+ // First call TailLogGroup once to check if the log group exists or we have another error
28+ var err error
29+ tailStream , err = TailLogGroup (ctx , lgi )
30+ if err != nil {
31+ var resourceNotFound * types.ResourceNotFoundException
32+ if ! errors .As (err , & resourceNotFound ) {
33+ return nil , err
34+ }
35+ // Doesn't exist yet, continue to poll for it
2836 }
2937 }
3038
3139 // Start goroutine to wait for the log group to be created and then tail it
3240 go func () {
3341 defer close (es .ch )
3442
35- if eventStream == nil {
43+ if doTail {
3644 // If the log group does not exist yet, poll until it does
37- eventStream , err = pollTailLogGroup (ctx , lgi )
38- if err != nil {
39- es .err = err
40- return
45+ if tailStream == nil {
46+ var err error
47+ tailStream , err = pollTailLogGroup (ctx , lgi )
48+ if err != nil {
49+ es .err = err
50+ return
51+ }
4152 }
53+ defer tailStream .Close ()
4254 }
43- defer eventStream .Close ()
4455
4556 if ! start .IsZero () {
57+ if end .IsZero () {
58+ end = time .Now ()
59+ }
4660 // Query the logs between the start time and now
4761 if err := QueryLogGroup (ctx , lgi , start , time .Now (), func (events []LogEvent ) error {
4862 es .ch <- & types.StartLiveTailResponseStreamMemberSessionUpdate {
@@ -55,14 +69,17 @@ func QueryAndTailLogGroup(ctx context.Context, lgi LogGroupInput, start time.Tim
5569 }
5670 }
5771
58- es .pipeEvents (ctx , eventStream )
72+ if doTail {
73+ // Pipe the events from the tail stream to the internal channel
74+ es .pipeEvents (ctx , tailStream )
75+ }
5976 }()
6077
6178 return es , nil
6279}
6380
6481// pollTailLogGroup polls the log group and starts the Live Tail session once it's available
65- func pollTailLogGroup (ctx context.Context , lgi LogGroupInput ) (EventStream , error ) {
82+ func pollTailLogGroup (ctx context.Context , lgi LogGroupInput ) (cloudwatchlogs. StartLiveTailResponseStreamReader , error ) {
6683 ticker := time .NewTicker (time .Second )
6784 defer ticker .Stop ()
6885
0 commit comments