Skip to content

Commit cb061d2

Browse files
mishushakovclaude
andauthored
fix(sdk): correct command/PTY stream handling in Python and JS SDKs (#1441)
## Summary Fixes three command/PTY streaming issues in the Python and JS SDKs: - **Multibyte UTF-8 corruption (JS + Python sync/async):** stdout/stderr were decoded per-chunk, so a UTF-8 character split across two stream chunks turned into replacement characters. Each handle now keeps a persistent incremental decoder per stream (`codecs.getincrementaldecoder` in Python, a shared `TextDecoder` with `{ stream: true }` in JS) and flushes any incomplete trailing bytes to `�` on the end event, preserving the existing broken-UTF-8 behavior. - **`commands.list()` optionals (Python):** now returns `None` instead of `""` for unset proto3-optional `tag` and `cwd` fields, matching the declared `Optional[str]` types and the JS SDK. - **Leaked connections (Python):** command/PTY/watch streams are now closed when stream setup fails, instead of abandoning the generator (and its pooled HTTP connection) until GC. ## Usage example ```python # Split multibyte output is now decoded correctly instead of returning "ð\x9f\x98\x80"-style garbage result = sandbox.commands.run("printf '😀'") assert result.stdout == "😀" # Unset fields are None rather than "" proc = sandbox.commands.list()[0] assert proc.tag is None # previously "" ``` ## Testing - New unit tests for incremental/trailing UTF-8 decoding (Python sync + async, JS). - Live command/PTY/watch integration suites pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 8c72291 commit cb061d2

13 files changed

Lines changed: 588 additions & 63 deletions

File tree

.changeset/great-pots-deny.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"e2b": patch
3+
---
4+
5+
Fix command and PTY streaming issues:
6+
7+
- Decode stdout/stderr incrementally so multibyte UTF-8 characters split across command stream chunks are no longer corrupted
8+
- Avoid mutating the caller's `envs` object when applying default `TERM`/`LANG`/`LC_ALL` values in `pty.create()`

.changeset/heavy-pugs-smile.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"@e2b/python-sdk": patch
3+
---
4+
5+
Fix several command and PTY streaming issues:
6+
7+
- Decode stdout/stderr incrementally so multibyte UTF-8 characters split across stream chunks are no longer corrupted
8+
- Return `None` instead of empty strings for unset `tag` and `cwd` fields in `commands.list()`
9+
- Close command/PTY/watch stream connections when establishing the stream fails, instead of leaking pooled connections
10+
- Avoid mutating the caller's `envs` dict when applying default `TERM`/`LANG`/`LC_ALL` values in `pty.create()`

packages/js-sdk/src/sandbox/commands/commandHandle.ts

Lines changed: 81 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ export class CommandHandle
9191
private _stdout = ''
9292
private _stderr = ''
9393

94+
private readonly stdoutDecoder = new TextDecoder()
95+
private readonly stderrDecoder = new TextDecoder()
96+
9497
private result?: CommandResult
9598
private iterationError?: Error
9699

@@ -235,41 +238,89 @@ export class CommandHandle
235238
await this.handleCloseStdin(opts)
236239
}
237240

241+
/**
242+
* Flush any bytes still buffered in the stream decoders.
243+
*
244+
* Incomplete trailing UTF-8 sequences are emitted as replacement
245+
* characters, matching the per-chunk decoding behavior.
246+
*/
247+
private *flushDecoders(): Generator<
248+
[Stdout, null, null] | [null, Stderr, null]
249+
> {
250+
const stdoutRest = this.stdoutDecoder.decode()
251+
if (stdoutRest) {
252+
this._stdout += stdoutRest
253+
yield [stdoutRest as Stdout, null, null]
254+
}
255+
const stderrRest = this.stderrDecoder.decode()
256+
if (stderrRest) {
257+
this._stderr += stderrRest
258+
yield [null, stderrRest as Stderr, null]
259+
}
260+
}
261+
238262
private async *iterateEvents(): AsyncGenerator<
239263
[Stdout, null, null] | [null, Stderr, null] | [null, null, PtyOutput]
240264
> {
241-
for await (const event of this.events) {
242-
const e = event?.event?.event
243-
let out: string | undefined
244-
245-
switch (e?.case) {
246-
case 'data':
247-
switch (e.value.output.case) {
248-
case 'stdout':
249-
out = new TextDecoder().decode(e.value.output.value)
250-
this._stdout += out
251-
yield [out as Stdout, null, null]
252-
break
253-
case 'stderr':
254-
out = new TextDecoder().decode(e.value.output.value)
255-
this._stderr += out
256-
yield [null, out as Stderr, null]
257-
break
258-
case 'pty':
259-
yield [null, null, e.value.output.value as PtyOutput]
260-
break
261-
}
262-
break
263-
case 'end':
264-
this.result = {
265-
exitCode: e.value.exitCode,
266-
error: e.value.error,
267-
stdout: this.stdout,
268-
stderr: this.stderr,
265+
try {
266+
for await (const event of this.events) {
267+
const e = event?.event?.event
268+
let out: string | undefined
269+
270+
switch (e?.case) {
271+
case 'data':
272+
switch (e.value.output.case) {
273+
case 'stdout':
274+
out = this.stdoutDecoder.decode(e.value.output.value, {
275+
stream: true,
276+
})
277+
if (out) {
278+
this._stdout += out
279+
yield [out as Stdout, null, null]
280+
}
281+
break
282+
case 'stderr':
283+
out = this.stderrDecoder.decode(e.value.output.value, {
284+
stream: true,
285+
})
286+
if (out) {
287+
this._stderr += out
288+
yield [null, out as Stderr, null]
289+
}
290+
break
291+
case 'pty':
292+
yield [null, null, e.value.output.value as PtyOutput]
293+
break
294+
}
295+
break
296+
case 'end': {
297+
yield* this.flushDecoders()
298+
this.result = {
299+
exitCode: e.value.exitCode,
300+
error: e.value.error,
301+
stdout: this.stdout,
302+
stderr: this.stderr,
303+
}
304+
break
269305
}
270-
break
306+
}
307+
// TODO: Handle empty events like in python SDK
271308
}
272-
// TODO: Handle empty events like in python SDK
309+
} catch (e) {
310+
// The stream raised before an `end` event (e.g. disconnect or RPC
311+
// failure). Flush any bytes still buffered in the decoders so incomplete
312+
// trailing sequences surface as replacement characters instead of being
313+
// silently dropped, then re-raise so the error is still surfaced.
314+
yield* this.flushDecoders()
315+
throw e
316+
}
317+
318+
// If the stream closed without an `end` event (e.g. disconnect or a
319+
// dropped connection), flush any bytes still buffered in the decoders so
320+
// incomplete trailing sequences surface as replacement characters instead
321+
// of being silently dropped.
322+
if (this.result === undefined) {
323+
yield* this.flushDecoders()
273324
}
274325
}
275326

packages/js-sdk/src/sandbox/commands/pty.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export class Pty {
106106
async create(opts: PtyCreateOpts) {
107107
const requestTimeoutMs =
108108
opts?.requestTimeoutMs ?? this.connectionConfig.requestTimeoutMs
109-
const envs = opts?.envs ?? {}
109+
const envs = { ...(opts?.envs ?? {}) }
110110
envs.TERM = envs.TERM ?? 'xterm-256color'
111111
envs.LANG = envs.LANG ?? 'C.UTF-8'
112112
envs.LC_ALL = envs.LC_ALL ?? 'C.UTF-8'

packages/js-sdk/tests/sandbox/commands/commandHandle.test.ts

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,36 @@ function createEvents(kind: EventKind): AsyncIterable<any> {
5252
return events()
5353
}
5454

55+
function dataEvent(kind: 'stdout' | 'stderr', value: Uint8Array) {
56+
return {
57+
event: {
58+
event: {
59+
case: 'data',
60+
value: {
61+
output: {
62+
case: kind,
63+
value,
64+
},
65+
},
66+
},
67+
},
68+
}
69+
}
70+
71+
function endEvent(exitCode = 0) {
72+
return {
73+
event: {
74+
event: {
75+
case: 'end',
76+
value: {
77+
exitCode,
78+
error: undefined,
79+
},
80+
},
81+
},
82+
}
83+
}
84+
5585
describe('CommandHandle', () => {
5686
it.each<EventKind>(['stdout', 'stderr', 'pty'])(
5787
'wait awaits async %s callbacks',
@@ -94,4 +124,137 @@ describe('CommandHandle', () => {
94124
expect(waitResolved).toBe(true)
95125
}
96126
)
127+
128+
it('decodes multibyte characters split across chunks', async () => {
129+
const emojiBytes = new TextEncoder().encode('😀')
130+
131+
async function* events() {
132+
yield dataEvent(
133+
'stdout',
134+
new Uint8Array([
135+
...new TextEncoder().encode('a'),
136+
...emojiBytes.slice(0, 2),
137+
])
138+
)
139+
yield dataEvent(
140+
'stdout',
141+
new Uint8Array([
142+
...emojiBytes.slice(2),
143+
...new TextEncoder().encode('b'),
144+
])
145+
)
146+
yield dataEvent('stderr', emojiBytes.slice(0, 3))
147+
yield dataEvent('stderr', emojiBytes.slice(3))
148+
yield endEvent()
149+
}
150+
151+
const stdoutChunks: string[] = []
152+
const handle = new CommandHandle(
153+
1,
154+
() => {},
155+
async () => true,
156+
events(),
157+
(out) => {
158+
stdoutChunks.push(out)
159+
}
160+
)
161+
162+
const result = await handle.wait()
163+
164+
expect(result.stdout).toBe('a😀b')
165+
expect(result.stderr).toBe('😀')
166+
expect(result.stdout).not.toContain('�')
167+
expect(result.stderr).not.toContain('�')
168+
expect(stdoutChunks.join('')).toBe('a😀b')
169+
})
170+
171+
it('replaces incomplete trailing utf-8 sequences at the end of the stream', async () => {
172+
const emojiBytes = new TextEncoder().encode('😀')
173+
174+
async function* events() {
175+
yield dataEvent(
176+
'stdout',
177+
new Uint8Array([
178+
...new TextEncoder().encode('a'),
179+
...emojiBytes.slice(0, 2),
180+
])
181+
)
182+
yield endEvent()
183+
}
184+
185+
const handle = new CommandHandle(
186+
1,
187+
() => {},
188+
async () => true,
189+
events()
190+
)
191+
192+
const result = await handle.wait()
193+
194+
expect(result.stdout).toBe('a�')
195+
})
196+
197+
it('flushes incomplete trailing utf-8 sequences when the stream closes without an end event', async () => {
198+
const emojiBytes = new TextEncoder().encode('😀')
199+
200+
async function* events() {
201+
yield dataEvent(
202+
'stdout',
203+
new Uint8Array([
204+
...new TextEncoder().encode('a'),
205+
...emojiBytes.slice(0, 2),
206+
])
207+
)
208+
}
209+
210+
const stdoutChunks: string[] = []
211+
const handle = new CommandHandle(
212+
1,
213+
() => {},
214+
async () => true,
215+
events(),
216+
(out) => {
217+
stdoutChunks.push(out)
218+
}
219+
)
220+
221+
// No end event arrives, so wait() rejects, but the buffered bytes must
222+
// still be flushed to the stdout callback as a replacement character.
223+
await expect(handle.wait()).rejects.toThrow()
224+
225+
expect(stdoutChunks.join('')).toBe('a�')
226+
})
227+
228+
it('flushes incomplete trailing utf-8 sequences when the stream errors', async () => {
229+
const emojiBytes = new TextEncoder().encode('😀')
230+
231+
async function* events() {
232+
yield dataEvent(
233+
'stdout',
234+
new Uint8Array([
235+
...new TextEncoder().encode('a'),
236+
...emojiBytes.slice(0, 2),
237+
])
238+
)
239+
throw new Error('stream died')
240+
}
241+
242+
const stdoutChunks: string[] = []
243+
const handle = new CommandHandle(
244+
1,
245+
() => {},
246+
async () => true,
247+
events(),
248+
(out) => {
249+
stdoutChunks.push(out)
250+
}
251+
)
252+
253+
// The stream errors before an end event arrives, so wait() rejects, but the
254+
// buffered bytes must still be flushed to the stdout callback as a
255+
// replacement character.
256+
await expect(handle.wait()).rejects.toThrow()
257+
258+
expect(stdoutChunks.join('')).toBe('a�')
259+
})
97260
})

packages/python-sdk/e2b/sandbox_async/commands/command.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ async def list(
6767
return [
6868
ProcessInfo(
6969
pid=p.pid,
70-
tag=p.tag,
70+
tag=p.tag if p.HasField("tag") else None,
7171
cmd=p.config.cmd,
7272
args=list(p.config.args),
7373
envs=dict(p.config.envs),
74-
cwd=p.config.cwd,
74+
cwd=p.config.cwd if p.config.HasField("cwd") else None,
7575
)
7676
for p in res.processes
7777
]
@@ -325,6 +325,10 @@ async def _start(
325325
check_health=self._check_health,
326326
)
327327
except Exception as e:
328+
try:
329+
await events.aclose()
330+
except Exception:
331+
pass
328332
raise await ahandle_rpc_exception_with_health(e, self._check_health)
329333

330334
async def connect(
@@ -384,4 +388,8 @@ async def connect(
384388
check_health=self._check_health,
385389
)
386390
except Exception as e:
391+
try:
392+
await events.aclose()
393+
except Exception:
394+
pass
387395
raise await ahandle_rpc_exception_with_health(e, self._check_health)

0 commit comments

Comments
 (0)