Skip to content

Commit 8feb4a3

Browse files
authored
feat(core): add background job service (#27033)
1 parent 8f05bbf commit 8feb4a3

4 files changed

Lines changed: 330 additions & 0 deletions

File tree

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import { InstanceState } from "@/effect/instance-state"
2+
import { Identifier } from "@/id/id"
3+
import { Cause, Clock, Context, Deferred, Effect, Fiber, Layer, Scope, SynchronizedRef } from "effect"
4+
5+
export type Status = "running" | "completed" | "error" | "cancelled"
6+
7+
export type Info = {
8+
id: string
9+
type: string
10+
title?: string
11+
status: Status
12+
started_at: number
13+
completed_at?: number
14+
output?: string
15+
error?: string
16+
metadata?: Record<string, unknown>
17+
}
18+
19+
type Active = {
20+
info: Info
21+
done: Deferred.Deferred<Info>
22+
fiber?: Fiber.Fiber<void, unknown>
23+
}
24+
25+
type State = {
26+
jobs: SynchronizedRef.SynchronizedRef<Map<string, Active>>
27+
scope: Scope.Scope
28+
}
29+
30+
type FinishResult = {
31+
info?: Info
32+
done?: Deferred.Deferred<Info>
33+
}
34+
35+
export type StartInput = {
36+
id?: string
37+
type: string
38+
title?: string
39+
metadata?: Record<string, unknown>
40+
run: Effect.Effect<string, unknown>
41+
}
42+
43+
export type WaitInput = {
44+
id: string
45+
timeout?: number
46+
}
47+
48+
export type WaitResult = {
49+
info?: Info
50+
timedOut: boolean
51+
}
52+
53+
export interface Interface {
54+
readonly list: () => Effect.Effect<Info[]>
55+
readonly get: (id: string) => Effect.Effect<Info | undefined>
56+
readonly start: (input: StartInput) => Effect.Effect<Info>
57+
readonly wait: (input: WaitInput) => Effect.Effect<WaitResult>
58+
readonly cancel: (id: string) => Effect.Effect<Info | undefined>
59+
}
60+
61+
export class Service extends Context.Service<Service, Interface>()("@opencode/BackgroundJob") {}
62+
63+
function snapshot(job: Active): Info {
64+
return {
65+
...job.info,
66+
...(job.info.metadata ? { metadata: { ...job.info.metadata } } : {}),
67+
}
68+
}
69+
70+
function errorText(error: unknown) {
71+
if (error instanceof Error) return error.message
72+
return String(error)
73+
}
74+
75+
export const layer = Layer.effect(
76+
Service,
77+
Effect.gen(function* () {
78+
const state = yield* InstanceState.make<State>(
79+
Effect.fn("BackgroundJob.state")(function* () {
80+
return {
81+
jobs: yield* SynchronizedRef.make(new Map()),
82+
scope: yield* Scope.Scope,
83+
}
84+
}),
85+
)
86+
87+
const finish = Effect.fn("BackgroundJob.finish")(function* (
88+
id: string,
89+
status: Exclude<Status, "running">,
90+
data?: { output?: string; error?: string },
91+
) {
92+
const completed_at = yield* Clock.currentTimeMillis
93+
const result = yield* SynchronizedRef.modify(
94+
(yield* InstanceState.get(state)).jobs,
95+
(jobs): readonly [FinishResult, Map<string, Active>] => {
96+
const job = jobs.get(id)
97+
if (!job) return [{}, jobs]
98+
if (job.info.status !== "running") return [{ info: snapshot(job) }, jobs]
99+
const next = {
100+
...job,
101+
fiber: undefined,
102+
info: {
103+
...job.info,
104+
status,
105+
completed_at,
106+
...(data?.output !== undefined ? { output: data.output } : {}),
107+
...(data?.error !== undefined ? { error: data.error } : {}),
108+
},
109+
}
110+
return [{ info: snapshot(next), done: job.done }, new Map(jobs).set(id, next)]
111+
},
112+
)
113+
if (result.info && result.done) yield* Deferred.succeed(result.done, result.info).pipe(Effect.ignore)
114+
return result.info
115+
})
116+
117+
const list: Interface["list"] = Effect.fn("BackgroundJob.list")(function* () {
118+
return Array.from((yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).values())
119+
.map(snapshot)
120+
.toSorted((a, b) => a.started_at - b.started_at)
121+
})
122+
123+
const get: Interface["get"] = Effect.fn("BackgroundJob.get")(function* (id) {
124+
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id)
125+
if (!job) return
126+
return snapshot(job)
127+
})
128+
129+
const start: Interface["start"] = Effect.fn("BackgroundJob.start")(function* (input) {
130+
return yield* Effect.uninterruptibleMask((restore) =>
131+
Effect.gen(function* () {
132+
const s = yield* InstanceState.get(state)
133+
const id = input.id ?? Identifier.ascending("job")
134+
const started_at = yield* Clock.currentTimeMillis
135+
const done = yield* Deferred.make<Info>()
136+
return yield* SynchronizedRef.modifyEffect(
137+
s.jobs,
138+
Effect.fnUntraced(function* (jobs) {
139+
const existing = jobs.get(id)
140+
if (existing?.info.status === "running") return [snapshot(existing), jobs] as const
141+
const fiber = yield* restore(input.run).pipe(
142+
Effect.matchCauseEffect({
143+
onSuccess: (output) => finish(id, "completed", { output }),
144+
onFailure: (cause) =>
145+
finish(id, Cause.hasInterruptsOnly(cause) ? "cancelled" : "error", {
146+
error: errorText(Cause.squash(cause)),
147+
}),
148+
}),
149+
Effect.asVoid,
150+
Effect.forkIn(s.scope, { startImmediately: true }),
151+
)
152+
const job = {
153+
info: {
154+
id,
155+
type: input.type,
156+
title: input.title,
157+
status: "running" as const,
158+
started_at,
159+
metadata: input.metadata,
160+
},
161+
done,
162+
fiber,
163+
}
164+
return [snapshot(job), new Map(jobs).set(id, job)] as const
165+
}),
166+
)
167+
}),
168+
)
169+
})
170+
171+
const wait: Interface["wait"] = Effect.fn("BackgroundJob.wait")(function* (input) {
172+
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(input.id)
173+
if (!job) return { timedOut: false }
174+
if (job.info.status !== "running") return { info: snapshot(job), timedOut: false }
175+
if (input.timeout === undefined) return { info: yield* Deferred.await(job.done), timedOut: false }
176+
if (input.timeout <= 0) return { info: snapshot(job), timedOut: true }
177+
const info = yield* Deferred.await(job.done).pipe(Effect.timeoutOption(input.timeout))
178+
if (info._tag === "Some") return { info: info.value, timedOut: false }
179+
return { info: snapshot(job), timedOut: true }
180+
})
181+
182+
const cancel: Interface["cancel"] = Effect.fn("BackgroundJob.cancel")(function* (id) {
183+
const job = (yield* SynchronizedRef.get((yield* InstanceState.get(state)).jobs)).get(id)
184+
if (!job) return
185+
if (job.info.status !== "running") return snapshot(job)
186+
if (job.fiber) {
187+
yield* Fiber.interrupt(job.fiber).pipe(Effect.ignore)
188+
yield* Fiber.await(job.fiber).pipe(Effect.ignore)
189+
}
190+
const info = yield* finish(id, "cancelled")
191+
return info
192+
})
193+
194+
return Service.of({ list, get, start, wait, cancel })
195+
}),
196+
)
197+
198+
export const defaultLayer = layer
199+
200+
export * as BackgroundJob from "./job"

packages/opencode/src/effect/app-runtime.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import { SyncEvent } from "@/sync"
5555
import { Npm } from "@opencode-ai/core/npm"
5656
import { memoMap } from "@opencode-ai/core/effect/memo-map"
5757
import { DataMigration } from "@/data-migration"
58+
import { BackgroundJob } from "@/background/job"
5859

5960
export const AppLayer = Layer.mergeAll(
6061
Npm.defaultLayer,
@@ -81,6 +82,7 @@ export const AppLayer = Layer.mergeAll(
8182
Todo.defaultLayer,
8283
Session.defaultLayer,
8384
SessionStatus.defaultLayer,
85+
BackgroundJob.defaultLayer,
8486
SessionRunState.defaultLayer,
8587
SessionProcessor.defaultLayer,
8688
SessionCompaction.defaultLayer,

packages/opencode/src/id/id.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { randomBytes } from "crypto"
22

33
const prefixes = {
4+
job: "job",
45
event: "evt",
56
session: "ses",
67
message: "msg",
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import { describe, expect } from "bun:test"
2+
import { Deferred, Effect } from "effect"
3+
import { BackgroundJob } from "@/background/job"
4+
import { testEffect } from "../lib/effect"
5+
6+
const it = testEffect(BackgroundJob.defaultLayer)
7+
8+
describe("background.job", () => {
9+
it.instance("tracks started jobs through completion", () =>
10+
Effect.gen(function* () {
11+
const jobs = yield* BackgroundJob.Service
12+
const latch = yield* Deferred.make<void>()
13+
const job = yield* jobs.start({
14+
type: "test",
15+
title: "test job",
16+
run: Deferred.await(latch).pipe(Effect.as("done")),
17+
})
18+
19+
expect(job.id.startsWith("job_")).toBe(true)
20+
expect(job.status).toBe("running")
21+
expect(job.title).toBe("test job")
22+
23+
yield* Deferred.succeed(latch, undefined)
24+
const done = yield* jobs.wait({ id: job.id })
25+
26+
expect(done.timedOut).toBe(false)
27+
expect(done.info?.status).toBe("completed")
28+
expect(done.info?.output).toBe("done")
29+
expect((yield* jobs.list()).map((item) => item.id)).toEqual([job.id])
30+
}),
31+
)
32+
33+
it.instance("returns a running snapshot when wait times out", () =>
34+
Effect.gen(function* () {
35+
const jobs = yield* BackgroundJob.Service
36+
const job = yield* jobs.start({
37+
type: "test",
38+
run: Effect.never,
39+
})
40+
41+
const result = yield* jobs.wait({ id: job.id, timeout: 1 })
42+
43+
expect(result.timedOut).toBe(true)
44+
expect(result.info?.status).toBe("running")
45+
}),
46+
)
47+
48+
it.instance("deduplicates concurrent starts for a running id", () =>
49+
Effect.gen(function* () {
50+
const jobs = yield* BackgroundJob.Service
51+
const started = yield* Deferred.make<void>()
52+
const id = "job_test"
53+
const [first, second] = yield* Effect.all(
54+
[
55+
jobs.start({
56+
id,
57+
type: "test",
58+
run: Deferred.succeed(started, undefined).pipe(Effect.andThen(Effect.never)),
59+
}),
60+
jobs.start({
61+
id,
62+
type: "test",
63+
run: Effect.fail(new Error("duplicate started")),
64+
}),
65+
],
66+
{ concurrency: "unbounded" },
67+
)
68+
69+
yield* Deferred.await(started)
70+
71+
expect(first.id).toBe(id)
72+
expect(second.id).toBe(id)
73+
expect(first.status).toBe("running")
74+
expect(second.status).toBe("running")
75+
expect((yield* jobs.list()).map((item) => item.id)).toEqual([id])
76+
77+
yield* jobs.cancel(id)
78+
}),
79+
)
80+
81+
it.instance("records failed jobs", () =>
82+
Effect.gen(function* () {
83+
const jobs = yield* BackgroundJob.Service
84+
const job = yield* jobs.start({
85+
type: "test",
86+
run: Effect.fail(new Error("boom")),
87+
})
88+
89+
const result = yield* jobs.wait({ id: job.id })
90+
91+
expect(result.info?.status).toBe("error")
92+
expect(result.info?.error).toBe("boom")
93+
}),
94+
)
95+
96+
it.instance("can cancel running jobs", () =>
97+
Effect.gen(function* () {
98+
const jobs = yield* BackgroundJob.Service
99+
const interrupted = yield* Deferred.make<void>()
100+
const job = yield* jobs.start({
101+
type: "test",
102+
run: Effect.never.pipe(Effect.ensuring(Deferred.succeed(interrupted, undefined))),
103+
})
104+
105+
const cancelled = yield* jobs.cancel(job.id)
106+
107+
expect(cancelled?.status).toBe("cancelled")
108+
yield* Deferred.await(interrupted).pipe(Effect.timeout("1 second"))
109+
expect((yield* jobs.get(job.id))?.status).toBe("cancelled")
110+
}),
111+
)
112+
113+
it.instance("returns immutable snapshots", () =>
114+
Effect.gen(function* () {
115+
const jobs = yield* BackgroundJob.Service
116+
const job = yield* jobs.start({
117+
type: "test",
118+
metadata: { value: "initial" },
119+
run: Effect.succeed("done"),
120+
})
121+
122+
if (job.metadata) job.metadata.value = "changed"
123+
124+
expect((yield* jobs.get(job.id))?.metadata?.value).toBe("initial")
125+
}),
126+
)
127+
})

0 commit comments

Comments
 (0)