@@ -596,67 +596,89 @@ func (b *ByocAws) CreateUploadURL(ctx context.Context, req *defangv1.UploadURLRe
596596 }, nil
597597}
598598
599- func (b * ByocAws ) Query (ctx context.Context , req * defangv1.DebugRequest ) error {
599+ func (b * ByocAws ) QueryForDebug (ctx context.Context , req * defangv1.DebugRequest ) error {
600+ // tailRequest := &defangv1.TailRequest{
601+ // Etag: req.Etag,
602+ // Project: req.Project,
603+ // Services: req.Services,
604+ // Since: req.Since,
605+ // Until: req.Until,
606+ // }
607+
600608 // The LogStreamNamePrefix filter can only be used with one service name
601609 var service string
602610 if len (req .Services ) == 1 {
603611 service = req .Services [0 ]
604612 }
605613
606- since := b .cdStart // TODO: get start time from req.Etag
607- if since .IsZero () {
608- since = time .Now ().Add (- time .Hour ) // TODO: get start time from req
614+ start := b .cdStart // TODO: get start time from req.Etag
615+ if req .Since .IsValid () {
616+ start = req .Since .AsTime ()
617+ } else if start .IsZero () {
618+ start = time .Now ().Add (- time .Hour )
609619 }
610620
611- // get stack information
621+ end := time .Now ()
622+ if req .Until .IsValid () {
623+ end = req .Until .AsTime ()
624+ }
625+
626+ // get stack information (for log group ARN)
612627 err := b .driver .FillOutputs (ctx )
613628 if err != nil {
614629 return AnnotateAwsError (err )
615630 }
616631
617- const maxQuerySizePerLogGroup = 128 * 1024 // 128KB (to stay well below the 1MB gRPC payload limit)
618-
619632 // Gather logs from the CD task, kaniko, ECS events, and all services
633+ evtsChan , errsChan := ecs .QueryLogGroups (ctx , start , end , b .getLogGroupInputs (req .Etag , req .Project , service , "" , logs .LogTypeAll )... )
634+ if evtsChan == nil {
635+ return <- errsChan
636+ }
637+
638+ const maxQuerySizePerLogGroup = 128 * 1024 // 128KB per LogGroup (to stay well below the 1MB gRPC payload limit)
639+
620640 sb := strings.Builder {}
621- for _ , lgi := range b .getLogGroupInputs (req .Etag , req .Project , service , "" , logs .LogTypeAll ) {
622- parseECSEventRecords := strings .HasSuffix (lgi .LogGroupARN , "/ecs" )
623- if err := ecs .Query (ctx , lgi , since , time .Now (), func (logEvents []ecs.LogEvent ) error {
624- for _ , event := range logEvents {
625- msg := term .StripAnsi (* event .Message )
626- if parseECSEventRecords {
627- if event , err := ecs .ParseECSEvent ([]byte (msg )); err == nil {
628- // TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
629- if event .Etag () != "" && req .Etag != "" && event .Etag () != req .Etag {
630- continue
631- }
632- if event .Service () != "" && len (req .Services ) > 0 && ! slices .Contains (req .Services , event .Service ()) {
633- continue
634- }
635- // This matches the status messages in the Defang Playground Loki logs
636- sb .WriteString ("status=" )
637- sb .WriteString (event .Status ())
638- sb .WriteByte ('\n' )
641+ loop:
642+ for {
643+ select {
644+ case event , ok := <- evtsChan :
645+ if ! ok {
646+ break loop
647+ }
648+ parseECSEventRecords := strings .HasSuffix (* event .LogGroupIdentifier , "/ecs" )
649+ if parseECSEventRecords {
650+ if event , err := ecs .ParseECSEvent ([]byte (* event .Message )); err == nil {
651+ // TODO: once we know the AWS deploymentId from TaskStateChangeEvent detail.startedBy, we can do a 2nd query to filter by deploymentId
652+ if event .Etag () != "" && req .Etag != "" && event .Etag () != req .Etag {
639653 continue
640654 }
641- }
642- sb .WriteString (msg )
643- sb .WriteByte ('\n' )
644- if sb .Len () > maxQuerySizePerLogGroup {
645- return errors .New ("query result was truncated" )
655+ if event .Service () != "" && len (req .Services ) > 0 && ! slices .Contains (req .Services , event .Service ()) {
656+ continue
657+ }
658+ // This matches the status messages in the Defang Playground Loki logs
659+ sb .WriteString ("status=" )
660+ sb .WriteString (event .Status ())
661+ sb .WriteByte ('\n' )
662+ continue
646663 }
647664 }
648- return nil
649- }); err != nil {
665+ msg := term .StripAnsi (* event .Message )
666+ sb .WriteString (msg )
667+ sb .WriteByte ('\n' )
668+ if sb .Len () > maxQuerySizePerLogGroup { // FIXME: this limit was supposed to be per LogGroup
669+ term .Warn ("Query result was truncated" )
670+ break loop
671+ }
672+ case err := <- errsChan :
650673 term .Warn ("CloudWatch query error:" , AnnotateAwsError (err ))
651674 // continue reading other log groups
652675 }
653676 }
654-
655677 req .Logs = sb .String ()
656678 return nil
657679}
658680
659- func (b * ByocAws ) Follow (ctx context.Context , req * defangv1.TailRequest ) (client.ServerStream [defangv1.TailResponse ], error ) {
681+ func (b * ByocAws ) QueryLogs (ctx context.Context , req * defangv1.TailRequest ) (client.ServerStream [defangv1.TailResponse ], error ) {
660682 // FillOutputs is needed to get the CD task ARN or the LogGroup ARNs
661683 if err := b .driver .FillOutputs (ctx ); err != nil {
662684 return nil , AnnotateAwsError (err )
@@ -673,11 +695,11 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
673695 // * Etag, service: tail that task/service
674696 var err error
675697 var taskArn ecs.TaskArn
676- var eventStream ecs.EventStream
698+ var tailStream ecs.LiveTailStream
677699 stopWhenCDTaskDone := false
678700 logType := logs .LogType (req .LogType )
679701 if etag != "" && ! pkg .IsValidRandomID (etag ) { // Assume invalid "etag" is a task ID
680- eventStream , err = b .driver .TailTaskID (ctx , etag )
702+ tailStream , err = b .driver .TailTaskID (ctx , etag )
681703 taskArn , _ = b .driver .GetTaskArn (etag )
682704 term .Debugf ("Tailing task %s" , * taskArn )
683705 etag = "" // no need to filter by etag
@@ -687,7 +709,14 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
687709 if len (req .Services ) == 1 {
688710 service = req .Services [0 ]
689711 }
690- eventStream , err = ecs .TailLogGroups (ctx , req .Since .AsTime (), b .getLogGroupInputs (etag , req .Project , service , req .Pattern , logType )... )
712+ var start , end time.Time
713+ if req .Since .IsValid () {
714+ start = req .Since .AsTime ()
715+ }
716+ if req .Until .IsValid () {
717+ end = req .Until .AsTime ()
718+ }
719+ tailStream , err = ecs .QueryAndTailLogGroups (ctx , start , end , b .getLogGroupInputs (etag , req .Project , service , req .Pattern , logType )... )
691720 taskArn = b .cdTaskArn
692721 }
693722 if err != nil {
@@ -709,7 +738,7 @@ func (b *ByocAws) Follow(ctx context.Context, req *defangv1.TailRequest) (client
709738 }()
710739 }
711740
712- return newByocServerStream (ctx , eventStream , etag , req .GetServices (), b ), nil
741+ return newByocServerStream (ctx , tailStream , etag , req .GetServices (), b ), nil
713742}
714743
715744func (b * ByocAws ) makeLogGroupARN (name string ) string {
0 commit comments