Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions apps/server/scripts/acp-mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const emitInterleavedAssistantToolCalls =
process.env.T3_ACP_EMIT_INTERLEAVED_ASSISTANT_TOOL_CALLS === "1";
const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1";
const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1";
const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1";
const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1";
const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1";
const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT;
Expand Down Expand Up @@ -556,6 +557,39 @@ const program = Effect.gen(function* () {
return { stopReason: "end_turn" };
}

if (emitXAiAskUserQuestion) {
const result = yield* agent.client.extRequest("_x.ai/ask_user_question", {
method: "x.ai/ask_user_question",
params: {
sessionId: requestedSessionId,
toolCallId: "ask-user-question-tool-call-1",
questions: [
{
question: "Which scope should Grok use?",
options: [
{ label: "Workspace", description: "Use the current workspace" },
{ label: "Session", description: "Only use this session" },
],
},
],
mode: "default",
},
});
if (
typeof result !== "object" ||
result === null ||
!("outcome" in result) ||
result.outcome !== "accepted" ||
!("answers" in result) ||
typeof result.answers !== "object" ||
result.answers === null
) {
throw new Error("Expected _x.ai/ask_user_question response outcome.");
}

return { stopReason: "end_turn" };
}

yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
Expand Down
61 changes: 61 additions & 0 deletions apps/server/src/provider/Layers/GrokAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,67 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => {
}),
);

it.effect("handles xAI ask_user_question extension requests", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-xai-ask-user-question");
const wrapperPath = yield* Effect.promise(() =>
makeMockGrokWrapper({ T3_ACP_EMIT_XAI_ASK_USER_QUESTION: "1" }),
);
const adapter = yield* makeTestAdapter(wrapperPath);
const requested =
yield* Deferred.make<Extract<ProviderRuntimeEvent, { type: "user-input.requested" }>>();
const resolved =
yield* Deferred.make<Extract<ProviderRuntimeEvent, { type: "user-input.resolved" }>>();

const eventsFiber = yield* Stream.runForEach(adapter.streamEvents, (event) => {
if (String(event.threadId) !== String(threadId)) {
return Effect.void;
}
if (event.type === "user-input.requested") {
return Deferred.succeed(requested, event).pipe(Effect.ignore);
}
if (event.type === "user-input.resolved") {
return Deferred.succeed(resolved, event).pipe(Effect.ignore);
}
return Effect.void;
}).pipe(Effect.forkChild);

yield* adapter.startSession({
threadId,
provider: ProviderDriverKind.make("grok"),
cwd: process.cwd(),
runtimeMode: "full-access",
});

const sendTurnFiber = yield* adapter
.sendTurn({ threadId, input: "ask before continuing", attachments: [] })
.pipe(Effect.forkChild);

const requestedEvent = yield* Deferred.await(requested);
assert.equal(requestedEvent.payload.questions.length, 1);
assert.equal(requestedEvent.payload.questions[0]?.id, "Which scope should Grok use?");
assert.equal(requestedEvent.payload.questions[0]?.question, "Which scope should Grok use?");
assert.equal(requestedEvent.raw?.method, "_x.ai/ask_user_question");

yield* adapter.respondToUserInput(
threadId,
ApprovalRequestId.make(String(requestedEvent.requestId)),
{
"Which scope should Grok use?": "Workspace",
},
);

const resolvedEvent = yield* Deferred.await(resolved);
assert.deepEqual(resolvedEvent.payload.answers, {
"Which scope should Grok use?": "Workspace",
});
yield* Fiber.join(sendTurnFiber);

yield* Fiber.interrupt(eventsFiber);
yield* adapter.stopSession(threadId);
}),
);

it.effect("continues streaming events when native notification logging fails", () =>
Effect.gen(function* () {
const threadId = ThreadId.make("grok-native-log-failure");
Expand Down
96 changes: 88 additions & 8 deletions apps/server/src/provider/Layers/GrokAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type ProviderApprovalDecision,
type ProviderRuntimeEvent,
type ProviderSession,
type ProviderUserInputAnswers,
ProviderDriverKind,
ProviderInstanceId,
RuntimeRequestId,
Expand Down Expand Up @@ -56,6 +57,11 @@ import {
makeGrokAcpRuntime,
resolveGrokAcpBaseModelId,
} from "../acp/GrokAcpSupport.ts";
import {
extractXAiAskUserQuestions,
makeXAiAskUserQuestionResponse,
XAiAskUserQuestionRequest,
} from "../acp/XAiAcpExtension.ts";
import { type GrokAdapterShape } from "../Services/GrokAdapter.ts";
import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts";

Expand All @@ -73,13 +79,17 @@ export interface GrokAdapterLiveOptions {
readonly environment?: NodeJS.ProcessEnv;
readonly nativeEventLogPath?: string;
readonly nativeEventLogger?: EventNdjsonLogger;
readonly instanceId?: typeof ProviderInstanceId.Type;
readonly instanceId?: ProviderInstanceId;
}

interface PendingApproval {
readonly decision: Deferred.Deferred<ProviderApprovalDecision>;
}

interface PendingUserInput {
readonly answers: Deferred.Deferred<ProviderUserInputAnswers>;
}

interface GrokSessionContext {
readonly threadId: ThreadId;
readonly acpSessionId: string;
Expand All @@ -88,6 +98,7 @@ interface GrokSessionContext {
readonly acp: AcpSessionRuntimeShape;
notificationFiber: Fiber.Fiber<void, never> | undefined;
readonly pendingApprovals: Map<ApprovalRequestId, PendingApproval>;
readonly pendingUserInputs: Map<ApprovalRequestId, PendingUserInput>;
turns: Array<{ id: TurnId; items: Array<unknown> }>;
lastPlanFingerprint: string | undefined;
activeTurnId: TurnId | undefined;
Expand All @@ -105,6 +116,16 @@ function settlePendingApprovalsAsCancelled(
);
}

function settlePendingUserInputsAsEmptyAnswers(
pendingUserInputs: ReadonlyMap<ApprovalRequestId, PendingUserInput>,
): Effect.Effect<void> {
return Effect.forEach(
Array.from(pendingUserInputs.values()),
(pending) => Deferred.succeed(pending.answers, {}).pipe(Effect.ignore),
{ discard: true },
);
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
Expand Down Expand Up @@ -287,6 +308,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
if (ctx.stopped) return;
ctx.stopped = true;
yield* settlePendingApprovalsAsCancelled(ctx.pendingApprovals);
yield* settlePendingUserInputsAsEmptyAnswers(ctx.pendingUserInputs);
if (ctx.notificationFiber) {
yield* Fiber.interrupt(ctx.notificationFiber);
}
Expand Down Expand Up @@ -329,6 +351,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
}

const pendingApprovals = new Map<ApprovalRequestId, PendingApproval>();
const pendingUserInputs = new Map<ApprovalRequestId, PendingUserInput>();
const sessionScope = yield* Scope.make("sequential");
let sessionScopeTransferred = false;
yield* Effect.addFinalizer(() =>
Expand Down Expand Up @@ -363,6 +386,53 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
),
);
const started = yield* Effect.gen(function* () {
yield* Effect.forEach(
["x.ai/ask_user_question", "_x.ai/ask_user_question"] as const,
(method) =>
acp.handleExtRequest(method, XAiAskUserQuestionRequest, (params) =>
mapAcpCallbackFailure(
Effect.gen(function* () {
yield* logNative(input.threadId, method, params);
const requestId = ApprovalRequestId.make(yield* randomUUIDv4);
const runtimeRequestId = RuntimeRequestId.make(requestId);
const answers = yield* Deferred.make<ProviderUserInputAnswers>();
pendingUserInputs.set(requestId, { answers });
yield* offerRuntimeEvent({
type: "user-input.requested",
...(yield* makeEventStamp()),
provider: PROVIDER,
threadId: input.threadId,
turnId: sessions.get(input.threadId)?.activeTurnId,
requestId: runtimeRequestId,
payload: { questions: extractXAiAskUserQuestions(params) },
raw: {
source: "acp.grok.extension",
method,
payload: params,
},
});
const resolved = yield* Deferred.await(answers);
pendingUserInputs.delete(requestId);
yield* offerRuntimeEvent({
type: "user-input.resolved",
...(yield* makeEventStamp()),
provider: PROVIDER,
threadId: input.threadId,
turnId: sessions.get(input.threadId)?.activeTurnId,
requestId: runtimeRequestId,
payload: { answers: resolved },
raw: {
source: "acp.grok.extension",
method,
payload: params,
},
});
return makeXAiAskUserQuestionResponse(resolved);
}),
),
),
{ discard: true },
);
yield* acp.handleRequestPermission((params) =>
mapAcpCallbackFailure(
Effect.gen(function* () {
Expand Down Expand Up @@ -470,6 +540,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
acp,
notificationFiber: undefined,
pendingApprovals,
pendingUserInputs,
turns: [],
lastPlanFingerprint: undefined,
activeTurnId: undefined,
Expand Down Expand Up @@ -733,6 +804,7 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
Effect.gen(function* () {
const ctx = yield* requireSession(threadId);
yield* settlePendingApprovalsAsCancelled(ctx.pendingApprovals);
yield* settlePendingUserInputsAsEmptyAnswers(ctx.pendingUserInputs);
yield* Effect.ignore(
ctx.acp.cancel.pipe(
Effect.mapError((error) =>
Expand Down Expand Up @@ -760,14 +832,22 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte
yield* Deferred.succeed(pending.decision, decision);
});

const respondToUserInput: GrokAdapterShape["respondToUserInput"] = (threadId, requestId) =>
const respondToUserInput: GrokAdapterShape["respondToUserInput"] = (
threadId,
requestId,
answers,
) =>
Effect.gen(function* () {
yield* requireSession(threadId);
return yield* new ProviderAdapterRequestError({
provider: PROVIDER,
method: "user-input/respond",
detail: `Grok has no pending user-input request: ${requestId}`,
});
const ctx = yield* requireSession(threadId);
const pending = ctx.pendingUserInputs.get(requestId);
if (!pending) {
return yield* new ProviderAdapterRequestError({
provider: PROVIDER,
method: "_x.ai/ask_user_question",
detail: `Unknown pending user-input request: ${requestId}`,
});
}
yield* Deferred.succeed(pending.answers, answers);
});

const readThread: GrokAdapterShape["readThread"] = (threadId) =>
Expand Down
Loading
Loading