Skip to content

Commit 28804fe

Browse files
AgentWrapperclaude
andcommitted
refactor(backend): collapse duplicate PR row types into one domain definition
Each PR-child table (pr / pr_checks / pr_comment) had three near-identical structs — gen.* (generated), sqlite.*Row, and ports.* — with wiring.Adapter copying field-by-field between them. Collapse to one shared definition per table in domain (PRRow / PRCheckRow / PRComment), used by both the PRWriter port and the sqlite store; gen.* stays sealed inside the storage layer. - *sqlite.Store now satisfies ports.SessionStore + ports.PRWriter directly, so the entire wiring.Adapter package is deleted (lifecycle.New(store, store)). - The bool PR state <-> single state column, int<->int64, and enum-default translation now lives only at the gen<->domain boundary in pr_store.go. - WritePRObservation renamed WritePR to match the port; the integration test and composition root drop their adapter copies. Net -280 lines, behaviour unchanged. go test -race ./... green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 83d1ea1 commit 28804fe

14 files changed

Lines changed: 233 additions & 466 deletions

File tree

backend/internal/cdc/cdc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestE2E_StoreWriteToBroadcast(t *testing.T) {
7878
if err := s.UpdateSession(ctx, r); err != nil { // -> session_updated (seq 2)
7979
t.Fatal(err)
8080
}
81-
if err := s.UpsertPR(ctx, sqlite.PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3)
81+
if err := s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3)
8282
t.Fatal(err)
8383
}
8484

backend/internal/domain/pr.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package domain
2+
3+
import "time"
4+
5+
// The PR rows are the canonical shapes for the pr / pr_checks / pr_comment
6+
// tables, shared by the PRWriter port and the sqlite store (the store maps them
7+
// to/from the sqlc gen.* models). They are flat by design — these tables carry
8+
// no nesting or derivation, so a single definition serves every layer.
9+
//
10+
// PRRow is the scalar facts of one tracked pull request (the pr table). A session
11+
// can own several PRs; a PR belongs to one session. PRFacts is the read-model
12+
// derived from these for display status; PRRow is what gets written.
13+
type PRRow struct {
14+
URL string
15+
SessionID string
16+
Number int
17+
Draft bool
18+
Merged bool
19+
Closed bool
20+
CI CIState
21+
Review ReviewDecision
22+
Mergeability Mergeability
23+
UpdatedAt time.Time
24+
}
25+
26+
// PRCheckRow is one CI check run — one row per check name per commit.
27+
type PRCheckRow struct {
28+
PRURL string
29+
Name string
30+
CommitHash string
31+
Status string
32+
URL string
33+
LogTail string
34+
CreatedAt time.Time
35+
}
36+
37+
// PRComment is one review comment. Feedback is injected into the agent
38+
// regardless of author, so there is no bot/human distinction.
39+
type PRComment struct {
40+
ID string
41+
Author string
42+
File string
43+
Line int
44+
Body string
45+
Resolved bool
46+
CreatedAt time.Time
47+
}

backend/internal/integration/lifecycle_sqlite_test.go

Lines changed: 9 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -24,99 +24,6 @@ import (
2424
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
2525
)
2626

27-
// ---- store adapter ----
28-
//
29-
// MIRROR OF backend/lifecycle_wiring.go's storeAdapter. The integration tests
30-
// can't import package main, so the small set of methods that bridge
31-
// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here.
32-
// Function bodies are line-for-line identical to the production adapter so a
33-
// future divergence shows up as a real diff in code review; the obvious
34-
// follow-up is to extract the production adapter into a shared internal
35-
// package — explicitly out of scope for this PR ("do NOT redesign anything").
36-
37-
type storeAdapter struct{ *sqlite.Store }
38-
39-
var (
40-
_ ports.SessionStore = storeAdapter{}
41-
_ ports.PRWriter = storeAdapter{}
42-
)
43-
44-
func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) {
45-
rows, err := a.Store.ListPRsBySession(ctx, string(id))
46-
if err != nil {
47-
return domain.PRFacts{}, err
48-
}
49-
if len(rows) == 0 {
50-
return domain.PRFacts{}, nil
51-
}
52-
pick := rows[0]
53-
for _, r := range rows {
54-
if r.State == "draft" || r.State == "open" {
55-
pick = r
56-
break
57-
}
58-
}
59-
facts := domain.PRFacts{
60-
URL: pick.URL, Number: int(pick.Number), Exists: true,
61-
Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed",
62-
CI: domain.CIState(pick.CIState),
63-
Review: domain.ReviewDecision(pick.ReviewDecision),
64-
Mergeability: domain.Mergeability(pick.Mergeability),
65-
}
66-
comments, err := a.Store.ListPRComments(ctx, pick.URL)
67-
if err != nil {
68-
return domain.PRFacts{}, err
69-
}
70-
for _, c := range comments {
71-
if !c.Resolved {
72-
facts.ReviewComments = true
73-
break
74-
}
75-
}
76-
return facts, nil
77-
}
78-
79-
func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error {
80-
row := sqlite.PRRow{
81-
URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number),
82-
State: prState(pr),
83-
ReviewDecision: string(pr.Review),
84-
CIState: string(pr.CI),
85-
Mergeability: string(pr.Mergeability),
86-
UpdatedAt: pr.UpdatedAt,
87-
}
88-
checkRows := make([]sqlite.PRCheckRow, len(checks))
89-
for i, c := range checks {
90-
checkRows[i] = sqlite.PRCheckRow{
91-
PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash,
92-
Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt,
93-
}
94-
}
95-
commentRows := make([]sqlite.PRCommentRow, len(comments))
96-
for i, c := range comments {
97-
commentRows[i] = sqlite.PRCommentRow{
98-
PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File,
99-
Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt,
100-
}
101-
}
102-
return a.Store.WritePRObservation(ctx, row, checkRows, commentRows)
103-
}
104-
105-
// prState mirrors the production helper of the same name in
106-
// backend/lifecycle_wiring.go.
107-
func prState(r ports.PRRow) string {
108-
switch {
109-
case r.Merged:
110-
return "merged"
111-
case r.Closed:
112-
return "closed"
113-
case r.Draft:
114-
return "draft"
115-
default:
116-
return "open"
117-
}
118-
}
119-
12027
// ---- plugin fakes (minimal: only enough to drive SM through real LCM) ----
12128

12229
type stubRuntime struct {
@@ -197,7 +104,6 @@ func (n *captureNotifier) drain() []ports.Event {
197104
type liveStack struct {
198105
dataDir string
199106
store *sqlite.Store
200-
adapter storeAdapter
201107
lcm *lifecycle.Manager
202108
sm *session.Manager
203109
notifier *captureNotifier
@@ -216,24 +122,22 @@ func openLiveStack(t *testing.T, dataDir string) *liveStack {
216122
if err != nil {
217123
t.Fatalf("open sqlite: %v", err)
218124
}
219-
adapter := storeAdapter{store}
220125
notifier := &captureNotifier{}
221126
messenger := &captureMessenger{}
222-
lcm := lifecycle.New(adapter, adapter, notifier, messenger)
127+
lcm := lifecycle.New(store, store, notifier, messenger)
223128

224129
wsRoot := t.TempDir()
225130
sm := session.New(session.Deps{
226131
Runtime: &stubRuntime{id: "h1", name: "tmux"},
227132
Agent: stubAgent{},
228133
Workspace: &stubWorkspace{root: wsRoot},
229-
Store: adapter,
134+
Store: store,
230135
Messenger: messenger,
231136
Lifecycle: lcm,
232137
})
233138
st := &liveStack{
234139
dataDir: dataDir,
235140
store: store,
236-
adapter: adapter,
237141
lcm: lcm,
238142
sm: sm,
239143
notifier: notifier,
@@ -272,11 +176,10 @@ func seedProject(t *testing.T, store *sqlite.Store, id string) {
272176
}
273177

274178
func durableLifecycle(store *sqlite.Store, messenger ports.AgentMessenger) *lifecycle.Manager {
275-
adapter := storeAdapter{store}
276179
renderer := notification.NewRenderer(store)
277180
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
278181
notifier := notification.NewEnqueuer(store, renderer, logger)
279-
return lifecycle.New(adapter, adapter, notifier, messenger)
182+
return lifecycle.New(store, store, notifier, messenger)
280183
}
281184

282185
func durableRecord(project, issue, branch string) domain.SessionRecord {
@@ -347,7 +250,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) {
347250
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
348251
Fetched: true, URL: prURL, Number: 1,
349252
CI: domain.CIPassing, Review: domain.ReviewNone, Mergeability: domain.MergeMergeable,
350-
Checks: []ports.PRCheckRow{{
253+
Checks: []domain.PRCheckRow{{
351254
Name: "ci/build", CommitHash: "abc123", Status: "passed", CreatedAt: time.Now(),
352255
}},
353256
}); err != nil {
@@ -357,7 +260,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) {
357260
if err != nil || !ok {
358261
t.Fatalf("get pr: ok=%v err=%v", ok, err)
359262
}
360-
if prRow.SessionID != string(sess.ID) || prRow.CIState != "passing" || prRow.State != "open" {
263+
if prRow.SessionID != string(sess.ID) || prRow.CI != domain.CIPassing || prRow.Draft || prRow.Merged || prRow.Closed {
361264
t.Fatalf("pr row wrong: %+v", prRow)
362265
}
363266

@@ -491,7 +394,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
491394
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
492395
Fetched: true, URL: prURL, Number: 2,
493396
CI: domain.CIFailing, Mergeability: domain.MergeUnstable,
494-
Checks: []ports.PRCheckRow{{
397+
Checks: []domain.PRCheckRow{{
495398
Name: "ci/build", CommitHash: "c1", Status: "failed", LogTail: "panic: nil map", CreatedAt: time.Now(),
496399
}},
497400
}); err != nil {
@@ -507,7 +410,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
507410

508411
// Brake confirmation: only one failure so far, RecentCheckStatuses should
509412
// reflect it.
510-
history, err := st.adapter.RecentCheckStatuses(ctx, prURL, "ci/build", 3)
413+
history, err := st.store.RecentCheckStatuses(ctx, prURL, "ci/build", 3)
511414
if err != nil {
512415
t.Fatalf("recent checks: %v", err)
513416
}
@@ -521,7 +424,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
521424
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
522425
Fetched: true, URL: prURL, Number: 2,
523426
CI: domain.CIPassing, Mergeability: domain.MergeMergeable,
524-
Checks: []ports.PRCheckRow{{
427+
Checks: []domain.PRCheckRow{{
525428
Name: "ci/build", CommitHash: "c2", Status: "passed", CreatedAt: time.Now(),
526429
}},
527430
}); err != nil {
@@ -537,7 +440,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
537440

538441
// And the pr row reflects the recovery in the canonical fact store.
539442
prRow, ok, _ := st.store.GetPR(ctx, prURL)
540-
if !ok || prRow.CIState != "passing" {
443+
if !ok || prRow.CI != domain.CIPassing {
541444
t.Fatalf("pr ci_state should be passing post-recovery: %+v", prRow)
542445
}
543446
}

backend/internal/lifecycle/manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,20 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o
180180
// in one atomic store call. PR-table CDC is emitted by the DB triggers.
181181
func (m *Manager) writePR(ctx context.Context, id domain.SessionID, o ports.PRObservation) error {
182182
now := m.clock()
183-
row := ports.PRRow{
183+
row := domain.PRRow{
184184
URL: o.URL, SessionID: string(id), Number: o.Number,
185185
Draft: o.Draft, Merged: o.Merged, Closed: o.Closed,
186186
CI: o.CI, Review: o.Review, Mergeability: o.Mergeability, UpdatedAt: now,
187187
}
188-
checks := make([]ports.PRCheckRow, len(o.Checks))
188+
checks := make([]domain.PRCheckRow, len(o.Checks))
189189
for i, c := range o.Checks {
190190
c.PRURL = o.URL
191191
if c.CreatedAt.IsZero() {
192192
c.CreatedAt = now
193193
}
194194
checks[i] = c
195195
}
196-
comments := make([]ports.PRComment, len(o.Comments))
196+
comments := make([]domain.PRComment, len(o.Comments))
197197
for i, c := range o.Comments {
198198
if c.CreatedAt.IsZero() {
199199
c.CreatedAt = now

backend/internal/lifecycle/manager_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,17 @@ var ctx = context.Background()
2020
// write path and the read-back together.
2121
type fakeStore struct {
2222
sessions map[domain.SessionID]domain.SessionRecord
23-
pr map[domain.SessionID]ports.PRRow
24-
comments map[string][]ports.PRComment
25-
checks []ports.PRCheckRow
23+
pr map[domain.SessionID]domain.PRRow
24+
comments map[string][]domain.PRComment
25+
checks []domain.PRCheckRow
2626
num int
2727
}
2828

2929
func newFakeStore() *fakeStore {
3030
return &fakeStore{
3131
sessions: map[domain.SessionID]domain.SessionRecord{},
32-
pr: map[domain.SessionID]ports.PRRow{},
33-
comments: map[string][]ports.PRComment{},
32+
pr: map[domain.SessionID]domain.PRRow{},
33+
comments: map[string][]domain.PRComment{},
3434
}
3535
}
3636

@@ -82,7 +82,7 @@ func (f *fakeStore) PRFactsForSession(_ context.Context, id domain.SessionID) (d
8282
}
8383
return facts, nil
8484
}
85-
func (f *fakeStore) WritePR(_ context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error {
85+
func (f *fakeStore) WritePR(_ context.Context, pr domain.PRRow, checks []domain.PRCheckRow, comments []domain.PRComment) error {
8686
f.pr[domain.SessionID(pr.SessionID)] = pr
8787
f.checks = append(f.checks, checks...)
8888
f.comments[pr.URL] = comments
@@ -222,7 +222,7 @@ func TestPR_CIFailingNudgesAgentWithLogs(t *testing.T) {
222222
m, st, _, msg := newManager()
223223
st.sessions["mer-1"] = working("mer-1")
224224

225-
o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []ports.PRCheckRow{{Name: "build", CommitHash: "c1", Status: "failed", LogTail: "boom"}}})
225+
o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []domain.PRCheckRow{{Name: "build", CommitHash: "c1", Status: "failed", LogTail: "boom"}}})
226226
if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil {
227227
t.Fatal(err)
228228
}
@@ -236,7 +236,7 @@ func TestPR_CIBrakeEscalatesAfterThreeFails(t *testing.T) {
236236
st.sessions["mer-1"] = working("mer-1")
237237

238238
for _, commit := range []string{"c1", "c2", "c3"} {
239-
o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []ports.PRCheckRow{{Name: "build", CommitHash: commit, Status: "failed", LogTail: "boom"}}})
239+
o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []domain.PRCheckRow{{Name: "build", CommitHash: commit, Status: "failed", LogTail: "boom"}}})
240240
if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil {
241241
t.Fatal(err)
242242
}
@@ -255,7 +255,7 @@ func TestPR_ReviewCommentsInjectedRegardlessOfAuthor(t *testing.T) {
255255

256256
o := openPR(ports.PRObservation{
257257
Review: domain.ReviewChangesRequest,
258-
Comments: []ports.PRComment{{ID: "1", Author: "greptileai", Body: "use a constant here"}},
258+
Comments: []domain.PRComment{{ID: "1", Author: "greptileai", Body: "use a constant here"}},
259259
})
260260
if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil {
261261
t.Fatal(err)

backend/internal/ports/facts.go

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ type PRObservation struct {
5252
CI domain.CIState
5353
Review domain.ReviewDecision
5454
Mergeability domain.Mergeability
55-
Checks []PRCheckRow
56-
Comments []PRComment
55+
Checks []domain.PRCheckRow
56+
Comments []domain.PRComment
5757
}
5858

5959
// SpawnOutcome is what the Session Manager reports once a spawn is live: the
@@ -65,42 +65,3 @@ type SpawnOutcome struct {
6565
AgentSessionID string
6666
Prompt string
6767
}
68-
69-
// ---- store row DTOs (shared by the PRWriter port and its sqlite adapter) ----
70-
71-
// PRRow is the scalar PR facts row.
72-
type PRRow struct {
73-
URL string
74-
SessionID string
75-
Number int
76-
Draft bool
77-
Merged bool
78-
Closed bool
79-
CI domain.CIState
80-
Review domain.ReviewDecision
81-
Mergeability domain.Mergeability
82-
UpdatedAt time.Time
83-
}
84-
85-
// PRCheckRow is one CI check run (one row per check name per commit).
86-
type PRCheckRow struct {
87-
PRURL string
88-
Name string
89-
CommitHash string
90-
Status string
91-
URL string
92-
LogTail string
93-
CreatedAt time.Time
94-
}
95-
96-
// PRComment is one review comment. Review feedback is injected into the agent
97-
// regardless of author, so there is no bot/human distinction.
98-
type PRComment struct {
99-
ID string
100-
Author string
101-
File string
102-
Line int
103-
Body string
104-
Resolved bool
105-
CreatedAt time.Time
106-
}

0 commit comments

Comments
 (0)