Skip to content

Commit 6fad3f9

Browse files
fix(client): use scanner for streaming
1 parent e2e8207 commit 6fad3f9

1 file changed

Lines changed: 10 additions & 50 deletions

File tree

packages/ssestream/ssestream.go

Lines changed: 10 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ func NewDecoder(res *http.Response) Decoder {
2929
if t, ok := decoderTypes[contentType]; ok {
3030
decoder = t(res.Body)
3131
} else {
32-
decoder = &eventStreamDecoder{rc: res.Body, rdr: bufio.NewReader(res.Body)}
32+
scn := bufio.NewScanner(res.Body)
33+
scn.Buffer(nil, bufio.MaxScanTokenSize<<2)
34+
decoder = &eventStreamDecoder{rc: res.Body, scn: scn}
3335
}
3436
return decoder
3537
}
@@ -49,48 +51,10 @@ type Event struct {
4951
type eventStreamDecoder struct {
5052
evt Event
5153
rc io.ReadCloser
52-
rdr *bufio.Reader
54+
scn *bufio.Scanner
5355
err error
5456
}
5557

56-
func line(r *bufio.Reader) ([]byte, error) {
57-
var overflow bytes.Buffer
58-
59-
// To prevent infinite loops, the failsafe stops when a line is
60-
// 100 times longer than the [io.Reader] default buffer size,
61-
// or after 20 failed attempts to find an end of line.
62-
for f := 0; f < 100; f++ {
63-
part, isPrefix, err := r.ReadLine()
64-
if err != nil {
65-
return nil, err
66-
}
67-
68-
// Happy case, the line fits in the default buffer.
69-
if !isPrefix && overflow.Len() == 0 {
70-
return part, nil
71-
}
72-
73-
// Overflow case, append to the buffer.
74-
if isPrefix || overflow.Len() > 0 {
75-
n, err := overflow.Write(part)
76-
if err != nil {
77-
return nil, err
78-
}
79-
80-
// Didn't find an end of line, heavily increment the failsafe.
81-
if n != r.Size() {
82-
f += 5
83-
}
84-
}
85-
86-
if !isPrefix {
87-
return overflow.Bytes(), nil
88-
}
89-
}
90-
91-
return nil, fmt.Errorf("ssestream: too many attempts to read a line")
92-
}
93-
9458
func (s *eventStreamDecoder) Next() bool {
9559
if s.err != nil {
9660
return false
@@ -99,16 +63,8 @@ func (s *eventStreamDecoder) Next() bool {
9963
event := ""
10064
data := bytes.NewBuffer(nil)
10165

102-
for {
103-
txt, err := line(s.rdr)
104-
if err == io.EOF {
105-
return false
106-
}
107-
108-
if err != nil {
109-
s.err = err
110-
break
111-
}
66+
for s.scn.Scan() {
67+
txt := s.scn.Bytes()
11268

11369
// Dispatch event on an empty line
11470
if len(txt) == 0 {
@@ -145,6 +101,10 @@ func (s *eventStreamDecoder) Next() bool {
145101
}
146102
}
147103

104+
if s.scn.Err() != nil {
105+
s.err = s.scn.Err()
106+
}
107+
148108
return false
149109
}
150110

0 commit comments

Comments
 (0)