diff --git a/backend/internal/cdc/cdc_test.go b/backend/internal/cdc/cdc_test.go index d72370f4..52a0c574 100644 --- a/backend/internal/cdc/cdc_test.go +++ b/backend/internal/cdc/cdc_test.go @@ -78,7 +78,7 @@ func TestE2E_StoreWriteToBroadcast(t *testing.T) { if err := s.UpdateSession(ctx, r); err != nil { // -> session_updated (seq 2) t.Fatal(err) } - if err := s.UpsertPR(ctx, sqlite.PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3) + if err := s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3) t.Fatal(err) } diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index 5a791054..e96b5564 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -16,17 +16,16 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring" ) // lifecycleStack owns the running LCM + reaper. The LCM is the sole writer of // canonical transitions; the reaper is the OBSERVE-layer timer that probes live -// runtimes and reports facts back through it. Adapter is exposed so the Session +// runtimes and reports facts back through it. Store is exposed so the Session // Manager construction in startSession can plug the same SessionStore + PRWriter -// instance the LCM already holds. +// instance the LCM already holds (*sqlite.Store satisfies both ports directly). type lifecycleStack struct { LCM *lifecycle.Manager - Adapter wiring.Adapter + Store *sqlite.Store reaperDone <-chan struct{} } @@ -38,12 +37,11 @@ type lifecycleStack struct { // - reaper.MapRegistry{} — empty runtime registry, so the reaper ticks // escalations but probes nothing until the runtime plugins exist. func startLifecycle(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*lifecycleStack, error) { - a := wiring.Adapter{Store: store} renderer := notification.NewRenderer(store) notifier := notification.NewEnqueuer(store, renderer, logger) - lcm := lifecycle.New(a, a, notifier, noopMessenger{}) + lcm := lifecycle.New(store, store, notifier, noopMessenger{}) rp := reaper.New(lcm, reaper.MapRegistry{}, reaper.Config{Logger: logger}) - return &lifecycleStack{LCM: lcm, Adapter: a, reaperDone: rp.Start(ctx)}, nil + return &lifecycleStack{LCM: lcm, Store: store, reaperDone: rp.Start(ctx)}, nil } // Stop waits for the reaper goroutine to exit (the caller must have cancelled the @@ -88,7 +86,7 @@ func startSession(ctx context.Context, cfg config.Config, ls *lifecycleStack, lo Runtime: runtime, Agent: agent, Workspace: ws, - Store: ls.Adapter, + Store: ls.Store, Messenger: noopMessenger{}, Lifecycle: ls.LCM, }) diff --git a/backend/internal/daemon/wiring_test.go b/backend/internal/daemon/wiring_test.go index c2cfb721..f83be0dd 100644 --- a/backend/internal/daemon/wiring_test.go +++ b/backend/internal/daemon/wiring_test.go @@ -18,7 +18,6 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/session" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring" ) // TestWiring_WriteFlowsToBroadcaster exercises the real boot path end to end: @@ -32,11 +31,10 @@ func TestWiring_WriteFlowsToBroadcaster(t *testing.T) { } defer store.Close() - a := wiring.Adapter{Store: store} renderer := notification.NewRenderer(store) logger := slog.New(slog.NewTextHandler(io.Discard, nil)) notifier := notification.NewEnqueuer(store, renderer, logger) - lcm := lifecycle.New(a, a, notifier, noopMessenger{}) + lcm := lifecycle.New(store, store, notifier, noopMessenger{}) bcast := cdc.NewBroadcaster() poller := cdc.NewPoller(cdcSource{store}, bcast, cdc.PollerConfig{}) @@ -123,13 +121,13 @@ func TestWiring_SessionManagerSharesLifecycleStoreAndLCM(t *testing.T) { gotStore, gotLCM := inspectSessionDeps(t, sStack.SM) - // Store should be the exact wiring.Adapter the LCM was constructed with. - gotAdapter, ok := gotStore.(wiring.Adapter) + // Store should be the exact *sqlite.Store the LCM was constructed with. + gotSqlite, ok := gotStore.(*sqlite.Store) if !ok { - t.Fatalf("SM.store is %T, want wiring.Adapter", gotStore) + t.Fatalf("SM.store is %T, want *sqlite.Store", gotStore) } - if gotAdapter.Store != lcStack.Adapter.Store { - t.Fatalf("SM.store wraps a different *sqlite.Store than lcStack.Adapter") + if gotSqlite != lcStack.Store { + t.Fatalf("SM.store is a different *sqlite.Store than lcStack.Store") } // Lifecycle should be the exact *lifecycle.Manager pointer from startLifecycle. diff --git a/backend/internal/domain/pr.go b/backend/internal/domain/pr.go new file mode 100644 index 00000000..77f94f27 --- /dev/null +++ b/backend/internal/domain/pr.go @@ -0,0 +1,47 @@ +package domain + +import "time" + +// The PR rows are the canonical shapes for the pr / pr_checks / pr_comment +// tables, shared by the PRWriter port and the sqlite store (the store maps them +// to/from the sqlc gen.* models). They are flat by design — these tables carry +// no nesting or derivation, so a single definition serves every layer. +// +// PRRow is the scalar facts of one tracked pull request (the pr table). A session +// can own several PRs; a PR belongs to one session. PRFacts is the read-model +// derived from these for display status; PRRow is what gets written. +type PRRow struct { + URL string + SessionID string + Number int + Draft bool + Merged bool + Closed bool + CI CIState + Review ReviewDecision + Mergeability Mergeability + UpdatedAt time.Time +} + +// PRCheckRow is one CI check run — one row per check name per commit. +type PRCheckRow struct { + PRURL string + Name string + CommitHash string + Status string + URL string + LogTail string + CreatedAt time.Time +} + +// PRComment is one review comment. Feedback is injected into the agent +// regardless of author, so there is no bot/human distinction. +type PRComment struct { + ID string + Author string + File string + Line int + Body string + Resolved bool + CreatedAt time.Time +} diff --git a/backend/internal/integration/lifecycle_sqlite_test.go b/backend/internal/integration/lifecycle_sqlite_test.go index c353bc6d..67b781fb 100644 --- a/backend/internal/integration/lifecycle_sqlite_test.go +++ b/backend/internal/integration/lifecycle_sqlite_test.go @@ -24,99 +24,6 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" ) -// ---- store adapter ---- -// -// MIRROR OF backend/lifecycle_wiring.go's storeAdapter. The integration tests -// can't import package main, so the small set of methods that bridge -// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here. -// Function bodies are line-for-line identical to the production adapter so a -// future divergence shows up as a real diff in code review; the obvious -// follow-up is to extract the production adapter into a shared internal -// package — explicitly out of scope for this PR ("do NOT redesign anything"). - -type storeAdapter struct{ *sqlite.Store } - -var ( - _ ports.SessionStore = storeAdapter{} - _ ports.PRWriter = storeAdapter{} -) - -func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { - rows, err := a.Store.ListPRsBySession(ctx, string(id)) - if err != nil { - return domain.PRFacts{}, err - } - if len(rows) == 0 { - return domain.PRFacts{}, nil - } - pick := rows[0] - for _, r := range rows { - if r.State == "draft" || r.State == "open" { - pick = r - break - } - } - facts := domain.PRFacts{ - URL: pick.URL, Number: int(pick.Number), Exists: true, - Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", - CI: domain.CIState(pick.CIState), - Review: domain.ReviewDecision(pick.ReviewDecision), - Mergeability: domain.Mergeability(pick.Mergeability), - } - comments, err := a.Store.ListPRComments(ctx, pick.URL) - if err != nil { - return domain.PRFacts{}, err - } - for _, c := range comments { - if !c.Resolved { - facts.ReviewComments = true - break - } - } - return facts, nil -} - -func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { - row := sqlite.PRRow{ - URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), - State: prState(pr), - ReviewDecision: string(pr.Review), - CIState: string(pr.CI), - Mergeability: string(pr.Mergeability), - UpdatedAt: pr.UpdatedAt, - } - checkRows := make([]sqlite.PRCheckRow, len(checks)) - for i, c := range checks { - checkRows[i] = sqlite.PRCheckRow{ - PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, - Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, - } - } - commentRows := make([]sqlite.PRCommentRow, len(comments)) - for i, c := range comments { - commentRows[i] = sqlite.PRCommentRow{ - PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File, - Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt, - } - } - return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) -} - -// prState mirrors the production helper of the same name in -// backend/lifecycle_wiring.go. -func prState(r ports.PRRow) string { - switch { - case r.Merged: - return "merged" - case r.Closed: - return "closed" - case r.Draft: - return "draft" - default: - return "open" - } -} - // ---- plugin fakes (minimal: only enough to drive SM through real LCM) ---- type stubRuntime struct { @@ -197,7 +104,6 @@ func (n *captureNotifier) drain() []ports.Event { type liveStack struct { dataDir string store *sqlite.Store - adapter storeAdapter lcm *lifecycle.Manager sm *session.Manager notifier *captureNotifier @@ -216,24 +122,22 @@ func openLiveStack(t *testing.T, dataDir string) *liveStack { if err != nil { t.Fatalf("open sqlite: %v", err) } - adapter := storeAdapter{store} notifier := &captureNotifier{} messenger := &captureMessenger{} - lcm := lifecycle.New(adapter, adapter, notifier, messenger) + lcm := lifecycle.New(store, store, notifier, messenger) wsRoot := t.TempDir() sm := session.New(session.Deps{ Runtime: &stubRuntime{id: "h1", name: "tmux"}, Agent: stubAgent{}, Workspace: &stubWorkspace{root: wsRoot}, - Store: adapter, + Store: store, Messenger: messenger, Lifecycle: lcm, }) st := &liveStack{ dataDir: dataDir, store: store, - adapter: adapter, lcm: lcm, sm: sm, notifier: notifier, @@ -272,11 +176,10 @@ func seedProject(t *testing.T, store *sqlite.Store, id string) { } func durableLifecycle(store *sqlite.Store, messenger ports.AgentMessenger) *lifecycle.Manager { - adapter := storeAdapter{store} renderer := notification.NewRenderer(store) logger := slog.New(slog.NewTextHandler(io.Discard, nil)) notifier := notification.NewEnqueuer(store, renderer, logger) - return lifecycle.New(adapter, adapter, notifier, messenger) + return lifecycle.New(store, store, notifier, messenger) } func durableRecord(project, issue, branch string) domain.SessionRecord { @@ -347,7 +250,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) { if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ Fetched: true, URL: prURL, Number: 1, CI: domain.CIPassing, Review: domain.ReviewNone, Mergeability: domain.MergeMergeable, - Checks: []ports.PRCheckRow{{ + Checks: []domain.PRCheckRow{{ Name: "ci/build", CommitHash: "abc123", Status: "passed", CreatedAt: time.Now(), }}, }); err != nil { @@ -357,7 +260,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) { if err != nil || !ok { t.Fatalf("get pr: ok=%v err=%v", ok, err) } - if prRow.SessionID != string(sess.ID) || prRow.CIState != "passing" || prRow.State != "open" { + if prRow.SessionID != string(sess.ID) || prRow.CI != domain.CIPassing || prRow.Draft || prRow.Merged || prRow.Closed { t.Fatalf("pr row wrong: %+v", prRow) } @@ -491,7 +394,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) { if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ Fetched: true, URL: prURL, Number: 2, CI: domain.CIFailing, Mergeability: domain.MergeUnstable, - Checks: []ports.PRCheckRow{{ + Checks: []domain.PRCheckRow{{ Name: "ci/build", CommitHash: "c1", Status: "failed", LogTail: "panic: nil map", CreatedAt: time.Now(), }}, }); err != nil { @@ -507,7 +410,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) { // Brake confirmation: only one failure so far, RecentCheckStatuses should // reflect it. - history, err := st.adapter.RecentCheckStatuses(ctx, prURL, "ci/build", 3) + history, err := st.store.RecentCheckStatuses(ctx, prURL, "ci/build", 3) if err != nil { t.Fatalf("recent checks: %v", err) } @@ -521,7 +424,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) { if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{ Fetched: true, URL: prURL, Number: 2, CI: domain.CIPassing, Mergeability: domain.MergeMergeable, - Checks: []ports.PRCheckRow{{ + Checks: []domain.PRCheckRow{{ Name: "ci/build", CommitHash: "c2", Status: "passed", CreatedAt: time.Now(), }}, }); err != nil { @@ -537,7 +440,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) { // And the pr row reflects the recovery in the canonical fact store. prRow, ok, _ := st.store.GetPR(ctx, prURL) - if !ok || prRow.CIState != "passing" { + if !ok || prRow.CI != domain.CIPassing { t.Fatalf("pr ci_state should be passing post-recovery: %+v", prRow) } } diff --git a/backend/internal/lifecycle/manager.go b/backend/internal/lifecycle/manager.go index 438a76c6..dff0443d 100644 --- a/backend/internal/lifecycle/manager.go +++ b/backend/internal/lifecycle/manager.go @@ -180,12 +180,12 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o // in one atomic store call. PR-table CDC is emitted by the DB triggers. func (m *Manager) writePR(ctx context.Context, id domain.SessionID, o ports.PRObservation) error { now := m.clock() - row := ports.PRRow{ + row := domain.PRRow{ URL: o.URL, SessionID: string(id), Number: o.Number, Draft: o.Draft, Merged: o.Merged, Closed: o.Closed, CI: o.CI, Review: o.Review, Mergeability: o.Mergeability, UpdatedAt: now, } - checks := make([]ports.PRCheckRow, len(o.Checks)) + checks := make([]domain.PRCheckRow, len(o.Checks)) for i, c := range o.Checks { c.PRURL = o.URL if c.CreatedAt.IsZero() { @@ -193,7 +193,7 @@ func (m *Manager) writePR(ctx context.Context, id domain.SessionID, o ports.PROb } checks[i] = c } - comments := make([]ports.PRComment, len(o.Comments)) + comments := make([]domain.PRComment, len(o.Comments)) for i, c := range o.Comments { if c.CreatedAt.IsZero() { c.CreatedAt = now diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 4ae9aaaf..8adfd862 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -20,17 +20,17 @@ var ctx = context.Background() // write path and the read-back together. type fakeStore struct { sessions map[domain.SessionID]domain.SessionRecord - pr map[domain.SessionID]ports.PRRow - comments map[string][]ports.PRComment - checks []ports.PRCheckRow + pr map[domain.SessionID]domain.PRRow + comments map[string][]domain.PRComment + checks []domain.PRCheckRow num int } func newFakeStore() *fakeStore { return &fakeStore{ sessions: map[domain.SessionID]domain.SessionRecord{}, - pr: map[domain.SessionID]ports.PRRow{}, - comments: map[string][]ports.PRComment{}, + pr: map[domain.SessionID]domain.PRRow{}, + comments: map[string][]domain.PRComment{}, } } @@ -82,7 +82,7 @@ func (f *fakeStore) PRFactsForSession(_ context.Context, id domain.SessionID) (d } return facts, nil } -func (f *fakeStore) WritePR(_ context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { +func (f *fakeStore) WritePR(_ context.Context, pr domain.PRRow, checks []domain.PRCheckRow, comments []domain.PRComment) error { f.pr[domain.SessionID(pr.SessionID)] = pr f.checks = append(f.checks, checks...) f.comments[pr.URL] = comments @@ -222,7 +222,7 @@ func TestPR_CIFailingNudgesAgentWithLogs(t *testing.T) { m, st, _, msg := newManager() st.sessions["mer-1"] = working("mer-1") - o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []ports.PRCheckRow{{Name: "build", CommitHash: "c1", Status: "failed", LogTail: "boom"}}}) + o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []domain.PRCheckRow{{Name: "build", CommitHash: "c1", Status: "failed", LogTail: "boom"}}}) if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { t.Fatal(err) } @@ -236,7 +236,7 @@ func TestPR_CIBrakeEscalatesAfterThreeFails(t *testing.T) { st.sessions["mer-1"] = working("mer-1") for _, commit := range []string{"c1", "c2", "c3"} { - o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []ports.PRCheckRow{{Name: "build", CommitHash: commit, Status: "failed", LogTail: "boom"}}}) + o := openPR(ports.PRObservation{CI: domain.CIFailing, Checks: []domain.PRCheckRow{{Name: "build", CommitHash: commit, Status: "failed", LogTail: "boom"}}}) if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { t.Fatal(err) } @@ -255,7 +255,7 @@ func TestPR_ReviewCommentsInjectedRegardlessOfAuthor(t *testing.T) { o := openPR(ports.PRObservation{ Review: domain.ReviewChangesRequest, - Comments: []ports.PRComment{{ID: "1", Author: "greptileai", Body: "use a constant here"}}, + Comments: []domain.PRComment{{ID: "1", Author: "greptileai", Body: "use a constant here"}}, }) if err := m.ApplyPRObservation(ctx, "mer-1", o); err != nil { t.Fatal(err) diff --git a/backend/internal/ports/facts.go b/backend/internal/ports/facts.go index a3b3b397..01a78961 100644 --- a/backend/internal/ports/facts.go +++ b/backend/internal/ports/facts.go @@ -52,8 +52,8 @@ type PRObservation struct { CI domain.CIState Review domain.ReviewDecision Mergeability domain.Mergeability - Checks []PRCheckRow - Comments []PRComment + Checks []domain.PRCheckRow + Comments []domain.PRComment } // SpawnOutcome is what the Session Manager reports once a spawn is live: the @@ -65,42 +65,3 @@ type SpawnOutcome struct { AgentSessionID string Prompt string } - -// ---- store row DTOs (shared by the PRWriter port and its sqlite adapter) ---- - -// PRRow is the scalar PR facts row. -type PRRow struct { - URL string - SessionID string - Number int - Draft bool - Merged bool - Closed bool - CI domain.CIState - Review domain.ReviewDecision - Mergeability domain.Mergeability - UpdatedAt time.Time -} - -// PRCheckRow is one CI check run (one row per check name per commit). -type PRCheckRow struct { - PRURL string - Name string - CommitHash string - Status string - URL string - LogTail string - CreatedAt time.Time -} - -// PRComment is one review comment. Review feedback is injected into the agent -// regardless of author, so there is no bot/human distinction. -type PRComment struct { - ID string - Author string - File string - Line int - Body string - Resolved bool - CreatedAt time.Time -} diff --git a/backend/internal/ports/outbound.go b/backend/internal/ports/outbound.go index 79c20423..bc7321d3 100644 --- a/backend/internal/ports/outbound.go +++ b/backend/internal/ports/outbound.go @@ -28,7 +28,7 @@ type PRWriter interface { // WritePR persists a full PR observation — scalar facts, check runs, and the // replacement comment set — in one transaction, so the rows and the CDC // events they emit are all-or-nothing. - WritePR(ctx context.Context, pr PRRow, checks []PRCheckRow, comments []PRComment) error + WritePR(ctx context.Context, pr domain.PRRow, checks []domain.PRCheckRow, comments []domain.PRComment) error // RecentCheckStatuses reads the last `limit` runs of a check (the CI brake). RecentCheckStatuses(ctx context.Context, prURL, name string, limit int) ([]string, error) } diff --git a/backend/internal/storage/sqlite/pr_cdc_test.go b/backend/internal/storage/sqlite/pr_cdc_test.go index 8c8f7ea2..102e8b4f 100644 --- a/backend/internal/storage/sqlite/pr_cdc_test.go +++ b/backend/internal/storage/sqlite/pr_cdc_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" ) // A check can change status on the same commit (in_progress -> failed) via @@ -19,13 +21,13 @@ func TestPRChecksCDC_EmitsOnInsertAndStatusUpdate(t *testing.T) { t.Fatal(err) } url := "https://example/pr/1" - if err := s.UpsertPR(ctx, PRRow{URL: url, SessionID: string(rec.ID), Number: 1}); err != nil { + if err := s.UpsertPR(ctx, domain.PRRow{URL: url, SessionID: string(rec.ID), Number: 1}); err != nil { t.Fatal(err) } now := time.Now() mustCheck := func(status string) { - if err := s.RecordCheck(ctx, PRCheckRow{PRURL: url, Name: "build", CommitHash: "c1", Status: status, CreatedAt: now}); err != nil { + if err := s.RecordCheck(ctx, domain.PRCheckRow{PRURL: url, Name: "build", CommitHash: "c1", Status: status, CreatedAt: now}); err != nil { t.Fatal(err) } } @@ -51,9 +53,9 @@ func TestPRChecksCDC_EmitsOnInsertAndStatusUpdate(t *testing.T) { } } -// WritePRObservation persists scalar facts, checks, and comments in one tx; all -// three should be queryable afterward. -func TestWritePRObservation_PersistsScalarsChecksAndComments(t *testing.T) { +// WritePR persists scalar facts, checks, and comments in one tx; all three +// should be queryable afterward. +func TestWritePR_PersistsScalarsChecksAndComments(t *testing.T) { s := newTestStore(t) ctx := context.Background() seedProject(t, s, "mer") @@ -64,18 +66,18 @@ func TestWritePRObservation_PersistsScalarsChecksAndComments(t *testing.T) { url := "https://example/pr/7" now := time.Now() - err = s.WritePRObservation(ctx, - PRRow{URL: url, SessionID: string(rec.ID), Number: 7, CIState: "failing", UpdatedAt: now}, - []PRCheckRow{{PRURL: url, Name: "build", CommitHash: "c1", Status: "failed", CreatedAt: now}}, - []PRCommentRow{{PRURL: url, CommentID: "1", Author: "reviewer", Body: "use a const", CreatedAt: now}}, + err = s.WritePR(ctx, + domain.PRRow{URL: url, SessionID: string(rec.ID), Number: 7, CI: domain.CIFailing, UpdatedAt: now}, + []domain.PRCheckRow{{PRURL: url, Name: "build", CommitHash: "c1", Status: "failed", CreatedAt: now}}, + []domain.PRComment{{ID: "1", Author: "reviewer", Body: "use a const", CreatedAt: now}}, ) if err != nil { t.Fatal(err) } pr, ok, err := s.GetPR(ctx, url) - if err != nil || !ok || pr.CIState != "failing" { - t.Fatalf("scalar facts not persisted: ok=%v ci=%q err=%v", ok, pr.CIState, err) + if err != nil || !ok || pr.CI != domain.CIFailing { + t.Fatalf("scalar facts not persisted: ok=%v ci=%q err=%v", ok, pr.CI, err) } if checks, _ := s.ListChecks(ctx, url); len(checks) != 1 || checks[0].Status != "failed" { t.Fatalf("check not persisted: %+v", checks) diff --git a/backend/internal/storage/sqlite/pr_facts.go b/backend/internal/storage/sqlite/pr_facts.go index d72f2978..c0c3068b 100644 --- a/backend/internal/storage/sqlite/pr_facts.go +++ b/backend/internal/storage/sqlite/pr_facts.go @@ -19,17 +19,15 @@ func (s *Store) PRFactsForSession(ctx context.Context, id domain.SessionID) (dom } pick := rows[0] for _, r := range rows { - if r.State == "draft" || r.State == "open" { + if !r.Merged && !r.Closed { // newest non-closed (draft or open) pick = r break } } facts := domain.PRFacts{ - URL: pick.URL, Number: int(pick.Number), Exists: true, - Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", - CI: domain.CIState(pick.CIState), - Review: domain.ReviewDecision(pick.ReviewDecision), - Mergeability: domain.Mergeability(pick.Mergeability), + URL: pick.URL, Number: pick.Number, Exists: true, + Draft: pick.Draft, Merged: pick.Merged, Closed: pick.Closed, + CI: pick.CI, Review: pick.Review, Mergeability: pick.Mergeability, } comments, err := s.ListPRComments(ctx, pick.URL) if err != nil { diff --git a/backend/internal/storage/sqlite/pr_store.go b/backend/internal/storage/sqlite/pr_store.go index 8b41396c..1d57b40d 100644 --- a/backend/internal/storage/sqlite/pr_store.go +++ b/backend/internal/storage/sqlite/pr_store.go @@ -5,123 +5,82 @@ import ( "database/sql" "errors" "fmt" - "time" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" ) -// PRRow is the scalar PR facts row (the pr table), keyed by normalized URL. One -// session can own many PRs; a PR belongs to one session (session_id FK). -type PRRow struct { - URL string - SessionID string - Number int64 - State string // draft | open | merged | closed - ReviewDecision string // none | approved | changes_requested | review_required - CIState string // unknown | pending | passing | failing - Mergeability string // unknown | mergeable | conflicting | blocked | unstable - UpdatedAt time.Time -} +// The pr / pr_checks / pr_comment rows are modelled by domain.PRRow / +// domain.PRCheckRow / domain.PRComment — flat tables, one shared type per table. +// This layer only maps those to/from the sqlc gen.* params: the bool PR state +// becomes the single pr.state column, empty enums default to their +// "nothing known yet" value (matching the CHECK constraints), and ints widen to +// int64. + +// Compile-time proof that *Store satisfies both ports it is wired into, so a +// drift between either interface and this implementation fails here at the point +// of definition rather than later at the call sites in lifecycle_wiring / tests. +var ( + _ ports.SessionStore = (*Store)(nil) + _ ports.PRWriter = (*Store)(nil) +) -// UpsertPR inserts or replaces the scalar PR facts for a PR URL. Empty enum -// fields default to their "nothing known yet" value so a partial row is valid -// against the CHECK constraints (matches the domain zero values none/unknown). -func (s *Store) UpsertPR(ctx context.Context, r PRRow) error { - r = r.withDefaults() +// UpsertPR inserts or replaces the scalar PR facts for a PR URL. +func (s *Store) UpsertPR(ctx context.Context, r domain.PRRow) error { s.writeMu.Lock() defer s.writeMu.Unlock() - return s.qw.UpsertPR(ctx, gen.UpsertPRParams{ - Url: r.URL, - SessionID: r.SessionID, - Number: r.Number, - PrState: r.State, - ReviewDecision: r.ReviewDecision, - CiState: r.CIState, - Mergeability: r.Mergeability, - UpdatedAt: r.UpdatedAt, - }) + return s.qw.UpsertPR(ctx, genPRParams(r)) } -// WritePRObservation persists a full PR observation — scalar facts, check runs, -// and the replacement comment set — in one write transaction, so the rows and -// the change_log events their triggers emit are committed all-or-nothing. The -// scalar PR upsert runs first so the checks'/comments' CDC triggers can resolve -// the session id from the pr row within the same transaction. -func (s *Store) WritePRObservation(ctx context.Context, pr PRRow, checks []PRCheckRow, comments []PRCommentRow) error { - pr = pr.withDefaults() +// WritePR persists a full PR observation — scalar facts, check runs, and the +// replacement comment set — in one write transaction, so the rows and the +// change_log events their triggers emit are committed all-or-nothing. The scalar +// PR upsert runs first so the checks'/comments' CDC triggers can resolve the +// session id from the pr row within the same transaction. +func (s *Store) WritePR(ctx context.Context, pr domain.PRRow, checks []domain.PRCheckRow, comments []domain.PRComment) error { s.writeMu.Lock() defer s.writeMu.Unlock() return s.inTx(ctx, "write pr observation", func(q *gen.Queries) error { - if err := q.UpsertPR(ctx, gen.UpsertPRParams{ - Url: pr.URL, SessionID: pr.SessionID, Number: pr.Number, - PrState: pr.State, ReviewDecision: pr.ReviewDecision, - CiState: pr.CIState, Mergeability: pr.Mergeability, UpdatedAt: pr.UpdatedAt, - }); err != nil { + if err := q.UpsertPR(ctx, genPRParams(pr)); err != nil { return err } for _, c := range checks { - if c.Status == "" { - c.Status = "unknown" - } - if err := q.UpsertPRCheck(ctx, gen.UpsertPRCheckParams{ - PrUrl: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, - Status: c.Status, Url: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, - }); err != nil { + if err := q.UpsertPRCheck(ctx, genCheckParams(c)); err != nil { return err } } if err := q.DeletePRComments(ctx, pr.URL); err != nil { return err } - for _, cm := range comments { - if err := q.UpsertPRComment(ctx, gen.UpsertPRCommentParams{ - PrUrl: pr.URL, CommentID: cm.CommentID, Author: cm.Author, File: cm.File, - Line: cm.Line, Body: cm.Body, Resolved: boolToInt(cm.Resolved), CreatedAt: cm.CreatedAt, - }); err != nil { - return fmt.Errorf("comment %q: %w", cm.CommentID, err) + for _, c := range comments { + if err := q.UpsertPRComment(ctx, genCommentParams(pr.URL, c)); err != nil { + return fmt.Errorf("comment %q: %w", c.ID, err) } } return nil }) } -// withDefaults fills empty enum fields with their "nothing known yet" value so a -// partial row satisfies the CHECK constraints (matches UpsertPR). -func (r PRRow) withDefaults() PRRow { - if r.State == "" { - r.State = "open" - } - if r.ReviewDecision == "" { - r.ReviewDecision = "none" - } - if r.CIState == "" { - r.CIState = "unknown" - } - if r.Mergeability == "" { - r.Mergeability = "unknown" - } - return r -} - // GetPR returns the PR facts for a URL, or ok=false if absent. -func (s *Store) GetPR(ctx context.Context, url string) (PRRow, bool, error) { +func (s *Store) GetPR(ctx context.Context, url string) (domain.PRRow, bool, error) { p, err := s.qr.GetPR(ctx, url) if errors.Is(err, sql.ErrNoRows) { - return PRRow{}, false, nil + return domain.PRRow{}, false, nil } if err != nil { - return PRRow{}, false, fmt.Errorf("get pr %s: %w", url, err) + return domain.PRRow{}, false, fmt.Errorf("get pr %s: %w", url, err) } return prRowFromGen(p), true, nil } // ListPRsBySession returns every PR owned by a session, newest first. -func (s *Store) ListPRsBySession(ctx context.Context, sessionID string) ([]PRRow, error) { +func (s *Store) ListPRsBySession(ctx context.Context, sessionID string) ([]domain.PRRow, error) { rows, err := s.qr.ListPRsBySession(ctx, sessionID) if err != nil { return nil, fmt.Errorf("list prs for %s: %w", sessionID, err) } - out := make([]PRRow, 0, len(rows)) + out := make([]domain.PRRow, 0, len(rows)) for _, p := range rows { out = append(out, prRowFromGen(p)) } @@ -135,49 +94,12 @@ func (s *Store) DeletePR(ctx context.Context, url string) error { return s.qw.DeletePR(ctx, url) } -func prRowFromGen(p gen.Pr) PRRow { - return PRRow{ - URL: p.Url, - SessionID: p.SessionID, - Number: p.Number, - State: p.PrState, - ReviewDecision: p.ReviewDecision, - CIState: p.CiState, - Mergeability: p.Mergeability, - UpdatedAt: p.UpdatedAt, - } -} - -// ---- pr_checks: CI run history ---- - -// PRCheckRow is one CI check run for a PR (one row per check name per commit). -type PRCheckRow struct { - PRURL string - Name string - CommitHash string - Status string // unknown | queued | in_progress | passed | failed | skipped | cancelled - URL string - LogTail string - CreatedAt time.Time -} - // RecordCheck upserts a CI check run. Re-polling the same (pr, name, commit) // updates the same row; a new commit creates a new row (a fresh agent attempt). -func (s *Store) RecordCheck(ctx context.Context, r PRCheckRow) error { - if r.Status == "" { - r.Status = "unknown" - } +func (s *Store) RecordCheck(ctx context.Context, r domain.PRCheckRow) error { s.writeMu.Lock() defer s.writeMu.Unlock() - return s.qw.UpsertPRCheck(ctx, gen.UpsertPRCheckParams{ - PrUrl: r.PRURL, - Name: r.Name, - CommitHash: r.CommitHash, - Status: r.Status, - Url: r.URL, - LogTail: r.LogTail, - CreatedAt: r.CreatedAt, - }) + return s.qw.UpsertPRCheck(ctx, genCheckParams(r)) } // RecentCheckStatuses returns the statuses of the last `limit` runs of a check, @@ -197,38 +119,21 @@ func (s *Store) RecentCheckStatuses(ctx context.Context, prURL, name string, lim } // ListChecks returns every recorded check run for a PR. -func (s *Store) ListChecks(ctx context.Context, prURL string) ([]PRCheckRow, error) { +func (s *Store) ListChecks(ctx context.Context, prURL string) ([]domain.PRCheckRow, error) { rows, err := s.qr.ListChecksByPR(ctx, prURL) if err != nil { return nil, fmt.Errorf("list checks %s: %w", prURL, err) } - out := make([]PRCheckRow, 0, len(rows)) + out := make([]domain.PRCheckRow, 0, len(rows)) for _, c := range rows { - out = append(out, PRCheckRow{ - PRURL: c.PrUrl, Name: c.Name, CommitHash: c.CommitHash, - Status: c.Status, URL: c.Url, LogTail: c.LogTail, CreatedAt: c.CreatedAt, - }) + out = append(out, checkRowFromGen(c)) } return out, nil } -// ---- pr_comment ---- - -// PRCommentRow is one review comment on a PR. -type PRCommentRow struct { - PRURL string - CommentID string - Author string - File string - Line int64 - Body string - Resolved bool - CreatedAt time.Time -} - // ReplacePRComments atomically replaces the full comment set for a PR (each SCM // fetch reports the current set, so a replace keeps it in sync). -func (s *Store) ReplacePRComments(ctx context.Context, prURL string, comments []PRCommentRow) error { +func (s *Store) ReplacePRComments(ctx context.Context, prURL string, comments []domain.PRComment) error { s.writeMu.Lock() defer s.writeMu.Unlock() return s.inTx(ctx, "replace pr comments", func(q *gen.Queries) error { @@ -236,17 +141,8 @@ func (s *Store) ReplacePRComments(ctx context.Context, prURL string, comments [] return err } for _, c := range comments { - if err := q.UpsertPRComment(ctx, gen.UpsertPRCommentParams{ - PrUrl: prURL, - CommentID: c.CommentID, - Author: c.Author, - File: c.File, - Line: c.Line, - Body: c.Body, - Resolved: boolToInt(c.Resolved), - CreatedAt: c.CreatedAt, - }); err != nil { - return fmt.Errorf("comment %q: %w", c.CommentID, err) + if err := q.UpsertPRComment(ctx, genCommentParams(prURL, c)); err != nil { + return fmt.Errorf("comment %q: %w", c.ID, err) } } return nil @@ -254,17 +150,97 @@ func (s *Store) ReplacePRComments(ctx context.Context, prURL string, comments [] } // ListPRComments returns a PR's review comments, oldest first. -func (s *Store) ListPRComments(ctx context.Context, prURL string) ([]PRCommentRow, error) { +func (s *Store) ListPRComments(ctx context.Context, prURL string) ([]domain.PRComment, error) { rows, err := s.qr.ListPRComments(ctx, prURL) if err != nil { return nil, fmt.Errorf("list pr comments %s: %w", prURL, err) } - out := make([]PRCommentRow, 0, len(rows)) + out := make([]domain.PRComment, 0, len(rows)) for _, c := range rows { - out = append(out, PRCommentRow{ - PRURL: c.PrUrl, CommentID: c.CommentID, Author: c.Author, File: c.File, - Line: c.Line, Body: c.Body, Resolved: c.Resolved != 0, CreatedAt: c.CreatedAt, - }) + out = append(out, commentFromGen(c)) } return out, nil } + +// ---- domain <-> gen mapping ---- + +// prState collapses the PR's bools into the single pr.state column value. +func prState(r domain.PRRow) string { + switch { + case r.Merged: + return "merged" + case r.Closed: + return "closed" + case r.Draft: + return "draft" + default: + return "open" + } +} + +func orDefault(v, def string) string { + if v == "" { + return def + } + return v +} + +func genPRParams(r domain.PRRow) gen.UpsertPRParams { + return gen.UpsertPRParams{ + Url: r.URL, + SessionID: r.SessionID, + Number: int64(r.Number), + PrState: prState(r), + ReviewDecision: orDefault(string(r.Review), "none"), + CiState: orDefault(string(r.CI), "unknown"), + Mergeability: orDefault(string(r.Mergeability), "unknown"), + UpdatedAt: r.UpdatedAt, + } +} + +func prRowFromGen(p gen.Pr) domain.PRRow { + return domain.PRRow{ + URL: p.Url, + SessionID: p.SessionID, + Number: int(p.Number), + Draft: p.PrState == "draft", + Merged: p.PrState == "merged", + Closed: p.PrState == "closed", + CI: domain.CIState(p.CiState), + Review: domain.ReviewDecision(p.ReviewDecision), + Mergeability: domain.Mergeability(p.Mergeability), + UpdatedAt: p.UpdatedAt, + } +} + +func genCheckParams(c domain.PRCheckRow) gen.UpsertPRCheckParams { + status := c.Status + if status == "" { + status = "unknown" + } + return gen.UpsertPRCheckParams{ + PrUrl: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, + Status: status, Url: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, + } +} + +func checkRowFromGen(c gen.PrCheck) domain.PRCheckRow { + return domain.PRCheckRow{ + PRURL: c.PrUrl, Name: c.Name, CommitHash: c.CommitHash, + Status: c.Status, URL: c.Url, LogTail: c.LogTail, CreatedAt: c.CreatedAt, + } +} + +func genCommentParams(prURL string, c domain.PRComment) gen.UpsertPRCommentParams { + return gen.UpsertPRCommentParams{ + PrUrl: prURL, CommentID: c.ID, Author: c.Author, File: c.File, + Line: int64(c.Line), Body: c.Body, Resolved: boolToInt(c.Resolved), CreatedAt: c.CreatedAt, + } +} + +func commentFromGen(c gen.PrComment) domain.PRComment { + return domain.PRComment{ + ID: c.CommentID, Author: c.Author, File: c.File, Line: int(c.Line), + Body: c.Body, Resolved: c.Resolved != 0, CreatedAt: c.CreatedAt, + } +} diff --git a/backend/internal/storage/sqlite/store_test.go b/backend/internal/storage/sqlite/store_test.go index 55165c41..832bcfa4 100644 --- a/backend/internal/storage/sqlite/store_test.go +++ b/backend/internal/storage/sqlite/store_test.go @@ -143,9 +143,9 @@ func TestPRCRUD(t *testing.T) { r, _ := s.CreateSession(ctx, sampleRecord("mer")) now := time.Now().UTC().Truncate(time.Second) - pr := PRRow{ - URL: "https://gh/pr/1", SessionID: string(r.ID), Number: 1, State: "open", - ReviewDecision: "review_required", CIState: "failing", Mergeability: "blocked", UpdatedAt: now, + pr := domain.PRRow{ + URL: "https://gh/pr/1", SessionID: string(r.ID), Number: 1, + Review: domain.ReviewRequired, CI: domain.CIFailing, Mergeability: domain.MergeBlocked, UpdatedAt: now, } if err := s.UpsertPR(ctx, pr); err != nil { t.Fatal(err) @@ -171,11 +171,11 @@ func TestPRChecksLoopBrakeQuery(t *testing.T) { seedProject(t, s, "mer") r, _ := s.CreateSession(ctx, sampleRecord("mer")) now := time.Now().UTC().Truncate(time.Second) - _ = s.UpsertPR(ctx, PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: now}) + _ = s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: now}) // three consecutive failing runs of "build" (one per commit). for i := 1; i <= 3; i++ { - if err := s.RecordCheck(ctx, PRCheckRow{ + if err := s.RecordCheck(ctx, domain.PRCheckRow{ PRURL: "pr1", Name: "build", CommitHash: fmt.Sprintf("c%d", i), Status: "failed", CreatedAt: now.Add(time.Duration(i) * time.Second), }); err != nil { @@ -190,7 +190,7 @@ func TestPRChecksLoopBrakeQuery(t *testing.T) { t.Fatalf("recent statuses = %v, want 3x failed (loop brake would trip)", last3) } // a pass on a newer commit breaks the streak. - _ = s.RecordCheck(ctx, PRCheckRow{PRURL: "pr1", Name: "build", CommitHash: "c4", Status: "passed", CreatedAt: now.Add(4 * time.Second)}) + _ = s.RecordCheck(ctx, domain.PRCheckRow{PRURL: "pr1", Name: "build", CommitHash: "c4", Status: "passed", CreatedAt: now.Add(4 * time.Second)}) last3, _ = s.RecentCheckStatuses(ctx, "pr1", "build", 3) if last3[0] != "passed" { t.Fatalf("most recent should be passed, got %v", last3) @@ -203,17 +203,17 @@ func TestPRCommentsReplace(t *testing.T) { seedProject(t, s, "mer") r, _ := s.CreateSession(ctx, sampleRecord("mer")) now := time.Now().UTC().Truncate(time.Second) - _ = s.UpsertPR(ctx, PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: now}) + _ = s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: now}) - _ = s.ReplacePRComments(ctx, "pr1", []PRCommentRow{ - {PRURL: "pr1", CommentID: "c1", Author: "a", File: "a.go", Line: 1, Body: "nit", CreatedAt: now}, - {PRURL: "pr1", CommentID: "c2", Author: "b", File: "b.go", Line: 2, Body: "bug", Resolved: true, CreatedAt: now.Add(time.Second)}, + _ = s.ReplacePRComments(ctx, "pr1", []domain.PRComment{ + {ID: "c1", Author: "a", File: "a.go", Line: 1, Body: "nit", CreatedAt: now}, + {ID: "c2", Author: "b", File: "b.go", Line: 2, Body: "bug", Resolved: true, CreatedAt: now.Add(time.Second)}, }) if list, _ := s.ListPRComments(ctx, "pr1"); len(list) != 2 { t.Fatalf("comments = %d, want 2", len(list)) } // replace with a smaller set drops the rest. - _ = s.ReplacePRComments(ctx, "pr1", []PRCommentRow{{PRURL: "pr1", CommentID: "c1", Body: "x", CreatedAt: now}}) + _ = s.ReplacePRComments(ctx, "pr1", []domain.PRComment{{ID: "c1", Body: "x", CreatedAt: now}}) if list, _ := s.ListPRComments(ctx, "pr1"); len(list) != 1 { t.Fatalf("after replace, comments = %d, want 1", len(list)) } @@ -231,7 +231,7 @@ func TestCDCTriggersPopulateChangeLog(t *testing.T) { r.Metadata.Prompt = "only metadata changed" _ = s.UpdateSession(ctx, r) // a PR insert logs too. - _ = s.UpsertPR(ctx, PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: r.UpdatedAt}) + _ = s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: r.UpdatedAt}) evs, err := s.ReadChangeLogAfter(ctx, 0, 100) if err != nil { diff --git a/backend/internal/storage/sqlite/wiring/adapter.go b/backend/internal/storage/sqlite/wiring/adapter.go deleted file mode 100644 index 8a8d017d..00000000 --- a/backend/internal/storage/sqlite/wiring/adapter.go +++ /dev/null @@ -1,107 +0,0 @@ -// Package wiring bridges *sqlite.Store to the engine's outbound ports. It -// embeds the store (so the SessionStore reads/writes and PRWriter.RecentCheckStatuses -// promote directly) and supplies the PR conversions plus the PRFacts read-model -// that drives the derived display status. -// -// The adapter lives in its own package so the daemon's composition root and any -// in-process integration tests (e.g. backend/internal/integration) can share the -// same bridge instead of redefining it. -package wiring - -import ( - "context" - - "github.com/aoagents/agent-orchestrator/backend/internal/domain" - "github.com/aoagents/agent-orchestrator/backend/internal/ports" - "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" -) - -// Adapter wraps *sqlite.Store and implements ports.SessionStore + ports.PRWriter. -// The embedded *sqlite.Store promotes CreateSession / UpdateSession / GetSession -// / ListSessions / ListAllSessions and RecentCheckStatuses verbatim; the two -// methods defined here are the ones that need shape translation between the port -// types and the sqlite row types. -type Adapter struct{ *sqlite.Store } - -var ( - _ ports.SessionStore = Adapter{} - _ ports.PRWriter = Adapter{} -) - -// PRFactsForSession picks the PR that drives display status — the most-recently -// updated non-closed PR, else the most recent — and folds in whether it has -// unresolved review comments. -func (a Adapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) { - rows, err := a.Store.ListPRsBySession(ctx, string(id)) // newest first - if err != nil { - return domain.PRFacts{}, err - } - if len(rows) == 0 { - return domain.PRFacts{}, nil - } - pick := rows[0] - for _, r := range rows { - if r.State == "draft" || r.State == "open" { - pick = r - break - } - } - facts := domain.PRFacts{ - URL: pick.URL, Number: int(pick.Number), Exists: true, - Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed", - CI: domain.CIState(pick.CIState), - Review: domain.ReviewDecision(pick.ReviewDecision), - Mergeability: domain.Mergeability(pick.Mergeability), - } - comments, err := a.Store.ListPRComments(ctx, pick.URL) - if err != nil { - return domain.PRFacts{}, err - } - for _, c := range comments { - if !c.Resolved { - facts.ReviewComments = true - break - } - } - return facts, nil -} - -func (a Adapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error { - row := sqlite.PRRow{ - URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number), - State: prState(pr), - ReviewDecision: string(pr.Review), - CIState: string(pr.CI), - Mergeability: string(pr.Mergeability), - UpdatedAt: pr.UpdatedAt, - } - checkRows := make([]sqlite.PRCheckRow, len(checks)) - for i, c := range checks { - checkRows[i] = sqlite.PRCheckRow{ - PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash, - Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt, - } - } - commentRows := make([]sqlite.PRCommentRow, len(comments)) - for i, c := range comments { - commentRows[i] = sqlite.PRCommentRow{ - PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File, - Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt, - } - } - return a.Store.WritePRObservation(ctx, row, checkRows, commentRows) -} - -// prState collapses the PR's bools into the single pr.state column value. -func prState(r ports.PRRow) string { - switch { - case r.Merged: - return "merged" - case r.Closed: - return "closed" - case r.Draft: - return "draft" - default: - return "open" - } -}