Skip to content

Commit 5ebc898

Browse files
committed
Wire project run cancel through Gateway routes
1 parent eb28d35 commit 5ebc898

10 files changed

Lines changed: 244 additions & 13 deletions

File tree

internal/connector/project_requests.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func (c *Client) handleProjectRequest(ctx context.Context, cfg *Config, request
4848
service := localexec.NewService()
4949
report, err := service.GetRun(ctx, localexec.RunOptions{BaseOptions: projectBaseOptions(cfg, project), RunID: tail[1]})
5050
return projectExecutionResponse(response, report, err)
51+
case len(tail) == 3 && tail[0] == "runs" && tail[2] == "cancel" && request.Method == http.MethodPost:
52+
service := localexec.NewService()
53+
report, err := service.CancelRun(ctx, localexec.RunOptions{BaseOptions: projectBaseOptions(cfg, project), RunID: tail[1]})
54+
return projectExecutionResponse(response, report, err)
5155
case len(tail) == 1 && tail[0] == "submit" && request.Method == http.MethodPost:
5256
opts, err := decodeProjectSubmit(projectBaseOptions(cfg, project), request.Body)
5357
if err != nil {

internal/connector/session_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ func TestClientHandleProjectRequestStartsRunThroughLocalexec(t *testing.T) {
182182
}
183183

184184
var daemon *httptest.Server
185+
var cancelled atomic.Bool
185186
daemon = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
186187
switch r.URL.Path {
187188
case "/api/v1/instance":
@@ -193,6 +194,16 @@ func TestClientHandleProjectRequestStartsRunThroughLocalexec(t *testing.T) {
193194
}
194195
w.WriteHeader(http.StatusCreated)
195196
_ = json.NewEncoder(w).Encode(domain.Run{ID: "run-1", ProjectID: "proj", State: domain.RunStateRunning})
197+
case "/api/v1/runs/run-1":
198+
switch r.Method {
199+
case http.MethodPatch:
200+
cancelled.Store(true)
201+
_ = json.NewEncoder(w).Encode(domain.Run{ID: "run-1", ProjectID: "proj", State: domain.RunStateCancelled})
202+
case http.MethodGet:
203+
_ = json.NewEncoder(w).Encode(domain.Run{ID: "run-1", ProjectID: "proj", State: domain.RunStateCancelled})
204+
default:
205+
http.NotFound(w, r)
206+
}
196207
default:
197208
http.NotFound(w, r)
198209
}
@@ -230,6 +241,21 @@ func TestClientHandleProjectRequestStartsRunThroughLocalexec(t *testing.T) {
230241
if !strings.Contains(string(response.Body), `"run-1"`) || !strings.Contains(string(response.Body), `"started"`) {
231242
t.Fatalf("unexpected project response: %s", string(response.Body))
232243
}
244+
245+
response = client.handleRequest(context.Background(), relayproto.CommandRequest{
246+
RequestID: "req-project-cancel",
247+
Method: http.MethodPost,
248+
Path: "/codencer/v1/projects/proj/runs/run-1/cancel",
249+
})
250+
if response.StatusCode != http.StatusOK {
251+
t.Fatalf("expected ok cancel response, got %+v", response)
252+
}
253+
if !cancelled.Load() {
254+
t.Fatal("expected project cancel request to reach daemon run cancel endpoint")
255+
}
256+
if !strings.Contains(string(response.Body), `"run-1"`) || !strings.Contains(string(response.Body), `"cancelled"`) {
257+
t.Fatalf("unexpected project cancel response: %s", string(response.Body))
258+
}
233259
}
234260

235261
func TestClientHandleRequestRejectsUnsafeStepPath(t *testing.T) {

internal/gateway/api.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,10 @@ func (s *Server) handleProjectByID(w http.ResponseWriter, r *http.Request) {
784784
s.handleProjectRunReportGet(w, r, projectID, parts[2])
785785
return
786786
}
787+
if len(parts) == 4 && parts[1] == "runs" && parts[3] == "cancel" {
788+
s.handleProjectRunCancel(w, r, projectID, parts[2])
789+
return
790+
}
787791
if len(parts) > 1 {
788792
writeAPIError(w, http.StatusNotFound, "route_not_found", "project route not found")
789793
return
@@ -907,6 +911,57 @@ func (s *Server) handleProjectRunReportGet(w http.ResponseWriter, r *http.Reques
907911
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "project_id": projectID, "run_id": runID, "run_history_id": runRecord.ID, "result": payload})
908912
}
909913

914+
func (s *Server) handleProjectRunCancel(w http.ResponseWriter, r *http.Request, projectID, runID string) {
915+
if r.Method != http.MethodPost {
916+
writeAPIError(w, http.StatusMethodNotAllowed, "method_not_allowed", "method not allowed")
917+
return
918+
}
919+
principal, ok := s.authenticateConsoleAPI(w, r, []string{"projects:read", "runs:write"})
920+
if !ok {
921+
return
922+
}
923+
args := map[string]any{"project_id": projectID, "run_id": runID}
924+
var req map[string]any
925+
if r.Body != nil {
926+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil && err != io.EOF {
927+
writeAPIError(w, http.StatusBadRequest, "malformed_request", err.Error())
928+
return
929+
}
930+
}
931+
for key, value := range req {
932+
args[key] = value
933+
}
934+
for _, key := range []string{"relay_profile_id", "machine_id", "host_label"} {
935+
if value := strings.TrimSpace(r.URL.Query().Get(key)); value != "" {
936+
args[key] = value
937+
}
938+
}
939+
match, apiErr := s.resolveProject(r.Context(), principal, projectID, args, true)
940+
if apiErr != nil {
941+
s.recordGatewayAuditWithMetadata(r.Context(), principal, "cancel_project_run_requested", "Run cancel route resolution failed for project "+projectID+": "+apiErr.Code, map[string]any{"project_id": projectID, "run_id": runID})
942+
writeAPIError(w, apiErr.Status, apiErr.Code, apiErr.Message)
943+
return
944+
}
945+
auditMetadata := projectLifecycleAuditMetadata(match, args)
946+
s.recordGatewayAuditWithMetadata(r.Context(), principal, "cancel_project_run_requested", "Requested cancel_project_run for run "+runID+" in project "+projectID, auditMetadata)
947+
path, body, apiErr := cancelProjectRunRoute(args)
948+
if apiErr != nil {
949+
writeAPIError(w, apiErr.Status, apiErr.Code, apiErr.Message)
950+
return
951+
}
952+
path = appendSelector(path, args)
953+
_, response, apiErr := s.callRelay(r.Context(), match.Profile, http.MethodPost, path, body)
954+
payload, apiErr := responsePayload(match.Profile, response, apiErr)
955+
if apiErr != nil {
956+
s.recordGatewayAuditWithMetadata(r.Context(), principal, "run_failed", "Run cancel failed for project "+projectID+": "+apiErr.Code, auditMetadata)
957+
writeAPIError(w, apiErr.Status, apiErr.Code, apiErr.Message)
958+
return
959+
}
960+
runRecord, auditMetadata := s.refreshRunRecordFromLifecyclePayload(r.Context(), principal, match, args, payload, reportStatusForPayload(payload, "available"))
961+
s.recordGatewayAuditWithMetadata(r.Context(), principal, terminalAuditType(payload), terminalAuditSummary(projectID, payload), auditMetadata)
962+
writeJSON(w, http.StatusOK, map[string]any{"ok": true, "project_id": projectID, "run_id": runID, "run_history_id": runRecord.ID, "result": payload})
963+
}
964+
910965
func (s *Server) handleAuditEvents(w http.ResponseWriter, r *http.Request) {
911966
if apiErr := s.requireStore(); apiErr != nil {
912967
writeAPIError(w, apiErr.Status, apiErr.Code, apiErr.Message)

internal/gateway/gateway_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ func TestGatewayMCPAsyncLifecycleTools(t *testing.T) {
235235
t.Fatalf("expected async start response, got %s", startedBody)
236236
}
237237
assertNoGatewayMCPLeak(t, startedBody)
238+
startedPayload, _ := mcpStructuredContent(t, started).(map[string]any)
239+
startedRunHistoryID := stringValueFromAny(startedPayload["run_history_id"])
240+
if startedRunHistoryID == "" {
241+
t.Fatalf("start response did not return run_history_id: %s", startedBody)
242+
}
238243

239244
asyncSubmit := mcpToolCall(t, server.URL, session, "codencer.submit_project_task", map[string]any{
240245
"relay_profile_id": "default",
@@ -291,13 +296,22 @@ func TestGatewayMCPAsyncLifecycleTools(t *testing.T) {
291296
"relay_profile_id": "default",
292297
"project_id": "codencer",
293298
"run_id": "run-async-gateway-test",
294-
"run_history_id": runHistoryID,
299+
"run_history_id": startedRunHistoryID,
295300
})
296301
cancelBody := mustJSON(t, cancel)
297-
if !strings.Contains(cancelBody, `"type":"unsupported_operation"`) || !strings.Contains(cancelBody, `"operation":"cancel_project_run"`) {
298-
t.Fatalf("expected structured cancel capability blocker, got %s", cancelBody)
302+
if !strings.Contains(cancelBody, `"status":"cancelled"`) || !strings.Contains(cancelBody, `"run_id":"run-async-gateway-test"`) {
303+
t.Fatalf("expected forwarded cancel response, got %s", cancelBody)
299304
}
300305
assertNoGatewayMCPLeak(t, cancelBody)
306+
cancelEvents := mcpToolCall(t, server.URL, session, "codencer.get_gateway_run_events", map[string]any{
307+
"run_history_id": startedRunHistoryID,
308+
"limit": 20,
309+
})
310+
cancelEventsBody := mustJSON(t, cancelEvents)
311+
if !strings.Contains(cancelEventsBody, `"cancel_project_run_requested"`) || !strings.Contains(cancelEventsBody, `"run_cancelled"`) {
312+
t.Fatalf("expected cancel lifecycle audit events, got %s", cancelEventsBody)
313+
}
314+
assertNoGatewayMCPLeak(t, cancelEventsBody)
301315

302316
resume := mcpToolCall(t, server.URL, session, "codencer.resume_project_run", map[string]any{
303317
"relay_profile_id": "default",
@@ -632,6 +646,17 @@ func TestGatewayStoreDeviceLoginRelayRegistryAndConnectorBinding(t *testing.T) {
632646
t.Fatalf("async report endpoint returned run_history_id=%v want %s: %s", asyncReport["run_history_id"], asyncRunHistoryID, asyncReportBody)
633647
}
634648
assertNoGatewayMCPLeak(t, asyncReportBody)
649+
cancelResult := apiPost[map[string]any](t, httpServer.URL+"/api/gateway/v1/projects/codencer/runs/run-async-console/cancel?relay_profile_id=default&machine_id=mach-1", token.AccessToken, map[string]any{
650+
"reason": "operator requested stop",
651+
})
652+
cancelResultBody := mustJSON(t, cancelResult)
653+
if !strings.Contains(cancelResultBody, `"status":"cancelled"`) || !strings.Contains(cancelResultBody, `"run_id":"run-async-console"`) {
654+
t.Fatalf("project run cancel endpoint did not cancel through relay: %s", cancelResultBody)
655+
}
656+
if cancelResult["run_history_id"] != asyncRunHistoryID {
657+
t.Fatalf("cancel endpoint returned run_history_id=%v want %s: %s", cancelResult["run_history_id"], asyncRunHistoryID, cancelResultBody)
658+
}
659+
assertNoGatewayMCPLeak(t, cancelResultBody)
635660
report := apiGet[map[string]any](t, httpServer.URL+"/api/gateway/v1/projects/codencer/runs/run-gateway-test/report?relay_profile_id=default&machine_id=mach-1", token.AccessToken)
636661
reportBody := mustJSON(t, report)
637662
if !strings.Contains(reportBody, `"status":"completed"`) || !strings.Contains(reportBody, `"run_id":"run-gateway-test"`) {
@@ -957,6 +982,24 @@ func newFakeRelay(t *testing.T, opts fakeRelayOptions) *fakeRelay {
957982
"report_path": "/tmp/codencer/run-plans/run-async-gateway-test.json",
958983
})
959984
})
985+
mux.HandleFunc("/api/v2/projects/codencer/runs/run-async-gateway-test/cancel", func(w http.ResponseWriter, r *http.Request) {
986+
requireRelayAuth(t, r)
987+
if r.Method != http.MethodPost {
988+
w.WriteHeader(http.StatusMethodNotAllowed)
989+
return
990+
}
991+
writeTestJSON(t, w, map[string]any{
992+
"ok": true,
993+
"run_id": "run-async-gateway-test",
994+
"status": "cancelled",
995+
"run": map[string]any{
996+
"id": "run-async-gateway-test",
997+
"state": "cancelled",
998+
},
999+
"repo_root": "/Users/example/codencer",
1000+
"report_path": "/tmp/codencer/run-plans/run-async-gateway-test.json",
1001+
})
1002+
})
9601003
mux.HandleFunc("/api/v2/projects/codencer/run-plan", func(w http.ResponseWriter, r *http.Request) {
9611004
requireRelayAuth(t, r)
9621005
relay.lastMachineID = r.URL.Query().Get("machine_id")
@@ -1066,6 +1109,23 @@ func newFakeRelay(t *testing.T, opts fakeRelayOptions) *fakeRelay {
10661109
}},
10671110
})
10681111
})
1112+
mux.HandleFunc("/api/v2/projects/codencer/runs/run-async-console/cancel", func(w http.ResponseWriter, r *http.Request) {
1113+
requireRelayAuth(t, r)
1114+
if r.Method != http.MethodPost {
1115+
w.WriteHeader(http.StatusMethodNotAllowed)
1116+
return
1117+
}
1118+
writeTestJSON(t, w, map[string]any{
1119+
"ok": true,
1120+
"run_id": "run-async-console",
1121+
"status": "cancelled",
1122+
"run": map[string]any{
1123+
"id": "run-async-console",
1124+
"state": "cancelled",
1125+
},
1126+
"report_path": "/Users/example/.codencer-live-test/artifacts/run-plans/run-async-console.json",
1127+
})
1128+
})
10691129
mux.HandleFunc("/api/v2/projects/codencer/reports/run-plans/run-gateway-test", func(w http.ResponseWriter, r *http.Request) {
10701130
requireRelayAuth(t, r)
10711131
writeTestJSON(t, w, map[string]any{

internal/gateway/run_history.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func (s *Server) finishRunRecord(ctx context.Context, record RunRecord, payload
7171
}
7272

7373
func (s *Server) refreshRunRecordFromReport(ctx context.Context, principal *authPrincipal, match relayProjectMatch, args map[string]any, payload any) (RunRecord, map[string]any) {
74+
return s.refreshRunRecordFromLifecyclePayload(ctx, principal, match, args, payload, reportStatusForPayload(payload, "completed"))
75+
}
76+
77+
func (s *Server) refreshRunRecordFromLifecyclePayload(ctx context.Context, principal *authPrincipal, match relayProjectMatch, args map[string]any, payload any, reportStatus string) (RunRecord, map[string]any) {
7478
if s.store == nil || principal == nil || principal.WorkspaceID == "" {
7579
return RunRecord{}, map[string]any{"project_id": match.Project.ProjectID, "run_id": stringArg(args, "run_id")}
7680
}
@@ -95,7 +99,7 @@ func (s *Server) refreshRunRecordFromReport(ctx context.Context, principal *auth
9599
}
96100
record, _ = s.applyRouteToRunRecord(ctx, record, principal, match, args)
97101
record.RunID = firstNonEmpty(record.RunID, runID)
98-
return s.finishRunRecord(ctx, record, payload, reportStatusForPayload(payload, "completed"))
102+
return s.finishRunRecord(ctx, record, payload, reportStatus)
99103
}
100104

101105
func applyPayloadToRunRecord(record *RunRecord, payload any, reportStatus string) {
@@ -117,7 +121,8 @@ func applyPayloadToRunRecord(record *RunRecord, payload any, reportStatus string
117121
}
118122
record.ResultSummary = firstNonEmpty(resultSummaryFromPayload(obj), record.ResultSummary)
119123
record.ResultDetails = firstNonEmpty(resultDetailsFromPayload(obj), record.ResultDetails, record.ResultSummary)
120-
if terminalAuditType(obj) == "run_completed" || terminalAuditType(obj) == "run_failed" || terminalAuditType(obj) == "blocker" {
124+
eventType := terminalAuditType(obj)
125+
if eventType == "run_completed" || eventType == "run_failed" || eventType == "blocker" || eventType == "run_cancelled" {
121126
record.CompletedAt = time.Now().UTC()
122127
}
123128
}

internal/gateway/tools.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func buildTools(server *Server) map[string]Tool {
155155
return "/api/v2/projects/" + projectID + "/reports/run-plans/" + runID, nil, nil
156156
}),
157157
"codencer.get_gateway_run_events": server.gatewayRunEventsTool(),
158-
"codencer.cancel_project_run": server.unsupportedProjectLifecycleTool("codencer.cancel_project_run", "Return an explicit capability blocker for project-run cancellation when the selected route cannot cancel safely.", "cancel_project_run"),
158+
"codencer.cancel_project_run": server.projectForwardTool("codencer.cancel_project_run", "Cancel a shared project run through the selected Gateway relay.", []string{"project_id", "run_id"}, cancelProjectRunRoute),
159159
"codencer.resume_project_run": server.unsupportedProjectLifecycleTool("codencer.resume_project_run", "Return an explicit capability blocker for project-run resume when the selected route cannot resume safely.", "resume_project_run"),
160160
"codencer.get_blocker": {
161161
Name: "codencer.get_blocker",
@@ -262,6 +262,10 @@ func (s *Server) projectForwardTool(name, description string, required []string,
262262
return ToolResult{}, apiErr
263263
}
264264
path = appendSelector(path, args)
265+
if name == "codencer.cancel_project_run" {
266+
auditMetadata = projectLifecycleAuditMetadata(match, args)
267+
s.recordGatewayAuditWithMetadata(ctx, principal, "cancel_project_run_requested", "Requested cancel_project_run for run "+requiredStringValue(args, "run_id")+" in project "+projectID, auditMetadata)
268+
}
265269
method := http.MethodGet
266270
if body != nil {
267271
method = http.MethodPost
@@ -275,6 +279,8 @@ func (s *Server) projectForwardTool(name, description string, required []string,
275279
eventType := "run_failed"
276280
if name == "codencer.get_run_report" {
277281
eventType = "report_read"
282+
} else if name == "codencer.cancel_project_run" {
283+
auditMetadata = projectLifecycleAuditMetadata(match, args)
278284
} else {
279285
runRecord.Status = "failed"
280286
runRecord.ResultSummary = "Gateway relay call failed: " + apiErr.Code
@@ -288,6 +294,10 @@ func (s *Server) projectForwardTool(name, description string, required []string,
288294
_ = record
289295
s.recordTerminalRunAuditOnce(ctx, principal, projectID, payload, metadata)
290296
s.recordGatewayAuditWithMetadata(ctx, principal, "report_read", "Read run report "+requiredStringValue(args, "run_id")+" for project "+projectID, metadata)
297+
} else if name == "codencer.cancel_project_run" {
298+
record, metadata := s.refreshRunRecordFromLifecyclePayload(ctx, principal, match, args, payload, reportStatusForPayload(payload, "available"))
299+
_ = record
300+
s.recordGatewayAuditWithMetadata(ctx, principal, terminalAuditType(payload), terminalAuditSummary(projectID, payload), metadata)
291301
} else if recordRun {
292302
runRecord, auditMetadata = s.finishRunRecord(ctx, runRecord, payload, reportStatusForPayload(payload, "available"))
293303
if obj, ok := payload.(map[string]any); ok && runRecord.ID != "" {
@@ -460,6 +470,28 @@ func selectedProjectLocation(project relayProject, args map[string]any) projectL
460470
return projectLocation{}
461471
}
462472

473+
func projectLifecycleAuditMetadata(match relayProjectMatch, args map[string]any) map[string]any {
474+
metadata := map[string]any{
475+
"project_id": match.Project.ProjectID,
476+
"run_id": requiredStringValue(args, "run_id"),
477+
"relay_profile_id": match.Profile.ID,
478+
}
479+
if runHistoryID := strings.TrimSpace(stringArg(args, "run_history_id")); runHistoryID != "" {
480+
metadata["run_history_id"] = runHistoryID
481+
}
482+
location := selectedProjectLocation(match.Project, args)
483+
for key, value := range map[string]string{
484+
"connector_id": location.ConnectorID,
485+
"machine_id": location.MachineID,
486+
"host_label": location.HostLabel,
487+
} {
488+
if strings.TrimSpace(value) != "" {
489+
metadata[key] = strings.TrimSpace(value)
490+
}
491+
}
492+
return metadata
493+
}
494+
463495
func resolvedExecutorLabel(project relayProject, args map[string]any) string {
464496
return firstNonEmpty(
465497
strings.TrimSpace(stringArg(args, "profile")),
@@ -491,6 +523,10 @@ func terminalAuditType(payload any) string {
491523
switch status {
492524
case "submitted", "started", "starting", "queued", "pending", "running", "in_progress", "validating":
493525
return "run_started"
526+
case "cancel_requested", "cancelling", "canceling":
527+
return "run_cancel_requested"
528+
case "cancelled", "canceled":
529+
return "run_cancelled"
494530
case "failed", "failed_adapter", "failed_bridge", "failed_validation", "timeout", "adapter_error", "bridge_error":
495531
return "run_failed"
496532
case "blocked", "question", "manual_approval_required", "needs_approval", "needs_manual_attention", "permission_request_required", "unsafe_action", "validation_failed":
@@ -510,6 +546,10 @@ func terminalAuditSummary(projectID string, payload any) string {
510546
outcome = "failed"
511547
case "blocker":
512548
outcome = "reached blocker"
549+
case "run_cancel_requested":
550+
outcome = "cancel requested"
551+
case "run_cancelled":
552+
outcome = "cancelled"
513553
}
514554
if runID == "" {
515555
return "Run " + outcome + " for project " + projectID
@@ -686,6 +726,24 @@ func getProjectRunRoute(args map[string]any) (string, []byte, *apiError) {
686726
return "/api/v2/projects/" + projectID + "/runs/" + runID, nil, nil
687727
}
688728

729+
func cancelProjectRunRoute(args map[string]any) (string, []byte, *apiError) {
730+
projectID, apiErr := requiredString(args, "project_id")
731+
if apiErr != nil {
732+
return "", nil, apiErr
733+
}
734+
runID, apiErr := requiredString(args, "run_id")
735+
if apiErr != nil {
736+
return "", nil, apiErr
737+
}
738+
payload := map[string]any{}
739+
copyOptional(payload, args, "reason")
740+
body, apiErr := jsonBody(payload)
741+
if apiErr != nil {
742+
return "", nil, apiErr
743+
}
744+
return "/api/v2/projects/" + projectID + "/runs/" + runID + "/cancel", body, nil
745+
}
746+
689747
func submitProjectTaskRoute(wait bool) func(map[string]any) (string, []byte, *apiError) {
690748
return func(args map[string]any) (string, []byte, *apiError) {
691749
projectID, apiErr := requiredString(args, "project_id")

0 commit comments

Comments
 (0)