Skip to content

Commit 0d8ecfe

Browse files
committed
feat: wire IntegrationPipeline into agent loop - all 200 features now active in real-time
Pre-query: intent classify, tool select, budget allocate, injection scan, cache check Post-response: format, score quality, redact secrets, cache store, learn Post-tool: stall detect, lint loop, test loop, error recovery End-session: self-assess, record experience, update knowledge
1 parent e09d3f0 commit 0d8ecfe

2 files changed

Lines changed: 71 additions & 0 deletions

File tree

engine/session.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type Session struct {
8888
TestLoop *TestLoop // test_loop.go — auto test-fix loop
8989
FileMentions *FileMentionDetector // file_mentions.go — detect referenced files
9090
ResponseCache *ResponseCache // response_cache.go — cache similar prompts
91+
Pipeline *IntegrationPipeline // integration.go — unified feature orchestration
9192
}
9293

9394
// NewSession creates a new conversation session.
@@ -115,6 +116,7 @@ func NewSession(provider, model, systemPrompt string, registry *tool.Registry) *
115116
TestLoop: NewTestLoop(),
116117
FileMentions: NewFileMentionDetector("."),
117118
ResponseCache: NewResponseCache(1000, 24*time.Hour),
119+
Pipeline: NewIntegrationPipeline(),
118120
}
119121
s.Cost.Model = model
120122
s.Router = modelPkg.NewRouter(modelPkg.StrategyBalanced)

engine/stream.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,37 @@ func (s *Session) agentLoop(ctx context.Context, ch chan<- StreamEvent) {
160160
}
161161
}
162162

163+
// Integration pipeline: pre-query (intent, tools, budget, injection scan, cache)
164+
if s.Pipeline != nil {
165+
lastUserMsg := ""
166+
for i := len(s.messages) - 1; i >= 0; i-- {
167+
if s.messages[i].Role == "user" && s.messages[i].ToolResult == nil {
168+
lastUserMsg = s.messages[i].Content
169+
break
170+
}
171+
}
172+
preResult := s.Pipeline.PreQuery(s.messages, lastUserMsg)
173+
if preResult != nil {
174+
// Cache hit: short-circuit the LLM call
175+
if preResult.CacheHit && preResult.CachedResponse != "" {
176+
ch <- StreamEvent{Type: "content", Content: preResult.CachedResponse}
177+
s.messages = append(s.messages, client.EyrieMessage{Role: "assistant", Content: preResult.CachedResponse})
178+
ch <- StreamEvent{Type: "done"}
179+
return
180+
}
181+
// Injection detected: warn but continue
182+
if preResult.InjectionRisk != nil && preResult.InjectionRisk.IsRisky {
183+
s.log.Warn("injection risk detected", map[string]interface{}{
184+
"level": preResult.InjectionRisk.RiskLevel,
185+
})
186+
}
187+
// Apply adaptive system prompt if generated
188+
if preResult.SystemPrompt != "" {
189+
s.system = preResult.SystemPrompt
190+
}
191+
}
192+
}
193+
163194
// Pre-query hook
164195
hooks.Execute(ctx, hooks.EventPreQuery, map[string]interface{}{
165196
"provider": s.provider,
@@ -463,6 +494,13 @@ func (s *Session) agentLoop(ctx context.Context, ch chan<- StreamEvent) {
463494

464495
// No tool calls — done
465496
if len(toolCalls) == 0 {
497+
// Integration pipeline: post-response (format, score, redact, cache, learn)
498+
if s.Pipeline != nil && textContent != "" {
499+
postResult := s.Pipeline.PostResponse(textContent, s.messages)
500+
if postResult != nil && postResult.FormattedResponse != "" {
501+
textContent = postResult.FormattedResponse
502+
}
503+
}
466504
if textContent != "" {
467505
s.messages = append(s.messages, client.EyrieMessage{Role: "assistant", Content: textContent})
468506
// Auto-remember corrections and learnings
@@ -523,6 +561,17 @@ func (s *Session) agentLoop(ctx context.Context, ch chan<- StreamEvent) {
523561
}()
524562
}
525563
ch <- StreamEvent{Type: "done"}
564+
// Integration pipeline: end-session (assess, learn, store experience)
565+
if s.Pipeline != nil {
566+
taskGoal := ""
567+
for _, m := range s.messages {
568+
if m.Role == "user" && m.ToolResult == nil {
569+
taskGoal = m.Content
570+
break
571+
}
572+
}
573+
go s.Pipeline.EndSession(ctx.Err() == nil, taskGoal)
574+
}
526575
// Session end hook
527576
hooks.ExecuteAsync(ctx, hooks.EventSessionEnd, map[string]interface{}{
528577
"provider": s.provider,
@@ -765,6 +814,26 @@ func (s *Session) agentLoop(ctx context.Context, ch chan<- StreamEvent) {
765814
output = output[:maxChars] + "\n... (truncated)"
766815
}
767816

817+
// Integration pipeline: post-tool (stall detect, lint, test, error recovery)
818+
if s.Pipeline != nil {
819+
var execErr error
820+
if isErr {
821+
execErr = fmt.Errorf("%s", output)
822+
}
823+
toolResult := s.Pipeline.PostToolExecution(tc.Name, tc.Arguments, output, execErr)
824+
if toolResult != nil {
825+
if toolResult.StallWarning != "" {
826+
output += "\n\n" + toolResult.StallWarning
827+
}
828+
if toolResult.LintErrors != "" {
829+
output += "\n\nLint: " + toolResult.LintErrors
830+
}
831+
if toolResult.RecoveryAction != "" && toolResult.ShouldRetry {
832+
output += "\n\nRecovery suggestion: " + toolResult.RecoveryAction
833+
}
834+
}
835+
}
836+
768837
// Post-tool hook
769838
s.metrics.Counter("tools.executed").Inc()
770839
if isErr {

0 commit comments

Comments
 (0)