Skip to content

Commit c53db71

Browse files
committed
modified: internal/agent/agent.go
modified: internal/agent/agent_test.go modified: internal/agent/analysis_executor.go modified: internal/agent/analysis_executor_test.go modified: internal/agent/cognitive_bus_test.go new file: internal/agent/common_test.go modified: internal/agent/executors.go modified: internal/agent/executors_test.go modified: internal/agent/llm_mind.go modified: internal/agent/llm_mind_test.go modified: internal/analysis/active/taint/taint_analyzer.go modified: internal/analysis/active/timeslip/e2e_test.go modified: internal/analysis/active/timeslip/timeslip_analyzer.go modified: internal/browser/network/customhttp/client.go modified: internal/browser/network/customhttp/client_integration_test.go modified: internal/browser/network/customhttp/client_test.go modified: internal/browser/network/customhttp/h1client.go modified: internal/browser/network/customhttp/h2client.go modified: internal/browser/network/customhttp/h2client_test.go modified: internal/browser/session/harvester.go modified: internal/browser/session/harvester_test.go modified: internal/browser/stealth/evasions.js modified: internal/browser/stealth/evasions.test.js modified: internal/browser/stealth/stealth.go modified: internal/browser/stealth/stealth_test.go
1 parent 3c8c1d7 commit c53db71

25 files changed

Lines changed: 2556 additions & 404 deletions

internal/agent/agent.go

Lines changed: 96 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func New(ctx context.Context, mission Mission, globalCtx *core.GlobalContext, se
132132
// If initialization fails (e.g., cannot determine project root), log the error
133133
// but allow the agent to continue without evolution capabilities.
134134
logger.Error("Failed to initialize Evolution system (ImprovementAnalyst). Proceeding without it.", zap.Error(err))
135+
evoAnalyst = nil // FIX: Explicitly set to nil for consistency and clarity.
135136
}
136137

137138
agent := &Agent{
@@ -254,6 +255,8 @@ func (a *Agent) RunMission(ctx context.Context) (*MissionResult, error) {
254255
}
255256
}
256257

258+
// actionLoop is the primary consumer of actions posted to the CognitiveBus.
259+
// It dispatches actions to the appropriate handlers (executors or internal methods).
257260
func (a *Agent) actionLoop(ctx context.Context, actionChan <-chan CognitiveMessage) {
258261
defer a.wg.Done()
259262

@@ -264,73 +267,108 @@ func (a *Agent) actionLoop(ctx context.Context, actionChan <-chan CognitiveMessa
264267
return
265268
}
266269

267-
action, ok := msg.Payload.(Action)
268-
if !ok {
269-
a.logger.Error("Received invalid payload for ACTION message", zap.Any("payload", msg.Payload))
270-
a.bus.Acknowledge(msg)
271-
continue
270+
// FIX: Refactor processing into a separate function to handle panic recovery and acknowledgment robustly.
271+
// Process the message and check if we should stop the loop (e.g., after CONCLUDE).
272+
if stop := a.processActionMessage(ctx, msg); stop {
273+
return
272274
}
273275

274-
var execResult *ExecutionResult
275-
var execErr error
276-
277-
switch action.Type {
278-
case ActionConclude:
279-
a.logger.Info("Mind decided to conclude mission.", zap.String("rationale", action.Rationale))
280-
result, err := a.concludeMission(ctx)
281-
if err != nil {
282-
a.logger.Error("Failed to generate final mission result", zap.Error(err))
283-
a.bus.Acknowledge(msg)
284-
continue
285-
}
286-
if result != nil {
287-
// CRITICAL: Acknowledge BEFORE calling finish().
288-
// finish() calls bus.Shutdown(), which waits for this acknowledgment.
289-
a.bus.Acknowledge(msg)
290-
a.finish(ctx, *result)
291-
}
292-
return // End the action loop.
293-
294-
case ActionEvolveCodebase:
295-
a.logger.Info("Agent decided to initiate self-improvement (Evolution).", zap.String("rationale", action.Rationale))
296-
execResult = a.executeEvolution(ctx, action)
276+
case <-ctx.Done():
277+
return
278+
}
279+
}
280+
}
297281

282+
// processActionMessage handles a single action message, including panic recovery and acknowledgment.
283+
// It returns true if the action loop should stop (e.g., after ActionConclude), false otherwise.
284+
func (a *Agent) processActionMessage(ctx context.Context, msg CognitiveMessage) (stopLoop bool) {
285+
// CRITICAL FIX: Ensure the message is always acknowledged, even if processing panics.
286+
// This prevents deadlocks during CognitiveBus.Shutdown().
287+
acknowledged := false
288+
defer func() {
289+
if r := recover(); r != nil {
290+
a.logger.Error("Panic recovered in actionLoop processing. Acknowledging message to prevent deadlock.",
291+
zap.Any("panic_value", r),
292+
zap.String("message_id", msg.ID),
293+
zap.Stack("stack"),
294+
)
295+
// Ensure acknowledgment happens if it hasn't already.
296+
if !acknowledged {
297+
a.bus.Acknowledge(msg)
298298
}
299+
}
300+
}()
299301

300-
// If execResult is not yet set, it means the action should be handled by the ExecutorRegistry.
301-
if execResult == nil {
302-
a.logger.Debug("Dispatching action to ExecutorRegistry", zap.String("type", string(action.Type)))
303-
execResult, execErr = a.executors.Execute(ctx, action)
304-
}
302+
// --- Start of message processing ---
305303

306-
// Centralized error and nil-result handling.
307-
if execErr != nil {
308-
a.logger.Error("Action execution failed with a raw error", zap.String("action_type", string(action.Type)), zap.Error(execErr))
309-
execResult = &ExecutionResult{
310-
Status: "failed",
311-
ObservationType: ObservedSystemState,
312-
ErrorCode: ErrCodeExecutionFailure,
313-
ErrorDetails: map[string]interface{}{"message": execErr.Error()},
314-
}
315-
} else if execResult == nil {
316-
// This is a safeguard against a logic error where an action handler returns (nil, nil).
317-
a.logger.Error("CRITICAL: Action handler returned nil result and nil error.", zap.String("action_type", string(action.Type)))
318-
// Create a fallback result to prevent nil pointer in postObservation
319-
execResult = &ExecutionResult{
320-
Status: "failed",
321-
ObservationType: ObservedSystemState,
322-
ErrorCode: ErrCodeExecutionFailure,
323-
ErrorDetails: map[string]interface{}{"message": "Internal Error: Action handler returned nil result."},
324-
}
325-
}
304+
action, ok := msg.Payload.(Action)
305+
if !ok {
306+
a.logger.Error("Received invalid payload for ACTION message", zap.Any("payload", msg.Payload))
307+
a.bus.Acknowledge(msg)
308+
acknowledged = true
309+
return false // Continue loop
310+
}
326311

327-
a.postObservation(ctx, action, execResult)
312+
var execResult *ExecutionResult
313+
var execErr error
314+
315+
switch action.Type {
316+
case ActionConclude:
317+
a.logger.Info("Mind decided to conclude mission.", zap.String("rationale", action.Rationale))
318+
result, err := a.concludeMission(ctx)
319+
if err != nil {
320+
a.logger.Error("Failed to generate final mission result", zap.Error(err))
328321
a.bus.Acknowledge(msg)
322+
acknowledged = true
323+
return false // Continue loop, let mind potentially retry conclusion or do something else
324+
}
325+
if result != nil {
326+
// CRITICAL: Acknowledge BEFORE calling finish().
327+
// FIX: Updated comment to be accurate.
328+
// finish() sends the result; RunMission waits for result then calls bus.Shutdown().
329+
a.bus.Acknowledge(msg)
330+
acknowledged = true
331+
// FIX: Pass context to finish to prevent goroutine leak.
332+
a.finish(ctx, *result)
333+
}
334+
return true // Stop the loop.
329335

330-
case <-ctx.Done():
331-
return
336+
case ActionEvolveCodebase:
337+
a.logger.Info("Agent decided to initiate self-improvement (Evolution).", zap.String("rationale", action.Rationale))
338+
execResult = a.executeEvolution(ctx, action)
339+
}
340+
341+
// If execResult is not yet set, it means the action should be handled by the ExecutorRegistry.
342+
if execResult == nil {
343+
a.logger.Debug("Dispatching action to ExecutorRegistry", zap.String("type", string(action.Type)))
344+
execResult, execErr = a.executors.Execute(ctx, action)
345+
}
346+
347+
// Centralized error and nil-result handling.
348+
if execErr != nil {
349+
a.logger.Error("Action execution failed with a raw error", zap.String("action_type", string(action.Type)), zap.Error(execErr))
350+
execResult = &ExecutionResult{
351+
Status: "failed",
352+
ObservationType: ObservedSystemState,
353+
ErrorCode: ErrCodeExecutionFailure,
354+
ErrorDetails: map[string]interface{}{"message": execErr.Error()},
355+
}
356+
} else if execResult == nil {
357+
// This is a safeguard against a logic error where an action handler returns (nil, nil).
358+
a.logger.Error("CRITICAL: Action handler returned nil result and nil error.", zap.String("action_type", string(action.Type)))
359+
// Create a fallback result to prevent nil pointer in postObservation
360+
execResult = &ExecutionResult{
361+
Status: "failed",
362+
ObservationType: ObservedSystemState,
363+
ErrorCode: ErrCodeExecutionFailure,
364+
ErrorDetails: map[string]interface{}{"message": "Internal Error: Action handler returned nil result."},
332365
}
333366
}
367+
368+
a.postObservation(ctx, action, execResult)
369+
a.bus.Acknowledge(msg)
370+
acknowledged = true
371+
return false // Continue loop
334372
}
335373

336374
// executeEvolution handles the EVOLVE_CODEBASE action by invoking the EvolutionEngine.
@@ -472,6 +510,7 @@ func (a *Agent) concludeMission(ctx context.Context) (*MissionResult, error) {
472510
return nil, fmt.Errorf("failed to gather final context for summary: %w", err)
473511
}
474512

513+
// FIX: Use MarshalIndent for better readability and debugging.
475514
subgraphJSON, err := json.MarshalIndent(subgraph, "", " ")
476515
if err != nil {
477516
return nil, fmt.Errorf("failed to marshal subgraph for summary prompt: %w", err)
@@ -566,6 +605,7 @@ func (a *Agent) finish(ctx context.Context, result MissionResult) {
566605
a.mind.Stop()
567606
// Bus shutdown is handled in RunMission after the result is successfully received.
568607

608+
// FIX: Use select to send result, preventing blocking forever if the runner (RunMission)
569609
// Use select to send result, preventing blocking forever if the runner (RunMission)
570610
// has already exited (e.g., due to timeout/cancellation).
571611
select {

internal/agent/agent_test.go

Lines changed: 118 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,70 @@ func TestAgent_RunMission_Success(t *testing.T) {
273273
mockLTM.AssertExpectations(t) // Verify LTM mock expectations
274274
}
275275

276+
// NEW TEST: TestAgent_ActionLoop_PanicRecovery verifies that the action loop recovers from panics
277+
// during message processing and acknowledges the message to prevent shutdown deadlocks.
278+
func TestAgent_ActionLoop_PanicRecovery(t *testing.T) {
279+
// 1. Setup
280+
// We need a short timeout for the overall test execution to detect deadlocks.
281+
testCtx, cancelTest := context.WithTimeout(context.Background(), 5*time.Second)
282+
defer cancelTest()
283+
284+
// Initialize the agent and its dependencies.
285+
agent, _, bus, mockExecutors, _, _, _, _, _, _ := setupAgentTest(t)
286+
287+
// Use a separate context for the actionLoop itself, which we won't cancel until the end.
288+
loopCtx, cancelLoop := context.WithCancel(context.Background())
289+
290+
// Subscribe to the action channel that the loop will consume from.
291+
actionChan, unsubscribeActions := bus.Subscribe(MessageTypeAction)
292+
defer unsubscribeActions()
293+
294+
// 2. Configure the mock executor to panic when a specific action is executed.
295+
panickingAction := Action{Type: ActionClick, ID: "panic-action"}
296+
mockExecutors.On("Execute", mock.Anything, panickingAction).Run(func(args mock.Arguments) {
297+
panic("Simulated executor panic!")
298+
}).Return(nil, errors.New("this error is ignored because of panic")).Once()
299+
300+
// 3. Start the action loop in a separate goroutine.
301+
agent.wg.Add(1)
302+
loopFinishedChan := make(chan struct{})
303+
go func() {
304+
// Catch any panic propagating out of the loop just in case the internal recovery fails.
305+
defer func() {
306+
if r := recover(); r != nil {
307+
t.Logf("Test caught panic propagating out of actionLoop (unexpected): %v", r)
308+
}
309+
close(loopFinishedChan)
310+
}()
311+
agent.actionLoop(loopCtx, actionChan)
312+
}()
313+
314+
// 4. Post the message that will cause the panic.
315+
err := bus.Post(testCtx, CognitiveMessage{ID: "test-msg-panic", Type: MessageTypeAction, Payload: panickingAction})
316+
require.NoError(t, err)
317+
318+
// 5. Verify Acknowledgment by attempting to shut down the bus.
319+
// If the message wasn't acknowledged (because the loop crashed before recovery), bus.Shutdown() will hang.
320+
shutdownDone := make(chan struct{})
321+
go func() {
322+
// Shutdown waits for all in-flight messages to be acknowledged.
323+
bus.Shutdown()
324+
close(shutdownDone)
325+
}()
326+
327+
select {
328+
case <-shutdownDone:
329+
// Success: Bus shut down cleanly, meaning the message was acknowledged despite the panic.
330+
case <-testCtx.Done():
331+
t.Fatal("Timeout waiting for bus shutdown. Message likely unacknowledged due to panic in actionLoop.")
332+
}
333+
334+
// 6. Clean up the running loop.
335+
cancelLoop()
336+
// Wait for the loop goroutine to finish (safe because we know it didn't deadlock).
337+
<-loopFinishedChan
338+
}
339+
276340
// TestAgent_RunMission_MindFailure verifies the agent fails fast if the Mind fails to start.
277341
func TestAgent_RunMission_MindFailure(t *testing.T) {
278342
// Arrange
@@ -353,6 +417,58 @@ func TestAgent_RunMission_ContextCancellation(t *testing.T) {
353417
mockLLM.AssertExpectations(t)
354418
}
355419

420+
// NEW TEST: TestAgent_RunMission_CancellationBeforeFinish verifies that the actionLoop
421+
// does not leak if the context is cancelled right when the agent tries to finish.
422+
func TestAgent_RunMission_CancellationBeforeFinish(t *testing.T) {
423+
// This tests the fix where finish() now accepts a context and uses select{} when sending the result.
424+
425+
agent, mockMind, _, _, _, mockKG, mockLLM, mockLTM, _, _ := setupAgentTest(t)
426+
// Create a context that we can cancel.
427+
ctx, cancel := context.WithCancel(context.Background())
428+
429+
// Set expectations for the initial startup
430+
mockMind.On("SetMission", agent.mission).Return().Once()
431+
// Mind.Start should run until the context passed to it (missionCtx) is cancelled.
432+
mockMind.On("Start", mock.Anything).Run(func(args mock.Arguments) {
433+
startCtx := args.Get(0).(context.Context)
434+
<-startCtx.Done() // Block until cancelled
435+
}).Return(context.Canceled).Once()
436+
mockMind.On("Stop").Return().Once()
437+
mockLTM.On("Start").Return().Once()
438+
439+
// Set expectations for the conclusion (which will happen after cancellation in RunMission)
440+
// These mocks are needed because RunMission calls concludeMission upon cancellation.
441+
mockKG.On("GetNode", mock.Anything, mock.Anything).Return(schemas.Node{}, nil).Maybe()
442+
mockKG.On("GetEdges", mock.Anything, mock.Anything).Return([]schemas.Edge{}, nil).Maybe()
443+
mockLLM.On("Generate", mock.Anything, mock.Anything).Return("Cancelled summary.", nil).Maybe()
444+
445+
// Act
446+
var runMissionWg sync.WaitGroup
447+
runMissionWg.Add(1)
448+
go func() {
449+
defer runMissionWg.Done()
450+
// RunMission will block until cancelled.
451+
_, _ = agent.RunMission(ctx)
452+
}()
453+
454+
// Allow the agent and its actionLoop to start up.
455+
time.Sleep(100 * time.Millisecond)
456+
457+
// We need to ensure the actionLoop attempts to process the message *after* we cancel the context.
458+
// This simulates the race condition where RunMission exits due to cancellation
459+
// before the actionLoop finishes sending the result via finish().
460+
cancel() // Cancel the context, causing RunMission to start shutting down.
461+
462+
// Wait for RunMission to return. This confirms the receiver (RunMission) is gone.
463+
runMissionWg.Wait()
464+
465+
// Crucial Assertion: Wait for the agent's internal WaitGroup (which includes the actionLoop).
466+
// If the actionLoop leaks (because finish() blocks), this will time out.
467+
assert.True(t, waitTimeout(&agent.wg, 2*time.Second), "Agent WaitGroup did not complete, potential goroutine leak in actionLoop/finish.")
468+
469+
mockMind.AssertExpectations(t)
470+
}
471+
356472
// TestAgent_ActionLoop verifies the correct dispatching of various action types.
357473
func TestAgent_ActionLoop(t *testing.T) {
358474
// Helper to setup and run the action loop in the background
@@ -561,54 +677,8 @@ func TestAgent_ActionLoop(t *testing.T) {
561677
}
562678
})
563679

564-
// NEW: Test for complex actions being dispatched to the executor
565-
t.Run("ExecuteLoginSequenceAction_DispatchedToExecutor", func(t *testing.T) {
566-
agent, bus, cancelRoot, _ := setupActionLoop(t)
567-
defer cancelRoot()
568-
mockExecutors := agent.executors.(*MockExecutorRegistry)
569-
570-
action := Action{Type: ActionExecuteLoginSequence, Rationale: "Attempting login"}
571-
obsChan, unsub := bus.Subscribe(MessageTypeObservation)
572-
defer unsub()
573-
574-
execResult := &ExecutionResult{Status: "success", ObservationType: ObservedAuthResult}
575-
mockExecutors.On("Execute", mock.Anything, action).Return(execResult, nil).Once()
576-
577-
err := bus.Post(context.Background(), CognitiveMessage{ID: "login-msg", Type: MessageTypeAction, Payload: action})
578-
require.NoError(t, err)
579-
580-
select {
581-
case msg := <-obsChan:
582-
bus.Acknowledge(msg)
583-
mockExecutors.AssertExpectations(t)
584-
case <-time.After(2 * time.Second):
585-
t.Fatal("Timeout waiting for ActionExecuteLoginSequence to be dispatched")
586-
}
587-
})
588-
589-
t.Run("ExploreApplicationAction_DispatchedToExecutor", func(t *testing.T) {
590-
agent, bus, cancelRoot, _ := setupActionLoop(t)
591-
defer cancelRoot()
592-
mockExecutors := agent.executors.(*MockExecutorRegistry)
593-
594-
action := Action{Type: ActionExploreApplication, Rationale: "Exploring the app"}
595-
obsChan, unsub := bus.Subscribe(MessageTypeObservation)
596-
defer unsub()
597-
598-
execResult := &ExecutionResult{Status: "success", ObservationType: ObservedDOMChange}
599-
mockExecutors.On("Execute", mock.Anything, action).Return(execResult, nil).Once()
600-
601-
err := bus.Post(context.Background(), CognitiveMessage{ID: "explore-msg", Type: MessageTypeAction, Payload: action})
602-
require.NoError(t, err)
603-
604-
select {
605-
case msg := <-obsChan:
606-
bus.Acknowledge(msg)
607-
mockExecutors.AssertExpectations(t)
608-
case <-time.After(2 * time.Second):
609-
t.Fatal("Timeout waiting for ActionExploreApplication to be dispatched")
610-
}
611-
})
680+
// REMOVED: ExecuteLoginSequenceAction and ExploreApplicationAction tests are removed here
681+
// because they are now explicitly covered by the ExecutorRegistry tests (TestExecutorRegistry_Execute/RegisteredComplexActions_RoutedToBrowserExecutor).
612682

613683
t.Run("FuzzEndpointAction_DispatchedToExecutor", func(t *testing.T) {
614684
agent, bus, cancelRoot, _ := setupActionLoop(t)

0 commit comments

Comments
 (0)