Skip to content

Commit e4693fa

Browse files
committed
Start follow-up tasks from human interrupts
1 parent 683489c commit e4693fa

5 files changed

Lines changed: 271 additions & 4 deletions

File tree

internal/gateway/api.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,116 @@ func (s *Server) cancelProjectRunFromRecord(ctx context.Context, principal *auth
10411041
return map[string]any{"ok": true, "project_id": projectID, "run_id": runID, "run_history_id": runRecord.ID, "result": payload}, nil
10421042
}
10431043

1044+
func (s *Server) startNewProjectTaskFromRecord(ctx context.Context, principal *authPrincipal, record RunRecord, args map[string]any) (map[string]any, *apiError) {
1045+
if args == nil {
1046+
args = map[string]any{}
1047+
}
1048+
projectID := firstNonEmpty(stringValueFromAny(args["project_id"]), record.ProjectID)
1049+
if strings.TrimSpace(projectID) == "" {
1050+
return nil, &apiError{Status: http.StatusBadRequest, Code: "project_id_required", Message: "project_id is required"}
1051+
}
1052+
goal := firstNonEmpty(
1053+
stringValueFromAny(args["new_task_goal"]),
1054+
stringValueFromAny(args["goal"]),
1055+
)
1056+
if strings.TrimSpace(goal) == "" {
1057+
return nil, &apiError{Status: http.StatusBadRequest, Code: "new_task_goal_required", Message: "new_task_goal is required when follow_up=start_new_task"}
1058+
}
1059+
submitArgs := map[string]any{
1060+
"project_id": projectID,
1061+
"wait": false,
1062+
"goal": goal,
1063+
"title": firstNonEmpty(
1064+
stringValueFromAny(args["new_task_title"]),
1065+
stringValueFromAny(args["title"]),
1066+
"Follow-up task for "+firstNonEmpty(record.RunID, record.ID),
1067+
),
1068+
}
1069+
for _, key := range []string{"relay_profile_id", "machine_id", "host_label", "reason"} {
1070+
if value := strings.TrimSpace(stringValueFromAny(args[key])); value != "" {
1071+
submitArgs[key] = value
1072+
}
1073+
}
1074+
for key, value := range map[string]string{
1075+
"relay_profile_id": record.RelayProfileID,
1076+
"machine_id": record.MachineID,
1077+
"host_label": record.HostLabel,
1078+
} {
1079+
if _, exists := submitArgs[key]; !exists && strings.TrimSpace(value) != "" {
1080+
submitArgs[key] = strings.TrimSpace(value)
1081+
}
1082+
}
1083+
for source, target := range map[string]string{
1084+
"new_task_prompt": "prompt",
1085+
"new_task_profile": "profile",
1086+
"new_task_adapter_profile": "adapter_profile",
1087+
"new_task_timeout_seconds": "timeout_seconds",
1088+
} {
1089+
if value, exists := args[source]; exists {
1090+
submitArgs[target] = value
1091+
}
1092+
}
1093+
for _, key := range []string{"prompt", "profile", "adapter_profile", "timeout_seconds"} {
1094+
if _, exists := submitArgs[key]; !exists {
1095+
if value, ok := args[key]; ok {
1096+
submitArgs[key] = value
1097+
}
1098+
}
1099+
}
1100+
if _, exists := submitArgs["adapter_profile"]; !exists && strings.TrimSpace(record.ExecutorProfile) != "" {
1101+
submitArgs["adapter_profile"] = strings.TrimSpace(record.ExecutorProfile)
1102+
}
1103+
if record.ID != "" {
1104+
submitArgs["parent_run_history_id"] = record.ID
1105+
}
1106+
if record.RunID != "" {
1107+
submitArgs["parent_run_id"] = record.RunID
1108+
}
1109+
runRecord, auditMetadata := s.beginRunRecord(ctx, principal, projectID, "start_new_task", submitArgs)
1110+
parentMetadata := runAuditMetadata(record)
1111+
parentMetadata["operation"] = "start_new_task"
1112+
parentMetadata["new_task_title"] = safeExcerpt(stringValueFromAny(submitArgs["title"]), 240)
1113+
parentMetadata["new_task_goal"] = safeExcerpt(goal, 600)
1114+
s.recordGatewayAuditWithMetadata(ctx, principal, "start_new_task_requested", "Requested start_new_task from run "+firstNonEmpty(record.RunID, record.ID)+" in project "+projectID, parentMetadata)
1115+
s.recordGatewayAuditWithMetadata(ctx, principal, "task_submitted", "Submitted follow-up task for project "+projectID, auditMetadata)
1116+
match, apiErr := s.resolveProject(ctx, principal, projectID, submitArgs, true)
1117+
if apiErr != nil {
1118+
runRecord.Status = "failed"
1119+
runRecord.ResultSummary = "Route resolution failed: " + apiErr.Code
1120+
runRecord, auditMetadata = s.finishRunRecord(ctx, runRecord, map[string]any{"status": "failed", "summary": runRecord.ResultSummary}, "unavailable")
1121+
s.recordGatewayAuditWithMetadata(ctx, principal, "run_failed", "Follow-up task route resolution failed for project "+projectID+": "+apiErr.Code, auditMetadata)
1122+
return nil, apiErr
1123+
}
1124+
runRecord, auditMetadata = s.applyRouteToRunRecord(ctx, runRecord, principal, match, submitArgs)
1125+
s.recordProjectRouteAudit(ctx, principal, match, submitArgs, auditMetadata)
1126+
path, body, apiErr := submitProjectTaskRoute(false)(submitArgs)
1127+
if apiErr != nil {
1128+
runRecord.Status = "failed"
1129+
runRecord.ResultSummary = "Follow-up task validation failed: " + apiErr.Code
1130+
runRecord, auditMetadata = s.finishRunRecord(ctx, runRecord, map[string]any{"status": "failed", "summary": runRecord.ResultSummary}, "unavailable")
1131+
s.recordGatewayAuditWithMetadata(ctx, principal, "run_failed", "Follow-up task validation failed for project "+projectID+": "+apiErr.Code, auditMetadata)
1132+
return nil, apiErr
1133+
}
1134+
path = appendSelector(path, submitArgs)
1135+
s.recordGatewayAuditWithMetadata(ctx, principal, "run_started", "Started follow-up task for project "+projectID, auditMetadata)
1136+
_, response, apiErr := s.callRelay(ctx, match.Profile, http.MethodPost, path, body)
1137+
payload, apiErr := responsePayload(match.Profile, response, apiErr)
1138+
if apiErr != nil {
1139+
runRecord.Status = "failed"
1140+
runRecord.ResultSummary = "Follow-up task failed: " + apiErr.Code
1141+
runRecord, auditMetadata = s.finishRunRecord(ctx, runRecord, map[string]any{"status": "failed", "summary": runRecord.ResultSummary}, "unavailable")
1142+
s.recordGatewayAuditWithMetadata(ctx, principal, "run_failed", "Follow-up task failed for project "+projectID+": "+apiErr.Code, auditMetadata)
1143+
return nil, apiErr
1144+
}
1145+
runRecord, auditMetadata = s.finishRunRecord(ctx, runRecord, payload, reportStatusForPayload(payload, "available"))
1146+
eventType := terminalAuditType(payload)
1147+
s.recordGatewayAuditWithMetadata(ctx, principal, eventType, terminalAuditSummary(projectID, payload), auditMetadata)
1148+
if eventType == "blocker" {
1149+
s.recordHumanInterruptAudit(ctx, principal, projectID, payload, auditMetadata)
1150+
}
1151+
return map[string]any{"ok": true, "project_id": projectID, "run_history_id": runRecord.ID, "run_id": runRecord.RunID, "result": payload}, nil
1152+
}
1153+
10441154
func (s *Server) handleProjectRunResume(w http.ResponseWriter, r *http.Request, projectID, runID string) {
10451155
if r.Method != http.MethodPost {
10461156
writeAPIError(w, http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed")

internal/gateway/gateway_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,54 @@ func TestGatewayMCPAsyncLifecycleTools(t *testing.T) {
387387
}
388388
assertNoGatewayMCPLeak(t, blockedCancelEventsBody)
389389

390+
blockedStart := mcpToolCall(t, server.URL, session, "codencer.submit_project_task", map[string]any{
391+
"relay_profile_id": "default",
392+
"project_id": "codencer",
393+
"machine_id": "mach-1",
394+
"title": "Blocked Gateway start task",
395+
"goal": "Ask for a safe replacement task.",
396+
})
397+
blockedStartBody := mustJSON(t, blockedStart)
398+
blockedStartPayload, _ := mcpStructuredContent(t, blockedStart).(map[string]any)
399+
blockedStartRunHistoryID := stringValueFromAny(blockedStartPayload["run_history_id"])
400+
if blockedStartRunHistoryID == "" || !strings.Contains(blockedStartBody, `"status":"blocked"`) || !strings.Contains(blockedStartBody, `"type":"question"`) {
401+
t.Fatalf("expected blocked start-new-task run response with history id, got %s", blockedStartBody)
402+
}
403+
assertNoGatewayMCPLeak(t, blockedStartBody)
404+
405+
startResponse := mcpToolCall(t, server.URL, session, "codencer.respond_to_human_interrupt", map[string]any{
406+
"run_history_id": blockedStartRunHistoryID,
407+
"response_type": "decision",
408+
"response": "Start a new safe task without using /Users/example/private token=relay-secret.",
409+
"follow_up": "start_new_task",
410+
"new_task_title": "Gateway follow-up task",
411+
"new_task_goal": "Run a safe follow-up task that does not expose local paths.",
412+
})
413+
startResponseBody := mustJSON(t, startResponse)
414+
if !strings.Contains(startResponseBody, `"status":"human_interrupt_response_recorded"`) ||
415+
!strings.Contains(startResponseBody, `"start_new_task_supported":true`) ||
416+
!strings.Contains(startResponseBody, `"start_new_task_attempted":true`) ||
417+
!strings.Contains(startResponseBody, `"follow_up":"start_new_task"`) ||
418+
!strings.Contains(startResponseBody, `"follow_up_result"`) ||
419+
!strings.Contains(startResponseBody, `"run-followup"`) ||
420+
!strings.Contains(startResponseBody, `"operator_response"`) {
421+
t.Fatalf("expected recorded start_new_task human interrupt response, got %s", startResponseBody)
422+
}
423+
assertNoGatewayMCPLeak(t, startResponseBody)
424+
425+
blockedStartEvents := mcpToolCall(t, server.URL, session, "codencer.get_gateway_run_events", map[string]any{
426+
"run_history_id": blockedStartRunHistoryID,
427+
"limit": 20,
428+
})
429+
blockedStartEventsBody := mustJSON(t, blockedStartEvents)
430+
if !strings.Contains(blockedStartEventsBody, `"human_interrupt_created"`) ||
431+
!strings.Contains(blockedStartEventsBody, `"human_interrupt_responded"`) ||
432+
!strings.Contains(blockedStartEventsBody, `"start_new_task_requested"`) ||
433+
!strings.Contains(blockedStartEventsBody, `"operator_response"`) {
434+
t.Fatalf("expected human interrupt start_new_task audit event, got %s", blockedStartEventsBody)
435+
}
436+
assertNoGatewayMCPLeak(t, blockedStartEventsBody)
437+
390438
cancel := mcpToolCall(t, server.URL, session, "codencer.cancel_project_run", map[string]any{
391439
"relay_profile_id": "default",
392440
"project_id": "codencer",
@@ -1008,6 +1056,44 @@ func TestGatewayStoreDeviceLoginRelayRegistryAndConnectorBinding(t *testing.T) {
10081056
t.Fatalf("blocked cancel run events missing human interrupt cancel audit: %s", blockedCancelEventsAfterResponseBody)
10091057
}
10101058
assertNoGatewayConsoleSensitiveLeak(t, blockedCancelEventsAfterResponseBody)
1059+
blockedStartRun := apiPost[map[string]any](t, httpServer.URL+"/api/gateway/v1/projects/codencer/runs", token.AccessToken, map[string]any{
1060+
"relay_profile_id": "default",
1061+
"machine_id": "mach-1",
1062+
"title": "Blocked Gateway start task",
1063+
"goal": "Ask for a safe replacement task.",
1064+
"timeout_seconds": 30,
1065+
})
1066+
blockedStartBody := mustJSON(t, blockedStartRun)
1067+
blockedStartRunHistoryID, _ := blockedStartRun["run_history_id"].(string)
1068+
if blockedStartRunHistoryID == "" || !strings.Contains(blockedStartBody, `"status":"blocked"`) || !strings.Contains(blockedStartBody, `"type":"question"`) {
1069+
t.Fatalf("blocked start-new-task run did not return blocker and history id: %s", blockedStartBody)
1070+
}
1071+
assertNoGatewayConsoleSensitiveLeak(t, blockedStartBody)
1072+
startMissingGoalResponse := apiPost[map[string]any](t, httpServer.URL+"/api/gateway/v1/runs/"+blockedStartRunHistoryID+"/human-interrupts/respond", token.AccessToken, map[string]any{
1073+
"response_type": "decision",
1074+
"response": "Start a different task, but no task goal was provided.",
1075+
"follow_up": "start_new_task",
1076+
"reason": "Operator requested a replacement task.",
1077+
})
1078+
startMissingGoalResponseBody := mustJSON(t, startMissingGoalResponse)
1079+
if !strings.Contains(startMissingGoalResponseBody, `"status":"human_interrupt_response_recorded"`) ||
1080+
!strings.Contains(startMissingGoalResponseBody, `"start_new_task_supported":true`) ||
1081+
!strings.Contains(startMissingGoalResponseBody, `"start_new_task_attempted":true`) ||
1082+
!strings.Contains(startMissingGoalResponseBody, `"follow_up":"start_new_task"`) ||
1083+
!strings.Contains(startMissingGoalResponseBody, `"new_task_goal_required"`) ||
1084+
!strings.Contains(startMissingGoalResponseBody, `"operator_response"`) {
1085+
t.Fatalf("human interrupt start_new_task missing-goal response missing expected blocker: %s", startMissingGoalResponseBody)
1086+
}
1087+
assertNoGatewayConsoleSensitiveLeak(t, startMissingGoalResponseBody)
1088+
blockedStartEventsAfterResponse := apiGet[map[string]any](t, httpServer.URL+"/api/gateway/v1/runs/"+blockedStartRunHistoryID+"/events", token.AccessToken)
1089+
blockedStartEventsAfterResponseBody := mustJSON(t, blockedStartEventsAfterResponse)
1090+
if !strings.Contains(blockedStartEventsAfterResponseBody, `"human_interrupt_responded"`) ||
1091+
!strings.Contains(blockedStartEventsAfterResponseBody, `"start_new_task_blocked"`) ||
1092+
!strings.Contains(blockedStartEventsAfterResponseBody, `"new_task_goal_required"`) ||
1093+
!strings.Contains(blockedStartEventsAfterResponseBody, `"operator_response"`) {
1094+
t.Fatalf("blocked start-new-task events missing structured blocker audit: %s", blockedStartEventsAfterResponseBody)
1095+
}
1096+
assertNoGatewayConsoleSensitiveLeak(t, blockedStartEventsAfterResponseBody)
10111097
audit := apiGet[map[string]any](t, httpServer.URL+"/api/gateway/v1/audit-events", token.AccessToken)
10121098
auditBody := mustJSON(t, audit)
10131099
for _, eventType := range []string{
@@ -1336,12 +1422,15 @@ func newFakeRelay(t *testing.T, opts fakeRelayOptions) *fakeRelay {
13361422
relay.lastHostLabel = r.URL.Query().Get("host_label")
13371423
var req map[string]any
13381424
_ = json.NewDecoder(r.Body).Decode(&req)
1339-
if req["title"] == "Blocked Gateway task" || req["title"] == "Blocked Gateway cancel task" {
1425+
if req["title"] == "Blocked Gateway task" || req["title"] == "Blocked Gateway cancel task" || req["title"] == "Blocked Gateway start task" {
13401426
runID := "run-blocked"
13411427
stepID := "step-blocked"
13421428
if req["title"] == "Blocked Gateway cancel task" {
13431429
runID = "run-blocked-cancel"
13441430
stepID = "step-blocked-cancel"
1431+
} else if req["title"] == "Blocked Gateway start task" {
1432+
runID = "run-blocked-start"
1433+
stepID = "step-blocked-start"
13451434
}
13461435
writeTestJSON(t, w, map[string]any{
13471436
"ok": false,
@@ -1360,6 +1449,24 @@ func newFakeRelay(t *testing.T, opts fakeRelayOptions) *fakeRelay {
13601449
})
13611450
return
13621451
}
1452+
if req["title"] == "Gateway follow-up task" {
1453+
if wait, _ := req["wait"].(bool); wait {
1454+
t.Fatalf("expected follow-up task to forward wait=false")
1455+
}
1456+
writeTestJSON(t, w, map[string]any{
1457+
"ok": true,
1458+
"status": "submitted",
1459+
"run_id": "run-followup",
1460+
"step_id": "step-followup",
1461+
"task": map[string]any{
1462+
"run_id": "run-followup",
1463+
"status": "submitted",
1464+
"step_id": "step-followup",
1465+
},
1466+
"report_path": "/tmp/codencer/run-plans/run-followup.json",
1467+
})
1468+
return
1469+
}
13631470
if req["title"] == "Async Gateway Console task" {
13641471
if wait, _ := req["wait"].(bool); wait {
13651472
t.Fatalf("expected async console task to forward wait=false")

internal/gateway/tools.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,8 @@ func (s *Server) humanInterruptResponseTool() Tool {
396396
"response_type": stringSchema("Optional response type, such as answer, approve, deny, or decision."),
397397
"follow_up": stringSchema("Optional requested follow-up, such as resume, cancel, or start_new_task."),
398398
"reason": stringSchema("Optional operator/planner reason."),
399+
"new_task_title": stringSchema("Optional title for an explicit start_new_task follow-up."),
400+
"new_task_goal": stringSchema("Required task goal when follow_up=start_new_task."),
399401
}),
400402
ReadOnly: false,
401403
RequiredScopes: []string{"projects:read", "runs:write"},
@@ -463,6 +465,9 @@ func (s *Server) recordHumanInterruptResponse(ctx context.Context, principal *au
463465
"cancel_supported": true,
464466
"cancel_operation": "codencer.cancel_project_run",
465467
"cancel_attempted": false,
468+
"start_new_task_supported": true,
469+
"start_new_task_operation": "codencer.submit_project_task",
470+
"start_new_task_attempted": false,
466471
"status_read_tool": "codencer.get_project_run_status",
467472
"report_read_tool": "codencer.get_run_report",
468473
"events_read_tool": "codencer.get_gateway_run_events",
@@ -528,6 +533,35 @@ func (s *Server) recordHumanInterruptResponse(ctx context.Context, principal *au
528533
nextActions["planner_decision_required"] = false
529534
followUpResult = result
530535
}
536+
} else if strings.EqualFold(followUp, "start_new_task") {
537+
nextActions["start_new_task_attempted"] = true
538+
startArgs := map[string]any{}
539+
for key, value := range args {
540+
startArgs[key] = value
541+
}
542+
if reason != "" {
543+
startArgs["reason"] = reason
544+
}
545+
result, startErr := s.startNewProjectTaskFromRecord(ctx, principal, record, startArgs)
546+
if startErr != nil {
547+
blockedMetadata := runAuditMetadata(record)
548+
blockedMetadata["operation"] = "start_new_task"
549+
blockedMetadata["status"] = "blocked"
550+
blockedMetadata["blocker_type"] = startErr.Code
551+
s.recordGatewayAuditWithMetadata(ctx, principal, "start_new_task_blocked", "Blocked follow-up start_new_task for run "+firstNonEmpty(record.RunID, record.ID)+" in project "+record.ProjectID, blockedMetadata)
552+
followUpResult = map[string]any{
553+
"ok": false,
554+
"status": "blocked",
555+
"blocker": map[string]any{
556+
"type": startErr.Code,
557+
"message": startErr.Message,
558+
"operation": "start_new_task",
559+
},
560+
}
561+
} else {
562+
nextActions["planner_decision_required"] = false
563+
followUpResult = result
564+
}
531565
}
532566
payload := map[string]any{
533567
"ok": true,

web/gateway-console/api/run-history.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,27 @@ export async function respondToHumanInterrupt(
102102
result: { events: [{ type: "run_cancelled" }] },
103103
status: "cancelled",
104104
}
105-
: undefined,
105+
: input.followUp === "start_new_task"
106+
? {
107+
ok: true,
108+
result: {
109+
run_id: "run_demo_followup",
110+
status: "submitted",
111+
},
112+
run_history_id: "run_demo_followup_history",
113+
status: "submitted",
114+
}
115+
: undefined,
106116
next_actions: {
107117
cancel_attempted: input.followUp === "cancel",
108118
cancel_operation: "codencer.cancel_project_run",
109119
cancel_supported: true,
110120
resume_attempted: input.followUp === "resume",
111121
resume_operation: "codencer.resume_project_run",
112122
resume_supported: true,
123+
start_new_task_attempted: input.followUp === "start_new_task",
124+
start_new_task_operation: "codencer.submit_project_task",
125+
start_new_task_supported: true,
113126
},
114127
ok: true,
115128
project_id: "codencer",

web/gateway-console/features/console/run-detail-screen.tsx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,14 @@ function HumanInterruptResponsePanel({
218218
{respond.data.response?.operatorResponse ||
219219
"The response was recorded and linked to this run."}
220220
{respond.data.followUp === "resume" ||
221-
respond.data.followUp === "cancel" ? (
221+
respond.data.followUp === "cancel" ||
222+
respond.data.followUp === "start_new_task" ? (
222223
<span className="mt-xs block text-xs text-muted">
223224
{respond.data.followUp === "cancel"
224225
? "Cancel follow-up requested; check the audit timeline for the cancelled or blocked outcome."
225-
: "Resume follow-up requested; check the audit timeline for the resumed or blocked outcome."}
226+
: respond.data.followUp === "start_new_task"
227+
? "Start-new-task follow-up requested; check the audit timeline and Runs page for the submitted or blocked outcome."
228+
: "Resume follow-up requested; check the audit timeline for the resumed or blocked outcome."}
226229
</span>
227230
) : null}
228231
</Alert>

0 commit comments

Comments
 (0)