Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/routes/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ const sessionHistoryQuerySchema = z.object({
status: z.string().optional(),
ownerKeyId: z.string().optional(),
});

/**
* Returns true when the caller is NOT a master key and NOT an admin.
* Used to gate ownership-based session filtering.
*/
function isNonAdminCaller(keyId: string | null | undefined, role: string | undefined): boolean {
return keyId !== 'master' && keyId !== null && keyId !== undefined && role !== 'admin';
}
/**
* Register all session-related REST routes on the Fastify instance.
*
* Provides: CRUD, listing, pagination, batch operations, health checks,
* event replay, and ACP event schema for session resources.
*/
export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext): void {
const {
sessions, auth, quotas, metrics, monitor, eventBus, channels,
Expand Down Expand Up @@ -164,7 +178,7 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
let history = Array.from(historyMap.values());
const callerKeyId = req.authKeyId;
const callerRole = getRequestRole(auth, req);
if (callerKeyId !== 'master' && callerKeyId !== null && callerKeyId !== undefined && callerRole !== 'admin') {
if (isNonAdminCaller(callerKeyId, callerRole)) {
history = history.filter(h => !h.ownerKeyId || h.ownerKeyId === callerKeyId);
}
if (ownerFilter) {
Expand Down Expand Up @@ -210,7 +224,7 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
let all = sessions.listSessions();
const callerKeyId = req.authKeyId;
const callerRole = getRequestRole(auth, req);
if (callerKeyId !== 'master' && callerKeyId !== null && callerKeyId !== undefined && callerRole !== 'admin') {
if (isNonAdminCaller(callerKeyId, callerRole)) {
all = all.filter(s => !s.ownerKeyId || s.ownerKeyId === callerKeyId);
}
// Issue #1944: Tenant scoping
Expand Down Expand Up @@ -239,7 +253,7 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
let all = sessions.listSessions();
const callerKeyId = req.authKeyId;
const callerRole = getRequestRole(auth, req);
if (callerKeyId !== 'master' && callerKeyId !== null && callerKeyId !== undefined && callerRole !== 'admin') {
if (isNonAdminCaller(callerKeyId, callerRole)) {
all = all.filter(s => !s.ownerKeyId || s.ownerKeyId === callerKeyId);
}
// Issue #1944: Tenant scoping
Expand Down Expand Up @@ -313,13 +327,13 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
return reply.status(200).send({ deleted, notFound, errors });
}));

// Backwards compat: /sessions (no prefix) returns raw array
/** @deprecated Use GET /v1/sessions instead. Kept for backward compatibility with pre-v1 clients. */
app.get('/sessions', async (req, reply) => {
if (!requireRole(auth, req, reply, 'admin', 'operator', 'viewer')) return;
let all = sessions.listSessions();
const callerKeyId = req.authKeyId;
const callerRole = getRequestRole(auth, req);
if (callerKeyId !== 'master' && callerKeyId !== null && callerKeyId !== undefined && callerRole !== 'admin') {
if (isNonAdminCaller(callerKeyId, callerRole)) {
all = all.filter(s => !s.ownerKeyId || s.ownerKeyId === callerKeyId);
}
// Issue #1944: Tenant scoping
Expand All @@ -328,7 +342,13 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
return all.map(s => redactSession(s as unknown as Record<string, unknown>));
});

// Create session (Issue #607: reuse idle session for same workDir)
/**
* Core handler for POST /v1/sessions.
*
* Creates a new session (or reuses an idle one for the same workDir when
* not in ACP mode). Validates workDir, enforces quotas, checks CC version,
* delivers the initial prompt, and emits audit/channel events.
*/
async function createSessionHandler(req: FastifyRequest, reply: FastifyReply, data: z.infer<typeof createSessionSchema>): Promise<unknown> {
if (!requirePermission(auth, req, reply, 'create')) return;
const { workDir, prompt, prd, resumeSessionId, claudeCommand, env, stallThresholdMs, permissionMode, autoApprove, parentId, memoryKeys, model, systemPrompt } = data;
Expand Down Expand Up @@ -424,7 +444,8 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
} catch (e) {
const auditLogger = getAuditLogger();
if (auditLogger) void auditLogger.log(resolveRequestAuditActor(auth, req, 'system'), 'session.acp.failed', `ACP runtime failed to start for workDir ${safeWorkDir}: ${(e as Error).message}`, undefined, req.tenantId);
return reply.status(500).send({ error: 'ACP runtime failed to start', details: (e as Error).message });
const acpErr = e instanceof Error ? e.message : String(e);
return reply.status(500).send({ error: 'ACP runtime failed to start — check claude CLI availability and ACP configuration', details: acpErr });
}
try {
session = await sessions.createSession({ id: acpResult.session.id, workDir: safeWorkDir, name, prd, resumeSessionId, claudeCommand, env: env as Record<string, string> | undefined, stallThresholdMs, permissionMode, autoApprove, parentId, ownerKeyId: req.authKeyId, tenantId: req.tenantId, model });
Expand Down Expand Up @@ -553,12 +574,13 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
await Promise.all(allSessions.map(async (s) => {
try {
results[s.id] = await sessions.getHealth(s.id);
} catch {
} catch (e) {
const errMsg = e instanceof Error ? e.message : 'Unknown error';
results[s.id] = {
alive: false, claudeRunning: false,
status: 'unknown', hasTranscript: false,
lastActivity: 0, lastActivityAgo: 0, sessionAge: 0,
details: 'Error fetching health',
details: `Health check failed: ${errMsg}`,
};
}
}));
Expand Down Expand Up @@ -601,7 +623,8 @@ export function registerSessionRoutes(app: FastifyInstance, ctx: RouteContext):
count: records.length,
};
} catch (e: unknown) {
return reply.status(500).send({ error: e instanceof Error ? e.message : String(e) });
const msg = e instanceof Error ? e.message : String(e);
return reply.status(500).send({ error: 'Failed to query event store', details: msg });
}
}));

Expand Down
52 changes: 52 additions & 0 deletions src/services/acp/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,19 @@ export class AcpBackend {
this.clientCapabilities = options.clientCapabilities ?? {};
}

/**
* Create a new ACP session: durable record → child process → initialize → session/new.
* @throws {AcpBackendLifecycleError} on handshake or session/new failure
*/
async createSession(input: AcpBackendCreateSessionInput): Promise<AcpBackendStartResult> {
const session = await this.sessionService.createSession(toCreateSessionInput(input));
return this.startNewRuntime(session, input.cwd, input.mcpServers, input.systemPrompt);
}

/**
* Resume an existing ACP session by spawning a fresh child process and calling session/resume.
* Requires an existing acpAgentSessionId on the session record.
*/
async resumeSession(input: AcpBackendResumeSessionInput): Promise<AcpBackendStartResult> {
const session = await this.sessionService.getSession(input.sessionId, scopeFromInput(input));
if (!session.acpAgentSessionId) {
Expand All @@ -296,6 +304,10 @@ export class AcpBackend {
return this.startResumeRuntime(session, input.cwd);
}

/**
* Load an existing ACP session into a fresh runtime, calling session/load
* to restore context. Used when reconnecting to a previously active session.
*/
async loadSession(input: AcpBackendLoadSessionInput): Promise<AcpBackendStartResult> {
const session = await this.sessionService.getSession(input.sessionId, scopeFromInput(input));
if (!session.acpAgentSessionId) {
Expand All @@ -306,6 +318,10 @@ export class AcpBackend {
return this.startLoadRuntime(session, input.cwd, input.mcpServers);
}

/**
* Send session/cancel to the ACP agent for the given session.
* The agent decides how to handle cancellation (stop current work, rollback, etc).
*/
async cancelSession(input: AcpBackendCancelSessionInput): Promise<AcpBackendCancelResult> {
const scope = scopeFromInput(input);
const session = await this.sessionService.getSession(input.sessionId, scope);
Expand All @@ -325,6 +341,10 @@ export class AcpBackend {
};
}

/**
* Approve a pending permission request from the ACP agent.
* Responds with the 'allow-once' option to the pending approval.
*/
async approveSession(input: AcpBackendApprovalInput): Promise<AcpBackendApprovalResult> {
const runtime = this.requireRuntime(input.sessionId);
await runtime.client.respond(input.approvalId, {
Expand All @@ -339,6 +359,10 @@ export class AcpBackend {
};
}

/**
* Reject a pending permission request from the ACP agent.
* Responds with the 'reject-once' option to the pending approval.
*/
async rejectSession(input: AcpBackendApprovalInput): Promise<AcpBackendApprovalResult> {
const runtime = this.requireRuntime(input.sessionId);
await runtime.client.respond(input.approvalId, {
Expand All @@ -353,10 +377,12 @@ export class AcpBackend {
};
}

/** Return the pending permission approval for a session, or null. */
getPendingApproval(sessionId: string): AcpPendingApproval | null {
return this.pendingApprovals.get(sessionId) ?? null;
}

/** Return the ACP client and agent capabilities for an active runtime, or undefined. */
getRuntime(sessionId: string): { client: AcpBackendClient; agentCapabilities?: AcpJsonValue } | undefined {
const runtime = this.runtimes.get(sessionId);
if (!runtime) return undefined;
Expand Down Expand Up @@ -415,6 +441,10 @@ export class AcpBackend {
}
}

/**
* Claim the driver seat for a session. Only one driver is allowed at a time.
* @throws {AcpBackendLifecycleError} if a driver is already claimed
*/
async claimDriver(input: AcpBackendClaimDriverInput): Promise<AcpBackendDriverResult> {
const scope = scopeFromInput(input);
await this.sessionService.getSession(input.sessionId, scope);
Expand All @@ -433,6 +463,10 @@ export class AcpBackend {
return { sessionId: input.sessionId, holderId: input.holderId, role: 'driver', fence, ttlMs: input.ttlMs };
}

/**
* Release the driver seat. The caller must be the current driver.
* @throws {AcpBackendLifecycleError} if not the current driver
*/
async releaseDriver(input: AcpBackendReleaseDriverInput): Promise<AcpBackendDriverResult> {
const scope = scopeFromInput(input);
await this.sessionService.getSession(input.sessionId, scope);
Expand All @@ -445,6 +479,10 @@ export class AcpBackend {
return { sessionId: input.sessionId, holderId: null, role: 'observer' };
}

/**
* Transfer the driver seat to another subscriber. The current driver's
* fence is incremented.
*/
async transferDriver(input: AcpBackendTransferDriverInput): Promise<AcpBackendDriverResult> {
const scope = scopeFromInput(input);
await this.sessionService.getSession(input.sessionId, scope);
Expand All @@ -458,6 +496,7 @@ export class AcpBackend {
return { sessionId: input.sessionId, holderId: input.targetSubscriberId, role: 'driver', fence };
}

/** Return current driver, observers, and active count for a session. */
getParticipants(sessionId: string, _scope: AcpSessionScope): AcpBackendParticipantsResult {
return (
this.participants.get(sessionId) ?? {
Expand All @@ -469,6 +508,10 @@ export class AcpBackend {
);
}

/**
* Dispatch an action from the action queue to the appropriate ACP runtime method.
* Handles: close, prompt, approve, reject, cancel. Throws for unimplemented types.
*/
async dispatchAction(action: AcpActionRecord): Promise<AcpBackendDispatchActionResult> {
if (action.actionType === 'close') {
const result = await this.shutdownSession(action);
Expand Down Expand Up @@ -507,6 +550,11 @@ export class AcpBackend {
}
}

/**
* Gracefully shut down an ACP runtime: transitions status, sends session/close,
* kills the child process, and cleans up internal state.
* No-op if no runtime exists for the session.
*/
async shutdownSession(input: AcpBackendShutdownSessionInput): Promise<AcpBackendShutdownResult> {
const scope = scopeFromInput(input);
const session = await this.sessionService.getSession(input.sessionId, scope);
Expand All @@ -520,6 +568,10 @@ export class AcpBackend {
return runtime.cleanupPromise;
}

/**
* Restart an ACP session: kill existing runtime, create fresh child process,
* and call session/resume. Includes configurable backoff delay.
*/
async restartSession(input: AcpBackendRestartSessionInput): Promise<AcpBackendRestartResult> {
const scope = scopeFromInput(input);
const verified = await this.sessionService.getSession(input.sessionId, scope);
Expand Down
Loading