diff --git a/backend/internal/adapters/workspace/gitworktree/commands.go b/backend/internal/adapters/workspace/gitworktree/commands.go index cc0339bf54..9b0a963a4d 100644 --- a/backend/internal/adapters/workspace/gitworktree/commands.go +++ b/backend/internal/adapters/workspace/gitworktree/commands.go @@ -107,6 +107,31 @@ func ignoredCountArgs(worktree string) []string { return []string{"-C", worktree, "status", "--ignored", "--porcelain"} } +// mergeBaseArgs returns the merge-base (fork point) of HEAD and ref in the +// worktree. Exit code 1 means no common ancestor. +func mergeBaseArgs(worktree, ref string) []string { + return []string{"-C", worktree, "merge-base", "HEAD", ref} +} + +// diffUnifiedZeroArgs diffs the worktree's current state (committed AND +// uncommitted tracked changes) against base with zero context lines, so the +// hunk headers report exactly the changed line ranges. --no-color and +// --no-ext-diff keep the output machine-parseable regardless of user git config. +func diffUnifiedZeroArgs(worktree, base string) []string { + return []string{"-C", worktree, "diff", "--unified=0", "--no-color", "--no-ext-diff", base} +} + +// baseBranchRefCandidates lists the refs to try, in order, when locating the +// base branch a session's worktree forked from. It mirrors baseRefCandidates' +// remote-then-local preference but targets the default/base branch only (never +// the session branch itself, which would make the merge-base degenerate). +func baseBranchRefCandidates(defaultBranch string) []string { + if strings.Contains(defaultBranch, "/") { + return []string{defaultBranch, "refs/heads/" + defaultBranch} + } + return []string{"origin/" + defaultBranch, "refs/heads/" + defaultBranch} +} + func baseRefCandidates(branch, defaultBranch string) []string { candidates := []string{"origin/" + branch} if strings.Contains(defaultBranch, "/") { diff --git a/backend/internal/adapters/workspace/gitworktree/diffparse_test.go b/backend/internal/adapters/workspace/gitworktree/diffparse_test.go new file mode 100644 index 0000000000..7440bc6cfb --- /dev/null +++ b/backend/internal/adapters/workspace/gitworktree/diffparse_test.go @@ -0,0 +1,100 @@ +package gitworktree + +import ( + "reflect" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestParseDiffChangedRegions_MultiHunk(t *testing.T) { + out := `diff --git a/config.go b/config.go +index 1111111..2222222 100644 +--- a/config.go ++++ b/config.go +@@ -10,2 +10,3 @@ func A() { +-old ++new ++new2 +@@ -40,0 +42,2 @@ func B() { ++added ++added2 +` + got := parseDiffChangedRegions(out) + want := map[string][]ports.LineRange{ + "config.go": {{Start: 10, End: 12}, {Start: 42, End: 43}}, + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %+v, want %+v", got, want) + } +} + +func TestParseDiffChangedRegions_SingleLineNoCount(t *testing.T) { + // `@@ -5 +5 @@` (no comma) means a one-line change at line 5. + out := `--- a/x.go ++++ b/x.go +@@ -5 +5 @@ +-a ++b +` + got := parseDiffChangedRegions(out) + want := map[string][]ports.LineRange{"x.go": {{Start: 5, End: 5}}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %+v, want %+v", got, want) + } +} + +func TestParseDiffChangedRegions_PureDeletionHunk(t *testing.T) { + // `+0,0` is a pure deletion; the new-side anchor is reported as a 1-line range. + out := `--- a/y.go ++++ b/y.go +@@ -7,3 +6,0 @@ +-gone1 +-gone2 +-gone3 +` + got := parseDiffChangedRegions(out) + want := map[string][]ports.LineRange{"y.go": {{Start: 6, End: 6}}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %+v, want %+v", got, want) + } +} + +func TestParseDiffChangedRegions_DeletedFileKeepsPath(t *testing.T) { + // A deleted file has `+++ /dev/null`; the old path must still be recorded. + out := `diff --git a/dead.go b/dead.go +deleted file mode 100644 +--- a/dead.go ++++ /dev/null +@@ -1,2 +0,0 @@ +-line1 +-line2 +` + got := parseDiffChangedRegions(out) + if _, ok := got["dead.go"]; !ok { + t.Fatalf("deleted file path missing: %+v", got) + } +} + +func TestParseDiffChangedRegions_NewFile(t *testing.T) { + out := `diff --git a/new.go b/new.go +new file mode 100644 +--- /dev/null ++++ b/new.go +@@ -0,0 +1,3 @@ ++a ++b ++c +` + got := parseDiffChangedRegions(out) + want := map[string][]ports.LineRange{"new.go": {{Start: 1, End: 3}}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("got %+v, want %+v", got, want) + } +} + +func TestParseDiffChangedRegions_Empty(t *testing.T) { + if got := parseDiffChangedRegions(""); len(got) != 0 { + t.Fatalf("empty diff should yield no regions, got %+v", got) + } +} diff --git a/backend/internal/adapters/workspace/gitworktree/parse.go b/backend/internal/adapters/workspace/gitworktree/parse.go index 5b2947ba9d..6935d29a4c 100644 --- a/backend/internal/adapters/workspace/gitworktree/parse.go +++ b/backend/internal/adapters/workspace/gitworktree/parse.go @@ -2,9 +2,98 @@ package gitworktree import ( "bufio" + "strconv" "strings" + + "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) +// parseDiffChangedRegions parses `git diff --unified=0` output into a map of +// repo-relative path → changed line ranges in the new revision. It tracks the +// current file from the `+++ b/` header (falling back to the `--- a/` +// header for deletions where `+++` is /dev/null) and reads each `@@ -a,b +c,d @@` +// hunk header for the `+c,d` span. A hunk with count 0 (pure deletion) is +// recorded as a single-line range at the anchor position. Files that change with +// no parseable hunks (e.g. binary or mode-only changes) still appear with an +// empty range slice so callers can treat them as file-level overlaps. +func parseDiffChangedRegions(out string) map[string][]ports.LineRange { + regions := map[string][]ports.LineRange{} + var curPath string + var minusPath string + + s := bufio.NewScanner(strings.NewReader(out)) + s.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for s.Scan() { + line := strings.TrimRight(s.Text(), "\r") + switch { + case strings.HasPrefix(line, "--- "): + minusPath = stripDiffPathPrefix(strings.TrimPrefix(line, "--- ")) + case strings.HasPrefix(line, "+++ "): + p := stripDiffPathPrefix(strings.TrimPrefix(line, "+++ ")) + if p == "" { + p = minusPath // deletion: +++ is /dev/null, use the old path. + } + curPath = p + if curPath != "" { + if _, ok := regions[curPath]; !ok { + regions[curPath] = []ports.LineRange{} + } + } + case strings.HasPrefix(line, "@@ ") && curPath != "": + if r, ok := parseHunkNewRange(line); ok { + regions[curPath] = append(regions[curPath], r) + } + } + } + if s.Err() != nil { + return regions + } + return regions +} + +// stripDiffPathPrefix removes git's a//b/ diff prefix and resolves /dev/null to +// the empty string. It does not attempt to unquote core.quotePath-escaped paths; +// those rare paths degrade to file-level overlaps, which is acceptable. +func stripDiffPathPrefix(p string) string { + p = strings.TrimSpace(p) + if p == "/dev/null" || p == "" { + return "" + } + if strings.HasPrefix(p, "a/") || strings.HasPrefix(p, "b/") { + return p[2:] + } + return p +} + +// parseHunkNewRange extracts the new-revision span from an `@@ -a,b +c,d @@` +// header. A missing count means 1; a zero count (insertion point / deletion) +// yields a single-line range at the anchor. +func parseHunkNewRange(line string) (ports.LineRange, bool) { + plus := strings.Index(line, "+") + if plus < 0 { + return ports.LineRange{}, false + } + rest := line[plus+1:] + if sp := strings.IndexAny(rest, " \t"); sp >= 0 { + rest = rest[:sp] + } + startStr, countStr, hasCount := strings.Cut(rest, ",") + start, err := strconv.Atoi(startStr) + if err != nil { + return ports.LineRange{}, false + } + count := 1 + if hasCount { + if c, err := strconv.Atoi(countStr); err == nil { + count = c + } + } + if count <= 0 { + return ports.LineRange{Start: start, End: start}, true + } + return ports.LineRange{Start: start, End: start + count - 1}, true +} + type worktreeRecord struct { Path string Branch string diff --git a/backend/internal/adapters/workspace/gitworktree/workspace.go b/backend/internal/adapters/workspace/gitworktree/workspace.go index 39bb01b9c1..92f41423ae 100644 --- a/backend/internal/adapters/workspace/gitworktree/workspace.go +++ b/backend/internal/adapters/workspace/gitworktree/workspace.go @@ -84,6 +84,7 @@ type Workspace struct { type commandRunner func(ctx context.Context, binary string, args ...string) ([]byte, error) var _ ports.Workspace = (*Workspace)(nil) +var _ ports.WorkspaceDiffer = (*Workspace)(nil) // New builds a gitworktree Workspace, validating that ManagedRoot and // RepoResolver are set and resolving the root to an absolute, symlink-free path. @@ -465,6 +466,48 @@ func (w *Workspace) existingWorktree(ctx context.Context, repo, path string, cfg return ports.WorkspaceInfo{}, false, nil } +// ChangedRegions implements ports.WorkspaceDiffer. It returns the files (and +// changed line ranges within them) the session's worktree has modified relative +// to where it forked from the base branch — committed branch work and +// uncommitted edits alike — which the convergence observer compares across +// sessions to detect edit collisions before they become merge conflicts. +func (w *Workspace) ChangedRegions(ctx context.Context, info ports.WorkspaceInfo) (map[string][]ports.LineRange, error) { + if info.Path == "" { + return nil, fmt.Errorf("%w: empty path", ErrUnsafePath) + } + path, err := w.validateManagedPath(info.Path) + if err != nil { + return nil, err + } + base := w.resolveDiffBase(ctx, path) + out, err := w.run(ctx, w.binary, diffUnifiedZeroArgs(path, base)...) + if err != nil { + return nil, fmt.Errorf("gitworktree: diff %q against %q: %w", path, base, err) + } + return parseDiffChangedRegions(string(out)), nil +} + +// resolveDiffBase picks the commit ChangedRegions diffs against: the merge-base +// of HEAD and the first reachable base-branch ref. It falls back to "HEAD" +// (uncommitted changes only) when no base branch ref resolves or the two share +// no common ancestor, so a brand-new or remoteless repo still reports live edits. +func (w *Workspace) resolveDiffBase(ctx context.Context, path string) string { + for _, ref := range baseBranchRefCandidates(w.defaultBranch) { + exists, err := w.refExists(ctx, path, ref) + if err != nil || !exists { + continue + } + out, err := w.run(ctx, w.binary, mergeBaseArgs(path, ref)...) + if err != nil { + continue + } + if mb := strings.TrimSpace(string(out)); mb != "" { + return mb + } + } + return "HEAD" +} + func (w *Workspace) addWorktree(ctx context.Context, repo, path, branch, baseBranch string) error { // Refuse early if the branch is already checked out in another worktree: // `git worktree add` will fail, but its stderr leaks through as an opaque diff --git a/backend/internal/cli/dto_drift_e2e_test.go b/backend/internal/cli/dto_drift_e2e_test.go index 743a398d7f..e626e06ff1 100644 --- a/backend/internal/cli/dto_drift_e2e_test.go +++ b/backend/internal/cli/dto_drift_e2e_test.go @@ -141,6 +141,10 @@ func (f *fakeProjectManager) Remove(context.Context, domain.ProjectID) (projects return projectsvc.RemoveResult{}, nil } +func (f *fakeProjectManager) Collisions(context.Context, domain.ProjectID) ([]projectsvc.Collision, error) { + return nil, nil +} + // startDriftTestDaemon stands up the real router+controllers backed by the // supplied fakes and points the CLI's run-file at it. The CLI discovers the // server purely via AO_RUN_FILE + the run-file port, so this is a genuine diff --git a/backend/internal/daemon/convergence_wiring.go b/backend/internal/daemon/convergence_wiring.go new file mode 100644 index 0000000000..0ea708796a --- /dev/null +++ b/backend/internal/daemon/convergence_wiring.go @@ -0,0 +1,35 @@ +package daemon + +// This file wires the cross-session collision (convergence) observer into daemon +// startup. The observer diffs each live session's worktree and detects when two +// parallel agents are editing overlapping code before either opens a PR, then +// nudges them through the same Lifecycle Manager the SCM lane uses. + +import ( + "context" + "log/slog" + "path/filepath" + + "github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree" + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/lifecycle" + "github.com/aoagents/agent-orchestrator/backend/internal/observe/convergence" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite" +) + +// startConvergenceObserver builds a read-only gitworktree differ over the same +// managed root and project→repo resolver the session workspace uses, then starts +// the convergence observer. A construction failure disables the lane (logged once) +// rather than failing daemon startup, mirroring startSCMObserver's posture. +func startConvergenceObserver(ctx context.Context, cfg config.Config, store *sqlite.Store, lcm *lifecycle.Manager, logger *slog.Logger) <-chan struct{} { + differ, err := gitworktree.New(gitworktree.Options{ + ManagedRoot: filepath.Join(cfg.DataDir, "worktrees"), + RepoResolver: projectRepoResolver{store: store}, + }) + if err != nil { + logger.Warn("convergence observer disabled: workspace differ setup failed", "err", err) + return closedDone() + } + observer := convergence.New(differ, store, lcm, convergence.Config{Logger: logger}) + return observer.Start(ctx) +} diff --git a/backend/internal/daemon/daemon.go b/backend/internal/daemon/daemon.go index 8918388157..40ef7f7128 100644 --- a/backend/internal/daemon/daemon.go +++ b/backend/internal/daemon/daemon.go @@ -106,6 +106,7 @@ func Run() error { // change_log -> poller -> broadcaster) and gives startSession the shared LCM. lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notificationWriter, telemetrySink, log) lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log) + lcStack.convergenceDone = startConvergenceObserver(ctx, cfg, store, lcStack.LCM, log) // Wire the controller-facing session service over the same store + LCM, the // selected runtime, a gitworktree workspace, the per-session agent resolver diff --git a/backend/internal/daemon/lifecycle_wiring.go b/backend/internal/daemon/lifecycle_wiring.go index c5391fe197..97c735e265 100644 --- a/backend/internal/daemon/lifecycle_wiring.go +++ b/backend/internal/daemon/lifecycle_wiring.go @@ -34,9 +34,10 @@ type lifecycleStack struct { // LCM is the Lifecycle Manager (the canonical write path). It is exposed so // startSession can share the same reducer the reaper drives, rather than // standing up a second store+LCM pair that would diverge under writes. - LCM *lifecycle.Manager - reaperDone <-chan struct{} - scmDone <-chan struct{} + LCM *lifecycle.Manager + reaperDone <-chan struct{} + scmDone <-chan struct{} + convergenceDone <-chan struct{} } // startLifecycle constructs the Lifecycle Manager over the store and starts the @@ -56,6 +57,9 @@ func (l *lifecycleStack) Stop() { if l.scmDone != nil { <-l.scmDone } + if l.convergenceDone != nil { + <-l.convergenceDone + } } // sessionLifecycle is the narrow surface of sessionmanager.Manager used for diff --git a/backend/internal/domain/collision.go b/backend/internal/domain/collision.go new file mode 100644 index 0000000000..6186f56422 --- /dev/null +++ b/backend/internal/domain/collision.go @@ -0,0 +1,67 @@ +package domain + +import "time" + +// CollisionSeverity grades how strongly two parallel sessions overlap in the +// code they are changing. The convergence observer computes it from each +// session's worktree diff before either has opened a PR. +type CollisionSeverity string + +const ( + // CollisionSoft means two sessions changed the same file(s) but their + // changed line ranges do not intersect — a likely-resolvable overlap worth + // surfacing but not worth interrupting an agent over. + CollisionSoft CollisionSeverity = "soft" + // CollisionHot means two sessions changed overlapping line ranges in the + // same file — a near-certain future merge conflict. Hot collisions drive the + // proactive agent nudge. + CollisionHot CollisionSeverity = "hot" +) + +// Valid reports whether s is a known severity. +func (s CollisionSeverity) Valid() bool { + switch s { + case CollisionSoft, CollisionHot: + return true + default: + return false + } +} + +// CollisionFile is one file two sessions both changed, with the line ranges +// (in the newer revision) that overlap when the collision is hot. For soft +// collisions Ranges is empty. +type CollisionFile struct { + Path string `json:"path"` + Ranges [][2]int `json:"ranges,omitempty"` +} + +// SessionCollision is the durable fact that two non-terminated worker sessions +// in the same project are concurrently editing overlapping code. SessionA and +// SessionB are stored in a stable lexical order (A < B) so each unordered pair +// has exactly one row. +type SessionCollision struct { + ProjectID ProjectID + SessionA SessionID + SessionB SessionID + Severity CollisionSeverity + // Files lists every overlapping path (for hot collisions, with the + // overlapping line ranges). It is what the agent nudge and the dashboard + // render. + Files []CollisionFile + // Signature is a content hash of (severity + files). The observer writes a + // row only when the signature changes, and lifecycle nudges only when a + // freshly hot signature appears, so a stable overlap is reported once. + Signature string + FirstSeenAt time.Time + UpdatedAt time.Time +} + +// DisplayNames is an optional, non-persisted enrichment the observer can attach +// before handing a collision to lifecycle so a nudge can name the peer session +// without an extra store read. Keyed by SessionID. +type CollisionWithNames struct { + SessionCollision + NameA string + NameB string +} diff --git a/backend/internal/httpd/apispec/openapi.yaml b/backend/internal/httpd/apispec/openapi.yaml index c194ffa0bd..70b808123b 100644 --- a/backend/internal/httpd/apispec/openapi.yaml +++ b/backend/internal/httpd/apispec/openapi.yaml @@ -489,6 +489,33 @@ paths: summary: Fetch one project; discriminates ok vs degraded tags: - projects + /api/v1/projects/{id}/collisions: + get: + operationId: listProjectCollisions + parameters: + - description: Project identifier (registry key). + in: path + name: id + required: true + schema: + description: Project identifier (registry key). + type: string + responses: + "200": + content: + application/json: + schema: + $ref: '#/components/schemas/ListProjectCollisionsResponse' + description: OK + "500": + content: + application/json: + schema: + $ref: '#/components/schemas/APIError' + description: Internal Server Error + summary: List the project's current cross-session edit collisions + tags: + - projects /api/v1/projects/{id}/config: put: operationId: setProjectConfig @@ -1548,6 +1575,56 @@ components: - sessionId - reason type: object + Collision: + properties: + files: + items: + $ref: '#/components/schemas/CollisionFile' + type: array + firstSeenAt: + format: date-time + type: string + sessionA: + type: string + sessionB: + type: string + severity: + enum: + - soft + - hot + type: string + updatedAt: + format: date-time + type: string + required: + - sessionA + - sessionB + - severity + - files + - firstSeenAt + - updatedAt + type: object + CollisionFile: + properties: + path: + type: string + ranges: + items: + $ref: '#/components/schemas/CollisionRange' + type: array + required: + - path + type: object + CollisionRange: + properties: + end: + type: integer + start: + type: integer + required: + - start + - end + type: object ControllersSessionView: properties: activity: @@ -1704,6 +1781,15 @@ components: required: - notifications type: object + ListProjectCollisionsResponse: + properties: + collisions: + items: + $ref: '#/components/schemas/Collision' + type: array + required: + - collisions + type: object ListProjectsResponse: properties: projects: diff --git a/backend/internal/httpd/apispec/specgen/build.go b/backend/internal/httpd/apispec/specgen/build.go index 21570e3582..a4c34f2d83 100644 --- a/backend/internal/httpd/apispec/specgen/build.go +++ b/backend/internal/httpd/apispec/specgen/build.go @@ -133,6 +133,7 @@ var schemaNames = map[string]string{ "DomainRoleOverride": "RoleOverride", // httpd/controllers (wire envelopes) "ControllersListProjectsResponse": "ListProjectsResponse", + "ControllersListProjectCollisionsResponse": "ListProjectCollisionsResponse", "ControllersProjectResponse": "ProjectResponse", "ControllersGetProjectResponse": "ProjectGetResponse", "ControllersProjectOrDegraded": "ProjectOrDegraded", @@ -204,6 +205,9 @@ var schemaNames = map[string]string{ "ProjectRemoveResult": "RemoveProjectResult", "ProjectSetConfigInput": "SetProjectConfigInput", "ProjectWorkspaceRepo": "WorkspaceRepo", + "ProjectCollision": "Collision", + "ProjectCollisionFile": "CollisionFile", + "ProjectCollisionRange": "CollisionRange", } // markRequestBodyRequired sets requestBody.required: true on the operation's @@ -425,7 +429,7 @@ func eventOperations() []operation { } } -// projectOperations declares the 4 canonical /projects operations. The set must +// projectOperations declares the canonical /projects operations. The set must // stay 1:1 with the routes ProjectsController.Register mounts — // TestRouteSpecParity fails the build otherwise. func projectOperations() []operation { @@ -459,6 +463,15 @@ func projectOperations() []operation { {http.StatusInternalServerError, envelope.APIError{}}, }, }, + { + method: http.MethodGet, path: "/api/v1/projects/{id}/collisions", id: "listProjectCollisions", tag: "projects", + summary: "List the project's current cross-session edit collisions", + pathParams: []any{controllers.ProjectIDParam{}}, + resps: []respUnit{ + {http.StatusOK, controllers.ListProjectCollisionsResponse{}}, + {http.StatusInternalServerError, envelope.APIError{}}, + }, + }, { method: http.MethodPut, path: "/api/v1/projects/{id}/config", id: "setProjectConfig", tag: "projects", summary: "Replace a project's per-project config", diff --git a/backend/internal/httpd/controllers/dto.go b/backend/internal/httpd/controllers/dto.go index ab859ef7a0..115d65daf1 100644 --- a/backend/internal/httpd/controllers/dto.go +++ b/backend/internal/httpd/controllers/dto.go @@ -31,6 +31,13 @@ type ListProjectsResponse struct { Projects []projectsvc.Summary `json:"projects"` } +// ListProjectCollisionsResponse is the body of +// GET /api/v1/projects/{id}/collisions: the project's current cross-session +// edit collisions detected by the convergence observer. +type ListProjectCollisionsResponse struct { + Collisions []projectsvc.Collision `json:"collisions"` +} + // ProjectResponse is the { project } body shared by POST /projects (201). type ProjectResponse struct { Project projectsvc.Project `json:"project"` diff --git a/backend/internal/httpd/controllers/projects.go b/backend/internal/httpd/controllers/projects.go index d3957a480c..c9136b93aa 100644 --- a/backend/internal/httpd/controllers/projects.go +++ b/backend/internal/httpd/controllers/projects.go @@ -27,6 +27,7 @@ func (c *ProjectsController) Register(r chi.Router) { r.Get("/projects", c.list) r.Post("/projects", c.add) r.Get("/projects/{id}", c.get) + r.Get("/projects/{id}/collisions", c.collisions) r.Put("/projects/{id}/config", c.setConfig) r.Delete("/projects/{id}", c.remove) } @@ -83,6 +84,22 @@ func (c *ProjectsController) get(w http.ResponseWriter, r *http.Request) { envelope.WriteJSON(w, http.StatusOK, resp) } +func (c *ProjectsController) collisions(w http.ResponseWriter, r *http.Request) { + if c.Mgr == nil { + apispec.NotImplemented(w, r, "GET", "/api/v1/projects/{id}/collisions") + return + } + got, err := c.Mgr.Collisions(r.Context(), projectID(r)) + if err != nil { + envelope.WriteError(w, r, err) + return + } + if got == nil { + got = []projectsvc.Collision{} + } + envelope.WriteJSON(w, http.StatusOK, ListProjectCollisionsResponse{Collisions: got}) +} + func (c *ProjectsController) setConfig(w http.ResponseWriter, r *http.Request) { if c.Mgr == nil { apispec.NotImplemented(w, r, "PUT", "/api/v1/projects/{id}/config") diff --git a/backend/internal/httpd/controllers/projects_collisions_test.go b/backend/internal/httpd/controllers/projects_collisions_test.go new file mode 100644 index 0000000000..404eb26dc9 --- /dev/null +++ b/backend/internal/httpd/controllers/projects_collisions_test.go @@ -0,0 +1,77 @@ +package controllers_test + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/config" + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/httpd" + projectsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/project" +) + +type collisionsManager struct { + projectsvc.Manager + got domain.ProjectID + out []projectsvc.Collision +} + +func (m *collisionsManager) Collisions(_ context.Context, id domain.ProjectID) ([]projectsvc.Collision, error) { + m.got = id + return m.out, nil +} + +func TestProjectsAPI_ListCollisions(t *testing.T) { + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + mgr := &collisionsManager{out: []projectsvc.Collision{{ + SessionA: "p-1", + SessionB: "p-2", + Severity: "hot", + Files: []projectsvc.CollisionFile{{Path: "config.go", Ranges: []projectsvc.CollisionRange{{Start: 15, End: 20}}}}, + }}} + srv := httptest.NewServer(httpd.NewRouterWithControl(config.Config{}, log, nil, httpd.APIDeps{ + Projects: mgr, + }, httpd.ControlDeps{})) + t.Cleanup(srv.Close) + + body, status, headers := doRequest(t, srv, "GET", "/api/v1/projects/p/collisions", "") + assertJSON(t, headers) + if status != http.StatusOK { + t.Fatalf("status = %d, body = %s", status, body) + } + if mgr.got != "p" { + t.Fatalf("handler passed project id %q, want \"p\"", mgr.got) + } + var resp controllers_ListProjectCollisionsResponse + if err := json.Unmarshal(body, &resp); err != nil { + t.Fatalf("unmarshal: %v (%s)", err, body) + } + if len(resp.Collisions) != 1 || resp.Collisions[0].Severity != "hot" { + t.Fatalf("unexpected body: %+v", resp.Collisions) + } + if resp.Collisions[0].Files[0].Ranges[0].Start != 15 { + t.Fatalf("range round-trip failed: %+v", resp.Collisions[0].Files) + } +} + +// controllers_ListProjectCollisionsResponse mirrors the wire body so the test +// decodes the JSON without importing the controller's unexported shapes. +type controllers_ListProjectCollisionsResponse struct { + Collisions []projectsvc.Collision `json:"collisions"` +} + +func TestProjectsAPI_ListCollisions_StubWithoutManager(t *testing.T) { + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + srv := httptest.NewServer(httpd.NewRouterWithControl(config.Config{}, log, nil, httpd.APIDeps{}, httpd.ControlDeps{})) + t.Cleanup(srv.Close) + + _, status, _ := doRequest(t, srv, "GET", "/api/v1/projects/p/collisions", "") + if status != http.StatusNotImplemented { + t.Fatalf("nil manager must return 501, got %d", status) + } +} diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 3b115afe2f..385a8f40ee 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -189,6 +189,58 @@ func (m *Manager) ApplyPRObservation(ctx context.Context, id domain.SessionID, o return nil } +// collisionMaxNudge caps how many times each agent is told about one evolving +// cross-session collision so a churning overlap cannot spam its pane. +const collisionMaxNudge = 3 + +// ApplyCollision reacts to a hot cross-session edit collision detected by the +// convergence observer (two live sessions changing overlapping code before +// either opens a PR). It sends each live participant a coordinating nudge naming +// the peer session and the overlapping files so the agents can diverge before +// the overlap becomes a merge conflict. Dedup is keyed on recipient+peer and the +// collision signature: a steady overlap is announced once and only re-announced +// when its content changes. Unlike PR nudges, the dedup is in-memory only +// (prURL is empty), so the worst case after a daemon restart is one repeat — the +// same trade-off the PR-nudge path documents. +func (m *Manager) ApplyCollision(ctx context.Context, c domain.CollisionWithNames) error { + if m.messenger == nil || c.Severity != domain.CollisionHot { + return nil + } + if err := m.nudgeCollision(ctx, c.SessionA, c.SessionB, c.NameB, c.Signature, c.Files); err != nil { + return err + } + return m.nudgeCollision(ctx, c.SessionB, c.SessionA, c.NameA, c.Signature, c.Files) +} + +func (m *Manager) nudgeCollision(ctx context.Context, recipient, peer domain.SessionID, peerName, sig string, files []domain.CollisionFile) error { + rec, ok, err := m.store.GetSession(ctx, recipient) + if err != nil || !ok { + return err + } + if rec.IsTerminated { + return nil + } + key := "collision:" + string(recipient) + ":" + string(peer) + return m.sendOnce(ctx, recipient, "", key, sig, collisionMessage(peerName, files), collisionMaxNudge) +} + +func collisionMessage(peerName string, files []domain.CollisionFile) string { + const maxList = 5 + paths := make([]string, 0, len(files)) + for _, f := range files { + paths = append(paths, domain.SanitizeControlChars(f.Path)) + } + list := strings.Join(paths[:min(len(paths), maxList)], ", ") + if len(paths) > maxList { + list += fmt.Sprintf(", and %d more", len(paths)-maxList) + } + name := domain.SanitizeControlChars(peerName) + if strings.TrimSpace(name) == "" { + name = "another session" + } + return fmt.Sprintf("Heads up: another agent session (%s) is concurrently editing overlapping code in this repository: %s. Coordinate or diverge now to avoid a merge conflict before you both open PRs.", name, list) +} + // ApplyReviewResult reacts to a completed AO-internal review pass after the // review service has persisted the run result. It mirrors ApplyPRObservation: // no change_log reads, no review_run writes, only lifecycle side effects. diff --git a/backend/internal/lifecycle/reactions_collision_test.go b/backend/internal/lifecycle/reactions_collision_test.go new file mode 100644 index 0000000000..e6d7e92404 --- /dev/null +++ b/backend/internal/lifecycle/reactions_collision_test.go @@ -0,0 +1,110 @@ +package lifecycle + +import ( + "strings" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func hotCollision(a, b domain.SessionID, nameA, nameB, sig string, files ...domain.CollisionFile) domain.CollisionWithNames { + return domain.CollisionWithNames{ + SessionCollision: domain.SessionCollision{ + SessionA: a, + SessionB: b, + Severity: domain.CollisionHot, + Files: files, + Signature: sig, + }, + NameA: nameA, + NameB: nameB, + } +} + +func TestApplyCollision_NudgesBothLiveParticipants(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + st.sessions["mer-2"] = working("mer-2") + + c := hotCollision("mer-1", "mer-2", "alpha", "bravo", "sig1", + domain.CollisionFile{Path: "config.go", Ranges: [][2]int{{15, 20}}}) + if err := m.ApplyCollision(ctx, c); err != nil { + t.Fatalf("ApplyCollision: %v", err) + } + if len(msg.msgs) != 2 { + t.Fatalf("want 2 nudges (one per participant), got %d: %v", len(msg.msgs), msg.msgs) + } + // The message sent to mer-1 names the peer (bravo) and the overlapping file. + joined := strings.Join(msg.msgs, "\n") + if !strings.Contains(joined, "alpha") || !strings.Contains(joined, "bravo") { + t.Fatalf("nudges should name both peers; got %v", msg.msgs) + } + if !strings.Contains(joined, "config.go") { + t.Fatalf("nudges should name the overlapping file; got %v", msg.msgs) + } +} + +func TestApplyCollision_DedupesSameSignature(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + st.sessions["mer-2"] = working("mer-2") + c := hotCollision("mer-1", "mer-2", "alpha", "bravo", "sig1", + domain.CollisionFile{Path: "config.go"}) + + for i := 0; i < 3; i++ { + if err := m.ApplyCollision(ctx, c); err != nil { + t.Fatalf("ApplyCollision %d: %v", i, err) + } + } + if len(msg.msgs) != 2 { + t.Fatalf("same signature must nudge each participant once; got %d", len(msg.msgs)) + } +} + +func TestApplyCollision_RenudgesOnChangedSignature(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + st.sessions["mer-2"] = working("mer-2") + + if err := m.ApplyCollision(ctx, hotCollision("mer-1", "mer-2", "a", "b", "sig1", domain.CollisionFile{Path: "config.go"})); err != nil { + t.Fatal(err) + } + if err := m.ApplyCollision(ctx, hotCollision("mer-1", "mer-2", "a", "b", "sig2", domain.CollisionFile{Path: "config.go", Ranges: [][2]int{{1, 9}}})); err != nil { + t.Fatal(err) + } + if len(msg.msgs) != 4 { + t.Fatalf("changed signature should re-nudge both; want 4 total, got %d", len(msg.msgs)) + } +} + +func TestApplyCollision_SkipsTerminatedRecipient(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + term := working("mer-2") + term.IsTerminated = true + st.sessions["mer-2"] = term + + c := hotCollision("mer-1", "mer-2", "alpha", "bravo", "sig1", domain.CollisionFile{Path: "config.go"}) + if err := m.ApplyCollision(ctx, c); err != nil { + t.Fatalf("ApplyCollision: %v", err) + } + if len(msg.msgs) != 1 { + t.Fatalf("terminated recipient must be skipped; want 1 nudge, got %d", len(msg.msgs)) + } +} + +func TestApplyCollision_SoftIsNoOp(t *testing.T) { + m, st, msg := newManager() + st.sessions["mer-1"] = working("mer-1") + st.sessions["mer-2"] = working("mer-2") + soft := domain.CollisionWithNames{ + SessionCollision: domain.SessionCollision{SessionA: "mer-1", SessionB: "mer-2", Severity: domain.CollisionSoft, Signature: "s"}, + NameA: "a", NameB: "b", + } + if err := m.ApplyCollision(ctx, soft); err != nil { + t.Fatalf("ApplyCollision: %v", err) + } + if len(msg.msgs) != 0 { + t.Fatalf("soft collision must not nudge; got %d", len(msg.msgs)) + } +} diff --git a/backend/internal/observe/convergence/observer.go b/backend/internal/observe/convergence/observer.go new file mode 100644 index 0000000000..50b6848b2e --- /dev/null +++ b/backend/internal/observe/convergence/observer.go @@ -0,0 +1,322 @@ +// Package convergence implements the cross-session edit-collision observer. It +// is the proactive counterpart to the SCM observer's reactive merge-conflict +// handling: where the SCM lane learns about a conflict only after two agents +// have opened PRs that GitHub reports as conflicting, this lane watches the live +// worktree diffs of all parallel sessions in a project and detects that two +// agents are editing overlapping code BEFORE either opens a PR. +// +// The loop follows the repository's OBSERVE → UPDATE → DERIVE/ACT pipeline: +// OBSERVE each session's changed files/ranges via the workspace differ, UPDATE +// the durable session_collision facts, and ACT by nudging the colliding agents +// through lifecycle. It reuses observe.StartPollLoop for the polling goroutine. +package convergence + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "log/slog" + "sort" + "strconv" + "strings" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/observe" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +const ( + // DefaultTickInterval is how often the observer recomputes cross-session + // overlap. Worktree diffs are local git operations, so this is faster than + // the SCM observer's network-bound cadence but slow enough to stay cheap. + DefaultTickInterval = 20 * time.Second + // maxFilesPerCollision bounds the per-pair file list so a sweeping refactor + // in two sessions cannot produce an unbounded collision payload. + maxFilesPerCollision = 50 +) + +// Differ reports the files (and changed line ranges) a session's worktree has +// modified relative to its base. ports.WorkspaceDiffer satisfies it. +type Differ interface { + ChangedRegions(ctx context.Context, info ports.WorkspaceInfo) (map[string][]ports.LineRange, error) +} + +// Store supplies the live session set and persists collision facts. +type Store interface { + ListAllSessions(ctx context.Context) ([]domain.SessionRecord, error) + ListAllCollisions(ctx context.Context) ([]domain.SessionCollision, error) + UpsertCollision(ctx context.Context, c domain.SessionCollision) error + DeleteCollision(ctx context.Context, a, b domain.SessionID) error +} + +// Lifecycle is notified when a hot collision first appears (or its content +// changes) so it can send the coordinating agent nudge. It is optional; a nil +// Lifecycle disables nudging while still persisting facts. +type Lifecycle interface { + ApplyCollision(ctx context.Context, c domain.CollisionWithNames) error +} + +// Config holds optional observer knobs. Zero values use production defaults. +type Config struct { + Tick time.Duration + Clock func() time.Time + Logger *slog.Logger +} + +// Observer coordinates worktree diffing, overlap computation, persistence, and +// lifecycle nudges for cross-session edit collisions. +type Observer struct { + differ Differ + store Store + lifecycle Lifecycle + tick time.Duration + clock func() time.Time + logger *slog.Logger +} + +// New constructs an Observer, applying defaults for zero-valued cfg fields. +func New(differ Differ, store Store, lifecycle Lifecycle, cfg Config) *Observer { + o := &Observer{differ: differ, store: store, lifecycle: lifecycle, tick: cfg.Tick, clock: cfg.Clock, logger: cfg.Logger} + if o.tick <= 0 { + o.tick = DefaultTickInterval + } + if o.clock == nil { + o.clock = time.Now + } + if o.logger == nil { + o.logger = slog.Default() + } + return o +} + +// Start launches the polling loop and returns a channel closed when it exits. +func (o *Observer) Start(ctx context.Context) <-chan struct{} { + return observe.StartPollLoop(ctx, o.tick, o.Poll, o.logger, "convergence observer") +} + +// Poll runs one synchronous collision-detection cycle: diff every eligible +// session, compute pairwise overlaps per project, then reconcile the durable +// collision set (upsert changed pairs, delete pairs that no longer overlap) and +// nudge agents on hot collisions. +func (o *Observer) Poll(ctx context.Context) error { + now := o.clock().UTC() + sessions, err := o.store.ListAllSessions(ctx) + if err != nil { + return fmt.Errorf("convergence: list sessions: %w", err) + } + + byProject := map[domain.ProjectID][]domain.SessionRecord{} + for _, s := range sessions { + if !eligible(s) { + continue + } + byProject[s.ProjectID] = append(byProject[s.ProjectID], s) + } + + desired := map[pairKey]domain.CollisionWithNames{} + for project, group := range byProject { + if len(group) < 2 { + continue + } + o.collisionsForProject(ctx, project, group, now, desired) + } + + existing, err := o.store.ListAllCollisions(ctx) + if err != nil { + return fmt.Errorf("convergence: list collisions: %w", err) + } + o.reconcile(ctx, existing, desired) + return nil +} + +// collisionsForProject diffs each session in one project and records every +// overlapping pair into desired. +func (o *Observer) collisionsForProject(ctx context.Context, project domain.ProjectID, group []domain.SessionRecord, now time.Time, desired map[pairKey]domain.CollisionWithNames) { + regions := make(map[domain.SessionID]map[string][]ports.LineRange, len(group)) + for _, s := range group { + r, err := o.differ.ChangedRegions(ctx, workspaceInfo(s)) + if err != nil { + o.logger.Warn("convergence: diff failed", "session", string(s.ID), "err", err) + continue + } + if len(r) > 0 { + regions[s.ID] = r + } + } + + for i := 0; i < len(group); i++ { + for j := i + 1; j < len(group); j++ { + a, b := group[i], group[j] + ra, okA := regions[a.ID] + rb, okB := regions[b.ID] + if !okA || !okB { + continue + } + files, severity := overlap(ra, rb) + if len(files) == 0 { + continue + } + sa, sb, na, nb := canonicalPair(a, b) + c := domain.SessionCollision{ + ProjectID: project, + SessionA: sa, + SessionB: sb, + Severity: severity, + Files: files, + Signature: signature(severity, files), + FirstSeenAt: now, + UpdatedAt: now, + } + desired[pairKey{sa, sb}] = domain.CollisionWithNames{SessionCollision: c, NameA: na, NameB: nb} + } + } +} + +// reconcile makes the stored collision set match desired: upsert pairs that are +// new or whose signature changed (preserving the original FirstSeenAt), delete +// pairs that no longer overlap, and notify lifecycle for hot collisions whose +// content changed. +func (o *Observer) reconcile(ctx context.Context, existing []domain.SessionCollision, desired map[pairKey]domain.CollisionWithNames) { + prev := make(map[pairKey]domain.SessionCollision, len(existing)) + for _, c := range existing { + prev[pairKey{c.SessionA, c.SessionB}] = c + } + + for key, want := range desired { + old, had := prev[key] + if had && old.Signature == want.Signature { + continue // unchanged overlap: nothing to write or re-nudge. + } + if err := o.store.UpsertCollision(ctx, want.SessionCollision); err != nil { + o.logger.Error("convergence: upsert collision failed", "pair", key.String(), "err", err) + continue + } + if want.Severity == domain.CollisionHot && o.lifecycle != nil { + if err := o.lifecycle.ApplyCollision(ctx, want); err != nil { + o.logger.Error("convergence: lifecycle apply failed", "pair", key.String(), "err", err) + } + } + } + + for key := range prev { + if _, ok := desired[key]; ok { + continue + } + if err := o.store.DeleteCollision(ctx, key.a, key.b); err != nil { + o.logger.Error("convergence: delete collision failed", "pair", key.String(), "err", err) + } + } +} + +type pairKey struct { + a domain.SessionID + b domain.SessionID +} + +func (k pairKey) String() string { return string(k.a) + "|" + string(k.b) } + +// eligible reports whether a session participates in collision detection: a live +// (non-terminated) worker with a materialised worktree. Orchestrators and seed +// rows (empty workspace path) are excluded. +func eligible(s domain.SessionRecord) bool { + return s.Kind == domain.KindWorker && !s.IsTerminated && strings.TrimSpace(s.Metadata.WorkspacePath) != "" +} + +func workspaceInfo(s domain.SessionRecord) ports.WorkspaceInfo { + return ports.WorkspaceInfo{ + Path: s.Metadata.WorkspacePath, + Branch: s.Metadata.Branch, + SessionID: s.ID, + ProjectID: s.ProjectID, + } +} + +// canonicalPair orders two sessions by ID so an unordered pair maps to one row, +// returning the ordered IDs and their matching display names. +func canonicalPair(x, y domain.SessionRecord) (a, b domain.SessionID, nameA, nameB string) { + if x.ID <= y.ID { + return x.ID, y.ID, displayName(x), displayName(y) + } + return y.ID, x.ID, displayName(y), displayName(x) +} + +func displayName(s domain.SessionRecord) string { + if n := strings.TrimSpace(s.DisplayName); n != "" { + return n + } + return string(s.ID) +} + +// overlap computes the shared changed files between two sessions' region maps. +// A file both touched is "hot" when their changed line ranges intersect (or +// either side changed the whole file, e.g. a deletion) and "soft" otherwise. +// The pair's severity is hot if any shared file is hot. The returned files are +// sorted by path and capped at maxFilesPerCollision. +func overlap(a, b map[string][]ports.LineRange) ([]domain.CollisionFile, domain.CollisionSeverity) { + var paths []string + for p := range a { + if _, ok := b[p]; ok { + paths = append(paths, p) + } + } + if len(paths) == 0 { + return nil, "" + } + sort.Strings(paths) + if len(paths) > maxFilesPerCollision { + paths = paths[:maxFilesPerCollision] + } + + files := make([]domain.CollisionFile, 0, len(paths)) + severity := domain.CollisionSoft + for _, p := range paths { + inter := intersectRanges(a[p], b[p]) + hot := len(a[p]) == 0 || len(b[p]) == 0 || len(inter) > 0 + f := domain.CollisionFile{Path: p} + if hot { + severity = domain.CollisionHot + f.Ranges = inter + } + files = append(files, f) + } + return files, severity +} + +// intersectRanges returns the overlapping segments between two sets of line +// ranges, as [start,end] pairs sorted by start. +func intersectRanges(a, b []ports.LineRange) [][2]int { + var out [][2]int + for _, ra := range a { + for _, rb := range b { + start := max(ra.Start, rb.Start) + end := min(ra.End, rb.End) + if start <= end { + out = append(out, [2]int{start, end}) + } + } + } + sort.Slice(out, func(i, j int) bool { return out[i][0] < out[j][0] }) + return out +} + +// signature is a stable content hash of a collision's severity and overlapping +// files/ranges. The observer writes a row only when this changes, and lifecycle +// nudges only when a hot signature is new, so a steady overlap is reported once. +func signature(severity domain.CollisionSeverity, files []domain.CollisionFile) string { + var sb strings.Builder + sb.WriteString(string(severity)) + for _, f := range files { + sb.WriteByte('\n') + sb.WriteString(f.Path) + for _, r := range f.Ranges { + sb.WriteByte(':') + sb.WriteString(strconv.Itoa(r[0])) + sb.WriteByte('-') + sb.WriteString(strconv.Itoa(r[1])) + } + } + sum := sha256.Sum256([]byte(sb.String())) + return hex.EncodeToString(sum[:]) +} diff --git a/backend/internal/observe/convergence/observer_test.go b/backend/internal/observe/convergence/observer_test.go new file mode 100644 index 0000000000..7311f69d06 --- /dev/null +++ b/backend/internal/observe/convergence/observer_test.go @@ -0,0 +1,299 @@ +package convergence + +// These tests exercise the convergence observer's orchestration contract with +// fake differ, store, and lifecycle collaborators so overlap classification, +// signature-based dedup, reconciliation (upsert/delete), and hot-collision +// nudging stay independent of git and SQLite. + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +type fakeDiffer struct { + regions map[domain.SessionID]map[string][]ports.LineRange + err map[domain.SessionID]error +} + +func (f *fakeDiffer) ChangedRegions(_ context.Context, info ports.WorkspaceInfo) (map[string][]ports.LineRange, error) { + if f.err != nil { + if e := f.err[info.SessionID]; e != nil { + return nil, e + } + } + return f.regions[info.SessionID], nil +} + +type fakeStore struct { + mu sync.Mutex + sessions []domain.SessionRecord + current map[pairKey]domain.SessionCollision + upserts int + deletes int + upsertErr error +} + +func newFakeStore(sessions ...domain.SessionRecord) *fakeStore { + return &fakeStore{sessions: sessions, current: map[pairKey]domain.SessionCollision{}} +} + +func (s *fakeStore) ListAllSessions(context.Context) ([]domain.SessionRecord, error) { + return s.sessions, nil +} + +func (s *fakeStore) ListAllCollisions(context.Context) ([]domain.SessionCollision, error) { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]domain.SessionCollision, 0, len(s.current)) + for _, c := range s.current { + out = append(out, c) + } + return out, nil +} + +func (s *fakeStore) UpsertCollision(_ context.Context, c domain.SessionCollision) error { + if s.upsertErr != nil { + return s.upsertErr + } + s.mu.Lock() + defer s.mu.Unlock() + k := pairKey{c.SessionA, c.SessionB} + if existing, ok := s.current[k]; ok { + c.FirstSeenAt = existing.FirstSeenAt // mirror the DB's preserve-on-conflict. + } + s.current[k] = c + s.upserts++ + return nil +} + +func (s *fakeStore) DeleteCollision(_ context.Context, a, b domain.SessionID) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.current, pairKey{a, b}) + s.deletes++ + return nil +} + +type fakeLifecycle struct { + mu sync.Mutex + applied []domain.CollisionWithNames +} + +func (l *fakeLifecycle) ApplyCollision(_ context.Context, c domain.CollisionWithNames) error { + l.mu.Lock() + defer l.mu.Unlock() + l.applied = append(l.applied, c) + return nil +} + +func worker(id, project, name, path string) domain.SessionRecord { + return domain.SessionRecord{ + ID: domain.SessionID(id), + ProjectID: domain.ProjectID(project), + Kind: domain.KindWorker, + DisplayName: name, + Metadata: domain.SessionMetadata{WorkspacePath: path}, + } +} + +func rng(start, end int) ports.LineRange { return ports.LineRange{Start: start, End: end} } + +func newTestObserver(d Differ, s Store, l Lifecycle) *Observer { + return New(d, s, l, Config{Clock: func() time.Time { return time.Unix(1000, 0).UTC() }}) +} + +func TestPoll_HotCollisionPersistsAndNudges(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"config.go": {rng(10, 20)}}, + b.ID: {"config.go": {rng(15, 25)}}, // overlaps 15-20 → hot + }} + store := newFakeStore(a, b) + lc := &fakeLifecycle{} + o := newTestObserver(differ, store, lc) + + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + + if store.upserts != 1 { + t.Fatalf("want 1 upsert, got %d", store.upserts) + } + got := store.current[pairKey{a.ID, b.ID}] + if got.Severity != domain.CollisionHot { + t.Fatalf("want hot, got %q", got.Severity) + } + if len(got.Files) != 1 || got.Files[0].Path != "config.go" { + t.Fatalf("unexpected files: %+v", got.Files) + } + if len(got.Files[0].Ranges) != 1 || got.Files[0].Ranges[0] != [2]int{15, 20} { + t.Fatalf("want overlap 15-20, got %+v", got.Files[0].Ranges) + } + if len(lc.applied) != 1 { + t.Fatalf("want 1 nudge, got %d", len(lc.applied)) + } + if lc.applied[0].NameA != "alpha" || lc.applied[0].NameB != "bravo" { + t.Fatalf("unexpected names: %+v", lc.applied[0]) + } +} + +func TestPoll_SoftCollisionPersistsWithoutNudge(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"config.go": {rng(1, 5)}}, + b.ID: {"config.go": {rng(50, 60)}}, // same file, disjoint ranges → soft + }} + store := newFakeStore(a, b) + lc := &fakeLifecycle{} + o := newTestObserver(differ, store, lc) + + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + got := store.current[pairKey{a.ID, b.ID}] + if got.Severity != domain.CollisionSoft { + t.Fatalf("want soft, got %q", got.Severity) + } + if len(lc.applied) != 0 { + t.Fatalf("soft collision must not nudge; got %d", len(lc.applied)) + } +} + +func TestPoll_NoOverlapNoRow(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"a.go": {rng(1, 5)}}, + b.ID: {"b.go": {rng(1, 5)}}, + }} + store := newFakeStore(a, b) + o := newTestObserver(differ, store, &fakeLifecycle{}) + + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + if len(store.current) != 0 { + t.Fatalf("want no collisions, got %d", len(store.current)) + } +} + +func TestPoll_StableOverlapWritesOnceThenDedupes(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"config.go": {rng(10, 20)}}, + b.ID: {"config.go": {rng(15, 25)}}, + }} + store := newFakeStore(a, b) + lc := &fakeLifecycle{} + o := newTestObserver(differ, store, lc) + + for i := 0; i < 3; i++ { + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll %d: %v", i, err) + } + } + if store.upserts != 1 { + t.Fatalf("stable overlap must upsert once, got %d", store.upserts) + } + if len(lc.applied) != 1 { + t.Fatalf("stable overlap must nudge once, got %d", len(lc.applied)) + } +} + +func TestPoll_ResolvedOverlapIsDeleted(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"config.go": {rng(10, 20)}}, + b.ID: {"config.go": {rng(15, 25)}}, + }} + store := newFakeStore(a, b) + o := newTestObserver(differ, store, &fakeLifecycle{}) + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll 1: %v", err) + } + if len(store.current) != 1 { + t.Fatalf("want 1 collision after first poll, got %d", len(store.current)) + } + + // Session b stops overlapping config.go. + differ.regions[b.ID] = map[string][]ports.LineRange{"other.go": {rng(1, 2)}} + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll 2: %v", err) + } + if len(store.current) != 0 { + t.Fatalf("resolved overlap must be deleted, got %d rows", len(store.current)) + } + if store.deletes != 1 { + t.Fatalf("want 1 delete, got %d", store.deletes) + } +} + +func TestPoll_TerminatedAndOrchestratorExcluded(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + b.IsTerminated = true + c := worker("p-3", "p", "charlie", "/wt/c") + c.Kind = domain.KindOrchestrator + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"config.go": {rng(10, 20)}}, + b.ID: {"config.go": {rng(10, 20)}}, + c.ID: {"config.go": {rng(10, 20)}}, + }} + store := newFakeStore(a, b, c) + o := newTestObserver(differ, store, &fakeLifecycle{}) + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + if len(store.current) != 0 { + t.Fatalf("only one eligible session: want no collisions, got %d", len(store.current)) + } +} + +func TestPoll_WholeFileChangeIsHot(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{regions: map[domain.SessionID]map[string][]ports.LineRange{ + a.ID: {"gone.go": {}}, // deletion / whole-file change, no ranges + b.ID: {"gone.go": {rng(1, 3)}}, // edit + }} + store := newFakeStore(a, b) + lc := &fakeLifecycle{} + o := newTestObserver(differ, store, lc) + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll: %v", err) + } + if got := store.current[pairKey{a.ID, b.ID}]; got.Severity != domain.CollisionHot { + t.Fatalf("whole-file change vs edit must be hot, got %q", got.Severity) + } + if len(lc.applied) != 1 { + t.Fatalf("want 1 nudge, got %d", len(lc.applied)) + } +} + +func TestPoll_DifferErrorSkipsSessionNotCrash(t *testing.T) { + a := worker("p-1", "p", "alpha", "/wt/a") + b := worker("p-2", "p", "bravo", "/wt/b") + differ := &fakeDiffer{ + regions: map[domain.SessionID]map[string][]ports.LineRange{ + b.ID: {"config.go": {rng(1, 5)}}, + }, + err: map[domain.SessionID]error{a.ID: context.DeadlineExceeded}, + } + store := newFakeStore(a, b) + o := newTestObserver(differ, store, &fakeLifecycle{}) + if err := o.Poll(context.Background()); err != nil { + t.Fatalf("Poll must tolerate a per-session diff error: %v", err) + } + if len(store.current) != 0 { + t.Fatalf("one session failed to diff: no pair possible, got %d", len(store.current)) + } +} diff --git a/backend/internal/ports/outbound.go b/backend/internal/ports/outbound.go index f88e1b7834..d9476f3340 100644 --- a/backend/internal/ports/outbound.go +++ b/backend/internal/ports/outbound.go @@ -141,6 +141,27 @@ type Workspace interface { ApplyPreserved(ctx context.Context, info WorkspaceInfo, ref string) error } +// LineRange is an inclusive [Start, End] span of changed lines in a file's new +// revision (1-based). A pure deletion is reported as a single-line range at the +// position the removed lines occupied. +type LineRange struct { + Start int + End int +} + +// WorkspaceDiffer reports the files (and changed line ranges within them) that a +// session's worktree has modified relative to its base. It is intentionally a +// capability distinct from the core Workspace lifecycle interface so that the +// only consumer — the convergence observer that detects cross-session edit +// collisions — depends on exactly what it uses and existing Workspace fakes are +// untouched. The returned map is keyed by repo-relative path; an entry with an +// empty range slice means the file changed but no line-level detail is +// available (e.g. a deletion or a binary file), which still counts as a +// file-level overlap. +type WorkspaceDiffer interface { + ChangedRegions(ctx context.Context, info WorkspaceInfo) (map[string][]LineRange, error) +} + // Workspace-level sentinels surfaced through Create/Restore/Destroy so callers // can map them to typed errors rather than collapsing every adapter failure // into an opaque 500. Adapters wrap these via fmt.Errorf("...: %w", sentinel). diff --git a/backend/internal/service/project/service.go b/backend/internal/service/project/service.go index af8593737f..aac029ab3d 100644 --- a/backend/internal/service/project/service.go +++ b/backend/internal/service/project/service.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "regexp" + "sort" "strconv" "strings" "sync" @@ -35,6 +36,10 @@ type Manager interface { // Remove unregisters a project, stopping its sessions and reclaiming // managed workspaces. Remove(ctx context.Context, id domain.ProjectID) (RemoveResult, error) + + // Collisions returns the project's current cross-session edit collisions + // (the convergence observer's durable facts), newest activity first. + Collisions(ctx context.Context, id domain.ProjectID) ([]Collision, error) } // SessionTeardowner is the narrow session-service surface project removal @@ -454,6 +459,41 @@ func validateProjectID(id domain.ProjectID) error { return nil } +// Collisions returns the project's current cross-session edit collisions as +// wire read-models, ordered by most-recently-updated first so the freshest +// overlaps surface at the top of the dashboard. +func (m *Service) Collisions(ctx context.Context, id domain.ProjectID) ([]Collision, error) { + rows, err := m.store.ListCollisionsByProject(ctx, id) + if err != nil { + return nil, apierr.Internal("PROJECT_COLLISIONS_FAILED", "Failed to load collisions") + } + out := make([]Collision, 0, len(rows)) + for _, c := range rows { + out = append(out, collisionToWire(c)) + } + sort.Slice(out, func(i, j int) bool { return out[i].UpdatedAt.After(out[j].UpdatedAt) }) + return out, nil +} + +func collisionToWire(c domain.SessionCollision) Collision { + files := make([]CollisionFile, 0, len(c.Files)) + for _, f := range c.Files { + ranges := make([]CollisionRange, 0, len(f.Ranges)) + for _, r := range f.Ranges { + ranges = append(ranges, CollisionRange{Start: r[0], End: r[1]}) + } + files = append(files, CollisionFile{Path: f.Path, Ranges: ranges}) + } + return Collision{ + SessionA: c.SessionA, + SessionB: c.SessionB, + Severity: string(c.Severity), + Files: files, + FirstSeenAt: c.FirstSeenAt, + UpdatedAt: c.UpdatedAt, + } +} + // resolveSessionPrefix prefers an explicit per-project SessionPrefix and falls // back to the id-derived prefix. (Display only; session-id generation is // unchanged.) diff --git a/backend/internal/service/project/store.go b/backend/internal/service/project/store.go index 0f72b0ed28..d842b47c4a 100644 --- a/backend/internal/service/project/store.go +++ b/backend/internal/service/project/store.go @@ -16,4 +16,5 @@ type Store interface { UpsertWorkspaceProject(ctx context.Context, row domain.ProjectRecord, repos []domain.WorkspaceRepoRecord) error ListWorkspaceRepos(ctx context.Context, projectID string) ([]domain.WorkspaceRepoRecord, error) ArchiveProject(ctx context.Context, id string, at time.Time) (bool, error) + ListCollisionsByProject(ctx context.Context, project domain.ProjectID) ([]domain.SessionCollision, error) } diff --git a/backend/internal/service/project/types.go b/backend/internal/service/project/types.go index f33b338d58..859eaf0dbd 100644 --- a/backend/internal/service/project/types.go +++ b/backend/internal/service/project/types.go @@ -1,6 +1,10 @@ package project -import "github.com/aoagents/agent-orchestrator/backend/internal/domain" +import ( + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) // Summary is the row shape returned by GET /api/v1/projects. type Summary struct { @@ -40,3 +44,30 @@ type WorkspaceRepo struct { RelativePath string `json:"relativePath"` Repo string `json:"repo"` } + +// Collision is the wire read-model for one cross-session edit collision detected +// by the convergence observer: two live sessions in the project are changing +// overlapping code before either has opened a PR. Severity is "hot" when their +// changed line ranges intersect (a near-certain future merge conflict) or "soft" +// when they only share files. +type Collision struct { + SessionA domain.SessionID `json:"sessionA"` + SessionB domain.SessionID `json:"sessionB"` + Severity string `json:"severity" enum:"soft,hot"` + Files []CollisionFile `json:"files"` + FirstSeenAt time.Time `json:"firstSeenAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +// CollisionFile is one file both sessions changed, with the overlapping line +// ranges (present only for hot collisions). +type CollisionFile struct { + Path string `json:"path"` + Ranges []CollisionRange `json:"ranges,omitempty"` +} + +// CollisionRange is an inclusive [Start, End] span of overlapping lines. +type CollisionRange struct { + Start int `json:"start"` + End int `json:"end"` +} diff --git a/backend/internal/storage/sqlite/gen/collisions.sql.go b/backend/internal/storage/sqlite/gen/collisions.sql.go new file mode 100644 index 0000000000..ae24343b48 --- /dev/null +++ b/backend/internal/storage/sqlite/gen/collisions.sql.go @@ -0,0 +1,188 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: collisions.sql + +package gen + +import ( + "context" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +const deleteCollision = `-- name: DeleteCollision :exec +DELETE FROM session_collision WHERE session_a = ? AND session_b = ? +` + +type DeleteCollisionParams struct { + SessionA domain.SessionID + SessionB domain.SessionID +} + +func (q *Queries) DeleteCollision(ctx context.Context, arg DeleteCollisionParams) error { + _, err := q.db.ExecContext(ctx, deleteCollision, arg.SessionA, arg.SessionB) + return err +} + +const listAllCollisions = `-- name: ListAllCollisions :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +ORDER BY project_id, session_a, session_b +` + +func (q *Queries) ListAllCollisions(ctx context.Context) ([]SessionCollision, error) { + rows, err := q.db.QueryContext(ctx, listAllCollisions) + if err != nil { + return nil, err + } + defer rows.Close() + items := []SessionCollision{} + for rows.Next() { + var i SessionCollision + if err := rows.Scan( + &i.ProjectID, + &i.SessionA, + &i.SessionB, + &i.Severity, + &i.Files, + &i.Signature, + &i.FirstSeenAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listCollisionsByProject = `-- name: ListCollisionsByProject :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +WHERE project_id = ? +ORDER BY session_a, session_b +` + +func (q *Queries) ListCollisionsByProject(ctx context.Context, projectID domain.ProjectID) ([]SessionCollision, error) { + rows, err := q.db.QueryContext(ctx, listCollisionsByProject, projectID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []SessionCollision{} + for rows.Next() { + var i SessionCollision + if err := rows.Scan( + &i.ProjectID, + &i.SessionA, + &i.SessionB, + &i.Severity, + &i.Files, + &i.Signature, + &i.FirstSeenAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listCollisionsForSession = `-- name: ListCollisionsForSession :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +WHERE session_a = ? OR session_b = ? +ORDER BY session_a, session_b +` + +type ListCollisionsForSessionParams struct { + SessionA domain.SessionID + SessionB domain.SessionID +} + +func (q *Queries) ListCollisionsForSession(ctx context.Context, arg ListCollisionsForSessionParams) ([]SessionCollision, error) { + rows, err := q.db.QueryContext(ctx, listCollisionsForSession, arg.SessionA, arg.SessionB) + if err != nil { + return nil, err + } + defer rows.Close() + items := []SessionCollision{} + for rows.Next() { + var i SessionCollision + if err := rows.Scan( + &i.ProjectID, + &i.SessionA, + &i.SessionB, + &i.Severity, + &i.Files, + &i.Signature, + &i.FirstSeenAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const upsertCollision = `-- name: UpsertCollision :exec +INSERT INTO session_collision ( + project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (session_a, session_b) DO UPDATE SET + project_id = excluded.project_id, + severity = excluded.severity, + files = excluded.files, + signature = excluded.signature, + updated_at = excluded.updated_at +` + +type UpsertCollisionParams struct { + ProjectID domain.ProjectID + SessionA domain.SessionID + SessionB domain.SessionID + Severity domain.CollisionSeverity + Files string + Signature string + FirstSeenAt time.Time + UpdatedAt time.Time +} + +// Insert or refresh one ordered session pair. first_seen_at is preserved across +// updates so the dashboard can show how long an overlap has persisted; only +// severity/files/signature/updated_at move when the overlap content changes. +func (q *Queries) UpsertCollision(ctx context.Context, arg UpsertCollisionParams) error { + _, err := q.db.ExecContext(ctx, upsertCollision, + arg.ProjectID, + arg.SessionA, + arg.SessionB, + arg.Severity, + arg.Files, + arg.Signature, + arg.FirstSeenAt, + arg.UpdatedAt, + ) + return err +} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index 1140f53ccf..0fc3beea26 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -183,6 +183,17 @@ type Session struct { PreviewRevision int64 } +type SessionCollision struct { + ProjectID domain.ProjectID + SessionA domain.SessionID + SessionB domain.SessionID + Severity domain.CollisionSeverity + Files string + Signature string + FirstSeenAt time.Time + UpdatedAt time.Time +} + type SessionWorktree struct { SessionID domain.SessionID RepoName string diff --git a/backend/internal/storage/sqlite/migrations/0020_session_collisions.sql b/backend/internal/storage/sqlite/migrations/0020_session_collisions.sql new file mode 100644 index 0000000000..fec3e4f7fc --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0020_session_collisions.sql @@ -0,0 +1,39 @@ +-- +goose Up +-- session_collision is the durable fact that two non-terminated worker sessions +-- in the same project are concurrently editing overlapping code, detected by the +-- convergence observer from each session's worktree diff BEFORE either opens a +-- PR. It is a derived-but-cached fact: the observer recomputes it every tick and +-- upserts/deletes rows so the table always reflects the current overlap set. +-- +-- session_a/session_b hold the pair in a stable lexical order (a < b) so an +-- unordered pair maps to exactly one row; the primary key enforces that. Rows +-- cascade-delete with either session and with the project, so terminating or +-- deleting a session cannot leave a dangling collision. +-- +-- No CDC trigger is attached: collisions surface to the UI through the derived +-- `colliding` flag on the session read and the dedicated collisions endpoint, +-- and to agents through the lifecycle nudge — none of which consume change_log. +-- +goose StatementBegin +CREATE TABLE session_collision ( + project_id TEXT NOT NULL REFERENCES projects (id) ON DELETE CASCADE, + session_a TEXT NOT NULL REFERENCES sessions (id) ON DELETE CASCADE, + session_b TEXT NOT NULL REFERENCES sessions (id) ON DELETE CASCADE, + severity TEXT NOT NULL CHECK (severity IN ('soft', 'hot')), + files TEXT NOT NULL CHECK (json_valid(files)), + signature TEXT NOT NULL, + first_seen_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + PRIMARY KEY (session_a, session_b), + CHECK (session_a < session_b) +); + +CREATE INDEX idx_session_collision_project ON session_collision (project_id); +CREATE INDEX idx_session_collision_b ON session_collision (session_b); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX IF EXISTS idx_session_collision_b; +DROP INDEX IF EXISTS idx_session_collision_project; +DROP TABLE IF EXISTS session_collision; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/queries/collisions.sql b/backend/internal/storage/sqlite/queries/collisions.sql new file mode 100644 index 0000000000..f1e43610d6 --- /dev/null +++ b/backend/internal/storage/sqlite/queries/collisions.sql @@ -0,0 +1,33 @@ +-- name: UpsertCollision :exec +-- Insert or refresh one ordered session pair. first_seen_at is preserved across +-- updates so the dashboard can show how long an overlap has persisted; only +-- severity/files/signature/updated_at move when the overlap content changes. +INSERT INTO session_collision ( + project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT (session_a, session_b) DO UPDATE SET + project_id = excluded.project_id, + severity = excluded.severity, + files = excluded.files, + signature = excluded.signature, + updated_at = excluded.updated_at; + +-- name: DeleteCollision :exec +DELETE FROM session_collision WHERE session_a = ? AND session_b = ?; + +-- name: ListCollisionsByProject :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +WHERE project_id = ? +ORDER BY session_a, session_b; + +-- name: ListAllCollisions :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +ORDER BY project_id, session_a, session_b; + +-- name: ListCollisionsForSession :many +SELECT project_id, session_a, session_b, severity, files, signature, first_seen_at, updated_at +FROM session_collision +WHERE session_a = ? OR session_b = ? +ORDER BY session_a, session_b; diff --git a/backend/internal/storage/sqlite/store/collision_store.go b/backend/internal/storage/sqlite/store/collision_store.go new file mode 100644 index 0000000000..cc2e3dc138 --- /dev/null +++ b/backend/internal/storage/sqlite/store/collision_store.go @@ -0,0 +1,99 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/gen" +) + +// UpsertCollision inserts or refreshes one ordered session-pair collision row. +// The caller is responsible for supplying SessionA < SessionB (the convergence +// observer canonicalises the pair before writing); the table CHECK enforces it. +func (s *Store) UpsertCollision(ctx context.Context, c domain.SessionCollision) error { + files, err := json.Marshal(c.Files) + if err != nil { + return fmt.Errorf("marshal collision files for %s/%s: %w", c.SessionA, c.SessionB, err) + } + s.writeMu.Lock() + defer s.writeMu.Unlock() + return s.qw.UpsertCollision(ctx, gen.UpsertCollisionParams{ + ProjectID: c.ProjectID, + SessionA: c.SessionA, + SessionB: c.SessionB, + Severity: c.Severity, + Files: string(files), + Signature: c.Signature, + FirstSeenAt: c.FirstSeenAt, + UpdatedAt: c.UpdatedAt, + }) +} + +// DeleteCollision removes one ordered session-pair row. It is a no-op when the +// pair is absent, which keeps the observer's per-tick reconciliation idempotent. +func (s *Store) DeleteCollision(ctx context.Context, a, b domain.SessionID) error { + s.writeMu.Lock() + defer s.writeMu.Unlock() + return s.qw.DeleteCollision(ctx, gen.DeleteCollisionParams{SessionA: a, SessionB: b}) +} + +// ListCollisionsByProject returns every collision row for one project. +func (s *Store) ListCollisionsByProject(ctx context.Context, project domain.ProjectID) ([]domain.SessionCollision, error) { + rows, err := s.qr.ListCollisionsByProject(ctx, project) + if err != nil { + return nil, fmt.Errorf("list collisions for %s: %w", project, err) + } + return mapCollisionRows(rows) +} + +// ListAllCollisions returns every collision row across all projects. +func (s *Store) ListAllCollisions(ctx context.Context) ([]domain.SessionCollision, error) { + rows, err := s.qr.ListAllCollisions(ctx) + if err != nil { + return nil, fmt.Errorf("list all collisions: %w", err) + } + return mapCollisionRows(rows) +} + +// ListCollisionsForSession returns every collision row that names the session on +// either side of the pair. +func (s *Store) ListCollisionsForSession(ctx context.Context, id domain.SessionID) ([]domain.SessionCollision, error) { + rows, err := s.qr.ListCollisionsForSession(ctx, gen.ListCollisionsForSessionParams{SessionA: id, SessionB: id}) + if err != nil { + return nil, fmt.Errorf("list collisions for session %s: %w", id, err) + } + return mapCollisionRows(rows) +} + +func mapCollisionRows(rows []gen.SessionCollision) ([]domain.SessionCollision, error) { + out := make([]domain.SessionCollision, 0, len(rows)) + for _, r := range rows { + c, err := collisionFromGen(r) + if err != nil { + return nil, err + } + out = append(out, c) + } + return out, nil +} + +func collisionFromGen(r gen.SessionCollision) (domain.SessionCollision, error) { + var files []domain.CollisionFile + if r.Files != "" { + if err := json.Unmarshal([]byte(r.Files), &files); err != nil { + return domain.SessionCollision{}, fmt.Errorf("unmarshal collision files for %s/%s: %w", r.SessionA, r.SessionB, err) + } + } + return domain.SessionCollision{ + ProjectID: r.ProjectID, + SessionA: r.SessionA, + SessionB: r.SessionB, + Severity: r.Severity, + Files: files, + Signature: r.Signature, + FirstSeenAt: r.FirstSeenAt, + UpdatedAt: r.UpdatedAt, + }, nil +} diff --git a/backend/internal/storage/sqlite/store/collision_store_test.go b/backend/internal/storage/sqlite/store/collision_store_test.go new file mode 100644 index 0000000000..cc1f5c3df8 --- /dev/null +++ b/backend/internal/storage/sqlite/store/collision_store_test.go @@ -0,0 +1,91 @@ +package store_test + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func TestCollisionStore_UpsertListDelete(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + seedProject(t, s, "mer") + a, _ := s.CreateSession(ctx, sampleRecord("mer")) + b, _ := s.CreateSession(ctx, sampleRecord("mer")) + + now := time.Now().UTC().Truncate(time.Second) + c := domain.SessionCollision{ + ProjectID: "mer", + SessionA: a.ID, + SessionB: b.ID, + Severity: domain.CollisionHot, + Files: []domain.CollisionFile{ + {Path: "config.go", Ranges: [][2]int{{15, 20}}}, + {Path: "main.go"}, + }, + Signature: "sig1", + FirstSeenAt: now, + UpdatedAt: now, + } + if a.ID >= b.ID { + c.SessionA, c.SessionB = b.ID, a.ID + } + + if err := s.UpsertCollision(ctx, c); err != nil { + t.Fatalf("upsert: %v", err) + } + + got, err := s.ListCollisionsByProject(ctx, "mer") + if err != nil { + t.Fatalf("list: %v", err) + } + if len(got) != 1 { + t.Fatalf("want 1 collision, got %d", len(got)) + } + if !reflect.DeepEqual(got[0].Files, c.Files) { + t.Fatalf("files round-trip mismatch: got %+v want %+v", got[0].Files, c.Files) + } + if got[0].Severity != domain.CollisionHot { + t.Fatalf("severity round-trip: got %q", got[0].Severity) + } + + // Upsert with a changed signature preserves first_seen_at, moves updated_at. + later := now.Add(time.Minute) + c.Signature = "sig2" + c.Severity = domain.CollisionSoft + c.FirstSeenAt = later // should be ignored by the DB on conflict + c.UpdatedAt = later + if err := s.UpsertCollision(ctx, c); err != nil { + t.Fatalf("re-upsert: %v", err) + } + got, _ = s.ListCollisionsByProject(ctx, "mer") + if len(got) != 1 { + t.Fatalf("re-upsert should not duplicate; got %d rows", len(got)) + } + if !got[0].FirstSeenAt.Equal(now) { + t.Fatalf("first_seen_at must be preserved across upsert: got %v want %v", got[0].FirstSeenAt, now) + } + if got[0].Signature != "sig2" || got[0].Severity != domain.CollisionSoft { + t.Fatalf("signature/severity not updated: %+v", got[0]) + } + + // ListCollisionsForSession finds the pair from either side. + bySession, err := s.ListCollisionsForSession(ctx, a.ID) + if err != nil { + t.Fatalf("list for session: %v", err) + } + if len(bySession) != 1 { + t.Fatalf("want 1 collision for session %s, got %d", a.ID, len(bySession)) + } + + if err := s.DeleteCollision(ctx, c.SessionA, c.SessionB); err != nil { + t.Fatalf("delete: %v", err) + } + got, _ = s.ListCollisionsByProject(ctx, "mer") + if len(got) != 0 { + t.Fatalf("want 0 after delete, got %d", len(got)) + } +} diff --git a/backend/sqlc.yaml b/backend/sqlc.yaml index 070b691606..147397d8ea 100644 --- a/backend/sqlc.yaml +++ b/backend/sqlc.yaml @@ -136,3 +136,19 @@ sql: go_type: import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" type: "ReviewVerdict" + - column: "session_collision.project_id" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "ProjectID" + - column: "session_collision.session_a" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "SessionID" + - column: "session_collision.session_b" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "SessionID" + - column: "session_collision.severity" + go_type: + import: "github.com/aoagents/agent-orchestrator/backend/internal/domain" + type: "CollisionSeverity" diff --git a/frontend/src/api/schema.ts b/frontend/src/api/schema.ts index 9ee643dadf..1b86a1553a 100644 --- a/frontend/src/api/schema.ts +++ b/frontend/src/api/schema.ts @@ -178,6 +178,23 @@ export interface paths { patch?: never; trace?: never; }; + "/api/v1/projects/{id}/collisions": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** List the project's current cross-session edit collisions */ + get: operations["listProjectCollisions"]; + put?: never; + post?: never; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/api/v1/projects/{id}/config": { parameters: { query?: never; @@ -532,6 +549,25 @@ export interface components { reason: string; sessionId: string; }; + Collision: { + files: components["schemas"]["CollisionFile"][]; + /** Format: date-time */ + firstSeenAt: string; + sessionA: string; + sessionB: string; + /** @enum {string} */ + severity: "soft" | "hot"; + /** Format: date-time */ + updatedAt: string; + }; + CollisionFile: { + path: string; + ranges?: components["schemas"]["CollisionRange"][]; + }; + CollisionRange: { + end: number; + start: number; + }; ControllersSessionView: { activity: components["schemas"]["DomainActivity"]; branch?: string; @@ -590,6 +626,9 @@ export interface components { ListNotificationsResponse: { notifications: components["schemas"]["NotificationResponse"][]; }; + ListProjectCollisionsResponse: { + collisions: components["schemas"]["Collision"][]; + }; ListProjectsResponse: { projects: components["schemas"]["ProjectSummary"][]; }; @@ -1568,6 +1607,38 @@ export interface operations { }; }; }; + listProjectCollisions: { + parameters: { + query?: never; + header?: never; + path: { + /** @description Project identifier (registry key). */ + id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description OK */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["ListProjectCollisionsResponse"]; + }; + }; + /** @description Internal Server Error */ + 500: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["APIError"]; + }; + }; + }; + }; setProjectConfig: { parameters: { query?: never;