Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/server/src/codexAppServerManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ describe("classifyCodexStderrLine", () => {
expect(classifyCodexStderrLine(line)).toBeNull();
});

it("ignores redacted AuthRequired transport shutdown noise", () => {
const line =
'2026-04-10T17:26:09.835934Z ERROR rmcp::transport::worker: worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError { www_authenticate_header: "Bearer [REDACTED]" })';
expect(classifyCodexStderrLine(line)).toBeNull();
});

it("keeps unknown structured errors", () => {
const line = "2026-02-08T04:24:20.085687Z ERROR codex_core::runtime: unrecoverable failure";
expect(classifyCodexStderrLine(line)).toEqual({
Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/codexAppServerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ const BENIGN_ERROR_LOG_SNIPPETS = [
"state db record_discrepancy: find_thread_path_by_id_str_in_subdir, falling_back",
];
const BENIGN_STDERR_MESSAGE_SNIPPETS = [
'worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError { www_authenticate_header: "Bearer error=\\"invalid_request\\", error_description=\\"No access token was provided in this request\\", resource_metadata=\\"https://mcp.supabase.com/.well-known/oauth-protected-resource/mcp\\""',
"worker quit with fatal: Transport channel closed, when AuthRequired(AuthRequiredError",
];
const RECOVERABLE_THREAD_RESUME_ERROR_SNIPPETS = [
"not found",
Expand Down
213 changes: 115 additions & 98 deletions apps/server/src/orchestration/Layers/ProjectionOverviewQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import {
toPersistenceSqlError,
type ProjectionRepositoryError,
} from "../../persistence/Errors.ts";
import { ProjectionOverviewQuery, type ProjectionOverviewQueryShape } from "../Services/ProjectionOverviewQuery.ts";
import {
ProjectionOverviewQuery,
type ProjectionOverviewQueryShape,
} from "../Services/ProjectionOverviewQuery.ts";
import { ProjectionState } from "../../persistence/Services/ProjectionState.ts";
import { ProjectionProject } from "../../persistence/Services/ProjectionProjects.ts";
import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts";
Expand Down Expand Up @@ -75,7 +78,9 @@ const ProjectionThreadPlanSummaryRow = Schema.Struct({
const ProjectionStateDbRowSchema = ProjectionState;
const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession;

function parseGithubRef(serialized: string | null): OrchestrationOverviewThread["githubRef"] | undefined {
function parseGithubRef(
serialized: string | null,
): OrchestrationOverviewThread["githubRef"] | undefined {
if (!serialized) return undefined;
try {
return JSON.parse(serialized) as OrchestrationOverviewThread["githubRef"];
Expand Down Expand Up @@ -153,13 +158,14 @@ function hasActionablePlan(
const matchingTurnPlan =
latestTurnId === null
? null
: [...plans]
: ([...plans]
.filter((plan) => plan.turnId === latestTurnId)
.toSorted(
(left, right) =>
left.updatedAt.localeCompare(right.updatedAt) || left.planId.localeCompare(right.planId),
left.updatedAt.localeCompare(right.updatedAt) ||
left.planId.localeCompare(right.planId),
)
.at(-1) ?? null;
.at(-1) ?? null);
if (matchingTurnPlan) {
return matchingTurnPlan.implementedAt === null;
}
Expand Down Expand Up @@ -315,109 +321,120 @@ const makeProjectionOverviewQuery = Effect.gen(function* () {
});

const getOverview: ProjectionOverviewQueryShape["getOverview"] = () =>
sql.withTransaction(
Effect.gen(function* () {
const [projectRows, threadRows, latestTurnRows, sessionRows, planRows, stateRows] =
yield* Effect.all([
listProjectRows(undefined),
listThreadRows(undefined),
listLatestTurnRows(undefined),
listSessionRows(undefined),
listPlanRows(undefined),
listProjectionStateRows(undefined),
]);
sql
.withTransaction(
Effect.gen(function* () {
const [projectRows, threadRows, latestTurnRows, sessionRows, planRows, stateRows] =
yield* Effect.all([
listProjectRows(undefined),
listThreadRows(undefined),
listLatestTurnRows(undefined),
listSessionRows(undefined),
listPlanRows(undefined),
listProjectionStateRows(undefined),
]);

const latestTurnByThread = new Map<string, OrchestrationLatestTurn>();
for (const row of latestTurnRows) {
if (!latestTurnByThread.has(row.threadId)) {
latestTurnByThread.set(row.threadId, toLatestTurn(row));
const latestTurnByThread = new Map<string, OrchestrationLatestTurn>();
for (const row of latestTurnRows) {
if (!latestTurnByThread.has(row.threadId)) {
latestTurnByThread.set(row.threadId, toLatestTurn(row));
}
}
}

const sessionByThread = new Map<string, OrchestrationSession>();
for (const row of sessionRows) {
sessionByThread.set(row.threadId, {
threadId: row.threadId,
status: row.status,
providerName: row.providerName,
runtimeMode: row.runtimeMode,
activeTurnId: row.activeTurnId,
lastError: row.lastError,
updatedAt: row.updatedAt,
});
}

const plansByThread = new Map<string, Array<Schema.Schema.Type<typeof ProjectionThreadPlanSummaryRow>>>();
for (const row of planRows) {
const plans = plansByThread.get(row.threadId) ?? [];
plans.push(row);
plansByThread.set(row.threadId, plans);
}
const sessionByThread = new Map<string, OrchestrationSession>();
for (const row of sessionRows) {
sessionByThread.set(row.threadId, {
threadId: row.threadId,
status: row.status,
providerName: row.providerName,
runtimeMode: row.runtimeMode,
activeTurnId: row.activeTurnId,
lastError: row.lastError,
updatedAt: row.updatedAt,
});
}

const projects: OrchestrationOverviewProject[] = projectRows.map((row) => ({
id: row.projectId,
title: row.title,
workspaceRoot: row.workspaceRoot,
defaultModel: row.defaultModel,
scripts: row.scripts,
activeThreadCount: row.activeThreadCount,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
}));
const plansByThread = new Map<
string,
Array<Schema.Schema.Type<typeof ProjectionThreadPlanSummaryRow>>
>();
for (const row of planRows) {
const plans = plansByThread.get(row.threadId) ?? [];
plans.push(row);
plansByThread.set(row.threadId, plans);
}

const threads: OrchestrationOverviewThread[] = threadRows.map((row) => {
const latestTurn = latestTurnByThread.get(row.threadId) ?? null;
return {
id: row.threadId,
projectId: row.projectId,
const projects: OrchestrationOverviewProject[] = projectRows.map((row) => ({
id: row.projectId,
title: row.title,
model: row.model,
runtimeMode: row.runtimeMode,
interactionMode: row.interactionMode,
branch: row.branch,
worktreePath: row.worktreePath,
...(parseGithubRef(row.githubRef) ? { githubRef: parseGithubRef(row.githubRef) } : {}),
latestTurn,
session: sessionByThread.get(row.threadId) ?? null,
workspaceRoot: row.workspaceRoot,
defaultModel: row.defaultModel,
scripts: row.scripts,
activeThreadCount: row.activeThreadCount,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
lastUserMessageAt: row.lastUserMessageAt,
pendingApprovalCount: row.pendingApprovalCount,
pendingUserInputCount: row.pendingUserInputCount,
hasActionablePlan: hasActionablePlan(plansByThread.get(row.threadId) ?? [], latestTurn),
};
});
}));

const updatedAtCandidates = [
...projectRows.map((row) => row.updatedAt),
...threadRows.map((row) => row.updatedAt),
...sessionRows.map((row) => row.updatedAt),
...stateRows.map((row) => row.updatedAt),
];
const threads: OrchestrationOverviewThread[] = threadRows.map((row) => {
const latestTurn = latestTurnByThread.get(row.threadId) ?? null;
return {
id: row.threadId,
projectId: row.projectId,
title: row.title,
model: row.model,
runtimeMode: row.runtimeMode,
interactionMode: row.interactionMode,
branch: row.branch,
worktreePath: row.worktreePath,
...(parseGithubRef(row.githubRef)
? { githubRef: parseGithubRef(row.githubRef) }
: {}),
latestTurn,
session: sessionByThread.get(row.threadId) ?? null,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
lastUserMessageAt: row.lastUserMessageAt,
pendingApprovalCount: row.pendingApprovalCount,
pendingUserInputCount: row.pendingUserInputCount,
hasActionablePlan: hasActionablePlan(
plansByThread.get(row.threadId) ?? [],
latestTurn,
),
};
});

return Schema.decodeUnknownSync(OrchestrationOverviewSnapshot)({
snapshotSequence: computeSnapshotSequence(stateRows),
limits: {
maxProjects: MAX_PROJECTS,
maxThreadsPerProject: MAX_THREADS_PER_PROJECT,
},
projects,
threads,
updatedAt:
updatedAtCandidates.sort((left, right) => (left < right ? 1 : left > right ? -1 : 0))[0] ??
new Date(0).toISOString(),
});
}),
).pipe(
Effect.mapError((cause): ProjectionRepositoryError => {
if (Schema.isSchemaError(cause)) {
return toPersistenceDecodeError("ProjectionOverviewQuery.getOverview:decode")(cause);
}
return isPersistenceError(cause)
? cause
: toPersistenceSqlError("ProjectionOverviewQuery.getOverview:query")(cause);
}),
);
const updatedAtCandidates = [
...projectRows.map((row) => row.updatedAt),
...threadRows.map((row) => row.updatedAt),
...sessionRows.map((row) => row.updatedAt),
...stateRows.map((row) => row.updatedAt),
];

return Schema.decodeUnknownSync(OrchestrationOverviewSnapshot)({
snapshotSequence: computeSnapshotSequence(stateRows),
limits: {
maxProjects: MAX_PROJECTS,
maxThreadsPerProject: MAX_THREADS_PER_PROJECT,
},
projects,
threads,
updatedAt:
updatedAtCandidates.sort((left, right) =>
left < right ? 1 : left > right ? -1 : 0,
)[0] ?? new Date(0).toISOString(),
});
}),
)
.pipe(
Effect.mapError((cause): ProjectionRepositoryError => {
if (Schema.isSchemaError(cause)) {
return toPersistenceDecodeError("ProjectionOverviewQuery.getOverview:decode")(cause);
}
return isPersistenceError(cause)
? cause
: toPersistenceSqlError("ProjectionOverviewQuery.getOverview:query")(cause);
}),
);

return { getOverview } satisfies ProjectionOverviewQueryShape;
});
Expand Down
Loading
Loading