Skip to content

Commit 53849bd

Browse files
authored
fix(sync): publish events on injected project bus (anomalyco#27825)
1 parent e33912b commit 53849bd

2 files changed

Lines changed: 25 additions & 6 deletions

File tree

packages/opencode/src/sync/index.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { EventV2 } from "@opencode-ai/core/event"
1717
import { serviceUse } from "@/effect/service-use"
1818
import { InstanceState } from "@/effect/instance-state"
1919
import { RuntimeFlags } from "@/effect/runtime-flags"
20+
import { attachWith } from "@/effect/run-service"
2021

2122
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
2223
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@@ -75,6 +76,7 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/Sy
7576
export const layer = Layer.effect(Service)(
7677
Effect.gen(function* () {
7778
const flags = yield* RuntimeFlags.Service
79+
const bus = yield* ProjectBus.Service
7880

7981
const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) {
8082
const def = registry.get(event.type)
@@ -112,6 +114,7 @@ export const layer = Layer.effect(Service)(
112114
}
113115
: undefined
114116
process(def, event, {
117+
bus,
115118
publish,
116119
context,
117120
ownerID: options?.ownerID,
@@ -172,7 +175,7 @@ export const layer = Layer.effect(Service)(
172175
const seq = row?.seq != null ? row.seq + 1 : 0
173176

174177
const event = { id, seq, aggregateID: agg, data }
175-
process(def, event, { publish, context, experimentalWorkspaces: flags.experimentalWorkspaces })
178+
process(def, event, { bus, publish, context, experimentalWorkspaces: flags.experimentalWorkspaces })
176179
},
177180
{
178181
behavior: "immediate",
@@ -209,7 +212,7 @@ export const layer = Layer.effect(Service)(
209212
}),
210213
)
211214

212-
export const defaultLayer = layer.pipe(Layer.provide(RuntimeFlags.defaultLayer))
215+
export const defaultLayer = layer.pipe(Layer.provide([ProjectBus.defaultLayer, RuntimeFlags.defaultLayer]))
213216

214217
export const use = serviceUse(Service)
215218

@@ -303,7 +306,13 @@ function register(def: Definition) {
303306
function process<Def extends Definition>(
304307
def: Def,
305308
event: Event<Def>,
306-
options: { publish: boolean; context?: PublishContext; ownerID?: string; experimentalWorkspaces: boolean },
309+
options: {
310+
bus: ProjectBus.Interface
311+
publish: boolean
312+
context?: PublishContext
313+
ownerID?: string
314+
experimentalWorkspaces: boolean
315+
},
307316
) {
308317
if (projectors == null) {
309318
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
@@ -348,7 +357,13 @@ function process<Def extends Definition>(
348357
}
349358

350359
const result = convertEvent(def.type, event.data)
351-
const publish = (data: unknown) => ProjectBus.publish(def, data as Properties<Def>, { id: event.id })
360+
const publish = (data: unknown) =>
361+
Effect.runPromise(
362+
attachWith(options.bus.publish(def, data as Properties<Def>, { id: event.id }), {
363+
instance: options.context?.instance,
364+
workspace: options.context?.workspace,
365+
}),
366+
)
352367
if (result instanceof Promise) {
353368
void result.then(publish)
354369
} else {

packages/opencode/test/sync/index.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import { RuntimeFlags } from "@/effect/runtime-flags"
1313

1414
const it = testEffect(
1515
Layer.mergeAll(
16-
SyncEvent.layer.pipe(Layer.provide(RuntimeFlags.layer({ experimentalWorkspaces: true }))),
16+
SyncEvent.layer.pipe(
17+
Layer.provide(RuntimeFlags.layer({ experimentalWorkspaces: true })),
18+
Layer.provideMerge(Bus.layer),
19+
),
1720
CrossSpawnSpawner.defaultLayer,
1821
),
1922
)
@@ -114,7 +117,8 @@ describe("SyncEvent", () => {
114117
const received = new Promise<void>((done) => {
115118
resolve = done
116119
})
117-
const dispose = Bus.subscribeAll((event) => {
120+
const bus = yield* Bus.Service
121+
const dispose = yield* bus.subscribeAllCallback((event) => {
118122
events.push(event)
119123
resolve()
120124
})

0 commit comments

Comments
 (0)