Skip to content

Commit 46daede

Browse files
authored
test(pty): migrate output isolation to Effect runner (anomalyco#27235)
1 parent 8249bae commit 46daede

1 file changed

Lines changed: 158 additions & 143 deletions

File tree

Lines changed: 158 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,147 +1,162 @@
1-
import { describe, expect, test } from "bun:test"
2-
import { AppRuntime } from "../../src/effect/app-runtime"
3-
import { Effect } from "effect"
4-
import { Instance } from "../../src/project/instance"
5-
import { WithInstance } from "../../src/project/with-instance"
1+
import { describe, expect } from "bun:test"
2+
import { Bus } from "../../src/bus"
3+
import { Config } from "../../src/config/config"
4+
import { Plugin } from "../../src/plugin"
65
import { Pty } from "../../src/pty"
7-
import { tmpdir } from "../fixture/fixture"
8-
import { setTimeout as sleep } from "node:timers/promises"
6+
import { Duration, Effect, Layer, Queue } from "effect"
7+
import { testEffect } from "../lib/effect"
8+
9+
type Socket = Parameters<Pty.Interface["connect"]>[1]
10+
11+
const it = testEffect(
12+
Pty.layer.pipe(
13+
Layer.provideMerge(Bus.layer),
14+
Layer.provideMerge(Config.defaultLayer),
15+
Layer.provideMerge(Plugin.defaultLayer),
16+
),
17+
)
18+
const ptyTest = process.platform === "win32" ? it.instance.skip : it.instance
19+
20+
const createPty = Effect.fn("PtyOutputIsolationTest.createPty")(function* (input: Pty.CreateInput) {
21+
const pty = yield* Pty.Service
22+
return yield* Effect.acquireRelease(pty.create(input), (info) => pty.remove(info.id).pipe(Effect.ignore))
23+
})
24+
25+
const decodeOutput = (data: string | Uint8Array | ArrayBuffer) =>
26+
typeof data === "string"
27+
? data
28+
: Buffer.from(data instanceof Uint8Array ? data : new Uint8Array(data)).toString("utf8")
29+
30+
const makeSocket = Effect.fn("PtyOutputIsolationTest.makeSocket")(function* (data: unknown) {
31+
const output = yield* Queue.unbounded<string>()
32+
const chunks: string[] = []
33+
const socket: Socket = {
34+
readyState: 1,
35+
data,
36+
send: (data) => {
37+
const text = decodeOutput(data)
38+
chunks.push(text)
39+
Queue.offerUnsafe(output, text)
40+
},
41+
close: () => {
42+
// no-op (simulate abrupt drop)
43+
},
44+
}
45+
46+
return { socket, output, chunks }
47+
})
48+
49+
const waitForOutput = (output: Queue.Queue<string>, text: string, duration: Duration.Input = "5 seconds") =>
50+
Effect.gen(function* () {
51+
let received = ""
52+
while (!received.includes(text)) {
53+
received += yield* Queue.take(output)
54+
}
55+
return received
56+
}).pipe(
57+
Effect.timeoutOrElse({
58+
duration,
59+
orElse: () => Effect.fail(new Error(`timeout waiting for output containing ${JSON.stringify(text)}`)),
60+
}),
61+
)
62+
63+
const waitForLeakedOutput = (output: Queue.Queue<string>, text: string) =>
64+
Effect.gen(function* () {
65+
let received = ""
66+
while (!received.includes(text)) {
67+
received += yield* Queue.take(output)
68+
}
69+
return received
70+
}).pipe(
71+
Effect.timeoutOrElse({
72+
duration: "100 millis",
73+
orElse: () => Effect.succeed(undefined),
74+
}),
75+
)
976

1077
describe("pty", () => {
11-
test("does not leak output when websocket objects are reused", async () => {
12-
await using dir = await tmpdir({ git: true })
13-
14-
await WithInstance.provide({
15-
directory: dir.path,
16-
fn: () =>
17-
AppRuntime.runPromise(
18-
Effect.gen(function* () {
19-
const pty = yield* Pty.Service
20-
const a = yield* pty.create({ command: "cat", title: "a" })
21-
const b = yield* pty.create({ command: "cat", title: "b" })
22-
try {
23-
const outA: string[] = []
24-
const outB: string[] = []
25-
26-
const ws = {
27-
readyState: 1,
28-
data: { events: { connection: "a" } },
29-
send: (data: unknown) => {
30-
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
31-
},
32-
close: () => {
33-
// no-op (simulate abrupt drop)
34-
},
35-
}
36-
37-
yield* pty.connect(a.id, ws as any)
38-
39-
ws.data = { events: { connection: "b" } }
40-
ws.send = (data: unknown) => {
41-
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
42-
}
43-
yield* pty.connect(b.id, ws as any)
44-
45-
outA.length = 0
46-
outB.length = 0
47-
48-
yield* pty.write(a.id, "AAA\n")
49-
yield* Effect.promise(() => sleep(100))
50-
51-
expect(outB.join("")).not.toContain("AAA")
52-
} finally {
53-
yield* pty.remove(a.id)
54-
yield* pty.remove(b.id)
55-
}
56-
}),
57-
),
58-
})
59-
})
60-
61-
test("does not leak output when Bun recycles websocket objects before re-connect", async () => {
62-
await using dir = await tmpdir({ git: true })
63-
64-
await WithInstance.provide({
65-
directory: dir.path,
66-
fn: () =>
67-
AppRuntime.runPromise(
68-
Effect.gen(function* () {
69-
const pty = yield* Pty.Service
70-
const a = yield* pty.create({ command: "cat", title: "a" })
71-
try {
72-
const outA: string[] = []
73-
const outB: string[] = []
74-
75-
const ws = {
76-
readyState: 1,
77-
data: { events: { connection: "a" } },
78-
send: (data: unknown) => {
79-
outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
80-
},
81-
close: () => {
82-
// no-op (simulate abrupt drop)
83-
},
84-
}
85-
86-
yield* pty.connect(a.id, ws as any)
87-
outA.length = 0
88-
89-
ws.data = { events: { connection: "b" } }
90-
ws.send = (data: unknown) => {
91-
outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
92-
}
93-
94-
yield* pty.write(a.id, "AAA\n")
95-
yield* Effect.promise(() => sleep(100))
96-
97-
expect(outB.join("")).not.toContain("AAA")
98-
} finally {
99-
yield* pty.remove(a.id)
100-
}
101-
}),
102-
),
103-
})
104-
})
105-
106-
test("treats in-place socket data mutation as the same connection", async () => {
107-
await using dir = await tmpdir({ git: true })
108-
109-
await WithInstance.provide({
110-
directory: dir.path,
111-
fn: () =>
112-
AppRuntime.runPromise(
113-
Effect.gen(function* () {
114-
const pty = yield* Pty.Service
115-
const a = yield* pty.create({ command: "cat", title: "a" })
116-
try {
117-
const out: string[] = []
118-
119-
const ctx = { connId: 1 }
120-
const ws = {
121-
readyState: 1,
122-
data: ctx,
123-
send: (data: unknown) => {
124-
out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8"))
125-
},
126-
close: () => {
127-
// no-op
128-
},
129-
}
130-
131-
yield* pty.connect(a.id, ws as any)
132-
out.length = 0
133-
134-
ctx.connId = 2
135-
136-
yield* pty.write(a.id, "AAA\n")
137-
yield* Effect.promise(() => sleep(100))
138-
139-
expect(out.join("")).toContain("AAA")
140-
} finally {
141-
yield* pty.remove(a.id)
142-
}
143-
}),
144-
),
145-
})
146-
})
78+
ptyTest(
79+
"does not leak output when websocket objects are reused",
80+
() =>
81+
Effect.gen(function* () {
82+
const pty = yield* Pty.Service
83+
const a = yield* createPty({ command: "cat", title: "a" })
84+
const b = yield* createPty({ command: "cat", title: "b" })
85+
const connectionA = yield* makeSocket({ events: { connection: "a" } })
86+
const connectionB = { events: { connection: "b" } }
87+
88+
yield* pty.connect(a.id, connectionA.socket)
89+
90+
const outBQueue = yield* Queue.unbounded<string>()
91+
const outB: string[] = []
92+
connectionA.socket.data = connectionB
93+
connectionA.socket.send = (data) => {
94+
const text = decodeOutput(data)
95+
outB.push(text)
96+
Queue.offerUnsafe(outBQueue, text)
97+
}
98+
yield* pty.connect(b.id, connectionA.socket)
99+
100+
connectionA.chunks.length = 0
101+
outB.length = 0
102+
103+
yield* pty.write(a.id, "AAA\n")
104+
const verifyA = yield* makeSocket({ events: { connection: "verify-a" } })
105+
yield* pty.connect(a.id, verifyA.socket)
106+
yield* waitForOutput(verifyA.output, "AAA")
107+
108+
expect(outB.join("")).not.toContain("AAA")
109+
expect(yield* waitForLeakedOutput(outBQueue, "AAA")).toBeUndefined()
110+
}),
111+
{ git: true },
112+
)
113+
114+
ptyTest(
115+
"does not leak output when Bun recycles websocket objects before re-connect",
116+
() =>
117+
Effect.gen(function* () {
118+
const pty = yield* Pty.Service
119+
const a = yield* createPty({ command: "cat", title: "a" })
120+
const outA = yield* makeSocket({ events: { connection: "a" } })
121+
const outB = yield* Queue.unbounded<string>()
122+
123+
yield* pty.connect(a.id, outA.socket)
124+
outA.chunks.length = 0
125+
126+
const connectionB = { events: { connection: "b" } }
127+
outA.socket.data = connectionB
128+
outA.socket.send = (data) => {
129+
Queue.offerUnsafe(outB, decodeOutput(data))
130+
}
131+
132+
yield* pty.write(a.id, "AAA\n")
133+
const verifyA = yield* makeSocket({ events: { connection: "verify-a" } })
134+
yield* pty.connect(a.id, verifyA.socket)
135+
yield* waitForOutput(verifyA.output, "AAA")
136+
137+
expect(yield* waitForLeakedOutput(outB, "AAA")).toBeUndefined()
138+
}),
139+
{ git: true },
140+
)
141+
142+
ptyTest(
143+
"treats in-place socket data mutation as the same connection",
144+
() =>
145+
Effect.gen(function* () {
146+
const pty = yield* Pty.Service
147+
const a = yield* createPty({ command: "cat", title: "a" })
148+
const ctx = { connId: 1 }
149+
const out = yield* makeSocket(ctx)
150+
151+
yield* pty.connect(a.id, out.socket)
152+
out.chunks.length = 0
153+
154+
ctx.connId = 2
155+
156+
yield* pty.write(a.id, "AAA\n")
157+
158+
expect(yield* waitForOutput(out.output, "AAA")).toContain("AAA")
159+
}),
160+
{ git: true },
161+
)
147162
})

0 commit comments

Comments
 (0)