Skip to content

Commit 43a727d

Browse files
Avoid bun memory leak bug from TransformStream
1 parent 699bbfd commit 43a727d

4 files changed

Lines changed: 83 additions & 129 deletions

File tree

apps/sim/executor/execution/block-executor.ts

Lines changed: 68 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
type ExecutionContext,
3535
getNextExecutionOrder,
3636
type NormalizedBlockOutput,
37+
type StreamingExecution,
3738
} from '@/executor/types'
3839
import { streamingResponseFormatProcessor } from '@/executor/utils'
3940
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
@@ -140,7 +141,7 @@ export class BlockExecutor {
140141

141142
let normalizedOutput: NormalizedBlockOutput
142143
if (isStreamingExecution) {
143-
const streamingExec = output as { stream: ReadableStream; execution: any }
144+
const streamingExec = output as StreamingExecution
144145

145146
if (ctx.onStream) {
146147
await this.handleStreamingExecution(
@@ -602,7 +603,7 @@ export class BlockExecutor {
602603
ctx: ExecutionContext,
603604
node: DAGNode,
604605
block: SerializedBlock,
605-
streamingExec: { stream: ReadableStream; execution: any },
606+
streamingExec: StreamingExecution,
606607
resolvedInputs: Record<string, any>,
607608
selectedOutputs: string[]
608609
): Promise<void> {
@@ -613,129 +614,97 @@ export class BlockExecutor {
613614
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
614615
(block.config as Record<string, any> | undefined)?.responseFormat
615616

616-
const stream = streamingExec.stream
617-
if (typeof stream.tee !== 'function') {
618-
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
619-
return
620-
}
617+
const sourceReader = streamingExec.stream.getReader()
618+
const decoder = new TextDecoder()
619+
const accumulated: string[] = []
620+
let drainError: unknown
621621

622-
const [clientStream, executorStream] = stream.tee()
622+
const clientSource = new ReadableStream<Uint8Array>({
623+
async pull(controller) {
624+
try {
625+
const { done, value } = await sourceReader.read()
626+
if (done) {
627+
const tail = decoder.decode()
628+
if (tail) accumulated.push(tail)
629+
controller.close()
630+
return
631+
}
632+
accumulated.push(decoder.decode(value, { stream: true }))
633+
controller.enqueue(value)
634+
} catch (error) {
635+
drainError = error
636+
controller.error(error)
637+
}
638+
},
639+
async cancel(reason) {
640+
try {
641+
await sourceReader.cancel(reason)
642+
} catch {}
643+
},
644+
})
623645

624646
const processedClientStream = streamingResponseFormatProcessor.processStream(
625-
clientStream,
626-
blockId,
627-
selectedOutputs,
628-
responseFormat
629-
)
630-
631-
const clientStreamingExec = {
632-
...streamingExec,
633-
stream: processedClientStream,
634-
}
635-
636-
const executorConsumption = this.consumeExecutorStream(
637-
executorStream,
638-
streamingExec,
639-
blockId,
640-
responseFormat
641-
)
642-
643-
const clientConsumption = (async () => {
644-
try {
645-
await ctx.onStream?.(clientStreamingExec)
646-
} catch (error) {
647-
this.execLogger.error('Error in onStream callback', { blockId, error })
648-
// Cancel the client stream to release the tee'd buffer
649-
await processedClientStream.cancel().catch(() => {})
650-
}
651-
})()
652-
653-
await Promise.all([clientConsumption, executorConsumption])
654-
}
655-
656-
private async forwardStream(
657-
ctx: ExecutionContext,
658-
blockId: string,
659-
streamingExec: { stream: ReadableStream; execution: any },
660-
stream: ReadableStream,
661-
responseFormat: any,
662-
selectedOutputs: string[]
663-
): Promise<void> {
664-
const processedStream = streamingResponseFormatProcessor.processStream(
665-
stream,
647+
clientSource,
666648
blockId,
667649
selectedOutputs,
668650
responseFormat
669651
)
670652

671653
try {
672654
await ctx.onStream?.({
673-
...streamingExec,
674-
stream: processedStream,
655+
stream: processedClientStream,
656+
execution: streamingExec.execution,
675657
})
676658
} catch (error) {
677659
this.execLogger.error('Error in onStream callback', { blockId, error })
678-
await processedStream.cancel().catch(() => {})
679-
}
680-
}
681-
682-
private async consumeExecutorStream(
683-
stream: ReadableStream,
684-
streamingExec: { execution: any },
685-
blockId: string,
686-
responseFormat: any
687-
): Promise<void> {
688-
const reader = stream.getReader()
689-
const decoder = new TextDecoder()
690-
const chunks: string[] = []
691-
692-
try {
693-
while (true) {
694-
const { done, value } = await reader.read()
695-
if (done) break
696-
chunks.push(decoder.decode(value, { stream: true }))
697-
}
698-
const tail = decoder.decode()
699-
if (tail) chunks.push(tail)
700-
} catch (error) {
701-
this.execLogger.error('Error reading executor stream for block', { blockId, error })
660+
await processedClientStream.cancel().catch(() => {})
702661
} finally {
703662
try {
704-
await reader.cancel().catch(() => {})
663+
sourceReader.releaseLock()
705664
} catch {}
706665
}
707666

708-
const fullContent = chunks.join('')
709-
if (!fullContent) {
667+
if (drainError) {
668+
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
710669
return
711670
}
712671

713-
const executionOutput = streamingExec.execution?.output
714-
if (!executionOutput || typeof executionOutput !== 'object') {
715-
return
672+
const fullContent = accumulated.join('')
673+
if (fullContent) {
674+
const executionOutput = streamingExec.execution?.output
675+
if (executionOutput && typeof executionOutput === 'object') {
676+
let parsedForFormat = false
677+
if (responseFormat) {
678+
try {
679+
const parsed = JSON.parse(fullContent.trim())
680+
streamingExec.execution.output = {
681+
...parsed,
682+
tokens: executionOutput.tokens,
683+
toolCalls: executionOutput.toolCalls,
684+
providerTiming: executionOutput.providerTiming,
685+
cost: executionOutput.cost,
686+
model: executionOutput.model,
687+
}
688+
parsedForFormat = true
689+
} catch (error) {
690+
this.execLogger.warn('Failed to parse streamed content for response format', {
691+
blockId,
692+
error,
693+
})
694+
}
695+
}
696+
if (!parsedForFormat) {
697+
executionOutput.content = fullContent
698+
}
699+
}
716700
}
717701

718-
if (responseFormat) {
702+
if (streamingExec.onFullContent) {
719703
try {
720-
const parsed = JSON.parse(fullContent.trim())
721-
722-
streamingExec.execution.output = {
723-
...parsed,
724-
tokens: executionOutput.tokens,
725-
toolCalls: executionOutput.toolCalls,
726-
providerTiming: executionOutput.providerTiming,
727-
cost: executionOutput.cost,
728-
model: executionOutput.model,
729-
}
730-
return
704+
await streamingExec.onFullContent(fullContent)
731705
} catch (error) {
732-
this.execLogger.warn('Failed to parse streamed content for response format', {
733-
blockId,
734-
error,
735-
})
706+
this.execLogger.error('onFullContent callback failed', { blockId, error })
736707
}
737708
}
738-
739-
executionOutput.content = fullContent
740709
}
741710
}

apps/sim/executor/handlers/agent/agent-handler.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
958958
streamingExec: StreamingExecution
959959
): StreamingExecution {
960960
return {
961-
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
961+
stream: streamingExec.stream,
962962
execution: streamingExec.execution,
963+
onFullContent: async (content: string) => {
964+
if (!content.trim()) return
965+
try {
966+
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
967+
} catch (error) {
968+
logger.error('Failed to persist streaming response:', error)
969+
}
970+
},
963971
}
964972
}
965973

apps/sim/executor/handlers/agent/memory.ts

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -111,35 +111,6 @@ export class Memory {
111111
})
112112
}
113113

114-
wrapStreamForPersistence(
115-
stream: ReadableStream<Uint8Array>,
116-
ctx: ExecutionContext,
117-
inputs: AgentInputs
118-
): ReadableStream<Uint8Array> {
119-
const chunks: string[] = []
120-
const decoder = new TextDecoder()
121-
122-
const transformStream = new TransformStream<Uint8Array, Uint8Array>({
123-
transform: (chunk, controller) => {
124-
controller.enqueue(chunk)
125-
const decoded = decoder.decode(chunk, { stream: true })
126-
chunks.push(decoded)
127-
},
128-
129-
flush: () => {
130-
const content = chunks.join('')
131-
if (content.trim()) {
132-
this.appendToMemory(ctx, inputs, {
133-
role: 'assistant',
134-
content,
135-
}).catch((error) => logger.error('Failed to persist streaming response:', error))
136-
}
137-
},
138-
})
139-
140-
return stream.pipeThrough(transformStream)
141-
}
142-
143114
private requireWorkspaceId(ctx: ExecutionContext): string {
144115
if (!ctx.workspaceId) {
145116
throw new Error('workspaceId is required for memory operations')

apps/sim/executor/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,12 @@ export interface ExecutionResult {
359359
export interface StreamingExecution {
360360
stream: ReadableStream
361361
execution: ExecutionResult & { isStreaming?: boolean }
362+
/**
363+
* Invoked with the assembled response text after the stream drains. Lets agent
364+
* blocks persist the full response without interposing a TransformStream on a
365+
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
366+
*/
367+
onFullContent?: (content: string) => void | Promise<void>
362368
}
363369

364370
export interface BlockExecutor {

0 commit comments

Comments
 (0)