11import { afterEach , describe , expect } from "bun:test"
22import { AppFileSystem } from "@opencode-ai/core/filesystem"
33import { parsePatch } from "diff"
4- import { Deferred , Effect , Layer , Stream } from "effect"
4+ import { Deferred , Effect , Layer } from "effect"
55import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
66import fs from "fs/promises"
77import path from "path"
@@ -50,13 +50,26 @@ const nextBranchUpdate = Effect.fn("VcsTest.nextBranchUpdate")(function* () {
5050 const bus = yield * Bus . Service
5151 const updated = yield * Deferred . make < string | undefined > ( )
5252
53- yield * Stream . runForEach ( bus . subscribe ( Vcs . Event . BranchUpdated ) , ( evt ) =>
54- Deferred . succeed ( updated , evt . properties . branch ) ,
55- ) . pipe ( Effect . forkScoped )
53+ const off = yield * bus . subscribeCallback ( Vcs . Event . BranchUpdated , ( evt ) => {
54+ Effect . runSync ( Deferred . succeed ( updated , evt . properties . branch ) )
55+ } )
56+ yield * Effect . addFinalizer ( ( ) => Effect . sync ( off ) )
5657
5758 return updated
5859} )
5960
61+ const publishHeadChangeUntil = Effect . fn ( "VcsTest.publishHeadChangeUntil" ) ( function * (
62+ pending : Deferred . Deferred < string | undefined > ,
63+ head : string ,
64+ ) {
65+ const bus = yield * Bus . Service
66+ for ( let i = 0 ; i < 50 ; i ++ ) {
67+ yield * bus . publish ( FileWatcher . Event . Updated , { file : head , event : "change" } )
68+ if ( yield * Deferred . isDone ( pending ) ) return
69+ yield * Effect . sleep ( "10 millis" )
70+ }
71+ } )
72+
6073// ---------------------------------------------------------------------------
6174// Tests
6275// ---------------------------------------------------------------------------
@@ -99,11 +112,10 @@ describe("Vcs", () => {
99112 const vcs = yield * init ( )
100113 yield * vcs . branch ( )
101114 const pending = yield * nextBranchUpdate ( )
102- const bus = yield * Bus . Service
103115
104116 const head = path . join ( test . directory , ".git" , "HEAD" )
105117 yield * write ( head , `ref: refs/heads/${ branch } \n` )
106- yield * bus . publish ( FileWatcher . Event . Updated , { file : head , event : "change" } )
118+ yield * publishHeadChangeUntil ( pending , head )
107119
108120 const updated = yield * Deferred . await ( pending ) . pipe ( Effect . timeout ( "2 seconds" ) )
109121 expect ( updated ) . toBe ( branch )
@@ -122,11 +134,10 @@ describe("Vcs", () => {
122134 const vcs = yield * init ( )
123135 yield * vcs . branch ( )
124136 const pending = yield * nextBranchUpdate ( )
125- const bus = yield * Bus . Service
126137
127138 const head = path . join ( test . directory , ".git" , "HEAD" )
128139 yield * write ( head , `ref: refs/heads/${ branch } \n` )
129- yield * bus . publish ( FileWatcher . Event . Updated , { file : head , event : "change" } )
140+ yield * publishHeadChangeUntil ( pending , head )
130141 yield * Deferred . await ( pending ) . pipe ( Effect . timeout ( "2 seconds" ) )
131142
132143 const current = yield * vcs . branch ( )
0 commit comments