Skip to content

Commit 1bf620e

Browse files
authored
Parallelize bare repo fetching and update existing worktrees on render (#20)
* Parallelize bare repo fetching and update existing worktrees on render * Fix data race in mockRunner for parallel render tests
1 parent 29b8534 commit 1bf620e

3 files changed

Lines changed: 293 additions & 72 deletions

File tree

internal/git/git.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Runner interface {
2121
BranchExists(ctx context.Context, bareRepo, branch string) (bool, error)
2222
DefaultBranch(ctx context.Context, bareRepo string) (string, error)
2323
EnsureRemoteRef(ctx context.Context, bareRepo, branch string) error
24+
ResetBranch(ctx context.Context, worktreePath, ref string) error
2425
IsClean(ctx context.Context, worktreePath string) (bool, error)
2526
Rebase(ctx context.Context, worktreePath, onto string) error
2627
RebaseAbort(ctx context.Context, worktreePath string) error
@@ -156,6 +157,13 @@ func (r *RealRunner) EnsureRemoteRef(ctx context.Context, bareRepo, branch strin
156157
"+refs/heads/"+branch+":refs/remotes/origin/"+branch)
157158
}
158159

160+
// ResetBranch resets the current branch in a worktree to the given ref.
161+
// This is used to fast-forward existing worktrees to the latest remote state.
162+
func (r *RealRunner) ResetBranch(ctx context.Context, worktreePath, ref string) error {
163+
r.log().Debug("resetting branch", "path", worktreePath, "ref", ref)
164+
return r.run(ctx, "-C", worktreePath, "reset", "--hard", ref)
165+
}
166+
159167
// IsClean returns true if the worktree has no uncommitted changes.
160168
func (r *RealRunner) IsClean(ctx context.Context, worktreePath string) (bool, error) {
161169
r.log().Debug("checking worktree cleanliness", "path", worktreePath)

internal/workspace/workspace.go

Lines changed: 158 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log/slog"
99
"os"
1010
"path/filepath"
11+
"sync"
1112
"time"
1213

1314
"github.com/milldr/flow/internal/agents"
@@ -178,7 +179,18 @@ func (s *Service) Resolve(idOrName string) ([]Info, error) {
178179
return matches, nil
179180
}
180181

182+
// repoRenderContext holds pre-computed paths for rendering a single repo.
183+
type repoRenderContext struct {
184+
index int
185+
repo state.Repo
186+
repoPath string
187+
barePath string
188+
worktreePath string
189+
}
190+
181191
// Render materializes a workspace: ensures bare clones and creates worktrees.
192+
// Bare repos are fetched in parallel to ensure we always have the latest remote
193+
// state before creating or updating worktrees.
182194
// progress is called with status messages for each repo.
183195
func (s *Service) Render(ctx context.Context, id string, progress func(msg string)) error {
184196
st, err := s.Find(id)
@@ -193,75 +205,47 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin
193205
wsDir := s.Config.WorkspacePath(id)
194206
total := len(st.Spec.Repos)
195207

208+
// Build render contexts for all repos
209+
repos := make([]repoRenderContext, total)
196210
for i, repo := range st.Spec.Repos {
197-
repoPath := state.RepoPath(repo)
198-
barePath := s.Config.BareRepoPath(repo.URL)
199-
worktreePath := filepath.Join(wsDir, repoPath)
211+
repos[i] = repoRenderContext{
212+
index: i,
213+
repo: repo,
214+
repoPath: state.RepoPath(repo),
215+
barePath: s.Config.BareRepoPath(repo.URL),
216+
worktreePath: filepath.Join(wsDir, state.RepoPath(repo)),
217+
}
218+
}
200219

201-
progress(fmt.Sprintf("[%d/%d] %s", i+1, total, repo.URL))
220+
// Phase 1: Clone and fetch all bare repos in parallel.
221+
// This ensures every bare clone has the latest remote refs before we
222+
// create or update any worktrees.
223+
fetchErrs := make([]error, total)
224+
var wg sync.WaitGroup
225+
for i := range repos {
226+
wg.Add(1)
227+
go func(rc *repoRenderContext) {
228+
defer wg.Done()
229+
fetchErrs[rc.index] = s.ensureBareRepo(ctx, rc)
230+
}(&repos[i])
231+
}
232+
wg.Wait()
202233

203-
// Ensure bare clone exists
204-
if _, err := os.Stat(barePath); os.IsNotExist(err) {
205-
s.log().Debug("bare clone not found, cloning", "url", repo.URL, "dest", barePath)
206-
if err := os.MkdirAll(filepath.Dir(barePath), 0o755); err != nil {
207-
return err
208-
}
209-
if err := s.Git.BareClone(ctx, repo.URL, barePath); err != nil {
210-
return fmt.Errorf("cloning %s: %w", repo.URL, err)
211-
}
212-
// Fetch after bare clone to create remote tracking refs
213-
// (bare clones don't create refs/remotes/origin/* by default).
214-
if err := s.Git.Fetch(ctx, barePath); err != nil {
215-
return fmt.Errorf("fetching %s: %w", repo.URL, err)
216-
}
217-
} else {
218-
s.log().Debug("bare clone exists, fetching", "url", repo.URL, "path", barePath)
219-
if err := s.Git.Fetch(ctx, barePath); err != nil {
220-
return fmt.Errorf("fetching %s: %w", repo.URL, err)
221-
}
234+
// Check for fetch errors — fail fast on any clone/fetch failure
235+
for i, err := range fetchErrs {
236+
if err != nil {
237+
return fmt.Errorf("%s: %w", repos[i].repo.URL, err)
222238
}
239+
}
223240

224-
// Create worktree if it doesn't exist
225-
if _, err := os.Stat(worktreePath); os.IsNotExist(err) {
226-
exists, err := s.Git.BranchExists(ctx, barePath, repo.Branch)
227-
if err != nil {
228-
return fmt.Errorf("checking branch for %s: %w", repo.URL, err)
229-
}
241+
// Phase 2: Create or update worktrees (sequential — progress messages
242+
// are order-dependent and worktree operations are fast).
243+
for i := range repos {
244+
rc := &repos[i]
245+
progress(fmt.Sprintf("[%d/%d] %s", rc.index+1, total, rc.repo.URL))
230246

231-
if exists {
232-
s.log().Debug("creating worktree from existing branch", "path", worktreePath, "branch", repo.Branch)
233-
if err := s.Git.AddWorktree(ctx, barePath, worktreePath, repo.Branch); err != nil {
234-
return fmt.Errorf("creating worktree for %s: %w", repo.URL, err)
235-
}
236-
progress(fmt.Sprintf(" └── %s (%s) ✓", repoPath, repo.Branch))
237-
} else {
238-
var baseBranch string
239-
if repo.Base != "" {
240-
baseBranch = repo.Base
241-
} else {
242-
baseBranch, err = s.Git.DefaultBranch(ctx, barePath)
243-
if err != nil {
244-
return fmt.Errorf("getting default branch for %s: %w", repo.URL, err)
245-
}
246-
}
247-
// Ensure the remote tracking ref exists for the base branch so
248-
// origin/{baseBranch} resolves (especially when baseBranch differs
249-
// from the default branch, which Fetch only creates refs for).
250-
if err := s.Git.EnsureRemoteRef(ctx, barePath, baseBranch); err != nil {
251-
return fmt.Errorf("ensuring remote ref for %s: %w", repo.URL, err)
252-
}
253-
// Use the remote ref to ensure we branch from the latest fetched state,
254-
// not a potentially stale local branch ref in the bare repo.
255-
startPoint := "origin/" + baseBranch
256-
s.log().Debug("creating worktree with new branch", "path", worktreePath, "branch", repo.Branch, "from", startPoint)
257-
if err := s.Git.AddWorktreeNewBranch(ctx, barePath, worktreePath, repo.Branch, startPoint); err != nil {
258-
return fmt.Errorf("creating worktree for %s: %w", repo.URL, err)
259-
}
260-
progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", repoPath, repo.Branch, baseBranch))
261-
}
262-
} else {
263-
s.log().Debug("worktree already exists, skipping", "path", worktreePath)
264-
progress(fmt.Sprintf(" └── %s (%s) exists", repoPath, repo.Branch))
247+
if err := s.ensureWorktree(ctx, rc, progress); err != nil {
248+
return err
265249
}
266250
}
267251

@@ -273,6 +257,117 @@ func (s *Service) Render(ctx context.Context, id string, progress func(msg strin
273257
return nil
274258
}
275259

260+
// ensureBareRepo clones (if needed) and fetches a bare repository.
261+
func (s *Service) ensureBareRepo(ctx context.Context, rc *repoRenderContext) error {
262+
if _, err := os.Stat(rc.barePath); os.IsNotExist(err) {
263+
s.log().Debug("bare clone not found, cloning", "url", rc.repo.URL, "dest", rc.barePath)
264+
if err := os.MkdirAll(filepath.Dir(rc.barePath), 0o755); err != nil {
265+
return err
266+
}
267+
if err := s.Git.BareClone(ctx, rc.repo.URL, rc.barePath); err != nil {
268+
return fmt.Errorf("cloning: %w", err)
269+
}
270+
}
271+
272+
s.log().Debug("fetching bare repo", "url", rc.repo.URL, "path", rc.barePath)
273+
if err := s.Git.Fetch(ctx, rc.barePath); err != nil {
274+
return fmt.Errorf("fetching: %w", err)
275+
}
276+
return nil
277+
}
278+
279+
// ensureWorktree creates a new worktree or updates an existing one to the
280+
// latest remote state.
281+
func (s *Service) ensureWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
282+
if _, err := os.Stat(rc.worktreePath); os.IsNotExist(err) {
283+
return s.createWorktree(ctx, rc, progress)
284+
}
285+
return s.updateWorktree(ctx, rc, progress)
286+
}
287+
288+
// createWorktree creates a new worktree, either from an existing branch or
289+
// by creating a new branch from the base.
290+
func (s *Service) createWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
291+
exists, err := s.Git.BranchExists(ctx, rc.barePath, rc.repo.Branch)
292+
if err != nil {
293+
return fmt.Errorf("checking branch for %s: %w", rc.repo.URL, err)
294+
}
295+
296+
if exists {
297+
s.log().Debug("creating worktree from existing branch", "path", rc.worktreePath, "branch", rc.repo.Branch)
298+
if err := s.Git.AddWorktree(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch); err != nil {
299+
return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err)
300+
}
301+
progress(fmt.Sprintf(" └── %s (%s) ✓", rc.repoPath, rc.repo.Branch))
302+
return nil
303+
}
304+
305+
baseBranch, err := s.resolveBaseBranch(ctx, rc)
306+
if err != nil {
307+
return err
308+
}
309+
310+
if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, baseBranch); err != nil {
311+
return fmt.Errorf("ensuring remote ref for %s: %w", rc.repo.URL, err)
312+
}
313+
314+
startPoint := "origin/" + baseBranch
315+
s.log().Debug("creating worktree with new branch", "path", rc.worktreePath, "branch", rc.repo.Branch, "from", startPoint)
316+
if err := s.Git.AddWorktreeNewBranch(ctx, rc.barePath, rc.worktreePath, rc.repo.Branch, startPoint); err != nil {
317+
return fmt.Errorf("creating worktree for %s: %w", rc.repo.URL, err)
318+
}
319+
progress(fmt.Sprintf(" └── %s (%s, new branch from %s) ✓", rc.repoPath, rc.repo.Branch, baseBranch))
320+
return nil
321+
}
322+
323+
// updateWorktree resets an existing worktree to the latest remote ref for its
324+
// branch so that re-rendering always picks up new upstream commits.
325+
func (s *Service) updateWorktree(ctx context.Context, rc *repoRenderContext, progress func(msg string)) error {
326+
// Ensure the remote tracking ref exists for this branch so we can
327+
// check if the branch exists on the remote.
328+
if err := s.Git.EnsureRemoteRef(ctx, rc.barePath, rc.repo.Branch); err != nil {
329+
// Branch doesn't exist on remote — this is a local-only feature
330+
// branch. Leave it alone.
331+
s.log().Debug("worktree exists, no remote branch to update from", "path", rc.worktreePath, "branch", rc.repo.Branch)
332+
progress(fmt.Sprintf(" └── %s (%s) exists", rc.repoPath, rc.repo.Branch))
333+
return nil
334+
}
335+
336+
// Check if the worktree is clean before resetting
337+
clean, err := s.Git.IsClean(ctx, rc.worktreePath)
338+
if err != nil {
339+
return fmt.Errorf("checking worktree status for %s: %w", rc.repo.URL, err)
340+
}
341+
342+
if !clean {
343+
s.log().Debug("worktree is dirty, skipping update", "path", rc.worktreePath)
344+
progress(fmt.Sprintf(" └── %s (%s) exists (dirty, skipped update)", rc.repoPath, rc.repo.Branch))
345+
return nil
346+
}
347+
348+
// Reset to the latest remote ref
349+
ref := "origin/" + rc.repo.Branch
350+
s.log().Debug("updating worktree to latest remote", "path", rc.worktreePath, "ref", ref)
351+
if err := s.Git.ResetBranch(ctx, rc.worktreePath, ref); err != nil {
352+
return fmt.Errorf("updating worktree for %s: %w", rc.repo.URL, err)
353+
}
354+
355+
progress(fmt.Sprintf(" └── %s (%s) updated ✓", rc.repoPath, rc.repo.Branch))
356+
return nil
357+
}
358+
359+
// resolveBaseBranch returns the base branch for creating new feature branches.
360+
func (s *Service) resolveBaseBranch(ctx context.Context, rc *repoRenderContext) (string, error) {
361+
if rc.repo.Base != "" {
362+
return rc.repo.Base, nil
363+
}
364+
baseBranch, err := s.Git.DefaultBranch(ctx, rc.barePath)
365+
if err != nil {
366+
return "", fmt.Errorf("getting default branch for %s: %w", rc.repo.URL, err)
367+
}
368+
return baseBranch, nil
369+
}
370+
276371
// Sync fetches and rebases worktrees onto their base branches.
277372
// It continues through failures — one repo failing doesn't block others.
278373
func (s *Service) Sync(ctx context.Context, id string, progress func(msg string)) error {

0 commit comments

Comments
 (0)