Skip to content

Commit efa6f19

Browse files
Flush telemetry in a detached subprocess (#275)
1 parent f0994f4 commit efa6f19

14 files changed

Lines changed: 578 additions & 122 deletions

File tree

cmd/flush_telemetry.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"time"
8+
9+
"github.com/localstack/lstk/internal/env"
10+
"github.com/localstack/lstk/internal/log"
11+
"github.com/localstack/lstk/internal/telemetry"
12+
"github.com/localstack/lstk/internal/tracing"
13+
)
14+
15+
// runFlushTelemetry handles the flusher subprocess. It bypasses the normal
16+
// Execute() boot path — no logger, keyring, telemetry client, or cobra tree —
17+
// so it writes nothing to disk and cannot spawn another flusher.
18+
func runFlushTelemetry(ctx context.Context, args []string) error {
19+
cfg := env.Init()
20+
if cfg.TracesEnabled {
21+
shutdown := tracing.Init(ctx, log.Nop())
22+
defer func() {
23+
// Fresh context: ctx may be cancelled and Shutdown would skip the flush.
24+
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
25+
defer cancel()
26+
_ = shutdown(shutCtx)
27+
}()
28+
}
29+
30+
endpoint := ""
31+
for i := 0; i < len(args); i++ {
32+
if args[i] == "--endpoint" && i+1 < len(args) {
33+
endpoint = args[i+1]
34+
i++
35+
}
36+
}
37+
if endpoint == "" {
38+
return fmt.Errorf("missing --endpoint")
39+
}
40+
41+
ctx = tracing.ContextWithRemoteParent(ctx, os.Getenv)
42+
return telemetry.RunFlush(ctx, endpoint, os.Stdin)
43+
}

cmd/root.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func NewRootCmd(cfg *env.Env, tel *telemetry.Client, logger log.Logger) *cobra.C
9393
}
9494

9595
func Execute(ctx context.Context) error {
96+
if len(os.Args) > 1 && os.Args[1] == telemetry.FlushCommandName {
97+
return runFlushTelemetry(ctx, os.Args[2:])
98+
}
99+
96100
cfg := env.Init()
97101

98102
logger, cleanup, err := newLogger()

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ require (
2626
go.opentelemetry.io/otel v1.44.0
2727
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.44.0
2828
go.opentelemetry.io/otel/sdk v1.44.0
29+
go.opentelemetry.io/otel/trace v1.44.0
2930
go.uber.org/mock v0.6.0
31+
golang.org/x/sys v0.45.0
3032
golang.org/x/term v0.43.0
3133
gopkg.in/ini.v1 v1.67.2
3234
gotest.tools/v3 v3.5.2
@@ -82,11 +84,9 @@ require (
8284
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
8385
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.44.0 // indirect
8486
go.opentelemetry.io/otel/metric v1.44.0 // indirect
85-
go.opentelemetry.io/otel/trace v1.44.0 // indirect
8687
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
8788
go.yaml.in/yaml/v3 v3.0.4 // indirect
8889
golang.org/x/net v0.55.0 // indirect
89-
golang.org/x/sys v0.45.0 // indirect
9090
golang.org/x/text v0.37.0 // indirect
9191
google.golang.org/genproto/googleapis/api v0.0.0-20260526163538-3dc84a4a5aaa // indirect
9292
google.golang.org/genproto/googleapis/rpc v0.0.0-20260526163538-3dc84a4a5aaa // indirect

internal/container/telemetry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func newCapturingTelClient(t *testing.T) (*telemetry.Client, <-chan map[string]a
3333
w.WriteHeader(http.StatusOK)
3434
}))
3535
t.Cleanup(srv.Close)
36-
return telemetry.New(srv.URL, false), ch
36+
return telemetry.NewWithInProcessFlush(srv.URL), ch
3737
}
3838

3939
func TestStop_EmitsLifecycleStopEvent(t *testing.T) {

internal/telemetry/client.go

Lines changed: 34 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,30 @@
11
package telemetry
22

33
import (
4-
"bytes"
54
"context"
6-
"encoding/json"
7-
"fmt"
8-
"net/http"
95
"os"
106
"runtime"
117
"sync"
128
"time"
139

1410
"github.com/google/uuid"
15-
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
16-
17-
"github.com/localstack/lstk/internal/version"
1811
)
1912

20-
func userAgent() string {
21-
return fmt.Sprintf("localstack lstk/%s (%s; %s)", version.Version(), runtime.GOOS, runtime.GOARCH)
22-
}
13+
// pendingCap bounds in-memory events; on overflow the oldest is dropped.
14+
const pendingCap = 64
2315

2416
type Client struct {
2517
enabled bool
2618
sessionID string
2719
machineID string
2820
authToken string
2921

30-
httpClient *http.Client
31-
endpoint string
22+
endpoint string
23+
flushFn func(ctx context.Context, endpoint string, events []eventBody)
3224

33-
events chan eventBody
34-
done chan struct{}
25+
mu sync.Mutex
26+
pending []eventBody
27+
traceCtx context.Context // last Emit ctx; carries the command span for trace propagation
3528
closeOnce sync.Once
3629
machineIDOnce sync.Once
3730
}
@@ -46,37 +39,23 @@ func New(endpoint string, disabled bool) *Client {
4639
if disabled {
4740
return &Client{enabled: false}
4841
}
49-
c := &Client{
42+
return &Client{
5043
enabled: true,
5144
sessionID: uuid.NewString(),
52-
// http.Client has no default timeout (zero means none). Without one, a
53-
// slow or unreachable endpoint would block the worker goroutine.
54-
httpClient: &http.Client{
55-
Timeout: 3 * time.Second,
56-
Transport: otelhttp.NewTransport(
57-
http.DefaultTransport,
58-
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
59-
return "telemetry " + r.Method + " " + r.URL.Path
60-
}),
61-
),
62-
},
63-
endpoint: endpoint,
64-
events: make(chan eventBody, 64),
65-
done: make(chan struct{}),
45+
endpoint: endpoint,
46+
flushFn: spawnDetachedFlusher,
47+
pending: make([]eventBody, 0, pendingCap),
6648
}
67-
go c.worker()
68-
return c
6949
}
7050

7151
type requestBody struct {
7252
Events []eventBody `json:"events"`
7353
}
7454

7555
type eventBody struct {
76-
ctx context.Context // not serialized; carries context to the worker
77-
Name string `json:"name"`
78-
Metadata eventMetadata `json:"metadata"`
79-
Payload any `json:"payload"`
56+
Name string `json:"name"`
57+
Metadata eventMetadata `json:"metadata"`
58+
Payload any `json:"payload"`
8059
}
8160

8261
type eventMetadata struct {
@@ -101,56 +80,41 @@ func (c *Client) Emit(ctx context.Context, name string, payload map[string]any)
10180
}
10281

10382
body := eventBody{
104-
ctx: context.WithoutCancel(ctx),
10583
Name: name,
10684
Metadata: eventMetadata{
10785
ClientTime: time.Now().UTC().Format("2006-01-02 15:04:05.000000"),
10886
SessionID: c.sessionID,
10987
},
11088
Payload: enriched,
11189
}
112-
select {
113-
case c.events <- body:
114-
default:
115-
}
116-
}
117-
118-
func (c *Client) worker() {
119-
defer close(c.done)
120-
for body := range c.events {
121-
c.send(body)
122-
}
123-
}
12490

125-
func (c *Client) send(body eventBody) {
126-
data, err := json.Marshal(requestBody{Events: []eventBody{body}})
127-
if err != nil {
128-
return
129-
}
130-
131-
req, err := http.NewRequestWithContext(body.ctx, http.MethodPost, c.endpoint, bytes.NewReader(data))
132-
if err != nil {
133-
return
134-
}
135-
req.Header.Set("Content-Type", "application/json")
136-
req.Header.Set("User-Agent", userAgent())
137-
138-
resp, err := c.httpClient.Do(req)
139-
if err != nil {
140-
return
91+
c.mu.Lock()
92+
if len(c.pending) >= pendingCap {
93+
c.pending = c.pending[1:]
14194
}
142-
_ = resp.Body.Close()
95+
c.pending = append(c.pending, body)
96+
c.traceCtx = context.WithoutCancel(ctx)
97+
c.mu.Unlock()
14398
}
14499

145-
// Close stops accepting new events, drains the event buffer, and blocks until
146-
// all pending HTTP requests have completed. Call it before process exit to
147-
// avoid dropping telemetry events.
100+
// Close hands pending events to a detached subprocess and returns immediately,
101+
// so analytics endpoint latency never delays command exit.
148102
func (c *Client) Close() {
149103
if !c.enabled {
150104
return
151105
}
152106
c.closeOnce.Do(func() {
153-
close(c.events)
154-
<-c.done
107+
c.mu.Lock()
108+
pending := c.pending
109+
traceCtx := c.traceCtx
110+
c.pending = nil
111+
c.mu.Unlock()
112+
if len(pending) == 0 {
113+
return
114+
}
115+
if traceCtx == nil {
116+
traceCtx = context.Background()
117+
}
118+
c.flushFn(traceCtx, c.endpoint, pending)
155119
})
156120
}

0 commit comments

Comments
 (0)