Skip to content

Commit ca1a3fc

Browse files
author
catlog22
committed
fix(orchestrator): resolve high-priority issues from code review
1. Race condition fix: Removed frontend direct lockSession call in useOrchestratorExecution.ts - session locking now handled purely via backend WebSocket broadcast (CLI_SESSION_LOCKED) 2. WebSocket handlers: Added handleSessionLockedMessage and handleSessionUnlockedMessage to sessionManagerStore.ts 3. useWebSocket integration: Added case handlers for CLI_SESSION_LOCKED and CLI_SESSION_UNLOCKED messages 4. API input validation: Added validation for sessionConfig, stepTimeout, and errorStrategy in execute-in-session endpoint 5. Fixed wsBroadcast reference: Changed to broadcastToClients from context
1 parent b2c1d32 commit ca1a3fc

3 files changed

Lines changed: 122 additions & 23 deletions

File tree

ccw/frontend/src/hooks/useWebSocket.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import { useExecutionStore } from '@/stores/executionStore';
99
import { useFlowStore } from '@/stores';
1010
import { useCliStreamStore } from '@/stores/cliStreamStore';
1111
import { useCliSessionStore } from '@/stores/cliSessionStore';
12+
import {
13+
handleSessionLockedMessage,
14+
handleSessionUnlockedMessage,
15+
} from '@/stores/sessionManagerStore';
1216
import {
1317
OrchestratorMessageSchema,
1418
type OrchestratorWebSocketMessage,
@@ -212,6 +216,30 @@ export function useWebSocket(options: UseWebSocketOptions = {}): UseWebSocketRet
212216
break;
213217
}
214218

219+
case 'CLI_SESSION_LOCKED': {
220+
const { sessionKey, reason, executionId, timestamp } = data.payload ?? {};
221+
if (typeof sessionKey === 'string') {
222+
handleSessionLockedMessage({
223+
sessionKey,
224+
reason: reason ?? 'Workflow execution',
225+
executionId,
226+
timestamp: timestamp ?? new Date().toISOString(),
227+
});
228+
}
229+
break;
230+
}
231+
232+
case 'CLI_SESSION_UNLOCKED': {
233+
const { sessionKey, timestamp } = data.payload ?? {};
234+
if (typeof sessionKey === 'string') {
235+
handleSessionUnlockedMessage({
236+
sessionKey,
237+
timestamp: timestamp ?? new Date().toISOString(),
238+
});
239+
}
240+
break;
241+
}
242+
215243
case 'CLI_OUTPUT': {
216244
const { executionId, chunkType, data: outputData, unit } = data.payload;
217245

ccw/frontend/src/stores/sessionManagerStore.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,50 @@ const initialState: SessionManagerState = {
3535
/** Module-level worker reference. Worker objects are not serializable. */
3636
let _workerRef: Worker | null = null;
3737

38+
// ========== WebSocket Session Lock Message Handler ==========
39+
40+
/**
41+
* Handle CLI_SESSION_LOCKED WebSocket message from backend.
42+
* Updates session metadata to reflect locked state.
43+
*/
44+
export function handleSessionLockedMessage(payload: {
45+
sessionKey: string;
46+
reason: string;
47+
executionId?: string;
48+
timestamp: string;
49+
}): void {
50+
const store = useSessionManagerStore.getState();
51+
store.updateTerminalMeta(payload.sessionKey, {
52+
status: 'locked',
53+
isLocked: true,
54+
lockReason: payload.reason,
55+
lockedByExecutionId: payload.executionId,
56+
lockedAt: payload.timestamp,
57+
});
58+
}
59+
60+
/**
61+
* Handle CLI_SESSION_UNLOCKED WebSocket message from backend.
62+
* Updates session metadata to reflect unlocked state.
63+
*/
64+
export function handleSessionUnlockedMessage(payload: {
65+
sessionKey: string;
66+
timestamp: string;
67+
}): void {
68+
const store = useSessionManagerStore.getState();
69+
const existing = store.terminalMetas[payload.sessionKey];
70+
// Only unlock if currently locked
71+
if (existing?.isLocked) {
72+
store.updateTerminalMeta(payload.sessionKey, {
73+
status: 'active',
74+
isLocked: false,
75+
lockReason: undefined,
76+
lockedByExecutionId: undefined,
77+
lockedAt: undefined,
78+
});
79+
}
80+
}
81+
3882
// ========== Worker Message Handler ==========
3983

4084
function _handleWorkerMessage(event: MessageEvent<MonitorAlert>): void {

ccw/src/core/routes/orchestrator-routes.ts

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,37 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
13071307
errorStrategy?: 'pause' | 'skip' | 'stop';
13081308
};
13091309

1310+
// Input validation
1311+
const validTools = ['claude', 'gemini', 'qwen', 'codex', 'opencode'];
1312+
const validShells = ['bash', 'pwsh', 'cmd'];
1313+
const validErrorStrategies = ['pause', 'skip', 'stop'];
1314+
1315+
if (sessionConfig) {
1316+
if (sessionConfig.tool && !validTools.includes(sessionConfig.tool)) {
1317+
return { success: false, error: `Invalid tool. Must be one of: ${validTools.join(', ')}`, status: 400 };
1318+
}
1319+
if (sessionConfig.preferredShell && !validShells.includes(sessionConfig.preferredShell)) {
1320+
return { success: false, error: `Invalid preferredShell. Must be one of: ${validShells.join(', ')}`, status: 400 };
1321+
}
1322+
if (sessionConfig.model && typeof sessionConfig.model !== 'string') {
1323+
return { success: false, error: 'model must be a string', status: 400 };
1324+
}
1325+
}
1326+
1327+
if (inputVariables && typeof inputVariables !== 'object') {
1328+
return { success: false, error: 'variables must be an object', status: 400 };
1329+
}
1330+
1331+
if (stepTimeout !== undefined) {
1332+
if (typeof stepTimeout !== 'number' || stepTimeout < 1000 || stepTimeout > 3600000) {
1333+
return { success: false, error: 'stepTimeout must be a number between 1000 and 3600000 (ms)', status: 400 };
1334+
}
1335+
}
1336+
1337+
if (!validErrorStrategies.includes(errorStrategy)) {
1338+
return { success: false, error: `Invalid errorStrategy. Must be one of: ${validErrorStrategies.join(', ')}`, status: 400 };
1339+
}
1340+
13101341
try {
13111342
// Verify flow exists
13121343
const flow = await readFlowStorage(workflowDir, flowId);
@@ -1355,31 +1386,27 @@ export async function handleOrchestratorRoutes(ctx: RouteContext): Promise<boole
13551386
broadcastExecutionStateUpdate(execution);
13561387

13571388
// Broadcast EXECUTION_STARTED to WebSocket clients
1358-
if (wsBroadcast) {
1359-
wsBroadcast({
1360-
type: 'EXECUTION_STARTED',
1361-
payload: {
1362-
executionId: execId,
1363-
flowId: flowId,
1364-
sessionKey: sessionKey,
1365-
stepName: flow.name,
1366-
timestamp: now
1367-
}
1368-
});
1369-
}
1389+
broadcastToClients({
1390+
type: 'EXECUTION_STARTED',
1391+
payload: {
1392+
executionId: execId,
1393+
flowId: flowId,
1394+
sessionKey: sessionKey,
1395+
stepName: flow.name,
1396+
timestamp: now
1397+
}
1398+
});
13701399

13711400
// Lock the session (via WebSocket broadcast for frontend to handle)
1372-
if (wsBroadcast) {
1373-
wsBroadcast({
1374-
type: 'CLI_SESSION_LOCKED',
1375-
payload: {
1376-
sessionKey: sessionKey,
1377-
reason: `Executing workflow: ${flow.name}`,
1378-
executionId: execId,
1379-
timestamp: now
1380-
}
1381-
});
1382-
}
1401+
broadcastToClients({
1402+
type: 'CLI_SESSION_LOCKED',
1403+
payload: {
1404+
sessionKey: sessionKey,
1405+
reason: `Executing workflow: ${flow.name}`,
1406+
executionId: execId,
1407+
timestamp: now
1408+
}
1409+
});
13831410

13841411
// TODO: Implement actual step-by-step execution in PTY session
13851412
// For now, mark as running and let the frontend handle the orchestration

0 commit comments

Comments
 (0)