Skip to content

Commit 4f6e492

Browse files
stevenouclaude
andcommitted
Handle parsing for error when chunked
When the OpenAI API returns an error during streaming, the error JSON body may be split across multiple HTTP chunks. Previously, the code would immediately try to raise an error on the first chunk, which would fail if the JSON was incomplete. This change accumulates error chunks until a complete, parsable JSON object is formed, then raises the appropriate Faraday error. It also prevents the user proc from being called when an error status is received. Based on the approach from PR alexrudall#591. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 6c834c2 commit 4f6e492

2 files changed

Lines changed: 67 additions & 10 deletions

File tree

lib/openai/stream.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class Stream
66
def initialize(user_proc:, parser: EventStreamParser::Parser.new)
77
@user_proc = user_proc
88
@parser = parser
9+
@accumulated_error = ""
910

1011
# To be backwards compatible, we need to check how many arguments the user_proc takes.
1112
@user_proc_arity =
@@ -18,13 +19,16 @@ def initialize(user_proc:, parser: EventStreamParser::Parser.new)
1819
end
1920

2021
def call(chunk, _bytes, env = nil)
21-
handle_http_error(chunk: chunk, env: env) if env && env.status != 200
22-
23-
parser.feed(chunk) do |event, data|
24-
next if data == DONE
25-
26-
args = [JSON.parse(data), event].first(user_proc_arity)
27-
user_proc.call(*args)
22+
if env && env.status != 200
23+
@accumulated_error += chunk
24+
raise_error_when_ready(env)
25+
else
26+
parser.feed(chunk) do |event, data|
27+
next if data == DONE
28+
29+
args = [JSON.parse(data), event].first(user_proc_arity)
30+
user_proc.call(*args)
31+
end
2832
end
2933
end
3034

@@ -34,11 +38,14 @@ def to_proc
3438

3539
private
3640

37-
attr_reader :user_proc, :parser, :user_proc_arity
41+
attr_reader :user_proc, :parser, :user_proc_arity, :accumulated_error
42+
43+
def raise_error_when_ready(env)
44+
parsed_error = try_parse_json(accumulated_error)
45+
return if parsed_error.is_a?(String)
3846

39-
def handle_http_error(chunk:, env:)
4047
raise_error = Faraday::Response::RaiseError.new
41-
raise_error.on_complete(env.merge(body: try_parse_json(chunk)))
48+
raise_error.on_complete(env.merge(body: parsed_error))
4249
end
4350

4451
def try_parse_json(maybe_json)

spec/openai/client/stream_spec.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,56 @@
9292
end
9393
end
9494

95+
context "with a HTTP error response with body containing JSON split across chunks" do
96+
let(:error_env) do
97+
Faraday::Env.from(
98+
method: :post,
99+
url: URI("http://example.com"),
100+
status: 400,
101+
request: {},
102+
response: Faraday::Response.new
103+
)
104+
end
105+
let(:expected_body) do
106+
{
107+
"error" => {
108+
"message" => "Test error",
109+
"type" => "test_error",
110+
"param" => nil,
111+
"code" => "test"
112+
}
113+
}
114+
end
115+
116+
it "raises an error" do
117+
json = expected_body.to_json
118+
# Split the JSON into two chunks in the middle
119+
chunks = [json[0..(json.length / 2)], json[((json.length / 2) + 1)..]]
120+
121+
expect do
122+
chunks.each do |chunk|
123+
stream.call(chunk, bytes, error_env)
124+
end
125+
end.to raise_error(Faraday::BadRequestError) do |e|
126+
expect(e.response).to include(status: 400)
127+
expect(e.response[:body]).to eq(expected_body)
128+
end
129+
end
130+
131+
it "does not call user proc on error" do
132+
expect(user_proc).not_to receive(:call)
133+
134+
json = expected_body.to_json
135+
chunks = [json[0..(json.length / 2)], json[((json.length / 2) + 1)..]]
136+
137+
expect do
138+
chunks.each do |chunk|
139+
stream.call(chunk, bytes, error_env)
140+
end
141+
end.to raise_error(Faraday::BadRequestError)
142+
end
143+
end
144+
95145
context "with a call method that only takes one argument" do
96146
let(:user_proc) { proc { |data| data } }
97147

0 commit comments

Comments
 (0)