Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/sony/gobreaker v1.0.0
github.com/spf13/cobra v1.10.2
github.com/stretchr/testify v1.11.1
golang.org/x/sync v0.20.0
golang.org/x/text v0.37.0
golang.org/x/time v0.15.0
google.golang.org/protobuf v1.36.11
Expand Down
69 changes: 57 additions & 12 deletions cli/src/internal/dashboard/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/jongio/azd-app/cli/src/internal/constants"
"github.com/jongio/azd-app/cli/src/internal/service"
"github.com/jongio/azd-app/cli/src/internal/serviceinfo"
"golang.org/x/sync/errgroup"
)

// Client speaks Connect over HTTP to a running dashboard process.
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *Client) StreamLogs(ctx context.Context, serviceName string, logs chan<-

// GetAzureLogs fetches buffered Azure logs for the given services. The proto
// request tails one service at a time; multi-service callers are served by
// issuing one RPC per service and merging results. Client-side `since`
// issuing concurrent RPCs per service and merging results. Client-side `since`
// filtering matches the legacy REST behavior (since_seconds is not a
// 1:1 replacement because the server clamps it differently).
func (c *Client) GetAzureLogs(ctx context.Context, services []string, tail int, since time.Time) ([]service.LogEntry, error) {
Expand All @@ -182,30 +183,74 @@ func (c *Client) GetAzureLogs(ctx context.Context, services []string, tail int,
serviceList = []string{""}
}

var all []service.LogEntry
for _, svc := range serviceList {
// Fast path: single service needs no goroutine overhead
if len(serviceList) == 1 {
resp, err := c.azure.GetAzureLogs(ctx, connect.NewRequest(&v1.GetAzureLogsRequest{
Service: svc,
Service: serviceList[0],
Tail: int32(tail),
}))
if err != nil {
return nil, err
}
all := make([]service.LogEntry, 0, len(resp.Msg.GetEntries()))
for _, p := range resp.Msg.GetEntries() {
all = append(all, protoToLogEntry(p))
}
return filterSince(all, since), nil
}

// Parallel path: launch one goroutine per service
type result struct {
entries []service.LogEntry
err error
}
results := make([]result, len(serviceList))

g, gctx := errgroup.WithContext(ctx)
for i, svc := range serviceList {
i, svc := i, svc
g.Go(func() error {
resp, err := c.azure.GetAzureLogs(gctx, connect.NewRequest(&v1.GetAzureLogsRequest{
Service: svc,
Tail: int32(tail),
}))
if err != nil {
results[i] = result{err: err}
return err
}
entries := make([]service.LogEntry, 0, len(resp.Msg.GetEntries()))
for _, p := range resp.Msg.GetEntries() {
entries = append(entries, protoToLogEntry(p))
}
results[i] = result{entries: entries}
return nil
})
}

if !since.IsZero() {
filtered := all[:0]
for _, entry := range all {
if !entry.Timestamp.Before(since) {
filtered = append(filtered, entry)
}
if err := g.Wait(); err != nil {
return nil, err
}

var all []service.LogEntry
for _, r := range results {
all = append(all, r.entries...)
}

return filterSince(all, since), nil
}

// filterSince returns entries at or after since. Returns all if since is zero.
func filterSince(all []service.LogEntry, since time.Time) []service.LogEntry {
if since.IsZero() {
return all
}
filtered := all[:0]
for _, entry := range all {
if !entry.Timestamp.Before(since) {
filtered = append(filtered, entry)
}
all = filtered
}
return all, nil
return filtered
}

// GetAzureStatus mirrors the legacy service.AzureStatus shape logs.go /
Expand Down
19 changes: 15 additions & 4 deletions cli/src/internal/rpc/azure_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ type localAzureLogRing struct {
mu sync.Mutex
buf []*v1.LogEntry
cap int
head int // index of oldest entry when ring is full
dropped int64
notify chan struct{}
}
Expand All @@ -355,10 +356,13 @@ func newAzureLogRing(capacity int) *localAzureLogRing {
func (r *localAzureLogRing) push(e *v1.LogEntry) {
r.mu.Lock()
if len(r.buf) >= r.cap {
r.buf = append(r.buf[:0], r.buf[1:]...)
// Ring full: overwrite oldest via index, O(1).
r.buf[r.head] = e
r.head = (r.head + 1) % r.cap
r.dropped++
} else {
r.buf = append(r.buf, e)
}
r.buf = append(r.buf, e)
r.mu.Unlock()
select {
case r.notify <- struct{}{}:
Expand All @@ -372,9 +376,16 @@ func (r *localAzureLogRing) drain() ([]*v1.LogEntry, int64) {
if len(r.buf) == 0 {
return nil, r.dropped
}
out := make([]*v1.LogEntry, len(r.buf))
copy(out, r.buf)
n := len(r.buf)
out := make([]*v1.LogEntry, n)
if r.head == 0 || n < r.cap {
copy(out, r.buf)
} else {
copy(out, r.buf[r.head:])
copy(out[n-r.head:], r.buf[:r.head])
}
r.buf = r.buf[:0]
r.head = 0
return out, r.dropped
}

Expand Down
29 changes: 20 additions & 9 deletions cli/src/internal/rpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ type localLogRing struct {
mu sync.Mutex
buf []*v1.LogEntry
cap int
head int // index of the oldest entry when ring is full
dropped int64
notify chan struct{}
}
Expand All @@ -609,12 +610,13 @@ func newLocalLogRing(capacity int) *localLogRing {
func (r *localLogRing) push(e *v1.LogEntry) {
r.mu.Lock()
if len(r.buf) >= r.cap {
// Drop oldest. Allocation is O(cap) once; subsequent pushes
// reuse the trimmed backing array up to cap.
r.buf = append(r.buf[:0], r.buf[1:]...)
// Ring full: overwrite oldest via index, O(1).
r.buf[r.head] = e
r.head = (r.head + 1) % r.cap
r.dropped++
} else {
r.buf = append(r.buf, e)
}
r.buf = append(r.buf, e)
r.mu.Unlock()

select {
Expand All @@ -625,18 +627,27 @@ func (r *localLogRing) push(e *v1.LogEntry) {
}
}

// drain returns all entries currently in the ring plus the cumulative
// dropped count. Resets the ring; the caller compares dropped against
// its previously-observed value to compute the delta.
// drain returns all entries currently in the ring in insertion order plus
// the cumulative dropped count. Resets the ring; the caller compares
// dropped against its previously-observed value to compute the delta.
func (r *localLogRing) drain() ([]*v1.LogEntry, int64) {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.buf) == 0 {
return nil, r.dropped
}
out := make([]*v1.LogEntry, len(r.buf))
copy(out, r.buf)
n := len(r.buf)
out := make([]*v1.LogEntry, n)
if r.head == 0 || n < r.cap {
// Not wrapped yet or head is at start - simple copy
copy(out, r.buf)
} else {
// Ring has wrapped: linearize from head
copy(out, r.buf[r.head:])
copy(out[n-r.head:], r.buf[:r.head])
}
r.buf = r.buf[:0]
r.head = 0
return out, r.dropped
}

Expand Down
Loading
Loading