Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ During normal plan review, an Archive sidebar tab provides the same browsing via
| `/api/external-annotations` | POST | Add external annotations (single or batch `{ annotations: [...] }`) |
| `/api/external-annotations` | PATCH | Update fields on a single annotation (`?id=`) |
| `/api/external-annotations` | DELETE | Remove by `?id=`, `?source=`, or clear all |
| `/api/agents/capabilities` | GET | Check available agent providers (claude, codex, tour) |
| `/api/agents/capabilities` | GET | Check available agent providers (claude, codex, tour, cursor, opencode) |
| `/api/agents/review-profiles` | GET | List launchable review profiles (enabled skills + builtin default) |
| `/api/agents/skills` | GET | List all discovered skills for the add-a-review picker (each flagged `enabled`) |
| `/api/agents/review-skills` | POST | Enable a skill as a review (body: `{ name }`); writes `review-skills.json` |
Expand Down
157 changes: 127 additions & 30 deletions apps/pi-extension/server/agent-jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
*/

import type { IncomingMessage, ServerResponse } from "node:http";
import { spawn, execFileSync, type ChildProcess } from "node:child_process";
import { spawn, execFileSync, execFile, type ChildProcess } from "node:child_process";
import { promisify } from "node:util";

const execFileAsync = promisify(execFile);
import {
type AgentJobInfo,
type AgentJobEvent,
Expand All @@ -21,6 +24,12 @@ import {
AGENT_HEARTBEAT_INTERVAL_MS,
} from "../generated/agent-jobs.js";
import { formatClaudeLogEvent } from "../generated/claude-review.js";
import {
MARKER_ENGINES,
formatMarkerLogEvent,
type MarkerEngine,
type MarkerModel,
} from "../generated/marker-review.js";
import { json, parseBody } from "./helpers.js";

// ---------------------------------------------------------------------------
Expand All @@ -32,6 +41,16 @@ const JOBS = `${BASE}/jobs`;
const JOBS_STREAM = `${JOBS}/stream`;
const CAPABILITIES = `${BASE}/capabilities`;

// Providers whose command is owned by the server. Client-supplied argv is never
// spawned for these — buildCommand must produce the command or the launch fails.
const SERVER_BUILT_PROVIDERS: ReadonlySet<string> = new Set([
"claude",
"codex",
"tour",
"cursor",
"opencode",
]);

// ---------------------------------------------------------------------------
// which() helper for Node.js
// ---------------------------------------------------------------------------
Expand All @@ -54,7 +73,7 @@ export interface AgentJobHandlerOptions {
mode: "plan" | "review" | "annotate";
getServerUrl: () => string;
getCwd: () => string;
/** Server-side command builder for known providers (codex, claude, tour). */
/** Build the command server-side for a given provider. */
buildCommand?: (provider: string, config?: Record<string, unknown>) => Promise<{
command: string[];
outputPath?: string;
Expand Down Expand Up @@ -88,6 +107,27 @@ export interface AgentJobHandlerOptions {
onJobComplete?: (job: AgentJobInfo, meta: { outputPath?: string; stdout?: string; cwd?: string }) => void | Promise<void>;
}

/**
* Best-effort model catalog for a marker engine, spawned once. The spawn lives
* HERE (per-runtime — child_process execFile) rather than in marker-review.ts,
* which must stay Bun-free for the Pi vendor build. ASYNC so it never blocks the
* event loop on the /capabilities request path (a slow/hanging CLI would otherwise
* freeze every other in-flight request for up to the timeout). Empty when discovery
* fails or the CLI is unauthenticated / has no providers configured — the UI falls
* back to the engine's default picker. Account/config-specific, so never hardcoded.
*/
async function discoverMarkerModels(engine: MarkerEngine): Promise<MarkerModel[]> {
try {
const { stdout } = await execFileAsync(engine.binary, engine.modelsArgv, {
timeout: 5000,
encoding: "utf8",
});
return engine.parseModels(stdout);
} catch {
return [];
}
}

export function createAgentJobHandler(options: AgentJobHandlerOptions) {
const { mode, getServerUrl, getCwd } = options;

Expand All @@ -103,11 +143,32 @@ export function createAgentJobHandler(options: AgentJobHandlerOptions) {
{ id: "codex", name: "Codex CLI", available: whichCmd("codex") },
{ id: "tour", name: "Code Tour", available: whichCmd("claude") || whichCmd("codex") },
];
const capabilitiesResponse: AgentCapabilities = {
mode,
providers: capabilities,
available: capabilities.some((c) => c.available),
};
// Marker engines (Cursor, OpenCode) — same shape, one loop. Available only in
// review mode when the binary is on PATH (NOTE: cursor's binary is `agent`).
// Model catalogs are discovered LAZILY (see buildCapabilitiesResponse) so a
// slow/unauthenticated `<binary> models` spawn never blocks startup.
for (const engine of Object.values(MARKER_ENGINES)) {
capabilities.push({
id: engine.id,
name: engine.name,
available: mode === "review" && whichCmd(engine.binary),
});
}

const markerModelsCache = new Map<string, MarkerModel[]>();
async function buildCapabilitiesResponse(): Promise<AgentCapabilities> {
const providers = await Promise.all(capabilities.map(async (c) => {
const engine = MARKER_ENGINES[c.id as "cursor" | "opencode"];
if (!engine || !c.available) return c;
let models = markerModelsCache.get(engine.id);
if (!models) {
models = await discoverMarkerModels(engine);
markerModelsCache.set(engine.id, models);
}
return { ...c, models };
}));
return { mode, providers, available: providers.some((p) => p.available) };
}

// --- SSE broadcasting ---
function broadcast(event: AgentJobEvent): void {
Expand Down Expand Up @@ -190,31 +251,47 @@ export function createAgentJobHandler(options: AgentJobHandlerOptions) {
if (spawnOptions?.cwd) jobOutputPaths.set(`${id}:cwd`, spawnOptions.cwd);
broadcast({ type: "job:started", job: { ...info } });

// --- Stdout capture (Claude JSONL streaming) ---
// --- Stdout capture (Claude/Cursor stream-json) ---
let stdoutBuf = "";
if (captureStdout && proc.stdout) {
// Format one complete JSONL line into a live-log delta (skip result
// events — handled in onJobComplete).
const emitLogLine = (line: string) => {
if (!line.trim()) return;
// Tour jobs with the Claude engine also stream Claude JSONL.
if (provider === "claude" || spawnOptions?.engine === "claude") {
const formatted = formatClaudeLogEvent(line);
if (formatted !== null) broadcast({ type: "job:log", jobId: id, delta: formatted + '\n' });
return;
}
// Marker engines (Cursor, OpenCode): map their NDJSON stream events
// into readable log deltas via the engine's own formatter (Cursor
// applies the partial-output dedup rule; OpenCode reads text parts).
const markerEngine = MARKER_ENGINES[provider as "cursor" | "opencode"];
if (markerEngine) {
const formatted = formatMarkerLogEvent(line, markerEngine);
if (formatted !== null) broadcast({ type: "job:log", jobId: id, delta: formatted + '\n' });
return;
}
try {
const event = JSON.parse(line);
if (event.type === 'result') return;
} catch { /* not JSON — forward as raw log */ }
broadcast({ type: "job:log", jobId: id, delta: line + '\n' });
};
// stream-json output is NDJSON and chunk boundaries are arbitrary —
// carry the trailing partial line until a later chunk completes it,
// otherwise records split across chunks are dropped from live logs.
let logLineCarry = "";
proc.stdout.on("data", (chunk: Buffer) => {
const text = chunk.toString();
stdoutBuf += text;

// Forward JSONL lines as log events
const lines = text.split('\n');
for (const line of lines) {
if (!line.trim()) continue;
// Tour jobs with the Claude engine also stream Claude JSONL.
if (provider === "claude" || spawnOptions?.engine === "claude") {
const formatted = formatClaudeLogEvent(line);
if (formatted !== null) {
broadcast({ type: "job:log", jobId: id, delta: formatted + '\n' });
}
continue;
}
try {
const event = JSON.parse(line);
if (event.type === 'result') continue;
} catch { /* not JSON — forward as raw log */ }
broadcast({ type: "job:log", jobId: id, delta: line + '\n' });
}
const lines = (logLineCarry + text).split('\n');
logLineCarry = lines.pop() ?? "";
for (const line of lines) emitLogLine(line);
});
proc.stdout.on("end", () => {
if (logLineCarry) emitLogLine(logLineCarry);
});
}

Expand Down Expand Up @@ -272,8 +349,15 @@ export function createAgentJobHandler(options: AgentJobHandlerOptions) {
stdout: captureStdout ? stdoutBuf : undefined,
cwd: jobCwd,
});
} catch {
// Result ingestion failure shouldn't prevent job completion broadcast
} catch (err) {
// Claude/Codex are fail-open; Cursor and OpenCode are fail-closed — an
// unexpected throw during prompt-enforced ingestion must fail the job,
// not pass it. (Their handlers normally fail by mutation and never
// throw; this guards future refactors.)
if (MARKER_ENGINES[provider as "cursor" | "opencode"]) {
entry.info.status = "failed";
entry.info.error = err instanceof Error ? err.message : `${provider} result ingestion failed`;
}
}
}
jobOutputPaths.delete(id);
Expand Down Expand Up @@ -350,7 +434,7 @@ export function createAgentJobHandler(options: AgentJobHandlerOptions) {
): Promise<boolean> {
// --- GET /api/agents/capabilities ---
if (url.pathname === CAPABILITIES && req.method === "GET") {
json(res, capabilitiesResponse);
json(res, await buildCapabilitiesResponse());
return true;
}

Expand Down Expand Up @@ -440,6 +524,19 @@ export function createAgentJobHandler(options: AgentJobHandlerOptions) {
return true;
}

// Fail-closed enforcement for server-owned providers: the command MUST
// be built server-side. Client-supplied argv is never spawned for these
// providers — a null/throwing builder becomes an error, not a fallback.
if (SERVER_BUILT_PROVIDERS.has(provider)) {
if (!options.buildCommand) {
json(res, { error: `Provider ${provider} requires server-built command` }, 400);
return true;
}
// Discard any client-supplied argv so a null build cleanly hits the
// `command.length === 0` guard below instead of falling through.
command = [];
}

// Try server-side command building for known providers
let captureStdout = false;
let stdinPrompt: string | undefined;
Expand Down
80 changes: 79 additions & 1 deletion apps/pi-extension/server/serverReview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ import {
transformClaudeFindings,
} from "../generated/claude-review.js";
import { createTourSession, TOUR_EMPTY_OUTPUT_ERROR } from "../generated/tour-review.js";
import {
MARKER_ENGINES,
composeMarkerReviewPrompt,
buildMarkerCommand,
parseMarkerStreamOutput,
transformMarkerFindings,
makeMarkerNonce,
extractMarkerNonce,
} from "../generated/marker-review.js";
import {
WorkspaceReviewSession,
type WorkspaceDiffType,
Expand Down Expand Up @@ -588,6 +597,24 @@ export async function startReviewServer(options: {
return { command, stdinPrompt, prompt, cwd, label: jobLabel, captureStdout: true, model, effort, prUrl: launchPrUrl, diffScope: launchDiffScope, diffContext, reviewProfileId: reviewProfile.id, reviewProfileLabel: reviewProfile.label };
}

// Marker engines (Cursor, OpenCode) — one branch, same shape as Claude.
// Neither CLI has a schema flag, so composeMarkerReviewPrompt ALWAYS
// appends the marker-block output contract (even for a custom profile —
// it's the only thing that makes their prose output parseable). The
// engine's buildArgv passes the prompt as the trailing positional arg and
// threads the spawn cwd (--workspace for Cursor, --dir for OpenCode).
// captureStdout is required: the marker block comes back on stdout NDJSON.
const markerEngine = MARKER_ENGINES[provider as "cursor" | "opencode"];
if (markerEngine) {
const model = typeof config?.model === "string" && config.model ? config.model : undefined;
// Per-job nonce embedded in the marker contract; recovered from job.prompt
// at parse time so echoed/quoted bare tags can't be mistaken for the payload.
const nonce = makeMarkerNonce();
const prompt = composeMarkerReviewPrompt(reviewProfile, userMessage, nonce);
const { command } = buildMarkerCommand(markerEngine, prompt, model, cwd);
return { command, prompt, cwd, label: jobLabel, captureStdout: true, model, prUrl: launchPrUrl, diffScope: launchDiffScope, diffContext, reviewProfileId: reviewProfile.id, reviewProfileLabel: reviewProfile.label };
}

return null;
},

Expand All @@ -612,7 +639,7 @@ export async function startReviewServer(options: {
// Map findings onto annotations and ingest. Shared by both engine branches;
// no-ops on an empty set so a clean (zero-finding) review stays "done".
const ingest = <T extends object>(transformed: readonly T[], logTag: string) => {
if (transformed.length === 0) return;
if (transformed.length === 0) return undefined;
const annotations = transformed.map((a) => ({
...a,
...jobPrContext,
Expand All @@ -621,6 +648,7 @@ export async function startReviewServer(options: {
}));
const result = externalAnnotations.addAnnotations({ annotations });
if ("error" in result) console.error(`[${logTag}] addAnnotations error:`, result.error);
return result;
};

if (job.provider === "codex") {
Expand Down Expand Up @@ -682,6 +710,56 @@ export async function startReviewServer(options: {
return;
}

// --- Marker path (Cursor, OpenCode) ---
// FAIL-CLOSED: marker output is prompt-enforced (no schema flag), so any
// missing/malformed/schema/transform/insertion failure must MUTATE the job
// to failed — NEVER throw (agent-jobs.ts swallows throws, silently leaving
// an exit-0 job marked done). Mirrors the Tour fail-closed pattern below.
// Findings carry nullable file/line, classified into line/whole-file/
// general by transformMarkerFindings — nothing is dropped (same as Claude).
const markerEngine = MARKER_ENGINES[job.provider as "cursor" | "opencode"];
if (markerEngine) {
// Recover the per-job nonce embedded in the prompt; without it no block
// can be trusted, so parse fails closed below.
const nonce = extractMarkerNonce(job.prompt ?? "");
const output = nonce && meta.stdout ? parseMarkerStreamOutput(meta.stdout, markerEngine, nonce) : null;
if (!output) {
job.status = "failed";
job.error = `${markerEngine.author} review output missing or unparseable (no valid marker JSON).`;
return;
}

// Derive the verdict from finding severities (like Claude) rather than
// trusting the model's free-form `correctness` string. Marker engines
// have no schema flag, so a model value like "not correct" would be
// stored verbatim and the detail panel (any string containing "correct"
// except "incorrect" → green) would invert the displayed result.
const hasImportant = output.findings.some((f) => f.severity === "important");
job.summary = {
correctness: hasImportant ? "Issues Found" : "Correct",
explanation: output.summary.explanation,
confidence: output.summary.confidence,
};

// Reuse the shared ingest() decoration; add a fail-closed check on result.
const result = ingest(
transformMarkerFindings(
output.findings,
job.source,
markerEngine.author,
cwd,
workspace ? (filePath) => workspace.normalizeAnnotationPath(filePath) : undefined,
),
`${markerEngine.id}-review`,
);
if (result && "error" in result) {
job.status = "failed";
job.error = `${markerEngine.author} annotation insertion failed: ${result.error}`;
return;
}
return;
}

if (job.provider === "tour") {
const { summary } = await tour.onJobComplete({ job, meta });
if (summary) {
Expand Down
2 changes: 1 addition & 1 deletion apps/pi-extension/vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ for f in feedback-templates prompts review-core diff-paths cli-pagination jj-cor
done

# Vendor review agent modules from packages/server/ — rewrite imports for generated/ layout
for f in agent-review-message codex-review claude-review path-utils review-skill-loader; do
for f in agent-review-message codex-review claude-review review-findings marker-review path-utils review-skill-loader; do
src="../../packages/server/$f.ts"
printf '// @generated — DO NOT EDIT. Source: packages/server/%s.ts\n' "$f" | cat - "$src" \
| sed 's|from "./vcs"|from "./review-core.js"|' \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ function ProviderPill({ provider, engine, model }: { provider: string; engine?:
const engineLabel = engine === 'codex' ? 'Codex' : 'Claude';
label = model && engine !== 'codex' ? `Tour · ${engineLabel} ${model.charAt(0).toUpperCase() + model.slice(1)}` : `Tour · ${engineLabel}`;
} else {
label = provider === 'claude' ? 'Claude' : provider === 'codex' ? 'Codex' : 'Shell';
label = provider === 'claude' ? 'Claude' : provider === 'codex' ? 'Codex' : provider === 'cursor' ? 'Cursor' : provider === 'opencode' ? 'OpenCode' : 'Shell';
}
return (
<span className={`text-[10px] font-semibold uppercase tracking-wider px-1.5 py-0.5 rounded ${
Expand Down
Loading