Skip to content

Commit 6697220

Browse files
authored
fix: Support stripping initial UTF-8 BOM header (#114)
1 parent 1cdf14a commit 6697220

File tree

3 files changed

+80
-3
lines changed

3 files changed

+80
-3
lines changed

contract-tests/src/bin/sse-test-api/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ struct Event {
8383
async fn status() -> impl Responder {
8484
web::Json(Status {
8585
capabilities: vec![
86+
"bom".to_string(),
8687
"comments".to_string(),
8788
"headers".to_string(),
8889
"last-event-id".to_string(),

eventsource-client/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::{
1818
fmt::{self, Debug, Formatter},
1919
future::Future,
2020
io::ErrorKind,
21-
pin::{pin, Pin},
21+
pin::Pin,
2222
str::FromStr,
2323
task::{Context, Poll},
2424
time::{Duration, Instant},

eventsource-client/src/event_parser.rs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,32 @@ fn parse_value(value: &[u8]) -> Result<&str> {
142142
from_utf8(value).map_err(|e| Error::InvalidLine(format!("malformed value: {e:?}")))
143143
}
144144

145+
// A state machine for handling the BOM header.
146+
#[derive(Debug)]
147+
enum BomHeaderState {
148+
Parsing(Vec<u8>),
149+
Consumed,
150+
}
151+
152+
const BOM_HEADER: &[u8] = b"\xEF\xBB\xBF";
153+
154+
// Try to consume the BOM header from the given bytes.
155+
// If the BOM header is found, return the remaining bytes, otherwise return the origin buffer.
156+
// Return `None` if we cannot determine whether the BOM header is present.
157+
fn try_consume_bom_header(buf: &[u8]) -> Option<&[u8]> {
158+
if buf.len() < BOM_HEADER.len() {
159+
if BOM_HEADER.starts_with(buf) {
160+
None
161+
} else {
162+
Some(buf)
163+
}
164+
} else if buf.starts_with(BOM_HEADER) {
165+
Some(&buf[BOM_HEADER.len()..])
166+
} else {
167+
Some(buf)
168+
}
169+
}
170+
145171
#[pin_project]
146172
#[must_use = "streams do nothing unless polled"]
147173
pub struct EventParser {
@@ -159,6 +185,8 @@ pub struct EventParser {
159185
/// the last-seen event ID; events without an ID will take on this value until it is updated.
160186
last_event_id: Option<String>,
161187
sse: VecDeque<SSE>,
188+
/// state machine for handling the BOM header
189+
bom_header_state: BomHeaderState,
162190
}
163191

164192
impl EventParser {
@@ -170,6 +198,7 @@ impl EventParser {
170198
event_data: None,
171199
last_event_id: None,
172200
sse: VecDeque::with_capacity(3),
201+
bom_header_state: BomHeaderState::Parsing(Vec::new()),
173202
}
174203
}
175204

@@ -187,6 +216,24 @@ impl EventParser {
187216

188217
pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> {
189218
trace!("Parsing bytes {bytes:?}");
219+
220+
// According to the SSE spec, a BOM header may be present at the beginning of the stream,
221+
// which must be stripped before the message processing.
222+
let bytes_to_process =
223+
if let BomHeaderState::Parsing(header_buf) = &mut self.bom_header_state {
224+
header_buf.extend_from_slice(&bytes);
225+
if let Some(rest) = try_consume_bom_header(header_buf) {
226+
let owned_rest = rest.to_vec();
227+
self.bom_header_state = BomHeaderState::Consumed;
228+
// Once the BOM header is consumed, we can process the rest of the bytes.
229+
Bytes::from_owner(owned_rest)
230+
} else {
231+
return Ok(());
232+
}
233+
} else {
234+
bytes
235+
};
236+
190237
// We get bytes from the underlying stream in chunks. Decoding a chunk has two phases:
191238
// decode the chunk into lines, and decode the lines into events.
192239
//
@@ -196,8 +243,7 @@ impl EventParser {
196243
// (empty-line-terminated) before returning it. So we buffer lines between poll()
197244
// invocations, and begin by processing any incomplete events from previous invocations,
198245
// before requesting new input from the underlying stream and processing that.
199-
200-
self.decode_and_buffer_lines(bytes);
246+
self.decode_and_buffer_lines(bytes_to_process);
201247
self.parse_complete_lines_into_event()?;
202248

203249
Ok(())
@@ -723,6 +769,36 @@ mod tests {
723769
.unwrap_or_else(|_| panic!("couldn't read {name}"))
724770
}
725771

772+
#[test]
773+
fn test_event_parser_with_bom_header_split_across_chunks() {
774+
let mut parser = EventParser::new();
775+
// First chunk: partial BOM
776+
assert!(parser
777+
.process_bytes(Bytes::from(b"\xEF\xBB".as_slice()))
778+
.is_ok());
779+
assert!(parser.get_event().is_none());
780+
// Second chunk: rest of BOM + data
781+
assert!(parser
782+
.process_bytes(Bytes::from(b"\xBFdata: hello\n\n".as_slice()))
783+
.is_ok());
784+
assert_eq!(parser.get_event(), Some(event("message", "hello")));
785+
assert!(parser.get_event().is_none());
786+
}
787+
788+
#[test]
789+
fn test_event_parser_second_bom_should_fail() {
790+
let mut parser = EventParser::new();
791+
// First event with BOM - should succeed
792+
assert!(parser
793+
.process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: first\n\n".as_slice()))
794+
.is_ok());
795+
assert_eq!(parser.get_event(), Some(event("message", "first")));
796+
797+
// Second event with BOM - should fail (only first message can have BOM)
798+
let result = parser.process_bytes(Bytes::from(b"\xEF\xBB\xBFdata: second\n\n".as_slice()));
799+
assert!(result.is_err());
800+
}
801+
726802
proptest! {
727803
#[test]
728804
fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {

0 commit comments

Comments
 (0)