Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
682 changes: 560 additions & 122 deletions packages/client/src/promise/generated/types.ts

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/core/src/database/migration.gen.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"

export default {
id: "20260703200000_reset_v2_event_fragments",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`DELETE FROM \`session_input\`;`)
yield* tx.run(`DELETE FROM \`session_message\`;`)
yield* tx.run(`DELETE FROM \`event\`;`)
yield* tx.run(`DELETE FROM \`event_sequence\`;`)
})
},
} satisfies DatabaseMigration.Migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"

export default {
id: "20260703210000_reset_v2_execution_errors",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`DELETE FROM \`session_input\`;`)
yield* tx.run(`DELETE FROM \`session_message\`;`)
yield* tx.run(`DELETE FROM \`event\`;`)
yield* tx.run(`DELETE FROM \`event_sequence\`;`)
})
},
} satisfies DatabaseMigration.Migration
45 changes: 38 additions & 7 deletions packages/core/src/permission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,28 @@ export type AskResult = typeof AskResult.Type

export const Event = Permission.Event

export class RejectedError extends Schema.TaggedErrorClass<RejectedError>()("PermissionV2.RejectedError", {}) {}
export class RejectedError extends Schema.TaggedErrorClass<RejectedError>()("PermissionV2.RejectedError", {
permission: Schema.String,
resources: Schema.Array(Schema.String),
}) {
override get message() {
return `Permission rejected: ${this.permission}`
}
}

export class CorrectedError extends Schema.TaggedErrorClass<CorrectedError>()("PermissionV2.CorrectedError", {
feedback: Schema.String,
}) {}

export class DeniedError extends Schema.TaggedErrorClass<DeniedError>()("PermissionV2.DeniedError", {
rules: Permission.Ruleset,
}) {}
permission: Schema.String,
resources: Schema.Array(Schema.String),
}) {
override get message() {
return `Permission denied: ${this.permission}`
}
}

export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("PermissionV2.NotFoundError", {
requestID: ID,
Expand Down Expand Up @@ -119,9 +132,17 @@ const layer = Layer.effect(
const pending = new Map<ID, Pending>()

yield* Effect.addFinalizer(() =>
Effect.forEach(pending.values(), (item) => Deferred.fail(item.deferred, new RejectedError()), {
discard: true,
}).pipe(
Effect.forEach(
pending.values(),
(item) =>
Deferred.fail(
item.deferred,
new RejectedError({ permission: item.request.action, resources: [...item.request.resources] }),
),
{
discard: true,
},
).pipe(
Effect.ensuring(
Effect.sync(() => {
pending.clear()
Expand Down Expand Up @@ -201,6 +222,8 @@ const layer = Layer.effect(
if (result.effect === "deny") {
return yield* new DeniedError({
rules: relevant(input, result.rules),
permission: input.action,
resources: input.resources,
})
}
if (result.effect === "allow") return
Expand Down Expand Up @@ -230,7 +253,12 @@ const layer = Layer.effect(
if (input.reply === "reject") {
yield* Deferred.fail(
existing.deferred,
input.message ? new CorrectedError({ feedback: input.message }) : new RejectedError(),
input.message
? new CorrectedError({ feedback: input.message })
: new RejectedError({
permission: existing.request.action,
resources: [...existing.request.resources],
}),
)
pending.delete(input.requestID)
for (const [id, item] of pending) {
Expand All @@ -240,7 +268,10 @@ const layer = Layer.effect(
requestID: item.request.id,
reply: "reject",
})
yield* Deferred.fail(item.deferred, new RejectedError())
yield* Deferred.fail(
item.deferred,
new RejectedError({ permission: item.request.action, resources: [...item.request.resources] }),
)
pending.delete(id)
}
return
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/session/compaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,13 @@ const make = (dependencies: Dependencies) => {
.pipe(
Stream.runForEach((event) => {
if (LLMEvent.is.providerError(event)) failed = true
if (LLMEvent.is.textDelta(event)) chunks.push(event.text)
if (LLMEvent.is.textDelta(event)) {
chunks.push(event.text)
return dependencies.events.publish(SessionEvent.Compaction.Delta, {
sessionID: input.sessionID,
text: event.text,
})
}
return Effect.void
}),
Effect.as(true),
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/session/error.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Schema } from "effect"
import { SessionMessage } from "./message"
import { SessionSchema } from "./schema"
import { SessionError } from "@opencode-ai/schema/session-error"

export class MessageDecodeError extends Schema.TaggedErrorClass<MessageDecodeError>()("Session.MessageDecodeError", {
sessionID: SessionSchema.ID,
Expand All @@ -10,3 +11,20 @@ export class MessageDecodeError extends Schema.TaggedErrorClass<MessageDecodeErr
return `Failed to decode message ${this.messageID} in session ${this.sessionID}`
}
}

export class StepFailedError extends Schema.TaggedErrorClass<StepFailedError>()("Session.StepFailedError", {
error: SessionError.Error,
}) {
override get message() {
return this.error.message
}
}

export class UserInterruptedError extends Schema.TaggedErrorClass<UserInterruptedError>()(
"Session.UserInterruptedError",
{},
) {
override get message() {
return "Session interrupted by user"
}
}
67 changes: 48 additions & 19 deletions packages/core/src/session/execution/local.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cause, DateTime, Effect, Exit, Layer } from "effect"
import { Cause, Effect, Exit, Layer } from "effect"
import { EventV2 } from "../../event"
import { LocationServiceMap } from "../../location-service-map"
import { makeGlobalNode } from "../../effect/app-node"
Expand All @@ -8,6 +8,16 @@ import { SessionRunner } from "../runner"
import { SessionSchema } from "../schema"
import { SessionStore } from "../store"
import { SessionExecution } from "../execution"
import { toSessionError } from "../to-session-error"
import { UserInterruptedError } from "../error"

export function terminal(exit: Exit.Exit<void, SessionRunner.RunError>, reason?: "user" | "shutdown" | "superseded") {
if (Exit.isSuccess(exit)) return { type: "succeeded" as const }
if (Cause.hasInterrupts(exit.cause)) return { type: "interrupted" as const, reason: reason ?? "shutdown" }
const failure = Cause.squash(exit.cause)
if (failure instanceof UserInterruptedError) return { type: "interrupted" as const, reason: "user" as const }
return { type: "failed" as const, error: toSessionError(failure) }
}

/** Current-process routing for implicit-local Locations. Future remote placement belongs here. */
const layer = Layer.effect(
Expand All @@ -16,7 +26,23 @@ const layer = Layer.effect(
const store = yield* SessionStore.Service
const locations = yield* LocationServiceMap.Service
const events = yield* EventV2.Service
const coordinator = yield* SessionRunCoordinator.make<SessionSchema.ID, SessionRunner.RunError>({
const reportLifecycle = <A>(sessionID: SessionSchema.ID, effect: Effect.Effect<A>) =>
effect.pipe(
Effect.tapCause((cause) =>
Cause.hasInterruptsOnly(cause)
? Effect.void
: Effect.logError("Failed to publish Session execution lifecycle", cause).pipe(
Effect.annotateLogs({ sessionID }),
),
),
Effect.asVoid,
)
const coordinator = yield* SessionRunCoordinator.make<
SessionSchema.ID,
SessionRunner.RunError,
"user" | "shutdown" | "superseded"
>({
started: (sessionID) => reportLifecycle(sessionID, events.publish(SessionEvent.Execution.Started, { sessionID })),
drain: Effect.fnUntraced(function* (sessionID: SessionSchema.ID, force) {
const session = yield* store.get(sessionID)
if (!session) return yield* Effect.die(new Error(`Session not found: ${sessionID}`))
Expand All @@ -29,28 +55,31 @@ const layer = Layer.effect(
),
)
}),
// One ExecutionSettled per execution (busy period), covering every coalesced drain.
settled: (sessionID, exit) =>
Effect.gen(function* () {
const failure =
Exit.isFailure(exit) && !Cause.hasInterrupts(exit.cause) ? Cause.squash(exit.cause) : undefined
yield* events.publish(SessionEvent.ExecutionSettled, {
sessionID,
outcome: Exit.isSuccess(exit) ? "success" : Cause.hasInterrupts(exit.cause) ? "interrupted" : "failure",
error:
failure !== undefined
? { type: "unknown", message: failure instanceof Error ? failure.message : String(failure) }
: undefined,
})
}).pipe(
Effect.catchCause(() => Effect.void),
Effect.asVoid,
// One terminal observation per busy period, covering every coalesced drain.
settled: (sessionID, exit, reason) =>
reportLifecycle(
sessionID,
Effect.gen(function* () {
const outcome = terminal(exit, reason)
if (outcome.type === "succeeded") {
yield* events.publish(SessionEvent.Execution.Succeeded, { sessionID })
return
}
if (outcome.type === "interrupted") {
yield* events.publish(SessionEvent.Execution.Interrupted, { sessionID, reason: outcome.reason })
return
}
yield* events.publish(SessionEvent.Execution.Failed, {
sessionID,
error: outcome.error,
})
}),
),
})

return SessionExecution.Service.of({
active: coordinator.active,
interrupt: coordinator.interrupt,
interrupt: (sessionID) => coordinator.interrupt(sessionID, "user"),
resume: coordinator.run,
wake: coordinator.wake,
awaitIdle: coordinator.awaitIdle,
Expand Down
Loading
Loading