Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ProjectionCheckpointRepository } from "../src/persistence/Services/Proj
import { ProjectionPendingApprovalRepository } from "../src/persistence/Services/ProjectionPendingApprovals.ts";
import { makeAdapterRegistryMock } from "../src/provider/testUtils/providerAdapterRegistryMock.ts";
import { ProviderAdapterRegistry } from "../src/provider/Services/ProviderAdapterRegistry.ts";
import { makeProviderRegistryLayer } from "../src/provider/testUtils/providerRegistryMock.ts";
import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSessionDirectory.ts";
import { ServerSettingsService } from "../src/serverSettings.ts";
import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts";
Expand Down Expand Up @@ -293,6 +294,7 @@ export const makeOrchestrationIntegrationHarness = (
Layer.provide(AnalyticsService.layerTest),
Layer.provide(providerEventLoggersLayer),
);
const providerRegistryLayer = makeProviderRegistryLayer();

const checkpointStoreLayer = CheckpointStoreLive.pipe(Layer.provide(VcsDriverRegistry.layer));
const projectionSnapshotQueryLayer = OrchestrationProjectionSnapshotQueryLive;
Expand Down Expand Up @@ -375,6 +377,7 @@ export const makeOrchestrationIntegrationHarness = (
const layer = Layer.empty.pipe(
Layer.provideMerge(runtimeServicesLayer),
Layer.provideMerge(orchestrationReactorLayer),
Layer.provideMerge(providerRegistryLayer),
Layer.provide(persistenceLayer),
Layer.provideMerge(RepositoryIdentityResolverLive),
Layer.provideMerge(ServerSettingsService.layerTest()),
Expand Down
86 changes: 83 additions & 3 deletions apps/server/scripts/acp-mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ const emitInterleavedAssistantToolCalls =
process.env.T3_ACP_EMIT_INTERLEAVED_ASSISTANT_TOOL_CALLS === "1";
const emitGenericToolPlaceholders = process.env.T3_ACP_EMIT_GENERIC_TOOL_PLACEHOLDERS === "1";
const emitAskQuestion = process.env.T3_ACP_EMIT_ASK_QUESTION === "1";
const emitXAiAskUserQuestion = process.env.T3_ACP_EMIT_XAI_ASK_USER_QUESTION === "1";
const failSetConfigOption = process.env.T3_ACP_FAIL_SET_CONFIG_OPTION === "1";
const exitOnSetConfigOption = process.env.T3_ACP_EXIT_ON_SET_CONFIG_OPTION === "1";
const promptResponseText = process.env.T3_ACP_PROMPT_RESPONSE_TEXT;
const permissionOptionIds = {
allowOnce: process.env.T3_ACP_ALLOW_ONCE_OPTION_ID ?? "allow-once",
allowAlways: process.env.T3_ACP_ALLOW_ALWAYS_OPTION_ID ?? "allow-always",
rejectOnce: process.env.T3_ACP_REJECT_ONCE_OPTION_ID ?? "reject-once",
};
const sessionId = "mock-session-1";

let currentModeId = "ask";
Expand Down Expand Up @@ -237,6 +243,21 @@ function modeState(): AcpSchema.SessionModeState {
};
}

const grokAcpModels: ReadonlyArray<AcpSchema.ModelInfo> = [
{ modelId: "grok-build", name: "Grok Build" },
{ modelId: "grok-mock-alt", name: "Grok Mock Alt" },
];

function modelState(): AcpSchema.SessionModelState {
const modelId = grokAcpModels.some((model) => model.modelId === currentModelId)
? currentModelId
: "grok-build";
return {
currentModelId: modelId,
availableModels: grokAcpModels,
};
}

const program = Effect.gen(function* () {
const agent = yield* EffectAcpAgent.AcpAgent;

Expand All @@ -257,6 +278,7 @@ const program = Effect.gen(function* () {
Effect.succeed({
sessionId,
modes: modeState(),
models: modelState(),
configOptions: configOptions(),
}),
);
Expand All @@ -273,11 +295,28 @@ const program = Effect.gen(function* () {
.pipe(
Effect.as({
modes: modeState(),
models: modelState(),
configOptions: configOptions(),
}),
),
);

yield* agent.handleSetSessionModel((request) =>
Effect.gen(function* () {
if (!grokAcpModels.some((model) => model.modelId === request.modelId)) {
return yield* AcpError.AcpRequestError.invalidParams(
`Unknown mock model id: ${request.modelId}`,
{
method: "session/set_model",
params: request,
},
);
}
currentModelId = request.modelId;
return {};
}),
);

yield* agent.handleSetSessionConfigOption((request) =>
Effect.gen(function* () {
if (exitOnSetConfigOption) {
Expand Down Expand Up @@ -419,9 +458,13 @@ const program = Effect.gen(function* () {
],
},
options: [
{ optionId: "allow-once", name: "Allow once", kind: "allow_once" },
{ optionId: "allow-always", name: "Allow always", kind: "allow_always" },
{ optionId: "reject-once", name: "Reject", kind: "reject_once" },
{ optionId: permissionOptionIds.allowOnce, name: "Allow once", kind: "allow_once" },
{
optionId: permissionOptionIds.allowAlways,
name: "Allow always",
kind: "allow_always",
},
{ optionId: permissionOptionIds.rejectOnce, name: "Reject", kind: "reject_once" },
],
});

Expand Down Expand Up @@ -514,6 +557,43 @@ const program = Effect.gen(function* () {
return { stopReason: "end_turn" };
}

if (emitXAiAskUserQuestion) {
const result = yield* agent.client.extRequest("_x.ai/ask_user_question", {
method: "x.ai/ask_user_question",
params: {
sessionId: requestedSessionId,
toolCallId: "ask-user-question-tool-call-1",
questions: [
{
question: "Which scope should Grok use?",
multiSelect: null,
options: [
{ label: "Workspace", description: "Use the current workspace" },
{ label: "Session", description: "Only use this session" },
],
},
],
mode: "default",
},
});
if (typeof result !== "object" || result === null || !("outcome" in result)) {
throw new Error("Expected _x.ai/ask_user_question response outcome.");
}
if (result.outcome === "cancelled") {
return { stopReason: "end_turn" };
}
if (
result.outcome !== "accepted" ||
!("answers" in result) ||
typeof result.answers !== "object" ||
result.answers === null
) {
throw new Error("Expected accepted _x.ai/ask_user_question response answers.");
}

return { stopReason: "end_turn" };
}

yield* agent.client.sessionUpdate({
sessionId: requestedSessionId,
update: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
ProviderService,
type ProviderServiceShape,
} from "../../provider/Services/ProviderService.ts";
import { makeProviderRegistryLayer } from "../../provider/testUtils/providerRegistryMock.ts";
import { TextGeneration, type TextGenerationShape } from "../../textGeneration/TextGeneration.ts";
import { RepositoryIdentityResolverLive } from "../../project/Layers/RepositoryIdentityResolver.ts";
import { OrchestrationEngineLive } from "./OrchestrationEngine.ts";
Expand Down Expand Up @@ -142,6 +143,7 @@ describe("ProviderCommandReactor", () => {
readonly baseDir?: string;
readonly threadModelSelection?: ModelSelection;
readonly sessionModelSwitch?: "unsupported" | "in-session";
readonly requiresNewThreadForModelChange?: boolean;
}) {
const now = "2026-01-01T00:00:00.000Z";
const baseDir = input?.baseDir ?? fs.mkdtempSync(path.join(os.tmpdir(), "t3code-reactor-"));
Expand Down Expand Up @@ -280,6 +282,14 @@ describe("ProviderCommandReactor", () => {
}),
),
);
const providerSnapshots = [
{
instanceId: modelSelection.instanceId,
...(input?.requiresNewThreadForModelChange === true
? { requiresNewThreadForModelChange: true }
: {}),
},
];

const unsupported = () => Effect.die(new Error("Unsupported provider call in test")) as never;
const service: ProviderServiceShape = {
Expand Down Expand Up @@ -335,6 +345,7 @@ describe("ProviderCommandReactor", () => {
Layer.provideMerge(orchestrationLayer),
Layer.provideMerge(projectionSnapshotLayer),
Layer.provideMerge(Layer.succeed(ProviderService, service)),
Layer.provideMerge(makeProviderRegistryLayer(providerSnapshots as never)),
Layer.provideMerge(
Layer.mock(GitWorkflowService)({
renameBranch,
Expand Down Expand Up @@ -879,6 +890,71 @@ describe("ProviderCommandReactor", () => {
});
});

it("rejects changing models after start when the provider requires a new thread", async () => {
const harness = await createHarness({ requiresNewThreadForModelChange: true });
const now = "2026-01-01T00:00:00.000Z";

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-1"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-1"),
role: "user",
text: "first",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.sendTurn.mock.calls.length === 1);

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.make("cmd-turn-start-restricted-2"),
threadId: ThreadId.make("thread-1"),
message: {
messageId: asMessageId("user-message-restricted-2"),
role: "user",
text: "second",
attachments: [],
},
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5.1-codex",
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(async () => {
const readModel = await harness.readModel();
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
return (
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
false
);
});

expect(harness.sendTurn).toHaveBeenCalledTimes(1);
const readModel = await harness.readModel();
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
expect(
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
).toMatchObject({
payload: {
detail: expect.stringContaining("cannot switch models after the conversation has started"),
},
});
});

it("starts a first turn on the requested provider instance even when it differs from the thread model", async () => {
const harness = await createHarness({
threadModelSelection: { instanceId: ProviderInstanceId.make("codex"), model: "gpt-5-codex" },
Expand Down
48 changes: 48 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderCommandReactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ProviderAdapterRequestError } from "../../provider/Errors.ts";
import type { ProviderServiceError } from "../../provider/Errors.ts";
import { TextGeneration } from "../../textGeneration/TextGeneration.ts";
import { ProviderService } from "../../provider/Services/ProviderService.ts";
import { ProviderRegistry } from "../../provider/Services/ProviderRegistry.ts";
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
import {
Expand Down Expand Up @@ -180,6 +181,7 @@ const make = Effect.gen(function* () {
const orchestrationEngine = yield* OrchestrationEngineService;
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
const providerService = yield* ProviderService;
const providerRegistry = yield* ProviderRegistry;
const gitWorkflow = yield* GitWorkflowService;
const vcsStatusBroadcaster = yield* VcsStatusBroadcaster;
const textGeneration = yield* TextGeneration;
Expand Down Expand Up @@ -305,6 +307,38 @@ const make = Effect.gen(function* () {
.pipe(Effect.map(Option.getOrUndefined));
});

const rejectStartedThreadModelChangeIfRequired = Effect.fnUntraced(function* (input: {
readonly threadId: ThreadId;
readonly currentModelSelection: ModelSelection;
readonly requestedModelSelection: ModelSelection | undefined;
}) {
const requestedModelSelection = input.requestedModelSelection;
if (
requestedModelSelection === undefined ||
(input.currentModelSelection.instanceId === requestedModelSelection.instanceId &&
input.currentModelSelection.model === requestedModelSelection.model)
) {
return;
}
Comment thread
Jaaneek marked this conversation as resolved.
const providers = yield* providerRegistry.getProviders;
const requiresNewThread =
providers.find((snapshot) => snapshot.instanceId === input.currentModelSelection.instanceId)
?.requiresNewThreadForModelChange === true ||
providers.find((snapshot) => snapshot.instanceId === requestedModelSelection.instanceId)
?.requiresNewThreadForModelChange === true;
if (!requiresNewThread) {
return;
}
return yield* new ProviderAdapterRequestError({
provider: providerErrorLabelFromInstanceHint({
instanceId: String(requestedModelSelection.instanceId),
modelSelectionInstanceId: String(input.currentModelSelection.instanceId),
}),
method: "thread.turn.start",
detail: `Thread '${input.threadId}' cannot switch models after the conversation has started. Start a new thread to use '${requestedModelSelection.model}'.`,
});
});

const ensureSessionForThread = Effect.fn("ensureSessionForThread")(function* (
threadId: ThreadId,
createdAt: string,
Expand Down Expand Up @@ -384,6 +418,20 @@ const make = Effect.gen(function* () {
});
}
const preferredProvider: ProviderDriverKind = desiredDriverKind;
if (thread.session !== null) {
yield* rejectStartedThreadModelChangeIfRequired({
threadId,
currentModelSelection:
activeSession?.model !== undefined
? {
...thread.modelSelection,
instanceId: currentInstanceId,
model: activeSession.model,
}
: thread.modelSelection,
requestedModelSelection,
});
}
if (
thread.session !== null &&
requestedModelSelection !== undefined &&
Expand Down
Loading
Loading