Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions packages/core/src/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface RunOptions {
readonly combineOutput?: boolean
readonly maxOutputBytes?: number
readonly maxErrorBytes?: number
readonly onOutputChunk?: (chunk: Uint8Array) => Effect.Effect<void>
readonly signal?: AbortSignal
readonly timeout?: Duration.Input
readonly stdin?: string | Uint8Array | Stream.Stream<Uint8Array, PlatformError>
Expand Down Expand Up @@ -118,9 +119,14 @@ const normalizeStdin = (
? Stream.make(input)
: input

export const collectStream = (stream: Stream.Stream<Uint8Array, PlatformError>, maxOutputBytes: number | undefined) =>
Stream.runFold(
stream,
export const collectStream = (
stream: Stream.Stream<Uint8Array, PlatformError>,
maxOutputBytes: number | undefined,
onChunk?: (chunk: Uint8Array) => Effect.Effect<void>,
) => {
const source = onChunk ? stream.pipe(Stream.tap((chunk) => onChunk(chunk))) : stream
return Stream.runFold(
source,
() => ({ chunks: [] as Uint8Array[], bytes: 0, truncated: false }),
(acc, chunk) => {
if (maxOutputBytes === undefined) {
Expand All @@ -135,6 +141,7 @@ export const collectStream = (stream: Stream.Stream<Uint8Array, PlatformError>,
return acc
},
).pipe(Effect.map((x) => ({ buffer: Buffer.concat(x.chunks), truncated: x.truncated })))
}

const layer = Layer.effect(
Service,
Expand All @@ -148,7 +155,7 @@ const layer = Layer.effect(
const handle = yield* spawner.spawn(command)
if (options?.combineOutput) {
const [output, exitCode] = yield* Effect.all(
[collectStream(handle.all, options.maxOutputBytes), handle.exitCode],
[collectStream(handle.all, options.maxOutputBytes, options.onOutputChunk), handle.exitCode],
{ concurrency: "unbounded" },
)
return {
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/session/runner/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,24 @@ const layer = Layer.effect(
agent: agent.id,
assistantMessageID,
call: event,
progress: (output) =>
Effect.gen(function* () {
yield* withPublication(
events.publish(SessionEvent.Tool.Progress, {
sessionID: session.id,
timestamp: yield* DateTime.now,
assistantMessageID,
callID: event.id,
structured:
typeof output.structured === "object" &&
output.structured !== null &&
!Array.isArray(output.structured)
? (output.structured as Record<string, unknown>)
: { value: output.structured },
content: output.content,
}),
)
}),
}),
).pipe(
Effect.flatMap((settlement) =>
Expand Down
13 changes: 12 additions & 1 deletion packages/core/src/tool/bash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { AppProcess } from "../process"
import { PermissionV2 } from "../permission"
import { PositiveInt } from "../schema"
import { ToolRegistry } from "./registry"
import { ShellProgress } from "./shell-progress"
import { Tool } from "./tool"
import { Tools } from "./tools"

Expand Down Expand Up @@ -163,11 +164,20 @@ const layer = Layer.effectDiscard(
forceKillAfter: Duration.seconds(3),
})
const timeout = input.timeout ?? DEFAULT_TIMEOUT_MS
const progress = ShellProgress.makeState()
const result = yield* appProcess
.run(command, {
combineOutput: true,
timeout: Duration.millis(timeout),
maxOutputBytes: MAX_CAPTURE_BYTES,
onOutputChunk: (chunk) => {
const snapshot = ShellProgress.observe(progress, Buffer.from(chunk).toString("utf8"))
if (!snapshot || !context.progress) return Effect.void
return context.progress({
structured: { ...snapshot, output: snapshot.frame },
content: [{ type: "text", text: snapshot.frame }],
})
},
})
.pipe(
Effect.catchTag("AppProcessError", (error) =>
Expand All @@ -183,7 +193,8 @@ const layer = Layer.effectDiscard(
}
}

const output = result.output?.toString("utf8") || "(no output)"
const cleaned = ShellProgress.cleanOutput(result.output?.toString("utf8") ?? "")
const output = cleaned.output || "(no output)"
const notice = result.outputTruncated
? "[output capture truncated at the in-memory safety limit]"
: undefined
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/tool/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export type ExecuteInput = {
readonly agent: AgentV2.ID
readonly assistantMessageID: SessionMessage.ID
readonly call: ToolCall
readonly progress?: (output: ToolOutput) => Effect.Effect<void>
}

export interface Interface {
Expand Down Expand Up @@ -64,6 +65,7 @@ const registryLayer = Layer.effect(
agent: input.agent,
assistantMessageID: input.assistantMessageID,
toolCallID: input.call.id,
progress: input.progress,
}).pipe(
Effect.map((output) => ({ output })),
Effect.catchTag("LLM.ToolFailure", (failure) =>
Expand Down
144 changes: 144 additions & 0 deletions packages/core/src/tool/shell-progress.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
export * as ShellProgress from "./shell-progress"

export type Snapshot = {
readonly kind: "shell.progress"
readonly label?: string
readonly percent?: number
readonly current?: number
readonly total?: number
readonly rate?: string
readonly eta?: string
readonly elapsed?: string
readonly frame: string
readonly frames: number
readonly done?: boolean
}

export type State = {
frames: number
latest?: Snapshot
lastPublishedAt: number
lastPublishedKey?: string
}

const ANSI = /\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])/g
const BAR = /[|┃█▓▒░#=*>-]{3,}/g

export function makeState(): State {
return { frames: 0, lastPublishedAt: 0 }
}

export function stripControl(input: string) {
return input.replace(ANSI, "").replace(/[\u0000-\u0008\u000b\u000c\u000e-\u001f\u007f]/g, "")
}

const numberValue = (value: string | undefined): number | undefined => {
if (!value) return undefined
const suffix = value.at(-1)?.toLowerCase()
const base = Number.parseFloat(/[km]$/i.test(value) ? value.slice(0, -1) : value)
if (!Number.isFinite(base)) return undefined
if (suffix === "k") return base * 1_000
if (suffix === "m") return base * 1_000_000
return base
}

export function parseFrame(input: string): Omit<Snapshot, "kind" | "frames"> | undefined {
const frame = stripControl(input).trim()
if (!frame) return undefined

const percentMatch = frame.match(/(^|\s)(100|\d{1,2})(?:\.\d+)?%/)
const ratioMatch = frame.match(/(\d+(?:\.\d+)?[kKmM]?)\s*\/\s*(\d+(?:\.\d+)?[kKmM]?)/)
const rateMatch = frame.match(/\b(\d+(?:\.\d+)?\s*(?:it|B|KB|MB|GB)\/s)\b/i)
const etaMatch = frame.match(/(?:ETA|eta)[:=\s]+([0-9:.]+)/) ?? frame.match(/<([0-9:.]+),\s*[^\]]+\]/)
const elapsedMatch = frame.match(/\[([0-9:.]+)<[0-9:.]+,/) ?? frame.match(/(?:elapsed|Elapsed)[:=\s]+([0-9:.]+)/)

if (!percentMatch && !ratioMatch && !rateMatch) return undefined
if (!frame.includes("\r") && !percentMatch && !ratioMatch) return undefined

const percent = percentMatch ? Math.min(100, Math.max(0, Number.parseFloat(percentMatch[2]))) : undefined
const current = numberValue(ratioMatch?.[1])
const total = numberValue(ratioMatch?.[2])
const labelPrefix = frame.split(percentMatch?.[0] ?? ratioMatch?.[0] ?? rateMatch?.[0] ?? "", 1)[0]
const label =
labelPrefix
?.replace(BAR, " ")
.trim()
.replace(/[:|]+$/g, "") || undefined

return {
...(label ? { label } : {}),
...(percent === undefined ? {} : { percent }),
...(current === undefined ? {} : { current }),
...(total === undefined ? {} : { total }),
...(rateMatch ? { rate: rateMatch[1].replace(/\s+/g, "") } : {}),
...(etaMatch ? { eta: etaMatch[1] } : {}),
...(elapsedMatch ? { elapsed: elapsedMatch[1] } : {}),
frame,
...(percent === 100 || (current !== undefined && total !== undefined && current >= total) ? { done: true } : {}),
}
}

const key = (snapshot: Snapshot) =>
[snapshot.label, snapshot.percent, snapshot.current, snapshot.total, snapshot.rate, snapshot.eta, snapshot.done].join(
"\0",
)

export function observe(state: State, chunk: string, now = Date.now(), intervalMs = 750): Snapshot | undefined {
let next: Snapshot | undefined
for (const segment of chunk.split(/[\r\n]+/)) {
const parsed = parseFrame(segment)
if (!parsed) continue
state.frames += 1
next = { kind: "shell.progress", frames: state.frames, ...parsed }
state.latest = next
}
if (!next || state.frames < 2) return undefined
const nextKey = key(next)
if (next.done || nextKey !== state.lastPublishedKey || now - state.lastPublishedAt >= intervalMs) {
state.lastPublishedAt = now
state.lastPublishedKey = nextKey
return next
}
return undefined
}

const summary = (snapshot: Snapshot) => {
const parts = ["progress:"]
if (snapshot.label) parts.push(snapshot.label)
if (snapshot.percent !== undefined) parts.push(`${snapshot.percent}%`)
if (snapshot.current !== undefined && snapshot.total !== undefined)
parts.push(`${snapshot.current}/${snapshot.total}`)
if (snapshot.rate) parts.push(snapshot.rate)
if (snapshot.eta && !snapshot.done) parts.push(`ETA ${snapshot.eta}`)
return `[${parts.join(" ")}]`
}

export function cleanOutput(input: string) {
const lines: string[] = []
let frames = 0
let latest: Snapshot | undefined

for (const line of stripControl(input).replace(/\r\n/g, "\n").split("\n")) {
if (!line.includes("\r")) {
lines.push(line)
continue
}

const kept: string[] = []
for (const segment of line.split(/\r+/)) {
const parsed = parseFrame(segment)
if (parsed) {
frames += 1
latest = { kind: "shell.progress", frames, ...parsed }
continue
}
if (segment.trim()) kept.push(segment)
}
lines.push(...kept)
}

const raw = lines.join("\n")
const text = raw.replace(/\n{3,}/g, "\n\n").trim()
if (!latest || frames < 2) return { output: raw, frames, latest }
return { output: [summary(latest), text].filter(Boolean).join("\n"), frames, latest }
}
1 change: 1 addition & 0 deletions packages/core/src/tool/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface Context {
readonly agent: AgentV2.ID
readonly assistantMessageID: SessionMessage.ID
readonly toolCallID: string
readonly progress?: (output: ToolOutput) => Effect.Effect<void>
}

export type SchemaType<A> = Schema.Codec<A, any, never, never>
Expand Down
50 changes: 48 additions & 2 deletions packages/core/test/tool-bash.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import path from "path"
import { describe, expect, test } from "bun:test"
import { Effect, Layer } from "effect"
import { ChildProcess } from "effect/unstable/process"
import { FSUtil } from "@opencode-ai/core/fs-util"
import { Config } from "@opencode-ai/core/config"
import { AppNodeBuilder } from "@opencode-ai/core/effect/app-node-builder"
import { LayerNode } from "@opencode-ai/core/effect/layer-node"
Expand All @@ -17,6 +16,7 @@ import { SessionV2 } from "@opencode-ai/core/session"
import { BashTool } from "@opencode-ai/core/tool/bash"
import { ToolRegistry } from "@opencode-ai/core/tool/registry"
import { ToolOutputStore } from "@opencode-ai/core/tool-output-store"
import type { ToolOutput } from "@opencode-ai/llm"
import { location } from "./fixture/location"
import { tmpdir } from "./fixture/tmpdir"
import { testEffect } from "./lib/effect"
Expand All @@ -42,6 +42,7 @@ let result: AppProcess.RunResult = {
stderrTruncated: false,
}
let runFailure: AppProcess.AppProcessError | undefined
let outputChunks: string[] = []
let afterPermission = (_input: PermissionV2.AssertInput): Effect.Effect<void> => Effect.void

const permission = Layer.succeed(
Expand All @@ -68,7 +69,9 @@ const appProcess = Layer.succeed(
Effect.suspend(() => {
if (command._tag !== "StandardCommand") throw new Error("expected standard command")
runs.push({ command: command.command, cwd: command.options.cwd, shell: command.options.shell, options })
return runFailure ? Effect.fail(runFailure) : Effect.succeed(result)
return Effect.forEach(outputChunks, (chunk) => options?.onOutputChunk?.(Buffer.from(chunk)) ?? Effect.void, {
discard: true,
}).pipe(Effect.andThen(runFailure ? Effect.fail(runFailure) : Effect.succeed(result)))
}),
} as unknown as AppProcess.Interface),
)
Expand All @@ -84,6 +87,7 @@ const reset = () => {
runs.length = 0
denyAction = undefined
runFailure = undefined
outputChunks = []
afterPermission = () => Effect.void
result = {
command: "mock",
Expand Down Expand Up @@ -198,6 +202,48 @@ describe("BashTool", () => {
),
)

it.live("publishes shell progress and cleans carriage-return frames from final output", () =>
Effect.acquireUseRelease(
Effect.promise(() => tmpdir()),
(tmp) => {
reset()
const raw =
"\r 0%| | 0/10 [00:00<?, ?it/s]\r 50%|█████ | 5/10 [00:01<00:01, 5.0it/s]\r100%|██████████| 10/10 [00:02<00:00, 5.0it/s]\ndone\n"
result = { ...result, output: Buffer.from(raw) }
outputChunks = [raw]
const progress: ToolOutput[] = []
return withTool(tmp.path, (registry) =>
registry.materialize().pipe(
Effect.flatMap((materialized) =>
materialized.settle({
...call({ command: "python train.py" }),
progress: (output) => Effect.sync(() => progress.push(output)),
}),
),
Effect.andThen((settled) =>
Effect.sync(() => {
expect(progress.at(-1)?.structured).toMatchObject({
kind: "shell.progress",
percent: 100,
done: true,
})
expect(settled.output?.content[0]).toMatchObject({
type: "text",
text: expect.stringContaining("[progress:"),
})
expect(settled.output?.content[0]).toMatchObject({
type: "text",
text: expect.not.stringContaining("█████"),
})
}),
),
),
)
},
(tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
),
)

it.live("rejects a workdir that stops being a directory during approval", () =>
Effect.acquireUseRelease(
Effect.promise(() => tmpdir()),
Expand Down
Loading
Loading