Skip to content

Commit 3bfd81b

Browse files
committed
fix: eliminate 3 race conditions in parallel task execution
Race 1 — learnings.jsonl cherry-pick conflict: Add .gitattributes with merge=union for memory/*.jsonl so git automatically resolves concurrent appends without conflicts. Race 2 — auditLog concurrent writes: Add auditMu sync.Mutex to Engine; lock around file open+write so parallel goroutines never interleave audit log entries. Race 3 — same-file cherry-pick conflicts: Parse 'Files:' line from SESSION_PLAN.md tasks into planTask.Files. Group tasks into waves using groupTasksByFileOverlap — tasks sharing a file go into separate waves that run sequentially. Tasks with non-overlapping files stay in the same wave and run in parallel.
1 parent 37c97d5 commit 3bfd81b

6 files changed

Lines changed: 96 additions & 11 deletions

File tree

.gitattributes

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Append-only JSONL files: use union merge so concurrent cherry-picks
2+
# never conflict — git keeps all lines from both sides.
3+
memory/learnings.jsonl merge=union
4+
memory/failures.jsonl merge=union

internal/evolution/engine.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"strconv"
1212
"strings"
13+
"sync"
1314
"time"
1415

1516
iteragent "github.com/GrayCodeAI/iteragent"
@@ -73,6 +74,7 @@ type Engine struct {
7374
toolMap map[string]iteragent.Tool // cached at construction — avoids re-init per call
7475
tools []iteragent.Tool // cached tool slice for agent construction
7576
skills *iteragent.SkillSet // cached skills — loaded once per engine
77+
auditMu sync.Mutex // guards concurrent writes to audit.jsonl
7678
}
7779

7880
// generateTraceID creates a random hex trace ID for request correlation.
@@ -390,6 +392,7 @@ func (e *Engine) handlePRReviewAndMerge(ctx context.Context, p iteragent.Provide
390392
}
391393

392394
// auditLog appends a tool call or error to .iterate/audit.jsonl for debugging.
395+
// The mutex makes it safe to call concurrently from parallel task goroutines.
393396
func (e *Engine) auditLog(eventType, tool, detail string) {
394397
auditPath := filepath.Join(e.repoPath, ".iterate", "audit.jsonl")
395398
_ = os.MkdirAll(filepath.Dir(auditPath), 0o755)
@@ -400,14 +403,16 @@ func (e *Engine) auditLog(eventType, tool, detail string) {
400403
"tool": tool,
401404
}
402405
if detail != "" {
403-
// Truncate long details
404406
if len(detail) > 200 {
405407
detail = detail[:200] + "..."
406408
}
407409
entry["detail"] = detail
408410
}
409411

410412
data, _ := json.Marshal(entry)
413+
414+
e.auditMu.Lock()
415+
defer e.auditMu.Unlock()
411416
f, err := os.OpenFile(auditPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
412417
if err != nil {
413418
return

internal/evolution/engine_extended_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ func TestWithTimeout_CreatesContext(t *testing.T) {
142142
if !ok {
143143
t.Error("expected deadline to be set")
144144
}
145-
if time.Until(deadline) > defaultPhaseTimeout {
146-
t.Error("deadline should be within default timeout")
145+
if time.Until(deadline) > timeoutImplement {
146+
t.Error("deadline should be within implement timeout")
147147
}
148148
}
149149

150150
func TestWithTimeout_DefaultTimeout(t *testing.T) {
151-
if defaultPhaseTimeout != 30*time.Minute {
152-
t.Errorf("expected 30 minutes, got %s", defaultPhaseTimeout)
151+
if timeoutImplement != 40*time.Minute {
152+
t.Errorf("expected 40 minutes, got %s", timeoutImplement)
153153
}
154154
}
155155

internal/evolution/git_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,8 @@ func TestWithTimeout_Deadline(t *testing.T) {
469469
if !ok {
470470
t.Fatal("expected deadline")
471471
}
472-
if time.Until(deadline) > defaultPhaseTimeout {
473-
t.Error("deadline should be within default timeout")
472+
if time.Until(deadline) > timeoutImplement {
473+
t.Error("deadline should be within implement timeout")
474474
}
475475
}
476476

internal/evolution/parsing.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type planTask struct {
1010
Number int
1111
Title string
1212
Description string
13+
Files []string // parsed from "Files: ..." line — used for parallel conflict detection
1314
}
1415

1516
func parseSessionPlanTasks(plan string) []planTask {
@@ -53,6 +54,15 @@ func parseSessionPlanTasks(plan string) []planTask {
5354

5455
if current != nil {
5556
descLines = append(descLines, line)
57+
// Parse "Files: a, b, c" line to populate task.Files.
58+
if strings.HasPrefix(strings.TrimSpace(line), "Files:") {
59+
raw := strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(line), "Files:"))
60+
for _, f := range strings.Split(raw, ",") {
61+
if f := strings.TrimSpace(f); f != "" {
62+
current.Files = append(current.Files, f)
63+
}
64+
}
65+
}
5666
}
5767
}
5868

@@ -64,6 +74,60 @@ func parseSessionPlanTasks(plan string) []planTask {
6474
return tasks
6575
}
6676

77+
// groupTasksByFileOverlap splits tasks into sequential waves.
78+
// Tasks within a wave have no overlapping declared files and can run in parallel.
79+
// Tasks that share a file with any task already in the current wave start a new wave.
80+
func groupTasksByFileOverlap(tasks []planTask) [][]planTask {
81+
var waves [][]planTask
82+
var currentWave []planTask
83+
waveFiles := map[string]bool{}
84+
85+
for _, task := range tasks {
86+
// If task declares no files, always start a new wave (safe default).
87+
if len(task.Files) == 0 {
88+
if len(currentWave) > 0 {
89+
waves = append(waves, currentWave)
90+
}
91+
waves = append(waves, []planTask{task})
92+
currentWave = nil
93+
waveFiles = map[string]bool{}
94+
continue
95+
}
96+
97+
// Check if any of this task's files are already claimed in the current wave.
98+
conflict := false
99+
for _, f := range task.Files {
100+
if waveFiles[f] {
101+
conflict = true
102+
break
103+
}
104+
}
105+
106+
if conflict {
107+
// Start a new wave.
108+
if len(currentWave) > 0 {
109+
waves = append(waves, currentWave)
110+
}
111+
currentWave = []planTask{task}
112+
waveFiles = map[string]bool{}
113+
for _, f := range task.Files {
114+
waveFiles[f] = true
115+
}
116+
} else {
117+
// Add to current wave.
118+
currentWave = append(currentWave, task)
119+
for _, f := range task.Files {
120+
waveFiles[f] = true
121+
}
122+
}
123+
}
124+
125+
if len(currentWave) > 0 {
126+
waves = append(waves, currentWave)
127+
}
128+
return waves
129+
}
130+
67131
type issueResponse struct {
68132
IssueNum int
69133
Status string

internal/evolution/phases.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,25 @@ func (e *Engine) RunImplementPhase(ctx context.Context, p iteragent.Provider) er
186186
return nil
187187
}
188188

189-
// runTasksParallel runs all tasks concurrently (up to maxParallelTasks at once).
190-
// Each task runs in an isolated git worktree; successful commits are
191-
// cherry-picked back to the current branch in task order.
189+
// runTasksParallel groups tasks by file overlap into waves, then runs each wave
190+
// in parallel (up to maxParallelTasks concurrent). Tasks sharing declared files
191+
// are placed in separate waves and run sequentially to prevent cherry-pick conflicts.
192192
func (e *Engine) runTasksParallel(ctx context.Context, p iteragent.Provider, tasks []planTask, systemPrompt string, skills *iteragent.SkillSet, protectedWarning string) {
193+
waves := groupTasksByFileOverlap(tasks)
194+
e.logger.Info("task waves planned", "waves", len(waves), "tasks", len(tasks))
195+
196+
for waveIdx, wave := range waves {
197+
e.logger.Info("running wave", "wave", waveIdx+1, "tasks", len(wave))
198+
e.runWave(ctx, p, wave, systemPrompt, skills, protectedWarning)
199+
}
200+
}
201+
202+
// runWave runs all tasks in a single wave concurrently then cherry-picks their commits.
203+
// Tasks in a wave are guaranteed to have non-overlapping declared files.
204+
func (e *Engine) runWave(ctx context.Context, p iteragent.Provider, tasks []planTask, systemPrompt string, skills *iteragent.SkillSet, protectedWarning string) {
193205
type taskResult struct {
194206
task planTask
195-
commits []string // new commit hashes from worktree, in order
207+
commits []string
196208
success bool
197209
}
198210

0 commit comments

Comments
 (0)