Skip to content

Commit 2eb5b44

Browse files
Pritom14claude
andcommitted
feat(telemetry): instrument 12-stage onboarding funnel
Adds measurement-only telemetry across the install → activation → success → retention journey. Activation = first PR raised, success = first PR merged, retention magic number = 4 returns on distinct calendar days. Backend: - daemon/onboarding_cdc.go: CDC subscriber turns pr_created/pr_updated/ pr_review_thread_resolved into per-session ao.session.pr_* events plus once-per-install ao.onboarding.first_pr_* milestones, gated by a durable file-based milestoneStore at <dataDir>/telemetry_milestones.json. - daemon/onboarding_prereqs.go: boot-goroutine probe emits ao.onboarding.prereqs_checked (per-check booleans) and a one-shot prereqs_ready when git + tmux (POSIX) + claude|codex + gh auth all pass. - lifecycle/manager.go: ApplyActivitySignal emits ao.session.first_agent_output once per spawn on the first authoritative activity signal. - telemetry/posthog.go: allowlist entries for every new event. Frontend: - shared/telemetry.ts: launch-state (installDay, distinctActiveDays) persisted to <dataDir>/telemetry_app_launches.json; pure computeLaunchUpdate derives isFirstLaunch/isReturnDay/returnCount/isRetained/daysSinceInstall. - renderer/lib/telemetry.ts: ao.app.active carries is_first_launch; ao.app.returned fires on new-day launches with return_count / is_retained / days_since_install. Storage: shared distinct_id from telemetry_install_id keeps daemon + renderer events on one PostHog person. milestoneStore and launch-state files are daemon-owned and renderer-owned respectively, avoiding the install-id race. Refs #432 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 9ae0573 commit 2eb5b44

13 files changed

Lines changed: 964 additions & 9 deletions

File tree

backend/internal/adapters/telemetry/posthog.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,22 @@ var remotePayloadAllowlist = map[string]map[string]struct{}{
6767
"status": {},
6868
"status_family": {},
6969
},
70+
"ao.onboarding.first_pr_merged": {},
71+
"ao.onboarding.prereqs_checked": {
72+
"all_ok": {},
73+
"git_ok": {},
74+
"github_ok": {},
75+
"harness_ok": {},
76+
"runtime_ok": {},
77+
},
78+
"ao.onboarding.prereqs_ready": {},
79+
"ao.onboarding.first_pr_raised": {
80+
"state": {},
81+
},
82+
"ao.onboarding.first_pr_reviewed": {
83+
"decision": {},
84+
},
85+
"ao.onboarding.first_pr_revised": {},
7086
"ao.onboarding.first_project_added": {
7187
"has_git_remote": {},
7288
"kind": {},
@@ -80,6 +96,23 @@ var remotePayloadAllowlist = map[string]map[string]struct{}{
8096
"has_git_remote": {},
8197
"kind": {},
8298
},
99+
"ao.session.first_agent_output": {
100+
"harness": {},
101+
"state": {},
102+
},
103+
"ao.session.pr_merged": {
104+
"state": {},
105+
},
106+
"ao.session.pr_raised": {
107+
"ci": {},
108+
"mergeability": {},
109+
"review": {},
110+
"state": {},
111+
},
112+
"ao.session.pr_reviewed": {
113+
"decision": {},
114+
},
115+
"ao.session.pr_revised": {},
83116
"ao.session.spawn_failed": {
84117
"component": {},
85118
"duration_ms": {},

backend/internal/adapters/telemetry/posthog_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,51 @@ func TestPostHogSinkSanitizesPayloads(t *testing.T) {
134134
}
135135
}
136136

137+
func TestSanitizeRemotePayload_FunnelEventsAllowlisted(t *testing.T) {
138+
cases := []struct {
139+
event string
140+
payload map[string]any
141+
want map[string]any
142+
}{
143+
{
144+
event: "ao.session.first_agent_output",
145+
payload: map[string]any{"state": "active", "harness": "claude-code", "secret": "drop"},
146+
want: map[string]any{"state": "active", "harness": "claude-code"},
147+
},
148+
{
149+
event: "ao.session.pr_raised",
150+
payload: map[string]any{"state": "open", "ci": "pending", "review": "none", "mergeability": "unknown", "url": "https://x/y"},
151+
want: map[string]any{"state": "open", "ci": "pending", "review": "none", "mergeability": "unknown"},
152+
},
153+
{
154+
event: "ao.onboarding.first_pr_raised",
155+
payload: map[string]any{"state": "open", "url": "https://x/y"},
156+
want: map[string]any{"state": "open"},
157+
},
158+
{
159+
event: "ao.session.pr_reviewed",
160+
payload: map[string]any{"decision": "approved", "url": "https://x/y"},
161+
want: map[string]any{"decision": "approved"},
162+
},
163+
{
164+
event: "ao.onboarding.prereqs_checked",
165+
payload: map[string]any{"git_ok": true, "runtime_ok": true, "harness_ok": false, "github_ok": true, "all_ok": false, "path": "/Users/x"},
166+
want: map[string]any{"git_ok": true, "runtime_ok": true, "harness_ok": false, "github_ok": true, "all_ok": false},
167+
},
168+
}
169+
for _, tc := range cases {
170+
got := sanitizeRemotePayload(tc.event, tc.payload)
171+
if len(got) != len(tc.want) {
172+
t.Fatalf("%s: sanitized = %#v, want %#v", tc.event, got, tc.want)
173+
}
174+
for k, v := range tc.want {
175+
if got[k] != v {
176+
t.Fatalf("%s: key %q = %#v, want %#v", tc.event, k, got[k], v)
177+
}
178+
}
179+
}
180+
}
181+
137182
type roundTripClient func(*http.Request) (*http.Response, error)
138183

139184
func (f roundTripClient) Do(req *http.Request) (*http.Response, error) { return f(req) }

backend/internal/daemon/daemon.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,21 @@ func Run() error {
8282
return err
8383
}
8484

85+
// Onboarding funnel telemetry. A single durable milestone marker gates the
86+
// once-per-install events across the CDC subscriber and the prereqs probe.
87+
onboardingMilestones := newMilestoneStore(cfg.DataDir)
88+
// Turn live PR row changes into funnel telemetry (pr_raised = activation,
89+
// pr_merged = success) plus their once-per-install onboarding milestones.
90+
// Unsubscribe on shutdown so the broadcaster drops the closure.
91+
unsubscribeOnboarding := startOnboardingCDC(cdcPipe.Broadcaster, telemetrySink, onboardingMilestones, log)
92+
defer unsubscribeOnboarding()
93+
// Stage 4: probe local prereqs off the boot path and emit prereqs_checked /
94+
// prereqs_ready. The app supervisor starts the daemon on first launch, so
95+
// this seeds the funnel without a first-run wizard.
96+
go emitPrereqsTelemetry(ctx, telemetrySink, onboardingMilestones, func() bool {
97+
return onboardingMilestones.claimed("prereqs_ready")
98+
})
99+
85100
// Terminal streaming: the selected runtime (tmux on macOS/Linux, conpty on Windows) supplies the
86101
// attach Stream and liveness; the CDC broadcaster feeds the session-state channel. The manager
87102
// is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
package daemon
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"log/slog"
7+
"os"
8+
"path/filepath"
9+
"sort"
10+
"sync"
11+
"time"
12+
13+
"github.com/aoagents/agent-orchestrator/backend/internal/cdc"
14+
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
15+
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
16+
)
17+
18+
// milestoneStore claims one-time onboarding milestones durably. The CDC
19+
// subscriber runs in a single poller-driven goroutine, but the marker is
20+
// persisted so a milestone already reached in a prior daemon run is never
21+
// re-emitted after a restart. Keyed by an opaque name (e.g. "first_pr_raised"
22+
// or "pr_merged:<url>"). It is the funnel's once-per-install gate, the CDC
23+
// analogue of the store-derived first-ness checks in the session service.
24+
type milestoneStore struct {
25+
mu sync.Mutex
26+
path string
27+
seen map[string]struct{}
28+
}
29+
30+
func newMilestoneStore(dataDir string) *milestoneStore {
31+
s := &milestoneStore{
32+
path: filepath.Join(dataDir, "telemetry_milestones.json"),
33+
seen: map[string]struct{}{},
34+
}
35+
if data, err := os.ReadFile(s.path); err == nil {
36+
var names []string
37+
if json.Unmarshal(data, &names) == nil {
38+
for _, n := range names {
39+
s.seen[n] = struct{}{}
40+
}
41+
}
42+
}
43+
return s
44+
}
45+
46+
// claimed reports whether name was already recorded, without claiming it.
47+
func (s *milestoneStore) claimed(name string) bool {
48+
s.mu.Lock()
49+
defer s.mu.Unlock()
50+
_, ok := s.seen[name]
51+
return ok
52+
}
53+
54+
// claim records name and returns true only the first time it is seen.
55+
func (s *milestoneStore) claim(name string) bool {
56+
s.mu.Lock()
57+
defer s.mu.Unlock()
58+
if _, ok := s.seen[name]; ok {
59+
return false
60+
}
61+
s.seen[name] = struct{}{}
62+
names := make([]string, 0, len(s.seen))
63+
for n := range s.seen {
64+
names = append(names, n)
65+
}
66+
sort.Strings(names)
67+
if data, err := json.Marshal(names); err == nil {
68+
_ = os.WriteFile(s.path, data, 0o600)
69+
}
70+
return true
71+
}
72+
73+
// prCDCPayload is the shape the pr_created/pr_updated triggers write into
74+
// change_log (migration 0006). Only the fields the funnel needs are decoded.
75+
type prCDCPayload struct {
76+
URL string `json:"url"`
77+
Session string `json:"session"`
78+
State string `json:"state"`
79+
CI string `json:"ci"`
80+
Review string `json:"review"`
81+
Mergeability string `json:"mergeability"`
82+
}
83+
84+
// startOnboardingCDC subscribes to the CDC broadcaster and turns PR row changes
85+
// into funnel telemetry: pr_created -> pr_raised (activation), pr_updated with
86+
// state=merged -> pr_merged (success), each paired with a once-per-install
87+
// onboarding milestone. The broadcaster only pushes live events, so this is a
88+
// best-effort live signal; the milestone marker keeps first-* events exactly
89+
// once across restarts. The subscriber callback must not block.
90+
func startOnboardingCDC(bcast *cdc.Broadcaster, sink ports.EventSink, milestones *milestoneStore, log *slog.Logger) func() {
91+
if bcast == nil || sink == nil || milestones == nil {
92+
return func() {}
93+
}
94+
return bcast.Subscribe(func(ev cdc.Event) {
95+
switch ev.Type {
96+
case cdc.EventPRCreated:
97+
emitPRRaised(sink, milestones, ev, log)
98+
case cdc.EventPRUpdated:
99+
emitPRMerged(sink, milestones, ev, log)
100+
emitPRReviewed(sink, milestones, ev, log)
101+
case cdc.EventPRReviewThreadResolved:
102+
emitPRRevised(sink, milestones, ev)
103+
}
104+
})
105+
}
106+
107+
func decodePRPayload(ev cdc.Event, log *slog.Logger) (prCDCPayload, bool) {
108+
var p prCDCPayload
109+
if err := json.Unmarshal(ev.Payload, &p); err != nil {
110+
if log != nil {
111+
log.Warn("onboarding cdc: decode pr payload", "type", ev.Type, "seq", ev.Seq, "err", err)
112+
}
113+
return prCDCPayload{}, false
114+
}
115+
return p, true
116+
}
117+
118+
func emitPRRaised(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
119+
p, ok := decodePRPayload(ev, log)
120+
if !ok {
121+
return
122+
}
123+
payload := map[string]any{"state": p.State, "ci": p.CI, "review": p.Review, "mergeability": p.Mergeability}
124+
emitCDCTelemetry(sink, "ao.session.pr_raised", ev, payload)
125+
if milestones.claim("first_pr_raised") {
126+
emitCDCTelemetry(sink, "ao.onboarding.first_pr_raised", ev, map[string]any{"state": p.State})
127+
}
128+
}
129+
130+
func emitPRMerged(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
131+
p, ok := decodePRPayload(ev, log)
132+
if !ok || p.State != string(domain.PRStateMerged) {
133+
return
134+
}
135+
// pr_updated fires on any tracked-field change, and a merged PR can still
136+
// emit later updates (CI/review). Dedup the merge fact per PR URL so
137+
// pr_merged is one event per PR.
138+
if p.URL != "" && !milestones.claim("pr_merged:"+p.URL) {
139+
return
140+
}
141+
emitCDCTelemetry(sink, "ao.session.pr_merged", ev, map[string]any{"state": p.State})
142+
if milestones.claim("first_pr_merged") {
143+
emitCDCTelemetry(sink, "ao.onboarding.first_pr_merged", ev, map[string]any{})
144+
}
145+
}
146+
147+
func emitPRReviewed(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
148+
p, ok := decodePRPayload(ev, log)
149+
if !ok {
150+
return
151+
}
152+
if p.Review != string(domain.ReviewApproved) && p.Review != string(domain.ReviewChangesRequest) {
153+
return
154+
}
155+
// pr_updated fires on any tracked-field change; dedup per (PR, decision) so
156+
// each distinct human verdict is one pr_reviewed event.
157+
if p.URL != "" && !milestones.claim("pr_reviewed:"+p.URL+":"+p.Review) {
158+
return
159+
}
160+
emitCDCTelemetry(sink, "ao.session.pr_reviewed", ev, map[string]any{"decision": p.Review})
161+
if milestones.claim("first_pr_reviewed") {
162+
emitCDCTelemetry(sink, "ao.onboarding.first_pr_reviewed", ev, map[string]any{"decision": p.Review})
163+
}
164+
}
165+
166+
// prThreadPayload is the pr_review_thread_resolved trigger shape (migration
167+
// 0004): a resolved review thread is the cleanest "agent addressed feedback"
168+
// signal available without tracking review history.
169+
type prThreadPayload struct {
170+
PR string `json:"pr"`
171+
Thread string `json:"thread"`
172+
}
173+
174+
func emitPRRevised(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event) {
175+
var p prThreadPayload
176+
if json.Unmarshal(ev.Payload, &p) != nil {
177+
return
178+
}
179+
// One revision signal per resolved thread.
180+
if p.Thread != "" && !milestones.claim("pr_revised:"+p.PR+":"+p.Thread) {
181+
return
182+
}
183+
emitCDCTelemetry(sink, "ao.session.pr_revised", ev, map[string]any{})
184+
if milestones.claim("first_pr_revised") {
185+
emitCDCTelemetry(sink, "ao.onboarding.first_pr_revised", ev, map[string]any{})
186+
}
187+
}
188+
189+
func emitCDCTelemetry(sink ports.EventSink, name string, ev cdc.Event, payload map[string]any) {
190+
out := ports.TelemetryEvent{
191+
Name: name,
192+
Source: "cdc",
193+
OccurredAt: time.Now().UTC(),
194+
Level: ports.TelemetryLevelInfo,
195+
Payload: payload,
196+
}
197+
if ev.ProjectID != "" {
198+
projectID := domain.ProjectID(ev.ProjectID)
199+
out.ProjectID = &projectID
200+
}
201+
if ev.SessionID != "" {
202+
sessionID := domain.SessionID(ev.SessionID)
203+
out.SessionID = &sessionID
204+
}
205+
sink.Emit(context.Background(), out)
206+
}

0 commit comments

Comments
 (0)