Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/internal/cdc/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestE2E_StoreWriteToBroadcast(t *testing.T) {
if err := s.UpdateSession(ctx, r); err != nil { // -> session_updated (seq 2)
t.Fatal(err)
}
if err := s.UpsertPR(ctx, sqlite.PRRow{URL: "pr1", SessionID: string(r.ID), State: "open", UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3)
if err := s.UpsertPR(ctx, domain.PRRow{URL: "pr1", SessionID: string(r.ID), UpdatedAt: r.UpdatedAt}); err != nil { // -> pr_created (seq 3)
t.Fatal(err)
}

Expand Down
14 changes: 6 additions & 8 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/session"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring"
)

// lifecycleStack owns the running LCM + reaper. The LCM is the sole writer of
// canonical transitions; the reaper is the OBSERVE-layer timer that probes live
// runtimes and reports facts back through it. Adapter is exposed so the Session
// runtimes and reports facts back through it. Store is exposed so the Session
// Manager construction in startSession can plug the same SessionStore + PRWriter
// instance the LCM already holds.
// instance the LCM already holds (*sqlite.Store satisfies both ports directly).
type lifecycleStack struct {
LCM *lifecycle.Manager
Adapter wiring.Adapter
Store *sqlite.Store
reaperDone <-chan struct{}
}

Expand All @@ -38,12 +37,11 @@ type lifecycleStack struct {
// - reaper.MapRegistry{} — empty runtime registry, so the reaper ticks
// escalations but probes nothing until the runtime plugins exist.
func startLifecycle(ctx context.Context, store *sqlite.Store, logger *slog.Logger) (*lifecycleStack, error) {
a := wiring.Adapter{Store: store}
renderer := notification.NewRenderer(store)
notifier := notification.NewEnqueuer(store, renderer, logger)
lcm := lifecycle.New(a, a, notifier, noopMessenger{})
lcm := lifecycle.New(store, store, notifier, noopMessenger{})
rp := reaper.New(lcm, reaper.MapRegistry{}, reaper.Config{Logger: logger})
return &lifecycleStack{LCM: lcm, Adapter: a, reaperDone: rp.Start(ctx)}, nil
return &lifecycleStack{LCM: lcm, Store: store, reaperDone: rp.Start(ctx)}, nil
}

// Stop waits for the reaper goroutine to exit (the caller must have cancelled the
Expand Down Expand Up @@ -88,7 +86,7 @@ func startSession(ctx context.Context, cfg config.Config, ls *lifecycleStack, lo
Runtime: runtime,
Agent: agent,
Workspace: ws,
Store: ls.Adapter,
Store: ls.Store,
Messenger: noopMessenger{},
Lifecycle: ls.LCM,
})
Expand Down
14 changes: 6 additions & 8 deletions backend/internal/daemon/wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
"github.com/aoagents/agent-orchestrator/backend/internal/session"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/wiring"
)

// TestWiring_WriteFlowsToBroadcaster exercises the real boot path end to end:
Expand All @@ -32,11 +31,10 @@ func TestWiring_WriteFlowsToBroadcaster(t *testing.T) {
}
defer store.Close()

a := wiring.Adapter{Store: store}
renderer := notification.NewRenderer(store)
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
notifier := notification.NewEnqueuer(store, renderer, logger)
lcm := lifecycle.New(a, a, notifier, noopMessenger{})
lcm := lifecycle.New(store, store, notifier, noopMessenger{})

bcast := cdc.NewBroadcaster()
poller := cdc.NewPoller(cdcSource{store}, bcast, cdc.PollerConfig{})
Expand Down Expand Up @@ -123,13 +121,13 @@ func TestWiring_SessionManagerSharesLifecycleStoreAndLCM(t *testing.T) {

gotStore, gotLCM := inspectSessionDeps(t, sStack.SM)

// Store should be the exact wiring.Adapter the LCM was constructed with.
gotAdapter, ok := gotStore.(wiring.Adapter)
// Store should be the exact *sqlite.Store the LCM was constructed with.
gotSqlite, ok := gotStore.(*sqlite.Store)
if !ok {
t.Fatalf("SM.store is %T, want wiring.Adapter", gotStore)
t.Fatalf("SM.store is %T, want *sqlite.Store", gotStore)
}
if gotAdapter.Store != lcStack.Adapter.Store {
t.Fatalf("SM.store wraps a different *sqlite.Store than lcStack.Adapter")
if gotSqlite != lcStack.Store {
t.Fatalf("SM.store is a different *sqlite.Store than lcStack.Store")
}

// Lifecycle should be the exact *lifecycle.Manager pointer from startLifecycle.
Expand Down
47 changes: 47 additions & 0 deletions backend/internal/domain/pr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package domain

import "time"

// The PR rows are the canonical shapes for the pr / pr_checks / pr_comment
// tables, shared by the PRWriter port and the sqlite store (the store maps them
// to/from the sqlc gen.* models). They are flat by design — these tables carry
// no nesting or derivation, so a single definition serves every layer.
//
// PRRow is the scalar facts of one tracked pull request (the pr table). A session
// can own several PRs; a PR belongs to one session. PRFacts is the read-model
// derived from these for display status; PRRow is what gets written.
type PRRow struct {
URL string
SessionID string
Number int
Draft bool
Merged bool
Closed bool
CI CIState
Review ReviewDecision
Mergeability Mergeability
UpdatedAt time.Time
}

// PRCheckRow is one CI check run — one row per check name per commit.
type PRCheckRow struct {
PRURL string
Name string
CommitHash string
Status string
URL string
LogTail string
CreatedAt time.Time
}

// PRComment is one review comment. Feedback is injected into the agent
// regardless of author, so there is no bot/human distinction.
type PRComment struct {
ID string
Author string
File string
Line int
Body string
Resolved bool
CreatedAt time.Time
}
115 changes: 9 additions & 106 deletions backend/internal/integration/lifecycle_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,99 +24,6 @@ import (
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

// ---- store adapter ----
//
// MIRROR OF backend/lifecycle_wiring.go's storeAdapter. The integration tests
// can't import package main, so the small set of methods that bridge
// *sqlite.Store to ports.SessionStore + ports.PRWriter is duplicated here.
// Function bodies are line-for-line identical to the production adapter so a
// future divergence shows up as a real diff in code review; the obvious
// follow-up is to extract the production adapter into a shared internal
// package — explicitly out of scope for this PR ("do NOT redesign anything").

type storeAdapter struct{ *sqlite.Store }

var (
_ ports.SessionStore = storeAdapter{}
_ ports.PRWriter = storeAdapter{}
)

func (a storeAdapter) PRFactsForSession(ctx context.Context, id domain.SessionID) (domain.PRFacts, error) {
rows, err := a.Store.ListPRsBySession(ctx, string(id))
if err != nil {
return domain.PRFacts{}, err
}
if len(rows) == 0 {
return domain.PRFacts{}, nil
}
pick := rows[0]
for _, r := range rows {
if r.State == "draft" || r.State == "open" {
pick = r
break
}
}
facts := domain.PRFacts{
URL: pick.URL, Number: int(pick.Number), Exists: true,
Draft: pick.State == "draft", Merged: pick.State == "merged", Closed: pick.State == "closed",
CI: domain.CIState(pick.CIState),
Review: domain.ReviewDecision(pick.ReviewDecision),
Mergeability: domain.Mergeability(pick.Mergeability),
}
comments, err := a.Store.ListPRComments(ctx, pick.URL)
if err != nil {
return domain.PRFacts{}, err
}
for _, c := range comments {
if !c.Resolved {
facts.ReviewComments = true
break
}
}
return facts, nil
}

func (a storeAdapter) WritePR(ctx context.Context, pr ports.PRRow, checks []ports.PRCheckRow, comments []ports.PRComment) error {
row := sqlite.PRRow{
URL: pr.URL, SessionID: pr.SessionID, Number: int64(pr.Number),
State: prState(pr),
ReviewDecision: string(pr.Review),
CIState: string(pr.CI),
Mergeability: string(pr.Mergeability),
UpdatedAt: pr.UpdatedAt,
}
checkRows := make([]sqlite.PRCheckRow, len(checks))
for i, c := range checks {
checkRows[i] = sqlite.PRCheckRow{
PRURL: c.PRURL, Name: c.Name, CommitHash: c.CommitHash,
Status: c.Status, URL: c.URL, LogTail: c.LogTail, CreatedAt: c.CreatedAt,
}
}
commentRows := make([]sqlite.PRCommentRow, len(comments))
for i, c := range comments {
commentRows[i] = sqlite.PRCommentRow{
PRURL: pr.URL, CommentID: c.ID, Author: c.Author, File: c.File,
Line: int64(c.Line), Body: c.Body, Resolved: c.Resolved, CreatedAt: c.CreatedAt,
}
}
return a.Store.WritePRObservation(ctx, row, checkRows, commentRows)
}

// prState mirrors the production helper of the same name in
// backend/lifecycle_wiring.go.
func prState(r ports.PRRow) string {
switch {
case r.Merged:
return "merged"
case r.Closed:
return "closed"
case r.Draft:
return "draft"
default:
return "open"
}
}

// ---- plugin fakes (minimal: only enough to drive SM through real LCM) ----

type stubRuntime struct {
Expand Down Expand Up @@ -197,7 +104,6 @@ func (n *captureNotifier) drain() []ports.Event {
type liveStack struct {
dataDir string
store *sqlite.Store
adapter storeAdapter
lcm *lifecycle.Manager
sm *session.Manager
notifier *captureNotifier
Expand All @@ -216,24 +122,22 @@ func openLiveStack(t *testing.T, dataDir string) *liveStack {
if err != nil {
t.Fatalf("open sqlite: %v", err)
}
adapter := storeAdapter{store}
notifier := &captureNotifier{}
messenger := &captureMessenger{}
lcm := lifecycle.New(adapter, adapter, notifier, messenger)
lcm := lifecycle.New(store, store, notifier, messenger)

wsRoot := t.TempDir()
sm := session.New(session.Deps{
Runtime: &stubRuntime{id: "h1", name: "tmux"},
Agent: stubAgent{},
Workspace: &stubWorkspace{root: wsRoot},
Store: adapter,
Store: store,
Messenger: messenger,
Lifecycle: lcm,
})
st := &liveStack{
dataDir: dataDir,
store: store,
adapter: adapter,
lcm: lcm,
sm: sm,
notifier: notifier,
Expand Down Expand Up @@ -272,11 +176,10 @@ func seedProject(t *testing.T, store *sqlite.Store, id string) {
}

func durableLifecycle(store *sqlite.Store, messenger ports.AgentMessenger) *lifecycle.Manager {
adapter := storeAdapter{store}
renderer := notification.NewRenderer(store)
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
notifier := notification.NewEnqueuer(store, renderer, logger)
return lifecycle.New(adapter, adapter, notifier, messenger)
return lifecycle.New(store, store, notifier, messenger)
}

func durableRecord(project, issue, branch string) domain.SessionRecord {
Expand Down Expand Up @@ -347,7 +250,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) {
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
Fetched: true, URL: prURL, Number: 1,
CI: domain.CIPassing, Review: domain.ReviewNone, Mergeability: domain.MergeMergeable,
Checks: []ports.PRCheckRow{{
Checks: []domain.PRCheckRow{{
Name: "ci/build", CommitHash: "abc123", Status: "passed", CreatedAt: time.Now(),
}},
}); err != nil {
Expand All @@ -357,7 +260,7 @@ func TestHappyPath_Spawn_PR_Kill(t *testing.T) {
if err != nil || !ok {
t.Fatalf("get pr: ok=%v err=%v", ok, err)
}
if prRow.SessionID != string(sess.ID) || prRow.CIState != "passing" || prRow.State != "open" {
if prRow.SessionID != string(sess.ID) || prRow.CI != domain.CIPassing || prRow.Draft || prRow.Merged || prRow.Closed {
t.Fatalf("pr row wrong: %+v", prRow)
}

Expand Down Expand Up @@ -491,7 +394,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
Fetched: true, URL: prURL, Number: 2,
CI: domain.CIFailing, Mergeability: domain.MergeUnstable,
Checks: []ports.PRCheckRow{{
Checks: []domain.PRCheckRow{{
Name: "ci/build", CommitHash: "c1", Status: "failed", LogTail: "panic: nil map", CreatedAt: time.Now(),
}},
}); err != nil {
Expand All @@ -507,7 +410,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {

// Brake confirmation: only one failure so far, RecentCheckStatuses should
// reflect it.
history, err := st.adapter.RecentCheckStatuses(ctx, prURL, "ci/build", 3)
history, err := st.store.RecentCheckStatuses(ctx, prURL, "ci/build", 3)
if err != nil {
t.Fatalf("recent checks: %v", err)
}
Expand All @@ -521,7 +424,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {
if err := st.lcm.ApplyPRObservation(ctx, sess.ID, ports.PRObservation{
Fetched: true, URL: prURL, Number: 2,
CI: domain.CIPassing, Mergeability: domain.MergeMergeable,
Checks: []ports.PRCheckRow{{
Checks: []domain.PRCheckRow{{
Name: "ci/build", CommitHash: "c2", Status: "passed", CreatedAt: time.Now(),
}},
}); err != nil {
Expand All @@ -537,7 +440,7 @@ func TestCIFailureAndRecovery_NudgeThenClears(t *testing.T) {

// And the pr row reflects the recovery in the canonical fact store.
prRow, ok, _ := st.store.GetPR(ctx, prURL)
if !ok || prRow.CIState != "passing" {
if !ok || prRow.CI != domain.CIPassing {
t.Fatalf("pr ci_state should be passing post-recovery: %+v", prRow)
}
}
Expand Down
6 changes: 3 additions & 3 deletions backend/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,20 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o
// in one atomic store call. PR-table CDC is emitted by the DB triggers.
func (m *Manager) writePR(ctx context.Context, id domain.SessionID, o ports.PRObservation) error {
now := m.clock()
row := ports.PRRow{
row := domain.PRRow{
URL: o.URL, SessionID: string(id), Number: o.Number,
Draft: o.Draft, Merged: o.Merged, Closed: o.Closed,
CI: o.CI, Review: o.Review, Mergeability: o.Mergeability, UpdatedAt: now,
}
checks := make([]ports.PRCheckRow, len(o.Checks))
checks := make([]domain.PRCheckRow, len(o.Checks))
for i, c := range o.Checks {
c.PRURL = o.URL
if c.CreatedAt.IsZero() {
c.CreatedAt = now
}
checks[i] = c
}
comments := make([]ports.PRComment, len(o.Comments))
comments := make([]domain.PRComment, len(o.Comments))
for i, c := range o.Comments {
if c.CreatedAt.IsZero() {
c.CreatedAt = now
Expand Down
Loading
Loading