Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion frontend/types/gotypes.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,8 @@ declare global {
"ai:backendtype"?: string;
"ai:local"?: boolean;
"wsh:cmd"?: string;
"wsh:haderror"?: boolean;
"wsh:errorcount"?: number;
"wsh:count"?: number;
"conn:conntype"?: string;
"conn:wsherrorcode"?: string;
"conn:errorcode"?: string;
Expand Down
42 changes: 42 additions & 0 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const MaxTzNameLen = 50
const ActivityEventName = "app:activity"
const WshRunEventName = "wsh:run"

var cachedTosAgreedTs atomic.Int64

Expand Down Expand Up @@ -196,6 +197,44 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err
})
}

// aggregates wsh:run events per (cmd, haderror) key within the current 1-hour bucket
func updateWshRunTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
eventTs := time.Now().Truncate(time.Hour).Add(time.Hour)
incomingCount := tevent.Props.WshCount
if incomingCount <= 0 {
incomingCount = 1
}
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
uuidStr := tx.GetString(
`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ? AND json_extract(props, '$."wsh:cmd"') IS ?`,
eventTs.UnixMilli(), WshRunEventName, tevent.Props.WshCmd,
)
if uuidStr != "" {
var curProps telemetrydata.TEventProps
rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE uuid = ?`, uuidStr)
if rawProps != "" {
if err := json.Unmarshal([]byte(rawProps), &curProps); err != nil {
log.Printf("error unmarshalling wsh:run props: %v\n", err)
}
}
curCount := curProps.WshCount
if curCount <= 0 {
curCount = 1
}
curProps.WshCount = curCount + incomingCount
curProps.WshErrorCount += tevent.Props.WshErrorCount
tx.Exec(`UPDATE db_tevent SET props = ? WHERE uuid = ?`, dbutil.QuickJson(curProps), uuidStr)
} else {
newProps := tevent.Props
newProps.WshCount = incomingCount
tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339)
tx.Exec(`INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?, ?)`,
uuid.New().String(), eventTs.UnixMilli(), tsLocal, WshRunEventName, dbutil.QuickJson(newProps))
}
Comment on lines +207 to +233
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race window can create duplicate hourly rows for the same wsh:cmd.

Line 208 does a read-before-write (SELECT uuid ...) and then conditionally inserts. Concurrent writers can both miss and both insert, breaking aggregation guarantees for a (ts, event, cmd) bucket.

A robust fix is to enforce a DB-level unique key for the bucket identity and switch this path to an atomic UPSERT that increments wsh:count/wsh:errorcount.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/telemetry/telemetry.go` around lines 207 - 233, The current
read-then-insert in the wstore.WithTx block (the SELECT uuid ... conditional
insert around WshRunEventName handling) allows a race where concurrent writers
both insert duplicate hourly rows; add a DB-level UNIQUE constraint for the
bucket identity used here (the tuple for the hourly bucket: ts, event, and the
wsh:cmd identity extracted from props) and replace the read-then-insert logic
with a single atomic UPSERT that updates the existing row's props to increment
WshCount and WshErrorCount (and merges props) on conflict; locate the branch
using tx.GetString(`SELECT uuid FROM db_tevent ...`) / tx.Exec(...) and change
it to an INSERT ... ON CONFLICT(...) DO UPDATE (or equivalent upsert supported
by the DB) that atomically increments telemetrydata.TEventProps.WshCount and
WshErrorCount and updates props via dbutil.QuickJson (or use JSON functions)
instead of separate SELECT/UPDATE.

return nil
})
}

func TruncateActivityTEventForShutdown(ctx context.Context) error {
nowTs := time.Now()
eventTs := nowTs.Truncate(time.Hour).Add(time.Hour)
Expand Down Expand Up @@ -259,6 +298,9 @@ func RecordTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
if tevent.Event == ActivityEventName {
return updateActivityTEvent(ctx, tevent)
}
if tevent.Event == WshRunEventName {
return updateWshRunTEvent(ctx, tevent)
}
return insertTEvent(ctx, tevent)
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/telemetry/telemetrydata/telemetrydata.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ type TEventProps struct {
AiBackendType string `json:"ai:backendtype,omitempty"`
AiLocal bool `json:"ai:local,omitempty"`

WshCmd string `json:"wsh:cmd,omitempty"`
WshHadError bool `json:"wsh:haderror,omitempty"`
WshCmd string `json:"wsh:cmd,omitempty"`
WshErrorCount int `json:"wsh:errorcount,omitempty"`
WshCount int `json:"wsh:count,omitempty"`

ConnType string `json:"conn:conntype,omitempty"`
ConnWshErrorCode string `json:"conn:wsherrorcode,omitempty"`
Expand Down
5 changes: 3 additions & 2 deletions pkg/wshrpc/wshserver/wshserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,8 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int
delete(data, key)
}
if strings.HasSuffix(key, "#error") {
props.WshHadError = true
props.WshCmd = strings.TrimSuffix(key, "#error")
props.WshErrorCount = 1
} else {
props.WshCmd = key
}
Expand All @@ -1344,7 +1345,7 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int
}
telemetry.GoUpdateActivityWrap(activityUpdate, "wsh-activity")
telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{
Event: "wsh:run",
Event: telemetry.WshRunEventName,
Props: props,
})
return nil
Expand Down
Loading