Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions backend/internal/adapters/workspace/gitworktree/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,31 @@ func ignoredCountArgs(worktree string) []string {
return []string{"-C", worktree, "status", "--ignored", "--porcelain"}
}

// mergeBaseArgs returns the merge-base (fork point) of HEAD and ref in the
// worktree. Exit code 1 means no common ancestor.
func mergeBaseArgs(worktree, ref string) []string {
return []string{"-C", worktree, "merge-base", "HEAD", ref}
}

// diffUnifiedZeroArgs diffs the worktree's current state (committed AND
// uncommitted tracked changes) against base with zero context lines, so the
// hunk headers report exactly the changed line ranges. --no-color and
// --no-ext-diff keep the output machine-parseable regardless of user git config.
func diffUnifiedZeroArgs(worktree, base string) []string {
return []string{"-C", worktree, "diff", "--unified=0", "--no-color", "--no-ext-diff", base}
}

// baseBranchRefCandidates lists the refs to try, in order, when locating the
// base branch a session's worktree forked from. It mirrors baseRefCandidates'
// remote-then-local preference but targets the default/base branch only (never
// the session branch itself, which would make the merge-base degenerate).
func baseBranchRefCandidates(defaultBranch string) []string {
if strings.Contains(defaultBranch, "/") {
return []string{defaultBranch, "refs/heads/" + defaultBranch}
}
return []string{"origin/" + defaultBranch, "refs/heads/" + defaultBranch}
}

func baseRefCandidates(branch, defaultBranch string) []string {
candidates := []string{"origin/" + branch}
if strings.Contains(defaultBranch, "/") {
Expand Down
100 changes: 100 additions & 0 deletions backend/internal/adapters/workspace/gitworktree/diffparse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package gitworktree

import (
"reflect"
"testing"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

func TestParseDiffChangedRegions_MultiHunk(t *testing.T) {
out := `diff --git a/config.go b/config.go
index 1111111..2222222 100644
--- a/config.go
+++ b/config.go
@@ -10,2 +10,3 @@ func A() {
-old
+new
+new2
@@ -40,0 +42,2 @@ func B() {
+added
+added2
`
got := parseDiffChangedRegions(out)
want := map[string][]ports.LineRange{
"config.go": {{Start: 10, End: 12}, {Start: 42, End: 43}},
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %+v, want %+v", got, want)
}
}

func TestParseDiffChangedRegions_SingleLineNoCount(t *testing.T) {
// `@@ -5 +5 @@` (no comma) means a one-line change at line 5.
out := `--- a/x.go
+++ b/x.go
@@ -5 +5 @@
-a
+b
`
got := parseDiffChangedRegions(out)
want := map[string][]ports.LineRange{"x.go": {{Start: 5, End: 5}}}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %+v, want %+v", got, want)
}
}

func TestParseDiffChangedRegions_PureDeletionHunk(t *testing.T) {
// `+0,0` is a pure deletion; the new-side anchor is reported as a 1-line range.
out := `--- a/y.go
+++ b/y.go
@@ -7,3 +6,0 @@
-gone1
-gone2
-gone3
`
got := parseDiffChangedRegions(out)
want := map[string][]ports.LineRange{"y.go": {{Start: 6, End: 6}}}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %+v, want %+v", got, want)
}
}

func TestParseDiffChangedRegions_DeletedFileKeepsPath(t *testing.T) {
// A deleted file has `+++ /dev/null`; the old path must still be recorded.
out := `diff --git a/dead.go b/dead.go
deleted file mode 100644
--- a/dead.go
+++ /dev/null
@@ -1,2 +0,0 @@
-line1
-line2
`
got := parseDiffChangedRegions(out)
if _, ok := got["dead.go"]; !ok {
t.Fatalf("deleted file path missing: %+v", got)
}
}

func TestParseDiffChangedRegions_NewFile(t *testing.T) {
out := `diff --git a/new.go b/new.go
new file mode 100644
--- /dev/null
+++ b/new.go
@@ -0,0 +1,3 @@
+a
+b
+c
`
got := parseDiffChangedRegions(out)
want := map[string][]ports.LineRange{"new.go": {{Start: 1, End: 3}}}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %+v, want %+v", got, want)
}
}

func TestParseDiffChangedRegions_Empty(t *testing.T) {
if got := parseDiffChangedRegions(""); len(got) != 0 {
t.Fatalf("empty diff should yield no regions, got %+v", got)
}
}
89 changes: 89 additions & 0 deletions backend/internal/adapters/workspace/gitworktree/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,98 @@ package gitworktree

import (
"bufio"
"strconv"
"strings"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// parseDiffChangedRegions parses `git diff --unified=0` output into a map of
// repo-relative path → changed line ranges in the new revision. It tracks the
// current file from the `+++ b/<path>` header (falling back to the `--- a/<path>`
// header for deletions where `+++` is /dev/null) and reads each `@@ -a,b +c,d @@`
// hunk header for the `+c,d` span. A hunk with count 0 (pure deletion) is
// recorded as a single-line range at the anchor position. Files that change with
// no parseable hunks (e.g. binary or mode-only changes) still appear with an
// empty range slice so callers can treat them as file-level overlaps.
func parseDiffChangedRegions(out string) map[string][]ports.LineRange {
regions := map[string][]ports.LineRange{}
var curPath string
var minusPath string

s := bufio.NewScanner(strings.NewReader(out))
s.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for s.Scan() {
line := strings.TrimRight(s.Text(), "\r")
switch {
case strings.HasPrefix(line, "--- "):
minusPath = stripDiffPathPrefix(strings.TrimPrefix(line, "--- "))
case strings.HasPrefix(line, "+++ "):
p := stripDiffPathPrefix(strings.TrimPrefix(line, "+++ "))
if p == "" {
p = minusPath // deletion: +++ is /dev/null, use the old path.
}
curPath = p
if curPath != "" {
if _, ok := regions[curPath]; !ok {
regions[curPath] = []ports.LineRange{}
}
}
case strings.HasPrefix(line, "@@ ") && curPath != "":
if r, ok := parseHunkNewRange(line); ok {
regions[curPath] = append(regions[curPath], r)
}
}
}
if s.Err() != nil {
return regions
}
return regions
}

// stripDiffPathPrefix removes git's a//b/ diff prefix and resolves /dev/null to
// the empty string. It does not attempt to unquote core.quotePath-escaped paths;
// those rare paths degrade to file-level overlaps, which is acceptable.
func stripDiffPathPrefix(p string) string {
p = strings.TrimSpace(p)
if p == "/dev/null" || p == "" {
return ""
}
if strings.HasPrefix(p, "a/") || strings.HasPrefix(p, "b/") {
return p[2:]
}
return p
}

// parseHunkNewRange extracts the new-revision span from an `@@ -a,b +c,d @@`
// header. A missing count means 1; a zero count (insertion point / deletion)
// yields a single-line range at the anchor.
func parseHunkNewRange(line string) (ports.LineRange, bool) {
plus := strings.Index(line, "+")
if plus < 0 {
return ports.LineRange{}, false
}
rest := line[plus+1:]
if sp := strings.IndexAny(rest, " \t"); sp >= 0 {
rest = rest[:sp]
}
startStr, countStr, hasCount := strings.Cut(rest, ",")
start, err := strconv.Atoi(startStr)
if err != nil {
return ports.LineRange{}, false
}
count := 1
if hasCount {
if c, err := strconv.Atoi(countStr); err == nil {
count = c
}
}
if count <= 0 {
return ports.LineRange{Start: start, End: start}, true
}
return ports.LineRange{Start: start, End: start + count - 1}, true
}

type worktreeRecord struct {
Path string
Branch string
Expand Down
43 changes: 43 additions & 0 deletions backend/internal/adapters/workspace/gitworktree/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Workspace struct {
type commandRunner func(ctx context.Context, binary string, args ...string) ([]byte, error)

var _ ports.Workspace = (*Workspace)(nil)
var _ ports.WorkspaceDiffer = (*Workspace)(nil)

// New builds a gitworktree Workspace, validating that ManagedRoot and
// RepoResolver are set and resolving the root to an absolute, symlink-free path.
Expand Down Expand Up @@ -465,6 +466,48 @@ func (w *Workspace) existingWorktree(ctx context.Context, repo, path string, cfg
return ports.WorkspaceInfo{}, false, nil
}

// ChangedRegions implements ports.WorkspaceDiffer. It returns the files (and
// changed line ranges within them) the session's worktree has modified relative
// to where it forked from the base branch — committed branch work and
// uncommitted edits alike — which the convergence observer compares across
// sessions to detect edit collisions before they become merge conflicts.
func (w *Workspace) ChangedRegions(ctx context.Context, info ports.WorkspaceInfo) (map[string][]ports.LineRange, error) {
if info.Path == "" {
return nil, fmt.Errorf("%w: empty path", ErrUnsafePath)
}
path, err := w.validateManagedPath(info.Path)
if err != nil {
return nil, err
}
base := w.resolveDiffBase(ctx, path)
out, err := w.run(ctx, w.binary, diffUnifiedZeroArgs(path, base)...)
if err != nil {
return nil, fmt.Errorf("gitworktree: diff %q against %q: %w", path, base, err)
}
return parseDiffChangedRegions(string(out)), nil
}

// resolveDiffBase picks the commit ChangedRegions diffs against: the merge-base
// of HEAD and the first reachable base-branch ref. It falls back to "HEAD"
// (uncommitted changes only) when no base branch ref resolves or the two share
// no common ancestor, so a brand-new or remoteless repo still reports live edits.
func (w *Workspace) resolveDiffBase(ctx context.Context, path string) string {
for _, ref := range baseBranchRefCandidates(w.defaultBranch) {
exists, err := w.refExists(ctx, path, ref)
if err != nil || !exists {
continue
}
out, err := w.run(ctx, w.binary, mergeBaseArgs(path, ref)...)
if err != nil {
continue
}
if mb := strings.TrimSpace(string(out)); mb != "" {
return mb
}
}
return "HEAD"
}

func (w *Workspace) addWorktree(ctx context.Context, repo, path, branch, baseBranch string) error {
// Refuse early if the branch is already checked out in another worktree:
// `git worktree add` will fail, but its stderr leaks through as an opaque
Expand Down
4 changes: 4 additions & 0 deletions backend/internal/cli/dto_drift_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func (f *fakeProjectManager) Remove(context.Context, domain.ProjectID) (projects
return projectsvc.RemoveResult{}, nil
}

func (f *fakeProjectManager) Collisions(context.Context, domain.ProjectID) ([]projectsvc.Collision, error) {
return nil, nil
}

// startDriftTestDaemon stands up the real router+controllers backed by the
// supplied fakes and points the CLI's run-file at it. The CLI discovers the
// server purely via AO_RUN_FILE + the run-file port, so this is a genuine
Expand Down
35 changes: 35 additions & 0 deletions backend/internal/daemon/convergence_wiring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package daemon

// This file wires the cross-session collision (convergence) observer into daemon
// startup. The observer diffs each live session's worktree and detects when two
// parallel agents are editing overlapping code before either opens a PR, then
// nudges them through the same Lifecycle Manager the SCM lane uses.

import (
"context"
"log/slog"
"path/filepath"

"github.com/aoagents/agent-orchestrator/backend/internal/adapters/workspace/gitworktree"
"github.com/aoagents/agent-orchestrator/backend/internal/config"
"github.com/aoagents/agent-orchestrator/backend/internal/lifecycle"
"github.com/aoagents/agent-orchestrator/backend/internal/observe/convergence"
"github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite"
)

// startConvergenceObserver builds a read-only gitworktree differ over the same
// managed root and project→repo resolver the session workspace uses, then starts
// the convergence observer. A construction failure disables the lane (logged once)
// rather than failing daemon startup, mirroring startSCMObserver's posture.
func startConvergenceObserver(ctx context.Context, cfg config.Config, store *sqlite.Store, lcm *lifecycle.Manager, logger *slog.Logger) <-chan struct{} {
differ, err := gitworktree.New(gitworktree.Options{
ManagedRoot: filepath.Join(cfg.DataDir, "worktrees"),
RepoResolver: projectRepoResolver{store: store},
})
if err != nil {
logger.Warn("convergence observer disabled: workspace differ setup failed", "err", err)
return closedDone()
}
observer := convergence.New(differ, store, lcm, convergence.Config{Logger: logger})
return observer.Start(ctx)
}
1 change: 1 addition & 0 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func Run() error {
// change_log -> poller -> broadcaster) and gives startSession the shared LCM.
lcStack := startLifecycle(ctx, store, runtimeAdapter, messenger, notificationWriter, telemetrySink, log)
lcStack.scmDone = startSCMObserver(ctx, store, lcStack.LCM, log)
lcStack.convergenceDone = startConvergenceObserver(ctx, cfg, store, lcStack.LCM, log)

// Wire the controller-facing session service over the same store + LCM, the
// selected runtime, a gitworktree workspace, the per-session agent resolver
Expand Down
10 changes: 7 additions & 3 deletions backend/internal/daemon/lifecycle_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type lifecycleStack struct {
// LCM is the Lifecycle Manager (the canonical write path). It is exposed so
// startSession can share the same reducer the reaper drives, rather than
// standing up a second store+LCM pair that would diverge under writes.
LCM *lifecycle.Manager
reaperDone <-chan struct{}
scmDone <-chan struct{}
LCM *lifecycle.Manager
reaperDone <-chan struct{}
scmDone <-chan struct{}
convergenceDone <-chan struct{}
}

// startLifecycle constructs the Lifecycle Manager over the store and starts the
Expand All @@ -56,6 +57,9 @@ func (l *lifecycleStack) Stop() {
if l.scmDone != nil {
<-l.scmDone
}
if l.convergenceDone != nil {
<-l.convergenceDone
}
}

// sessionLifecycle is the narrow surface of sessionmanager.Manager used for
Expand Down
Loading