Skip to content

Commit 5d2f316

Browse files
committed
feat(workflows): implement support reply workflow with triage, drafting, and sending steps
- Added support_reply_deps.go to define dependencies and constants for the support reply workflow. - Implemented support_reply_test.go to cover various scenarios including happy paths, approval requirements, and error handling. - Created workflow.go to manage the workflow execution, including step results and context. - Developed workflow_test.go to validate the workflow registration, execution, and approval processes.
1 parent c749932 commit 5d2f316

98 files changed

Lines changed: 11069 additions & 64 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"path/filepath"
7+
"time"
8+
9+
"github.com/grasberg/sofia/pkg/agent"
10+
"github.com/grasberg/sofia/pkg/autonomy"
11+
"github.com/grasberg/sofia/pkg/config"
12+
"github.com/grasberg/sofia/pkg/logger"
13+
"github.com/grasberg/sofia/pkg/memory"
14+
"github.com/grasberg/sofia/pkg/workflows"
15+
)
16+
17+
// startGitHubAutonomy builds the fix-github-issue workflow + poller and
18+
// runs the poller loop in a goroutine until ctx is cancelled. A no-op when
19+
// cfg.GitHubAutonomy.Enabled is false or Repos is empty — callers always
20+
// pass through so the "off by default" contract holds.
21+
func startGitHubAutonomy(
22+
ctx context.Context,
23+
cfg *config.Config,
24+
agentLoop *agent.AgentLoop,
25+
memDB *memory.MemoryDB,
26+
workspacePath string,
27+
) {
28+
raw := cfg.GitHubAutonomy
29+
if !raw.Enabled {
30+
return
31+
}
32+
ghCfg := config.DefaultGitHubAutonomy(raw)
33+
if len(ghCfg.Repos) == 0 {
34+
logger.WarnC("workflows", "github_autonomy enabled but no repos configured — poller not started")
35+
return
36+
}
37+
if memDB == nil {
38+
logger.ErrorC("workflows", "github_autonomy requires a memory DB; disabled")
39+
return
40+
}
41+
42+
workspaceRoot := ghCfg.WorkspaceRoot
43+
if workspaceRoot == "" {
44+
workspaceRoot = filepath.Join(workspacePath, "github-autonomy")
45+
}
46+
47+
deps := workflows.FixGitHubIssueDeps{
48+
Classifier: workflows.NewHeuristicIssueClassifier(),
49+
Cloner: workflows.NewShellCloner(),
50+
Tester: workflows.NewShellTester(),
51+
Fixer: workflows.NewNoopFixer(),
52+
Pusher: workflows.NewShellPusher(),
53+
PRCreator: workflows.NewShellPRCreator(),
54+
IssueCommenter: workflows.NewShellIssueCommenter(),
55+
BranchPrefix: ghCfg.BranchPrefix,
56+
WorkspaceRoot: workspaceRoot,
57+
UseFork: ghCfg.UseFork,
58+
Locale: locaeFromEmailCfg(cfg),
59+
}
60+
61+
registry := workflows.NewRegistry()
62+
if err := workflows.RegisterFixGitHubIssue(registry, deps); err != nil {
63+
logger.ErrorCF("workflows", "fix-github-issue registration failed",
64+
map[string]any{"error": err.Error()})
65+
return
66+
}
67+
68+
goalSink := workflows.NewGoalSinkAdapter(autonomy.NewGoalManager(memDB), memDB)
69+
gate := workflows.NewApprovalGateAdapter(agentLoop.GetApprovalGate())
70+
runner := workflows.NewRunner(registry, goalSink, gate)
71+
72+
lister := workflows.NewShellIssueLister()
73+
store := workflows.NewSemanticGraphProcessedStore(memDB)
74+
75+
poller, err := workflows.NewGitHubPoller(workflows.GitHubPollerConfig{
76+
Repos: ghCfg.Repos,
77+
Label: ghCfg.Label,
78+
MaxConcurrent: ghCfg.MaxConcurrent,
79+
BranchPrefix: ghCfg.BranchPrefix,
80+
CloneRoot: workspaceRoot,
81+
UseFork: ghCfg.UseFork,
82+
Locale: deps.Locale,
83+
}, lister, store, runner)
84+
if err != nil {
85+
logger.ErrorCF("workflows", "github poller construction failed",
86+
map[string]any{"error": err.Error()})
87+
return
88+
}
89+
90+
interval := time.Duration(ghCfg.PollMinutes) * time.Minute
91+
go poller.Start(ctx, interval)
92+
93+
logger.InfoCF("workflows", "fix-github-issue poller started",
94+
map[string]any{
95+
"repos": ghCfg.Repos,
96+
"label": ghCfg.Label,
97+
"poll_minutes": ghCfg.PollMinutes,
98+
"max_concurrent": ghCfg.MaxConcurrent,
99+
"use_fork": ghCfg.UseFork,
100+
"workspace": workspaceRoot,
101+
})
102+
103+
_ = fmt.Sprintf // reserved for future logging helpers
104+
}
105+
106+
// locaeFromEmailCfg returns the user's preferred locale — derived from the
107+
// email channel config which already carries it. Defaults to "sv" when the
108+
// email channel isn't configured.
109+
func locaeFromEmailCfg(cfg *config.Config) string {
110+
loc := cfg.Channels.Email.UserLocale
111+
if loc == "" {
112+
return "sv"
113+
}
114+
return loc
115+
}

cmd/sofia/internal/gateway/helpers.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,22 @@ func gatewayCmd(debug bool) error {
199199
memDB := agentLoop.GetMemoryDB()
200200
if memDB != nil {
201201
healthServer.RegisterCheck("database", health.DatabaseCheck(memDB))
202+
203+
// Wire persistent email dedupe into the email channel so polling is
204+
// idempotent across restarts. Also wire the support-reply workflow
205+
// when the email channel is configured as Autonomous=true.
206+
if emailCh, ok := channelManager.GetChannel("email"); ok {
207+
if ec, ok := emailCh.(*channels.EmailChannel); ok {
208+
ec.SetIngestedStore(memDB)
209+
210+
if ec.Config().Autonomous {
211+
if err := wireSupportReplyWorkflow(ec, agentLoop, memDB); err != nil {
212+
logger.ErrorCF("workflows", "support-reply wiring failed",
213+
map[string]any{"error": err.Error()})
214+
}
215+
}
216+
}
217+
}
202218
}
203219
dataDir := filepath.Dir(cfg.MemoryDBPath())
204220
healthServer.RegisterCheck("disk_space", health.DiskSpaceCheck(dataDir, 0))
@@ -239,6 +255,8 @@ func gatewayCmd(debug bool) error {
239255
fmt.Printf("✓ Web UI available at http://%s:%d\n", cfg.WebUI.Host, cfg.WebUI.Port)
240256
}
241257

258+
startGitHubAutonomy(ctx, cfg, agentLoop, memDB, cfg.WorkspacePath())
259+
242260
go agentLoop.Run(ctx)
243261

244262
sigChan := make(chan os.Signal, 1)
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/grasberg/sofia/pkg/agent"
8+
"github.com/grasberg/sofia/pkg/autonomy"
9+
"github.com/grasberg/sofia/pkg/channels"
10+
"github.com/grasberg/sofia/pkg/logger"
11+
"github.com/grasberg/sofia/pkg/memory"
12+
"github.com/grasberg/sofia/pkg/workflows"
13+
)
14+
15+
// wireSupportReplyWorkflow assembles the runtime graph for the
16+
// "support-reply" workflow and installs an OnInbound handler on the email
17+
// channel so autonomous polls trigger it. No-ops gracefully when any piece
18+
// is missing (e.g. sender not configured).
19+
func wireSupportReplyWorkflow(
20+
ec *channels.EmailChannel,
21+
agentLoop *agent.AgentLoop,
22+
memDB *memory.MemoryDB,
23+
) error {
24+
if ec == nil {
25+
return fmt.Errorf("email channel is nil")
26+
}
27+
if memDB == nil {
28+
return fmt.Errorf("memory DB is nil")
29+
}
30+
31+
sender := ec.Sender()
32+
if sender == nil {
33+
return fmt.Errorf("email channel has no sender configured")
34+
}
35+
36+
cfg := ec.Config()
37+
38+
goalSink := workflows.NewGoalSinkAdapter(autonomy.NewGoalManager(memDB), memDB)
39+
gate := workflows.NewApprovalGateAdapter(agentLoop.GetApprovalGate())
40+
kb := workflows.NewKBAdapter(memDB)
41+
42+
archiver := channels.NewGmailArchiver(channels.GmailArchiverOptions{
43+
BinaryPath: cfg.GogBinary,
44+
Account: cfg.Username,
45+
MarkRead: cfg.MarkAsReadOnIngest,
46+
})
47+
48+
var classifier agent.RiskClassifier
49+
if ag := agentLoop.GetApprovalGate(); ag != nil {
50+
// Reuse the gate's own classifier so workflow risk_check and
51+
// ApprovalGate.RequiresApproval share one ruleset.
52+
classifier = ag
53+
}
54+
55+
deps := workflows.SupportReplyDeps{
56+
KBSearcher: kb,
57+
KBUpserter: kb,
58+
Sender: sender,
59+
Archiver: archiver,
60+
Classifier: classifier,
61+
DefaultLocale: cfg.UserLocale,
62+
AutoSendPriorityFloor: workflows.PriorityP3,
63+
}
64+
65+
registry := workflows.NewRegistry()
66+
if err := workflows.RegisterSupportReply(registry, deps); err != nil {
67+
return fmt.Errorf("register support-reply: %w", err)
68+
}
69+
70+
runner := workflows.NewRunner(registry, goalSink, gate)
71+
72+
ec.SetInboundHandler(func(email channels.IncomingEmail) {
73+
inputs := map[string]any{
74+
workflows.InputFrom: email.From,
75+
workflows.InputSubject: email.Subject,
76+
workflows.InputBody: email.Body,
77+
workflows.InputMessageID: email.MessageID,
78+
workflows.InputLocale: cfg.UserLocale,
79+
workflows.InputAgentID: "support",
80+
}
81+
_, err := runner.Run(
82+
context.Background(),
83+
workflows.WorkflowSupportReply,
84+
"support",
85+
fmt.Sprintf("Reply to %s: %s", email.From, truncateSubject(email.Subject)),
86+
inputs,
87+
)
88+
if err != nil {
89+
logger.WarnCF("workflows", "support-reply run ended with error",
90+
map[string]any{
91+
"from": email.From,
92+
"message_id": email.MessageID,
93+
"error": err.Error(),
94+
})
95+
}
96+
})
97+
98+
logger.InfoCF("workflows", "support-reply workflow wired",
99+
map[string]any{
100+
"account": cfg.Username,
101+
"locale": cfg.UserLocale,
102+
"mark_read": cfg.MarkAsReadOnIngest,
103+
"autonomous": cfg.Autonomous,
104+
})
105+
return nil
106+
}
107+
108+
// truncateSubject clips long subjects so goal names stay readable.
109+
func truncateSubject(s string) string {
110+
if len(s) <= 60 {
111+
return s
112+
}
113+
return s[:60] + "…"
114+
}

0 commit comments

Comments
 (0)