Skip to content

Commit 148bc06

Browse files
authored
live view telemetry (#13)
<!-- CURSOR_SUMMARY --> > [!NOTE] > **Medium Risk** > Adds a new built-in plugin that asynchronously sends session connect/disconnect events over HTTP, introducing new outbound network behavior and concurrency/queueing paths (though gated by a config flag and tested). > > **Overview** > Adds a new built-in `telemetry` plugin that can **forward live-view session connect/disconnect events** to a configured HTTP endpoint (defaulting to `http://127.0.0.1:10001/telemetry/events`), including a computed `duration_ms` on disconnect. > > The plugin is **opt-in** via new flags `telemetry.enabled` and `telemetry.endpoint`, uses a bounded in-memory queue with a background worker (dropping events when saturated to avoid blocking session goroutines), and includes tests covering disabled behavior, event emission, and non-blocking behavior on endpoint failures. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit c787d1c. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 41de6d6 commit 148bc06

7 files changed

Lines changed: 422 additions & 1 deletion

File tree

server/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/rs/zerolog v1.34.0
2222
github.com/spf13/cobra v1.9.1
2323
github.com/spf13/viper v1.20.1
24+
github.com/stretchr/testify v1.10.0
2425
)
2526

2627
require (
@@ -57,7 +58,6 @@ require (
5758
github.com/spf13/afero v1.14.0 // indirect
5859
github.com/spf13/cast v1.9.2 // indirect
5960
github.com/spf13/pflag v1.0.7 // indirect
60-
github.com/stretchr/testify v1.10.0 // indirect
6161
github.com/subosito/gotenv v1.6.0 // indirect
6262
github.com/wlynxg/anet v0.0.5 // indirect
6363
golang.org/x/crypto v0.40.0 // indirect

server/internal/plugins/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/m1k1o/neko/server/internal/plugins/chat"
1515
"github.com/m1k1o/neko/server/internal/plugins/filetransfer"
1616
"github.com/m1k1o/neko/server/internal/plugins/scaletozero"
17+
"github.com/m1k1o/neko/server/internal/plugins/telemetry"
1718
"github.com/m1k1o/neko/server/pkg/types"
1819
)
1920

@@ -49,6 +50,7 @@ func New(config *config.Plugins) *ManagerCtx {
4950
manager.plugins.addPlugin(filetransfer.NewPlugin())
5051
manager.plugins.addPlugin(chat.NewPlugin())
5152
manager.plugins.addPlugin(scaletozero.NewPlugin())
53+
manager.plugins.addPlugin(telemetry.NewPlugin())
5254

5355
return manager
5456
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package telemetry
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
"github.com/spf13/viper"
6+
)
7+
8+
type Config struct {
9+
Enabled bool
10+
Endpoint string
11+
}
12+
13+
func (Config) Init(cmd *cobra.Command) error {
14+
cmd.PersistentFlags().Bool("telemetry.enabled", false, "forward live-view session connect/disconnect events to kernel-images-api")
15+
if err := viper.BindPFlag("telemetry.enabled", cmd.PersistentFlags().Lookup("telemetry.enabled")); err != nil {
16+
return err
17+
}
18+
19+
cmd.PersistentFlags().String("telemetry.endpoint", "http://127.0.0.1:10001/telemetry/events", "kernel-images-api telemetry publish endpoint")
20+
if err := viper.BindPFlag("telemetry.endpoint", cmd.PersistentFlags().Lookup("telemetry.endpoint")); err != nil {
21+
return err
22+
}
23+
24+
return nil
25+
}
26+
27+
func (c *Config) Set() {
28+
c.Enabled = viper.GetBool("telemetry.enabled")
29+
c.Endpoint = viper.GetString("telemetry.endpoint")
30+
}
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package telemetry
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/m1k1o/neko/server/pkg/types"
12+
"github.com/rs/zerolog"
13+
"github.com/rs/zerolog/log"
14+
)
15+
16+
const (
17+
queueDepth = 256
18+
defaultHTTPTimeout = 5 * time.Second
19+
)
20+
21+
type Manager struct {
22+
logger zerolog.Logger
23+
config *Config
24+
sessions types.SessionManager
25+
httpClient *http.Client
26+
27+
mu sync.Mutex
28+
connectedAt map[string]time.Time
29+
30+
eventsCh chan eventPayload
31+
stopCh chan struct{}
32+
wg sync.WaitGroup
33+
}
34+
35+
func NewManager(sessions types.SessionManager, config *Config) *Manager {
36+
return &Manager{
37+
logger: log.With().Str("module", "telemetry").Logger(),
38+
config: config,
39+
sessions: sessions,
40+
httpClient: &http.Client{Timeout: defaultHTTPTimeout},
41+
connectedAt: make(map[string]time.Time),
42+
eventsCh: make(chan eventPayload, queueDepth),
43+
stopCh: make(chan struct{}),
44+
}
45+
}
46+
47+
func (m *Manager) Start() error {
48+
if !m.config.Enabled {
49+
return nil
50+
}
51+
m.logger.Info().Str("endpoint", m.config.Endpoint).Msg("plugin enabled")
52+
53+
m.wg.Add(1)
54+
go m.worker()
55+
56+
m.sessions.OnConnected(func(session types.Session) {
57+
m.handleConnect(session.ID())
58+
})
59+
m.sessions.OnDisconnected(func(session types.Session) {
60+
m.handleDisconnect(session.ID())
61+
})
62+
63+
return nil
64+
}
65+
66+
func (m *Manager) Shutdown() error {
67+
if !m.config.Enabled {
68+
return nil
69+
}
70+
close(m.stopCh)
71+
m.wg.Wait()
72+
return nil
73+
}
74+
75+
func (m *Manager) handleConnect(id string) {
76+
m.mu.Lock()
77+
m.connectedAt[id] = time.Now()
78+
m.mu.Unlock()
79+
80+
m.enqueue(eventPayload{
81+
Type: "live_view_connect",
82+
SourceEvent: "neko.session.connected",
83+
Data: map[string]any{"session_id": id},
84+
})
85+
}
86+
87+
func (m *Manager) handleDisconnect(id string) {
88+
m.mu.Lock()
89+
start, ok := m.connectedAt[id]
90+
delete(m.connectedAt, id)
91+
m.mu.Unlock()
92+
93+
var durationMs float64
94+
if ok {
95+
durationMs = float64(time.Since(start).Microseconds()) / 1000.0
96+
}
97+
98+
m.enqueue(eventPayload{
99+
Type: "live_view_disconnect",
100+
SourceEvent: "neko.session.disconnected",
101+
Data: map[string]any{"session_id": id, "duration_ms": durationMs},
102+
})
103+
}
104+
105+
func (m *Manager) enqueue(ev eventPayload) {
106+
select {
107+
case m.eventsCh <- ev:
108+
default:
109+
// Drop rather than block neko's session goroutines. A backed-up
110+
// kernel-images-api means we'd lose lifecycle pairs anyway.
111+
m.logger.Warn().Str("type", ev.Type).Msg("telemetry queue full; dropping event")
112+
}
113+
}
114+
115+
func (m *Manager) worker() {
116+
defer m.wg.Done()
117+
for {
118+
select {
119+
case <-m.stopCh:
120+
// Best-effort drain on shutdown so we don't lose paired
121+
// connect/disconnects when neko exits cleanly.
122+
for {
123+
select {
124+
case ev := <-m.eventsCh:
125+
m.publish(ev)
126+
default:
127+
return
128+
}
129+
}
130+
case ev := <-m.eventsCh:
131+
m.publish(ev)
132+
}
133+
}
134+
}
135+
136+
func (m *Manager) publish(ev eventPayload) {
137+
body := publishBody{
138+
Type: ev.Type,
139+
Category: "system",
140+
Source: publishSource{
141+
Kind: "local_process",
142+
Event: ev.SourceEvent,
143+
},
144+
Data: ev.Data,
145+
}
146+
raw, err := json.Marshal(body)
147+
if err != nil {
148+
m.logger.Warn().Err(err).Msg("marshal telemetry body failed")
149+
return
150+
}
151+
ctx, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout)
152+
defer cancel()
153+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.config.Endpoint, bytes.NewReader(raw))
154+
if err != nil {
155+
m.logger.Warn().Err(err).Msg("build telemetry request failed")
156+
return
157+
}
158+
req.Header.Set("Content-Type", "application/json")
159+
resp, err := m.httpClient.Do(req)
160+
if err != nil {
161+
m.logger.Debug().Err(err).Str("type", ev.Type).Msg("telemetry POST failed")
162+
return
163+
}
164+
_ = resp.Body.Close()
165+
if resp.StatusCode/100 != 2 {
166+
m.logger.Debug().Int("status", resp.StatusCode).Str("type", ev.Type).Msg("telemetry POST non-2xx")
167+
}
168+
}
169+
170+
type eventPayload struct {
171+
Type string
172+
SourceEvent string
173+
Data map[string]any
174+
}
175+
176+
type publishSource struct {
177+
Kind string `json:"kind"`
178+
Event string `json:"event"`
179+
}
180+
181+
type publishBody struct {
182+
Type string `json:"type"`
183+
Category string `json:"category"`
184+
Source publishSource `json:"source"`
185+
Data map[string]any `json:"data"`
186+
}

0 commit comments

Comments
 (0)