diff --git a/apps/server/scripts/acp-mock-agent.ts b/apps/server/scripts/acp-mock-agent.ts index 040c1ab6f4e..bc547bc2140 100644 --- a/apps/server/scripts/acp-mock-agent.ts +++ b/apps/server/scripts/acp-mock-agent.ts @@ -18,6 +18,7 @@ const emitInterleavedAssistantToolCalls = process.env.T3_ACP_EMIT_INTERLEAVED_ASSISTANT_TOOL_CALLS === "1"; const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1"; const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1"; +const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1"; const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1"; const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1"; const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT; @@ -556,6 +557,39 @@ const program = Effect.gen(function* () { return { stopReason: "end_turn" }; } + if (emitXAiAskUserQuestion) { + const result = yield* agent.client.extRequest("_x.ai/ask_user_question", { + method: "x.ai/ask_user_question", + params: { + sessionId: requestedSessionId, + toolCallId: "ask-user-question-tool-call-1", + questions: [ + { + question: "Which scope should Grok use?", + options: [ + { label: "Workspace", description: "Use the current workspace" }, + { label: "Session", description: "Only use this session" }, + ], + }, + ], + mode: "default", + }, + }); + if ( + typeof result !== "object" || + result === null || + !("outcome" in result) || + result.outcome !== "accepted" || + !("answers" in result) || + typeof result.answers !== "object" || + result.answers === null + ) { + throw new Error("Expected _x.ai/ask_user_question response outcome."); + } + + return { stopReason: "end_turn" }; + } + yield* agent.client.sessionUpdate({ sessionId: requestedSessionId, update: { diff --git a/apps/server/src/provider/Layers/GrokAdapter.test.ts b/apps/server/src/provider/Layers/GrokAdapter.test.ts index c8c8fadbb2c..1d8d2da498d 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.test.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.test.ts @@ -274,6 +274,67 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => { }), ); + it.effect("handles xAI ask_user_question extension requests", () => + Effect.gen(function* () { + const threadId = ThreadId.make("grok-xai-ask-user-question"); + const wrapperPath = yield* Effect.promise(() => + makeMockGrokWrapper({ T3_ACP_EMIT_XAI_ASK_USER_QUESTION: "1" }), + ); + const adapter = yield* makeTestAdapter(wrapperPath); + const requested = + yield* Deferred.make>(); + const resolved = + yield* Deferred.make>(); + + const eventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => { + if (String(event.threadId) !== String(threadId)) { + return Effect.void; + } + if (event.type === "user-input.requested") { + return Deferred.succeed(requested, event).pipe(Effect.ignore); + } + if (event.type === "user-input.resolved") { + return Deferred.succeed(resolved, event).pipe(Effect.ignore); + } + return Effect.void; + }).pipe(Effect.forkChild); + + yield* adapter.startSession({ + threadId, + provider: ProviderDriverKind.make("grok"), + cwd: process.cwd(), + runtimeMode: "full-access", + }); + + const sendTurnFiber = yield* adapter + .sendTurn({ threadId, input: "ask before continuing", attachments: [] }) + .pipe(Effect.forkChild); + + const requestedEvent = yield* Deferred.await(requested); + assert.equal(requestedEvent.payload.questions.length, 1); + assert.equal(requestedEvent.payload.questions[0]?.id, "Which scope should Grok use?"); + assert.equal(requestedEvent.payload.questions[0]?.question, "Which scope should Grok use?"); + assert.equal(requestedEvent.raw?.method, "_x.ai/ask_user_question"); + + yield* adapter.respondToUserInput( + threadId, + ApprovalRequestId.make(String(requestedEvent.requestId)), + { + "Which scope should Grok use?": "Workspace", + }, + ); + + const resolvedEvent = yield* Deferred.await(resolved); + assert.deepEqual(resolvedEvent.payload.answers, { + "Which scope should Grok use?": "Workspace", + }); + yield* Fiber.join(sendTurnFiber); + + yield* Fiber.interrupt(eventsFiber); + yield* adapter.stopSession(threadId); + }), + ); + it.effect("continues streaming events when native notification logging fails", () => Effect.gen(function* () { const threadId = ThreadId.make("grok-native-log-failure"); diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index 753566e5987..d788fdf6c73 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -5,6 +5,7 @@ import { type ProviderApprovalDecision, type ProviderRuntimeEvent, type ProviderSession, + type ProviderUserInputAnswers, ProviderDriverKind, ProviderInstanceId, RuntimeRequestId, @@ -56,6 +57,11 @@ import { makeGrokAcpRuntime, resolveGrokAcpBaseModelId, } from "../acp/GrokAcpSupport.ts"; +import { + extractXAiAskUserQuestions, + makeXAiAskUserQuestionResponse, + XAiAskUserQuestionRequest, +} from "../acp/XAiAcpExtension.ts"; import { type GrokAdapterShape } from "../Services/GrokAdapter.ts"; import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; @@ -73,13 +79,17 @@ export interface GrokAdapterLiveOptions { readonly environment?: NodeJS.ProcessEnv; readonly nativeEventLogPath?: string; readonly nativeEventLogger?: EventNdjsonLogger; - readonly instanceId?: typeof ProviderInstanceId.Type; + readonly instanceId?: ProviderInstanceId; } interface PendingApproval { readonly decision: Deferred.Deferred; } +interface PendingUserInput { + readonly answers: Deferred.Deferred; +} + interface GrokSessionContext { readonly threadId: ThreadId; readonly acpSessionId: string; @@ -88,6 +98,7 @@ interface GrokSessionContext { readonly acp: AcpSessionRuntimeShape; notificationFiber: Fiber.Fiber | undefined; readonly pendingApprovals: Map; + readonly pendingUserInputs: Map; turns: Array<{ id: TurnId; items: Array }>; lastPlanFingerprint: string | undefined; activeTurnId: TurnId | undefined; @@ -105,6 +116,16 @@ function settlePendingApprovalsAsCancelled( ); } +function settlePendingUserInputsAsEmptyAnswers( + pendingUserInputs: ReadonlyMap, +): Effect.Effect { + return Effect.forEach( + Array.from(pendingUserInputs.values()), + (pending) => Deferred.succeed(pending.answers, {}).pipe(Effect.ignore), + { discard: true }, + ); +} + function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null && !Array.isArray(value); } @@ -287,6 +308,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte if (ctx.stopped) return; ctx.stopped = true; yield* settlePendingApprovalsAsCancelled(ctx.pendingApprovals); + yield* settlePendingUserInputsAsEmptyAnswers(ctx.pendingUserInputs); if (ctx.notificationFiber) { yield* Fiber.interrupt(ctx.notificationFiber); } @@ -329,6 +351,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte } const pendingApprovals = new Map(); + const pendingUserInputs = new Map(); const sessionScope = yield* Scope.make("sequential"); let sessionScopeTransferred = false; yield* Effect.addFinalizer(() => @@ -363,6 +386,53 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte ), ); const started = yield* Effect.gen(function* () { + yield* Effect.forEach( + ["x.ai/ask_user_question", "_x.ai/ask_user_question"] as const, + (method) => + acp.handleExtRequest(method, XAiAskUserQuestionRequest, (params) => + mapAcpCallbackFailure( + Effect.gen(function* () { + yield* logNative(input.threadId, method, params); + const requestId = ApprovalRequestId.make(yield* randomUUIDv4); + const runtimeRequestId = RuntimeRequestId.make(requestId); + const answers = yield* Deferred.make(); + pendingUserInputs.set(requestId, { answers }); + yield* offerRuntimeEvent({ + type: "user-input.requested", + ...(yield* makeEventStamp()), + provider: PROVIDER, + threadId: input.threadId, + turnId: sessions.get(input.threadId)?.activeTurnId, + requestId: runtimeRequestId, + payload: { questions: extractXAiAskUserQuestions(params) }, + raw: { + source: "acp.grok.extension", + method, + payload: params, + }, + }); + const resolved = yield* Deferred.await(answers); + pendingUserInputs.delete(requestId); + yield* offerRuntimeEvent({ + type: "user-input.resolved", + ...(yield* makeEventStamp()), + provider: PROVIDER, + threadId: input.threadId, + turnId: sessions.get(input.threadId)?.activeTurnId, + requestId: runtimeRequestId, + payload: { answers: resolved }, + raw: { + source: "acp.grok.extension", + method, + payload: params, + }, + }); + return makeXAiAskUserQuestionResponse(resolved); + }), + ), + ), + { discard: true }, + ); yield* acp.handleRequestPermission((params) => mapAcpCallbackFailure( Effect.gen(function* () { @@ -470,6 +540,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte acp, notificationFiber: undefined, pendingApprovals, + pendingUserInputs, turns: [], lastPlanFingerprint: undefined, activeTurnId: undefined, @@ -733,6 +804,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte Effect.gen(function* () { const ctx = yield* requireSession(threadId); yield* settlePendingApprovalsAsCancelled(ctx.pendingApprovals); + yield* settlePendingUserInputsAsEmptyAnswers(ctx.pendingUserInputs); yield* Effect.ignore( ctx.acp.cancel.pipe( Effect.mapError((error) => @@ -760,14 +832,22 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte yield* Deferred.succeed(pending.decision, decision); }); - const respondToUserInput: GrokAdapterShape["respondToUserInput"] = (threadId, requestId) => + const respondToUserInput: GrokAdapterShape["respondToUserInput"] = ( + threadId, + requestId, + answers, + ) => Effect.gen(function* () { - yield* requireSession(threadId); - return yield* new ProviderAdapterRequestError({ - provider: PROVIDER, - method: "user-input/respond", - detail: `Grok has no pending user-input request: ${requestId}`, - }); + const ctx = yield* requireSession(threadId); + const pending = ctx.pendingUserInputs.get(requestId); + if (!pending) { + return yield* new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "_x.ai/ask_user_question", + detail: `Unknown pending user-input request: ${requestId}`, + }); + } + yield* Deferred.succeed(pending.answers, answers); }); const readThread: GrokAdapterShape["readThread"] = (threadId) => diff --git a/apps/server/src/provider/acp/XAiAcpExtension.ts b/apps/server/src/provider/acp/XAiAcpExtension.ts new file mode 100644 index 00000000000..5cbf5327783 --- /dev/null +++ b/apps/server/src/provider/acp/XAiAcpExtension.ts @@ -0,0 +1,159 @@ +import type { ProviderUserInputAnswers, UserInputQuestion } from "@t3tools/contracts"; +import * as Schema from "effect/Schema"; + +export const XAiAskUserQuestionRequest = Schema.Unknown; + +type UnknownRecord = Record; + +function trimmed(value: string | undefined): string | undefined { + const text = value?.trim(); + return text && text.length > 0 ? text : undefined; +} + +function isRecord(value: unknown): value is UnknownRecord { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function stringField(record: UnknownRecord, keys: ReadonlyArray): string | undefined { + for (const key of keys) { + const value = record[key]; + if (typeof value === "string") { + const text = trimmed(value); + if (text) { + return text; + } + } + } + return undefined; +} + +function booleanField(record: UnknownRecord, keys: ReadonlyArray): boolean | undefined { + for (const key of keys) { + const value = record[key]; + if (typeof value === "boolean") { + return value; + } + } + return undefined; +} + +function arrayField(record: UnknownRecord, keys: ReadonlyArray): ReadonlyArray { + for (const key of keys) { + const value = record[key]; + if (Array.isArray(value)) { + return value; + } + } + return []; +} + +function nestedRecord( + record: UnknownRecord, + keys: ReadonlyArray, +): UnknownRecord | undefined { + for (const key of keys) { + const value = record[key]; + if (isRecord(value)) { + return value; + } + } + return undefined; +} + +function unwrapParams(params: unknown): UnknownRecord { + if (!isRecord(params)) { + return {}; + } + const request = nestedRecord(params, ["request"]); + const requestInput = request ? nestedRecord(request, ["input", "arguments", "args"]) : undefined; + return nestedRecord(params, ["input", "arguments", "args", "params"]) ?? requestInput ?? params; +} + +function extractOptionLabel(option: unknown): string | undefined { + return typeof option === "string" + ? trimmed(option) + : isRecord(option) + ? stringField(option, ["label", "value", "id", "text", "title", "name"]) + : undefined; +} + +function extractOptions(options: ReadonlyArray) { + const extracted = (options ?? []).flatMap((option) => { + const label = extractOptionLabel(option); + if (!label) { + return []; + } + const description = + typeof option === "string" + ? label + : isRecord(option) + ? (stringField(option, ["description", "detail", "subtitle"]) ?? label) + : label; + return [{ label, description }]; + }); + return extracted.length > 0 ? extracted : [{ label: "OK", description: "Continue" }]; +} + +function extractQuestion( + question: unknown, + fallbackTitle: string | undefined, + index: number, +): UserInputQuestion { + const record = isRecord(question) ? question : {}; + const nestedQuestion = nestedRecord(record, ["question"]); + const questionSource = nestedQuestion ?? record; + const questionText = + (typeof question === "string" ? trimmed(question) : undefined) ?? + stringField(questionSource, ["question", "prompt", "text", "content", "message"]) ?? + fallbackTitle ?? + `Question ${index + 1}`; + const id = stringField(questionSource, ["id", "questionId", "key"]) ?? questionText; + return { + id, + header: + stringField(questionSource, ["header", "title", "label"]) ?? fallbackTitle ?? "Question", + question: questionText, + multiSelect: + booleanField(questionSource, ["multiSelect", "allowMultiple", "allow_multiple"]) === true, + options: extractOptions(arrayField(questionSource, ["options", "choices", "answers"])), + }; +} + +export function extractXAiAskUserQuestions(params: unknown): ReadonlyArray { + const root = unwrapParams(params); + const title = stringField(root, ["title", "header", "toolTitle"]); + const questions = arrayField(root, ["questions", "items", "prompts"]); + if (questions.length > 0) { + return questions.map((question, index) => extractQuestion(question, title, index)); + } + const singleQuestion = nestedRecord(root, ["question"]) ?? root; + const singleQuestionOptions = arrayField(root, ["options", "choices", "answers"]); + const question = + singleQuestion === root || singleQuestionOptions.length === 0 + ? singleQuestion + : { ...singleQuestion, options: singleQuestionOptions }; + return [extractQuestion(question, title, 0)]; +} + +function answerValues(answer: unknown): ReadonlyArray { + if (Array.isArray(answer)) { + return answer.flatMap((entry) => { + const text = typeof entry === "string" ? trimmed(entry) : undefined; + return text ? [text] : []; + }); + } + const text = typeof answer === "string" ? trimmed(answer) : undefined; + return text ? [text] : []; +} + +export function makeXAiAskUserQuestionResponse(answers: ProviderUserInputAnswers): { + readonly outcome: "accepted"; + readonly answers: Record>; +} { + return { + outcome: "accepted", + answers: Object.fromEntries( + Object.entries(answers).map(([questionId, answer]) => [questionId, answerValues(answer)]), + ), + }; +}