Skip to content

Commit b41729a

Browse files
authored
Merge pull request #689 from Yumiue/codex/gateway-plan-approval-rpc
feat(gateway): 接入计划审批 RPC
2 parents 542c15c + 216fac0 commit b41729a

29 files changed

Lines changed: 1374 additions & 17 deletions

docs/gateway-rpc-api.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,41 @@ type ResolvePermissionParams struct {
421421

422422
---
423423

424+
## Method: gateway.approvePlan
425+
426+
- Stability: Stable
427+
- Auth Required: Yes
428+
- Request Schema:
429+
430+
```go
431+
type ApprovePlanParams struct {
432+
SessionID string `json:"session_id"` // MUST
433+
PlanID string `json:"plan_id"` // MUST
434+
Revision int `json:"revision"` // MUST > 0
435+
}
436+
```
437+
438+
- Response Schema:
439+
440+
```json
441+
{
442+
"type": "ack",
443+
"action": "approve_plan",
444+
"session_id": "session-1",
445+
"payload": {
446+
"plan_id": "plan-1",
447+
"revision": 2,
448+
"status": "approved"
449+
}
450+
}
451+
```
452+
453+
- Semantics:
454+
- 仅批准当前会话中匹配 `plan_id + revision``draft` 计划。
455+
- 成功后客户端可再调用 `gateway.run({ "mode": "build" })` 执行已批准计划。
456+
457+
---
458+
424459
## Method: gateway.userQuestionAnswer
425460

426461
- Stability: Beta

docs/reference/gateway-rpc-api.md

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,44 @@ Observation:
778778

779779
---
780780

781-
## 15. wake.openUrl
781+
## 15. gateway.approvePlan
782+
783+
Method: `gateway.approvePlan`
784+
Stability: `Stable`
785+
Auth Required: `Yes`
786+
787+
Request Schema:
788+
789+
```go
790+
type ApprovePlanParams struct {
791+
SessionID string `json:"session_id"` // MUST
792+
PlanID string `json:"plan_id"` // MUST
793+
Revision int `json:"revision"` // MUST > 0
794+
}
795+
```
796+
797+
Response Schema:
798+
799+
```json
800+
{
801+
"type": "ack",
802+
"action": "approve_plan",
803+
"session_id": "session-1",
804+
"payload": {
805+
"plan_id": "plan-1",
806+
"revision": 2,
807+
"status": "approved"
808+
}
809+
}
810+
```
811+
812+
Semantics:
813+
1. Only the current session plan matching `plan_id + revision` and `draft` status can be approved.
814+
2. After success, clients can call `gateway.run` with `mode: "build"` to execute the approved plan.
815+
816+
---
817+
818+
## 16. wake.openUrl
782819

783820
Method: `wake.openUrl`
784821
Stability: `Experimental`
@@ -828,7 +865,7 @@ Observation:
828865

829866
---
830867

831-
## 16. gateway.event(服务端通知)
868+
## 17. gateway.event(服务端通知)
832869

833870
Method: `gateway.event`
834871
Stability: `Stable`

internal/cli/gateway_runtime_bridge.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,37 @@ func (b *gatewayRuntimePortBridge) ResolvePermission(ctx context.Context, input
497497
})
498498
}
499499

500+
// ApprovePlan 将网关计划批准请求转换为 runtime 当前计划批准输入。
501+
func (b *gatewayRuntimePortBridge) ApprovePlan(
502+
ctx context.Context,
503+
input gateway.ApprovePlanInput,
504+
) (gateway.ApprovePlanResult, error) {
505+
if err := b.ensureRuntimeAccess(input.SubjectID); err != nil {
506+
return gateway.ApprovePlanResult{}, err
507+
}
508+
approver, ok := b.runtime.(agentruntime.PlanApprover)
509+
if !ok {
510+
return gateway.ApprovePlanResult{}, fmt.Errorf("gateway runtime bridge: runtime does not support plan approval")
511+
}
512+
sessionID := strings.TrimSpace(input.SessionID)
513+
planID := strings.TrimSpace(input.PlanID)
514+
if err := approver.ApproveCurrentPlan(ctx, agentruntime.ApproveCurrentPlanInput{
515+
SessionID: sessionID,
516+
PlanID: planID,
517+
Revision: input.Revision,
518+
}); err != nil {
519+
if agentruntime.IsPlanApprovalInvalidError(err) {
520+
return gateway.ApprovePlanResult{}, fmt.Errorf("%w: %v", gateway.ErrRuntimeInvalidAction, err)
521+
}
522+
return gateway.ApprovePlanResult{}, err
523+
}
524+
return gateway.ApprovePlanResult{
525+
PlanID: planID,
526+
Revision: input.Revision,
527+
Status: "approved",
528+
}, nil
529+
}
530+
500531
// ResolveUserQuestion 将网关 ask_user 回答转发到 runtime。
501532
func (b *gatewayRuntimePortBridge) ResolveUserQuestion(ctx context.Context, input gateway.UserQuestionAnswerInput) error {
502533
if err := b.ensureRuntimeAccess(input.SubjectID); err != nil {

internal/cli/gateway_runtime_bridge_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ type runtimeStub struct {
8989
checkpointDiffErr error
9090
}
9191

92+
type runtimePlanApproverStub struct {
93+
*runtimeStub
94+
approveInput agentruntime.ApproveCurrentPlanInput
95+
approveErr error
96+
}
97+
9298
const testBridgeSubjectID = bridgeLocalSubjectID
9399

94100
func (s *runtimeStub) Submit(_ context.Context, input agentruntime.PrepareInput) error {
@@ -132,6 +138,14 @@ func (s *runtimeStub) ResolvePermission(_ context.Context, input agentruntime.Pe
132138
return s.permissionErr
133139
}
134140

141+
func (s *runtimePlanApproverStub) ApproveCurrentPlan(
142+
_ context.Context,
143+
input agentruntime.ApproveCurrentPlanInput,
144+
) error {
145+
s.approveInput = input
146+
return s.approveErr
147+
}
148+
135149
func (s *runtimeStub) ResolveUserQuestion(_ context.Context, input agentruntime.UserQuestionResolutionInput) error {
136150
s.userQuestionInput = input
137151
return s.userQuestionErr
@@ -1075,6 +1089,101 @@ func TestGatewayRuntimePortBridgeListSessionTodosAndSnapshot(t *testing.T) {
10751089
})
10761090
}
10771091

1092+
func TestGatewayRuntimePortBridgeApprovePlan(t *testing.T) {
1093+
runtimeSvc := &runtimePlanApproverStub{
1094+
runtimeStub: &runtimeStub{eventsCh: make(chan agentruntime.RuntimeEvent, 1)},
1095+
}
1096+
bridge, err := newGatewayRuntimePortBridge(context.Background(), runtimeSvc, testSessionStore)
1097+
if err != nil {
1098+
t.Fatalf("new bridge: %v", err)
1099+
}
1100+
t.Cleanup(func() { _ = bridge.Close() })
1101+
1102+
result, err := bridge.ApprovePlan(context.Background(), gateway.ApprovePlanInput{
1103+
SubjectID: testBridgeSubjectID,
1104+
SessionID: " session-1 ",
1105+
PlanID: " plan-1 ",
1106+
Revision: 3,
1107+
})
1108+
if err != nil {
1109+
t.Fatalf("approve_plan: %v", err)
1110+
}
1111+
if runtimeSvc.approveInput.SessionID != "session-1" || runtimeSvc.approveInput.PlanID != "plan-1" || runtimeSvc.approveInput.Revision != 3 {
1112+
t.Fatalf("approve input = %#v, want trimmed session/plan revision", runtimeSvc.approveInput)
1113+
}
1114+
if result.PlanID != "plan-1" || result.Revision != 3 || result.Status != "approved" {
1115+
t.Fatalf("approve result = %#v, want approved plan-1 revision 3", result)
1116+
}
1117+
}
1118+
1119+
func TestGatewayRuntimePortBridgeApprovePlanUnsupportedRuntime(t *testing.T) {
1120+
bridge, err := newGatewayRuntimePortBridge(
1121+
context.Background(),
1122+
&runtimeStub{eventsCh: make(chan agentruntime.RuntimeEvent, 1)},
1123+
testSessionStore,
1124+
)
1125+
if err != nil {
1126+
t.Fatalf("new bridge: %v", err)
1127+
}
1128+
t.Cleanup(func() { _ = bridge.Close() })
1129+
1130+
_, err = bridge.ApprovePlan(context.Background(), gateway.ApprovePlanInput{
1131+
SubjectID: testBridgeSubjectID,
1132+
SessionID: "session-1",
1133+
PlanID: "plan-1",
1134+
Revision: 1,
1135+
})
1136+
if err == nil || !strings.Contains(err.Error(), "runtime does not support plan approval") {
1137+
t.Fatalf("approve_plan unsupported error = %v", err)
1138+
}
1139+
}
1140+
1141+
func TestGatewayRuntimePortBridgeApprovePlanInvalidAction(t *testing.T) {
1142+
runtimeSvc := &runtimePlanApproverStub{
1143+
runtimeStub: &runtimeStub{eventsCh: make(chan agentruntime.RuntimeEvent, 1)},
1144+
approveErr: agentruntime.ErrPlanApprovalRevisionMismatch,
1145+
}
1146+
bridge, err := newGatewayRuntimePortBridge(context.Background(), runtimeSvc, testSessionStore)
1147+
if err != nil {
1148+
t.Fatalf("new bridge: %v", err)
1149+
}
1150+
t.Cleanup(func() { _ = bridge.Close() })
1151+
1152+
_, err = bridge.ApprovePlan(context.Background(), gateway.ApprovePlanInput{
1153+
SubjectID: testBridgeSubjectID,
1154+
SessionID: "session-1",
1155+
PlanID: "plan-1",
1156+
Revision: 1,
1157+
})
1158+
if !errors.Is(err, gateway.ErrRuntimeInvalidAction) {
1159+
t.Fatalf("approve_plan error = %v, want ErrRuntimeInvalidAction", err)
1160+
}
1161+
}
1162+
1163+
func TestGatewayRuntimePortBridgeApprovePlanAccessDenied(t *testing.T) {
1164+
runtimeSvc := &runtimePlanApproverStub{
1165+
runtimeStub: &runtimeStub{eventsCh: make(chan agentruntime.RuntimeEvent, 1)},
1166+
}
1167+
bridge, err := newGatewayRuntimePortBridge(context.Background(), runtimeSvc, testSessionStore)
1168+
if err != nil {
1169+
t.Fatalf("new bridge: %v", err)
1170+
}
1171+
t.Cleanup(func() { _ = bridge.Close() })
1172+
1173+
_, err = bridge.ApprovePlan(context.Background(), gateway.ApprovePlanInput{
1174+
SubjectID: "other-subject",
1175+
SessionID: "session-1",
1176+
PlanID: "plan-1",
1177+
Revision: 1,
1178+
})
1179+
if !errors.Is(err, gateway.ErrRuntimeAccessDenied) {
1180+
t.Fatalf("approve_plan error = %v, want ErrRuntimeAccessDenied", err)
1181+
}
1182+
if runtimeSvc.approveInput.SessionID != "" {
1183+
t.Fatalf("runtime approve should not be called, input = %#v", runtimeSvc.approveInput)
1184+
}
1185+
}
1186+
10781187
func TestGatewayRuntimePortBridgeLoadSessionNotFoundBranches(t *testing.T) {
10791188
t.Parallel()
10801189

internal/gateway/bootstrap.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1683,11 +1683,68 @@ func handleResolvePermissionFrame(ctx context.Context, frame MessageFrame, runti
16831683
}
16841684
}
16851685

1686+
// handleApprovePlanFrame 处理计划批准请求,并把能力收敛到可选 runtime 端口。
1687+
func handleApprovePlanFrame(ctx context.Context, frame MessageFrame, runtimePort RuntimePort) MessageFrame {
1688+
if runtimePort == nil {
1689+
return runtimePortUnavailableFrame(frame)
1690+
}
1691+
subjectID, subjectErr := requireAuthenticatedSubjectID(ctx)
1692+
if subjectErr != nil {
1693+
return errorFrame(frame, subjectErr)
1694+
}
1695+
approvalPort, approvalErr := requirePlanApprovalRuntimePort(runtimePort)
1696+
if approvalErr != nil {
1697+
return errorFrame(frame, approvalErr)
1698+
}
1699+
1700+
input, err := decodeApprovePlanPayload(frame.Payload)
1701+
if err != nil {
1702+
return errorFrame(frame, err)
1703+
}
1704+
input.SubjectID = subjectID
1705+
if input.SessionID == "" {
1706+
input.SessionID = strings.TrimSpace(frame.SessionID)
1707+
}
1708+
if input.SessionID == "" {
1709+
return errorFrame(frame, NewMissingRequiredFieldError("payload.session_id"))
1710+
}
1711+
if input.PlanID == "" {
1712+
return errorFrame(frame, NewMissingRequiredFieldError("payload.plan_id"))
1713+
}
1714+
if input.Revision <= 0 {
1715+
return errorFrame(frame, NewFrameError(ErrorCodeInvalidAction, "invalid approve_plan revision"))
1716+
}
1717+
1718+
callCtx, cancel := withRuntimeOperationTimeout(ctx)
1719+
defer cancel()
1720+
result, approveErr := approvalPort.ApprovePlan(callCtx, input)
1721+
if approveErr != nil {
1722+
return runtimeCallFailedFrame(callCtx, frame, approveErr, "approve_plan")
1723+
}
1724+
1725+
return MessageFrame{
1726+
Type: FrameTypeAck,
1727+
Action: FrameActionApprovePlan,
1728+
RequestID: frame.RequestID,
1729+
SessionID: input.SessionID,
1730+
Payload: result,
1731+
}
1732+
}
1733+
16861734
// runtimePortUnavailableFrame 在 runtime 未注入时返回统一错误。
16871735
func runtimePortUnavailableFrame(frame MessageFrame) MessageFrame {
16881736
return errorFrame(frame, NewFrameError(ErrorCodeInternalError, "runtime port is unavailable"))
16891737
}
16901738

1739+
// requirePlanApprovalRuntimePort 校验当前 runtime 端口是否支持计划批准能力。
1740+
func requirePlanApprovalRuntimePort(runtimePort RuntimePort) (PlanApprovalRuntimePort, *FrameError) {
1741+
approvalPort, ok := runtimePort.(PlanApprovalRuntimePort)
1742+
if !ok {
1743+
return nil, NewFrameError(ErrorCodeInternalError, "plan approval runtime port is unavailable")
1744+
}
1745+
return approvalPort, nil
1746+
}
1747+
16911748
// requireManagementRuntimePort 校验当前 runtime 端口是否支持管理面扩展能力。
16921749
func requireManagementRuntimePort(runtimePort RuntimePort) (ManagementRuntimePort, *FrameError) {
16931750
managementPort, ok := runtimePort.(ManagementRuntimePort)
@@ -1785,6 +1842,9 @@ func runtimeCallFailedFrame(ctx context.Context, frame MessageFrame, err error,
17851842
case errors.Is(err, ErrRuntimeResourceNotFound):
17861843
errorCode = ErrorCodeResourceNotFound
17871844
message = fmt.Sprintf("%s target not found", normalizedOperation)
1845+
case errors.Is(err, ErrRuntimeInvalidAction):
1846+
errorCode = ErrorCodeInvalidAction
1847+
message = fmt.Sprintf("%s invalid action", normalizedOperation)
17881848
case errors.Is(err, context.DeadlineExceeded):
17891849
errorCode = ErrorCodeTimeout
17901850
message = fmt.Sprintf("%s timed out", normalizedOperation)

0 commit comments

Comments
 (0)