Skip to content

Commit 373db39

Browse files
authored
Revert "feat: human-in-the-loop support with AskUserQuestion" (#896)
Reverts #871
1 parent 538ccbd commit 373db39

27 files changed

Lines changed: 47 additions & 1264 deletions

File tree

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ dmypy.json
8888

8989
# Claude Code
9090
.claude/settings.local.json
91-
.claude/worktrees/
9291

9392
# mkdocs
9493
/site

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,9 @@ kind-port-forward: check-kubectl check-local-context ## Port-forward kind servic
734734
@echo ""
735735
@echo "$(COLOR_YELLOW)Press Ctrl+C to stop$(COLOR_RESET)"
736736
@echo ""
737-
@trap 'kill 0; echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \
738-
kubectl port-forward -n $(NAMESPACE) svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 & \
739-
kubectl port-forward -n $(NAMESPACE) svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 & \
737+
@trap 'echo ""; echo "$(COLOR_GREEN)✓$(COLOR_RESET) Port forwarding stopped"; exit 0' INT; \
738+
(kubectl port-forward -n ambient-code svc/frontend-service $(KIND_FWD_FRONTEND_PORT):3000 >/dev/null 2>&1 &); \
739+
(kubectl port-forward -n ambient-code svc/backend-service $(KIND_FWD_BACKEND_PORT):8080 >/dev/null 2>&1 &); \
740740
wait
741741

742742
dev-bootstrap: check-kubectl check-local-context ## Bootstrap developer workspace with API key and integrations

components/backend/handlers/sessions.go

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ var (
4141
GetGitHubToken func(context.Context, kubernetes.Interface, dynamic.Interface, string, string) (string, error)
4242
GetGitLabToken func(context.Context, kubernetes.Interface, string, string) (string, error)
4343
DeriveRepoFolderFromURL func(string) string
44-
// DeriveAgentStatusFromEvents derives agentStatus from the persisted event log.
45-
// Set by the websocket package at init to avoid circular imports.
46-
// sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions.
47-
DeriveAgentStatusFromEvents func(sessionID string) string
4844
// LEGACY: SendMessageToSession removed - AG-UI server uses HTTP/SSE instead of WebSocket
4945
)
5046

@@ -365,28 +361,6 @@ func parseStatus(status map[string]interface{}) *types.AgenticSessionStatus {
365361

366362
// V2 API Handlers - Multi-tenant session management
367363

368-
// enrichAgentStatus derives agentStatus from the persisted event log for
369-
// Running sessions. This is the source of truth — it replaces the stale
370-
// CR-cached value which was subject to goroutine race conditions.
371-
func enrichAgentStatus(session *types.AgenticSession) {
372-
if session.Status == nil || session.Status.Phase != "Running" {
373-
return
374-
}
375-
if DeriveAgentStatusFromEvents == nil {
376-
return
377-
}
378-
name, _ := session.Metadata["name"].(string)
379-
namespace, _ := session.Metadata["namespace"].(string)
380-
if name == "" || namespace == "" {
381-
return
382-
}
383-
// Use namespace-qualified key to avoid cross-project collisions in the event store
384-
sessionID := namespace + "/" + name
385-
if derived := DeriveAgentStatusFromEvents(sessionID); derived != "" {
386-
session.Status.AgentStatus = types.StringPtr(derived)
387-
}
388-
}
389-
390364
func ListSessions(c *gin.Context) {
391365
project := c.GetString("project")
392366

@@ -457,11 +431,6 @@ func ListSessions(c *gin.Context) {
457431
totalCount := len(sessions)
458432
paginatedSessions, hasMore, nextOffset := paginateSessions(sessions, params.Offset, params.Limit)
459433

460-
// Derive agentStatus from event log only for paginated sessions (performance optimization)
461-
for i := range paginatedSessions {
462-
enrichAgentStatus(&paginatedSessions[i])
463-
}
464-
465434
response := types.PaginatedResponse{
466435
Items: paginatedSessions,
467436
TotalCount: totalCount,
@@ -676,9 +645,9 @@ func CreateSession(c *gin.Context) {
676645
timeout = *req.Timeout
677646
}
678647

679-
// Generate unique name (millisecond timestamp for burst-creation safety)
648+
// Generate unique name (timestamp-based)
680649
// Note: Runner will create branch as "ambient/{session-name}"
681-
timestamp := time.Now().UnixMilli()
650+
timestamp := time.Now().Unix()
682651
name := fmt.Sprintf("session-%d", timestamp)
683652

684653
// Create the custom resource
@@ -934,9 +903,6 @@ func GetSession(c *gin.Context) {
934903
session.Status = parseStatus(status)
935904
}
936905

937-
// Derive agentStatus from event log (source of truth) for running sessions
938-
enrichAgentStatus(&session)
939-
940906
session.AutoBranch = ComputeAutoBranch(sessionName)
941907

942908
c.JSON(http.StatusOK, session)

components/backend/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ func main() {
166166

167167
// Initialize websocket package
168168
websocket.StateBaseDir = server.StateBaseDir
169-
handlers.DeriveAgentStatusFromEvents = websocket.DeriveAgentStatus
170169

171170
// Normal server mode
172171
if err := server.Run(registerRoutes); err != nil {

components/backend/types/agui.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,6 @@ const (
6969
EventTypeMeta = "META"
7070
)
7171

72-
// Agent status values derived from the AG-UI event stream.
73-
const (
74-
AgentStatusWorking = "working"
75-
AgentStatusIdle = "idle"
76-
AgentStatusWaitingInput = "waiting_input"
77-
)
78-
7972
// AG-UI Message Roles
8073
// See: https://docs.ag-ui.com/concepts/messages
8174
const (

components/backend/types/session.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type AgenticSessionStatus struct {
4242
StartTime *string `json:"startTime,omitempty"`
4343
CompletionTime *string `json:"completionTime,omitempty"`
4444
LastActivityTime *string `json:"lastActivityTime,omitempty"`
45-
AgentStatus *string `json:"agentStatus,omitempty"`
4645
StoppedReason *string `json:"stoppedReason,omitempty"`
4746
ReconciledRepos []ReconciledRepo `json:"reconciledRepos,omitempty"`
4847
ReconciledWorkflow *ReconciledWorkflow `json:"reconciledWorkflow,omitempty"`

components/backend/websocket/agui_proxy.go

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,10 @@ func HandleAGUIRunProxy(c *gin.Context) {
257257

258258
log.Printf("AGUI Proxy: run=%s session=%s/%s msgs=%d", truncID(runID), projectName, sessionName, len(rawMessages))
259259

260-
// Use namespace-qualified session ID to avoid cross-project collisions
261-
namespacedSessionID := projectName + "/" + sessionName
262-
263260
sessionLastSeen.Store(sessionName, time.Now())
264261

265262
// Store project→session mapping for activity tracking in persistStreamedEvent
266-
sessionProjectMap.Store(namespacedSessionID, projectName)
263+
sessionProjectMap.Store(sessionName, projectName)
267264

268265
// Resolve and cache the runner port for this session from the registry.
269266
cacheSessionPort(projectName, sessionName)
@@ -300,7 +297,7 @@ func HandleAGUIRunProxy(c *gin.Context) {
300297
runnerURL := getRunnerEndpoint(projectName, sessionName)
301298

302299
// Start background goroutine to proxy runner SSE → persist + broadcast
303-
go proxyRunnerStream(runnerURL, bodyBytes, sessionName, namespacedSessionID, runID, threadID)
300+
go proxyRunnerStream(runnerURL, bodyBytes, sessionName, runID, threadID)
304301

305302
// Return metadata immediately — events arrive via GET /agui/events
306303
c.JSON(http.StatusOK, gin.H{
@@ -312,22 +309,21 @@ func HandleAGUIRunProxy(c *gin.Context) {
312309
// proxyRunnerStream connects to the runner's SSE endpoint, reads events,
313310
// persists them, and publishes them to the live broadcast pipe. Runs in
314311
// a background goroutine so the POST /agui/run handler can return immediately.
315-
// namespacedSessionID is the namespace-qualified session ID (e.g., "namespace/sessionName") for event persistence.
316-
func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespacedSessionID, runID, threadID string) {
312+
func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, runID, threadID string) {
317313
log.Printf("AGUI Proxy: connecting to runner at %s", runnerURL)
318314
resp, err := connectToRunner(runnerURL, bodyBytes)
319315
if err != nil {
320316
log.Printf("AGUI Proxy: runner unavailable for %s: %v", sessionName, err)
321317
// Publish error events so GET /agui/events subscribers see the failure
322-
publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, "Runner is not available")
318+
publishAndPersistErrorEvents(sessionName, runID, threadID, "Runner is not available")
323319
return
324320
}
325321
defer resp.Body.Close()
326322

327323
if resp.StatusCode != http.StatusOK {
328324
body, _ := io.ReadAll(resp.Body)
329325
log.Printf("AGUI Proxy: runner returned %d: %s", resp.StatusCode, string(body))
330-
publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode))
326+
publishAndPersistErrorEvents(sessionName, runID, threadID, fmt.Sprintf("Runner error: HTTP %d", resp.StatusCode))
331327
return
332328
}
333329

@@ -347,7 +343,7 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespac
347343
// Persist every data event to JSONL
348344
if strings.HasPrefix(trimmed, "data: ") {
349345
jsonData := strings.TrimPrefix(trimmed, "data: ")
350-
persistStreamedEvent(namespacedSessionID, runID, threadID, jsonData)
346+
persistStreamedEvent(sessionName, runID, threadID, jsonData)
351347
}
352348

353349
// Publish raw SSE line to all GET /agui/events subscribers
@@ -360,15 +356,14 @@ func proxyRunnerStream(runnerURL string, bodyBytes []byte, sessionName, namespac
360356
// publishAndPersistErrorEvents generates RUN_STARTED + RUN_ERROR events,
361357
// persists them, and publishes to the live broadcast so subscribers get
362358
// notified of runner failures.
363-
// sessionName is used for broadcasting; namespacedSessionID is used for persistence.
364-
func publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threadID, message string) {
359+
func publishAndPersistErrorEvents(sessionName, runID, threadID, message string) {
365360
// RUN_STARTED
366361
startEvt := map[string]interface{}{
367362
"type": "RUN_STARTED",
368363
"threadId": threadID,
369364
"runId": runID,
370365
}
371-
persistEvent(namespacedSessionID, startEvt)
366+
persistEvent(sessionName, startEvt)
372367
startData, _ := json.Marshal(startEvt)
373368
publishLine(sessionName, fmt.Sprintf("data: %s\n\n", startData))
374369

@@ -379,7 +374,7 @@ func publishAndPersistErrorEvents(sessionName, namespacedSessionID, runID, threa
379374
"threadId": threadID,
380375
"runId": runID,
381376
}
382-
persistEvent(namespacedSessionID, errEvt)
377+
persistEvent(sessionName, errEvt)
383378
errData, _ := json.Marshal(errEvt)
384379
publishLine(sessionName, fmt.Sprintf("data: %s\n\n", errData))
385380
}
@@ -441,19 +436,15 @@ func persistStreamedEvent(sessionID, runID, threadID, jsonData string) {
441436

442437
persistEvent(sessionID, event)
443438

444-
// Extract event type; projectName is derived from the
439+
// Update lastActivityTime on CR for activity events (debounced).
440+
// Extract event type to check; projectName is derived from the
445441
// sessionID-to-project mapping populated by HandleAGUIRunProxy.
446442
eventType, _ := event["type"].(string)
447-
448-
// Update lastActivityTime on CR for activity events (debounced).
449443
if isActivityEvent(eventType) {
450444
if projectName, ok := sessionProjectMap.Load(sessionID); ok {
451445
updateLastActivityTime(projectName.(string), sessionID, eventType == types.EventTypeRunStarted)
452446
}
453447
}
454-
455-
// agentStatus is derived at query time from the event log (DeriveAgentStatus).
456-
// No CR updates needed here — the persisted events ARE the source of truth.
457448
}
458449

459450
// ─── POST /agui/interrupt ────────────────────────────────────────────
@@ -954,16 +945,3 @@ func updateLastActivityTime(projectName, sessionName string, immediate bool) {
954945
}
955946
}()
956947
}
957-
958-
// isAskUserQuestionToolCall checks if a tool call name is the AskUserQuestion HITL tool.
959-
// Uses case-insensitive comparison after stripping non-alpha characters,
960-
// matching the frontend pattern in use-agent-status.ts.
961-
func isAskUserQuestionToolCall(name string) bool {
962-
var clean strings.Builder
963-
for _, r := range strings.ToLower(name) {
964-
if r >= 'a' && r <= 'z' {
965-
clean.WriteRune(r)
966-
}
967-
}
968-
return clean.String() == "askuserquestion"
969-
}

components/backend/websocket/agui_store.go

Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ package websocket
1111

1212
import (
1313
"ambient-code-backend/types"
14-
"bytes"
1514
"encoding/json"
1615
"fmt"
1716
"log"
@@ -195,110 +194,6 @@ func loadEvents(sessionID string) []map[string]interface{} {
195194
return events
196195
}
197196

198-
// DeriveAgentStatus reads a session's event log and returns the agent
199-
// status derived from the last significant events.
200-
//
201-
// sessionID should be namespace-qualified (e.g., "namespace/sessionName") to avoid cross-project collisions.
202-
// Returns "" if the status cannot be determined (no events, file missing, etc.).
203-
func DeriveAgentStatus(sessionID string) string {
204-
// sessionID is now namespace-qualified, e.g., "default/session-123"
205-
path := fmt.Sprintf("%s/sessions/%s/agui-events.jsonl", StateBaseDir, sessionID)
206-
207-
// Read only the tail of the file to avoid loading entire event log into memory.
208-
// 64KB is sufficient for recent lifecycle events (scanning backwards).
209-
const maxTailBytes = 64 * 1024
210-
211-
file, err := os.Open(path)
212-
if err != nil {
213-
return ""
214-
}
215-
defer file.Close()
216-
217-
stat, err := file.Stat()
218-
if err != nil {
219-
return ""
220-
}
221-
222-
fileSize := stat.Size()
223-
var data []byte
224-
225-
if fileSize <= maxTailBytes {
226-
// File is small, read it all
227-
data, err = os.ReadFile(path)
228-
if err != nil {
229-
return ""
230-
}
231-
} else {
232-
// File is large, seek to tail and read last N bytes
233-
offset := fileSize - maxTailBytes
234-
_, err = file.Seek(offset, 0)
235-
if err != nil {
236-
return ""
237-
}
238-
239-
data = make([]byte, maxTailBytes)
240-
n, err := file.Read(data)
241-
if err != nil {
242-
return ""
243-
}
244-
data = data[:n]
245-
246-
// Skip partial first line (we seeked into the middle of a line)
247-
if idx := bytes.IndexByte(data, '\n'); idx >= 0 {
248-
data = data[idx+1:]
249-
}
250-
}
251-
252-
lines := splitLines(data)
253-
254-
// Scan backwards. We only care about lifecycle and AskUserQuestion events.
255-
// RUN_STARTED → "working"
256-
// RUN_FINISHED / RUN_ERROR → "idle", unless same run had AskUserQuestion
257-
// TOOL_CALL_START (AskUserQuestion) → "waiting_input"
258-
var runEndRunID string // set when we hit RUN_FINISHED/RUN_ERROR and need to look deeper
259-
for i := len(lines) - 1; i >= 0; i-- {
260-
if len(lines[i]) == 0 {
261-
continue
262-
}
263-
var evt map[string]interface{}
264-
if err := json.Unmarshal(lines[i], &evt); err != nil {
265-
continue
266-
}
267-
evtType, _ := evt["type"].(string)
268-
269-
switch evtType {
270-
case types.EventTypeRunStarted:
271-
if runEndRunID != "" {
272-
// We were scanning for an AskUserQuestion but hit RUN_STARTED first → idle
273-
return types.AgentStatusIdle
274-
}
275-
return types.AgentStatusWorking
276-
277-
case types.EventTypeRunFinished, types.EventTypeRunError:
278-
if runEndRunID == "" {
279-
// First run-end seen; scan deeper within this run for AskUserQuestion
280-
runEndRunID, _ = evt["runId"].(string)
281-
}
282-
283-
case types.EventTypeToolCallStart:
284-
if runEndRunID != "" {
285-
// Only relevant if we're scanning within the ended run
286-
if evtRunID, _ := evt["runId"].(string); evtRunID != "" && evtRunID != runEndRunID {
287-
return types.AgentStatusIdle
288-
}
289-
}
290-
if toolName, _ := evt["toolCallName"].(string); isAskUserQuestionToolCall(toolName) {
291-
return types.AgentStatusWaitingInput
292-
}
293-
}
294-
}
295-
296-
if runEndRunID != "" {
297-
return types.AgentStatusIdle
298-
}
299-
return ""
300-
}
301-
302197
// ─── Compaction ──────────────────────────────────────────────────────
303198
//
304199
// Go port of @ag-ui/client compactEvents. Concatenates streaming deltas

0 commit comments

Comments
 (0)