Skip to content

Commit 68e6a7f

Browse files
shanturrekram1-node
authored andcommitted
feat(core): add session metadata support (anomalyco#23068)
Co-authored-by: Aiden Cline <aidenpcline@gmail.com>
1 parent 5f4d6e0 commit 68e6a7f

5 files changed

Lines changed: 204 additions & 4 deletions

File tree

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import { NotFoundError } from "@/storage/storage"
2+
import { eq } from "drizzle-orm"
3+
import { and } from "drizzle-orm"
4+
import { sql } from "drizzle-orm"
5+
import type { TxOrDb } from "@/storage/db"
6+
import { SyncEvent } from "@/sync"
7+
import * as Session from "./session"
8+
import { MessageV2 } from "./message-v2"
9+
import { SessionTable, MessageTable, PartTable } from "./session.sql"
10+
import { WorkspaceTable } from "@/control-plane/workspace.sql"
11+
import { Log } from "@opencode-ai/core/util/log"
12+
import nextProjectors from "./projectors-next"
13+
14+
const log = Log.create({ service: "session.projector" })
15+
16+
function foreign(err: unknown) {
17+
if (typeof err !== "object" || err === null) return false
18+
if ("code" in err && err.code === "SQLITE_CONSTRAINT_FOREIGNKEY") return true
19+
return "message" in err && typeof err.message === "string" && err.message.includes("FOREIGN KEY constraint failed")
20+
}
21+
22+
export type DeepPartial<T> = T extends object ? { [K in keyof T]?: DeepPartial<T[K]> | null } : T
23+
24+
type Usage = Pick<MessageV2.StepFinishPart, "cost" | "tokens">
25+
26+
function usage(part: MessageV2.Part | (typeof PartTable.$inferSelect)["data"]): Usage | undefined {
27+
if (part.type !== "step-finish") return undefined
28+
if (!("cost" in part) || !("tokens" in part)) return undefined
29+
return { cost: part.cost, tokens: part.tokens }
30+
}
31+
32+
function applyUsage(db: TxOrDb, sessionID: Session.Info["id"], value: Usage, sign = 1) {
33+
db.update(SessionTable)
34+
.set({
35+
cost: sql`${SessionTable.cost} + ${value.cost * sign}`,
36+
tokens_input: sql`${SessionTable.tokens_input} + ${value.tokens.input * sign}`,
37+
tokens_output: sql`${SessionTable.tokens_output} + ${value.tokens.output * sign}`,
38+
tokens_reasoning: sql`${SessionTable.tokens_reasoning} + ${value.tokens.reasoning * sign}`,
39+
tokens_cache_read: sql`${SessionTable.tokens_cache_read} + ${value.tokens.cache.read * sign}`,
40+
tokens_cache_write: sql`${SessionTable.tokens_cache_write} + ${value.tokens.cache.write * sign}`,
41+
time_updated: sql`${SessionTable.time_updated}`,
42+
})
43+
.where(eq(SessionTable.id, sessionID))
44+
.run()
45+
}
46+
47+
function grab<T extends object, K1 extends keyof T, X>(
48+
obj: T,
49+
field1: K1,
50+
cb?: (val: NonNullable<T[K1]>) => X,
51+
): X | undefined {
52+
if (obj == undefined || !(field1 in obj)) return undefined
53+
54+
const val = obj[field1]
55+
if (val && typeof val === "object" && cb) {
56+
return cb(val)
57+
}
58+
if (val === undefined) {
59+
throw new Error(
60+
"Session update failure: pass `null` to clear a field instead of `undefined`: " + JSON.stringify(obj),
61+
)
62+
}
63+
return val as X | undefined
64+
}
65+
66+
export function toPartialRow(info: DeepPartial<Session.Info>) {
67+
const obj = {
68+
id: grab(info, "id"),
69+
project_id: grab(info, "projectID"),
70+
workspace_id: grab(info, "workspaceID"),
71+
parent_id: grab(info, "parentID"),
72+
slug: grab(info, "slug"),
73+
directory: grab(info, "directory"),
74+
path: grab(info, "path"),
75+
title: grab(info, "title"),
76+
version: grab(info, "version"),
77+
share_url: grab(info, "share", (v) => grab(v, "url")),
78+
summary_additions: grab(info, "summary", (v) => grab(v, "additions")),
79+
summary_deletions: grab(info, "summary", (v) => grab(v, "deletions")),
80+
summary_files: grab(info, "summary", (v) => grab(v, "files")),
81+
summary_diffs: grab(info, "summary", (v) => grab(v, "diffs")),
82+
metadata: grab(info, "metadata"),
83+
cost: grab(info, "cost"),
84+
tokens_input: grab(info, "tokens", (v) => grab(v, "input")),
85+
tokens_output: grab(info, "tokens", (v) => grab(v, "output")),
86+
tokens_reasoning: grab(info, "tokens", (v) => grab(v, "reasoning")),
87+
tokens_cache_read: grab(info, "tokens", (v) => grab(v, "cache", (cache) => grab(cache, "read"))),
88+
tokens_cache_write: grab(info, "tokens", (v) => grab(v, "cache", (cache) => grab(cache, "write"))),
89+
revert: grab(info, "revert"),
90+
permission: grab(info, "permission"),
91+
time_created: grab(info, "time", (v) => grab(v, "created")),
92+
time_updated: grab(info, "time", (v) => grab(v, "updated")),
93+
time_compacting: grab(info, "time", (v) => grab(v, "compacting")),
94+
time_archived: grab(info, "time", (v) => grab(v, "archived")),
95+
}
96+
97+
return Object.fromEntries(Object.entries(obj).filter(([_, val]) => val !== undefined))
98+
}
99+
100+
export default [
101+
SyncEvent.project(Session.Event.Created, (db, data) => {
102+
db.insert(SessionTable)
103+
.values(Session.toRow(data.info as Session.Info))
104+
.run()
105+
106+
if (data.info.workspaceID) {
107+
db.update(WorkspaceTable).set({ time_used: Date.now() }).where(eq(WorkspaceTable.id, data.info.workspaceID)).run()
108+
}
109+
}),
110+
111+
SyncEvent.project(Session.Event.Updated, (db, data) => {
112+
const info = data.info
113+
const row = db
114+
.update(SessionTable)
115+
.set({ time_updated: sql`${SessionTable.time_updated}`, ...toPartialRow(info as Session.Patch) })
116+
.where(eq(SessionTable.id, data.sessionID))
117+
.returning()
118+
.get()
119+
if (!row) throw new NotFoundError({ message: `Session not found: ${data.sessionID}` })
120+
}),
121+
122+
SyncEvent.project(Session.Event.Deleted, (db, data) => {
123+
db.delete(SessionTable).where(eq(SessionTable.id, data.sessionID)).run()
124+
}),
125+
126+
SyncEvent.project(MessageV2.Event.Updated, (db, data) => {
127+
const time_created = data.info.time.created
128+
const { id, sessionID, ...rest } = data.info
129+
130+
try {
131+
db.insert(MessageTable)
132+
.values({
133+
id,
134+
session_id: sessionID,
135+
time_created,
136+
data: rest,
137+
})
138+
.onConflictDoUpdate({ target: MessageTable.id, set: { data: rest } })
139+
.run()
140+
} catch (err) {
141+
if (!foreign(err)) throw err
142+
log.warn("ignored late message update", { messageID: id, sessionID })
143+
}
144+
}),
145+
146+
SyncEvent.project(MessageV2.Event.Removed, (db, data) => {
147+
for (const row of db
148+
.select()
149+
.from(PartTable)
150+
.where(and(eq(PartTable.message_id, data.messageID), eq(PartTable.session_id, data.sessionID)))
151+
.all()) {
152+
const previous = usage(row.data)
153+
if (previous) applyUsage(db, data.sessionID, previous, -1)
154+
}
155+
db.delete(MessageTable)
156+
.where(and(eq(MessageTable.id, data.messageID), eq(MessageTable.session_id, data.sessionID)))
157+
.run()
158+
}),
159+
160+
SyncEvent.project(MessageV2.Event.PartRemoved, (db, data) => {
161+
const row = db
162+
.select()
163+
.from(PartTable)
164+
.where(and(eq(PartTable.id, data.partID), eq(PartTable.session_id, data.sessionID)))
165+
.get()
166+
const previous = row && usage(row.data)
167+
if (previous) applyUsage(db, data.sessionID, previous, -1)
168+
169+
db.delete(PartTable)
170+
.where(and(eq(PartTable.id, data.partID), eq(PartTable.session_id, data.sessionID)))
171+
.run()
172+
}),
173+
174+
SyncEvent.project(MessageV2.Event.PartUpdated, (db, data) => {
175+
const { id, messageID, sessionID, ...rest } = data.part
176+
const row = db.select().from(PartTable).where(eq(PartTable.id, id)).get()
177+
178+
try {
179+
db.insert(PartTable)
180+
.values({
181+
id,
182+
message_id: messageID,
183+
session_id: sessionID,
184+
time_created: data.time,
185+
data: rest,
186+
})
187+
.onConflictDoUpdate({ target: PartTable.id, set: { data: rest } })
188+
.run()
189+
const previous = row && usage(row.data)
190+
const next = usage(data.part)
191+
if (previous) applyUsage(db, row.session_id, previous, -1)
192+
if (next) applyUsage(db, sessionID, next)
193+
} catch (err) {
194+
if (!foreign(err)) throw err
195+
log.warn("ignored late part update", { partID: id, messageID, sessionID })
196+
}
197+
}),
198+
199+
...nextProjectors,
200+
]

packages/opencode/src/session/session.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ export const layer: Layer.Layer<
836836
})
837837

838838
const setMetadata = Effect.fn("Session.setMetadata")(function* (input: typeof SetMetadataInput.Type) {
839-
yield* patch(input.sessionID, { metadata: input.metadata, time: { updated: Date.now() } }).pipe(Effect.orDie)
839+
yield* patch(input.sessionID, { metadata: input.metadata, time: { updated: Date.now() } })
840840
})
841841

842842
const setPermission = Effect.fn("Session.setPermission")(function* (input: {

packages/opencode/test/server/session-actions.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { afterEach, describe, expect, mock } from "bun:test"
1+
import { afterEach, describe, expect, mock } from "bun:test"
22
import { Effect, Layer } from "effect"
33
import { Session as SessionNs } from "@/session/session"
44
import * as Log from "@opencode-ai/core/util/log"

packages/sdk/js/src/v2/gen/types.gen.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// This file is auto-generated by @hey-api/openapi-ts
1+
// This file is auto-generated by @hey-api/openapi-ts
22

33
export type ClientOptions = {
44
baseUrl: `${string}://${string}` | (string & {})

packages/sdk/openapi.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{
1+
{
22
"openapi": "3.1.0",
33
"info": {
44
"title": "opencode",

0 commit comments

Comments
 (0)