@@ -15,8 +15,8 @@ use crate::error::{FoundryLocalError, Result};
1515
1616use super :: responses_types:: {
1717 DeleteResponseResult , FunctionToolDefinition , InputItemsListResponse , ListResponsesResult ,
18- ReasoningConfig , ResponseCreateRequest , ResponseInput , ResponseObject , StreamingEvent ,
19- TextConfig ,
18+ ReasoningConfig , ResponseCreateOptions , ResponseCreateRequest , ResponseInput , ResponseObject ,
19+ StreamingEvent , TextConfig ,
2020} ;
2121
2222// ============================================================================
@@ -159,7 +159,7 @@ impl ResponsesClient {
159159 pub async fn create (
160160 & self ,
161161 input : ResponseInput ,
162- options : Option < ResponseCreateRequest > ,
162+ options : Option < ResponseCreateOptions > ,
163163 ) -> Result < ResponseObject > {
164164 self . validate_input ( & input) ?;
165165 if let Some ( ref opts) = options {
@@ -193,7 +193,7 @@ impl ResponsesClient {
193193 pub async fn create_streaming (
194194 & self ,
195195 input : ResponseInput ,
196- options : Option < ResponseCreateRequest > ,
196+ options : Option < ResponseCreateOptions > ,
197197 ) -> Result < SseStream > {
198198 self . validate_input ( & input) ?;
199199 if let Some ( ref opts) = options {
@@ -340,13 +340,13 @@ impl ResponsesClient {
340340 fn build_request (
341341 & self ,
342342 input : ResponseInput ,
343- options : Option < ResponseCreateRequest > ,
343+ options : Option < ResponseCreateOptions > ,
344344 stream : bool ,
345345 ) -> Result < ResponseCreateRequest > {
346346 // Determine model: options override self.model_id
347347 let model = options
348348 . as_ref ( )
349- . map ( |o| o. model . clone ( ) )
349+ . and_then ( |o| o. model . clone ( ) )
350350 . filter ( |m| !m. trim ( ) . is_empty ( ) )
351351 . or_else ( || self . model_id . clone ( ) )
352352 . ok_or_else ( || FoundryLocalError :: Validation {
@@ -383,11 +383,9 @@ impl ResponsesClient {
383383
384384 // Apply per-call overrides
385385 if let Some ( opts) = options {
386- if ! opts. model . trim ( ) . is_empty ( ) {
387- req. model = opts . model ;
386+ if let Some ( m ) = opts. model . filter ( |m| !m . trim ( ) . is_empty ( ) ) {
387+ req. model = m ;
388388 }
389- // Only override input if the caller passed an options object with explicit input;
390- // in practice options.input will always be overwritten by the positional `input`.
391389 if let Some ( v) = opts. instructions {
392390 req. instructions = Some ( v) ;
393391 }
@@ -635,3 +633,90 @@ where
635633 }
636634 }
637635}
636+
637+ // ============================================================================
638+ // Inline tests
639+ // ============================================================================
640+ //
641+ // These tests live alongside `parse_sse_stream` so they exercise the real
642+ // implementation rather than reimplementing SSE framing in an external test
643+ // crate. Anything that only depends on public APIs lives in `tests/unit/`.
644+
645+ #[ cfg( test) ]
646+ mod tests {
647+ use super :: * ;
648+ use async_stream:: stream;
649+
650+ /// Drive `parse_sse_stream` from a hand-constructed byte stream and collect
651+ /// its yielded events.
652+ async fn collect_events ( chunks : Vec < & ' static str > ) -> Vec < StreamingEvent > {
653+ let byte_stream = stream ! {
654+ for chunk in chunks {
655+ yield Ok :: <Bytes , reqwest:: Error >( Bytes :: from_static( chunk. as_bytes( ) ) ) ;
656+ }
657+ } ;
658+
659+ let parsed = parse_sse_stream ( byte_stream) ;
660+ let mut parsed = std:: pin:: pin!( parsed) ;
661+
662+ let mut events = Vec :: new ( ) ;
663+ use tokio_stream:: StreamExt as _;
664+ while let Some ( event) = parsed. next ( ) . await {
665+ events. push ( event. expect ( "SSE event failed to parse" ) ) ;
666+ }
667+ events
668+ }
669+
670+ #[ tokio:: test]
671+ async fn parses_complete_event_block ( ) {
672+ let payload = "data: {\" type\" :\" response.output_text.delta\" ,\" item_id\" :\" i1\" ,\
673+ \" output_index\" :0,\" content_index\" :0,\" delta\" :\" Hi\" ,\" sequence_number\" :1}\n \n \
674+ data: [DONE]\n \n ";
675+
676+ let events = collect_events ( vec ! [ payload] ) . await ;
677+ assert_eq ! ( events. len( ) , 1 ) ;
678+ assert ! ( matches!(
679+ events[ 0 ] ,
680+ StreamingEvent :: OutputTextDelta { ref delta, .. } if delta == "Hi"
681+ ) ) ;
682+ }
683+
684+ #[ tokio:: test]
685+ async fn done_signal_terminates_stream ( ) {
686+ let payload = "data: [DONE]\n \n \
687+ data: {\" type\" :\" response.output_text.delta\" ,\" item_id\" :\" i1\" ,\
688+ \" output_index\" :0,\" content_index\" :0,\" delta\" :\" after-done\" ,\
689+ \" sequence_number\" :2}\n \n ";
690+
691+ let events = collect_events ( vec ! [ payload] ) . await ;
692+ assert ! ( events. is_empty( ) , "events after [DONE] must be ignored" ) ;
693+ }
694+
695+ #[ tokio:: test]
696+ async fn handles_event_split_across_chunks ( ) {
697+ // Split a single SSE block across two byte chunks to make sure the
698+ // parser buffers correctly.
699+ let part1 = "data: {\" type\" :\" response.output_text.delta\" ,\
700+ \" item_id\" :\" i1\" ,\" output_index\" :0,\" content_index\" :0,";
701+ let part2 = "\" delta\" :\" split\" ,\" sequence_number\" :3}\n \n data: [DONE]\n \n " ;
702+
703+ let events = collect_events ( vec ! [ part1, part2] ) . await ;
704+ assert_eq ! ( events. len( ) , 1 ) ;
705+ assert ! ( matches!(
706+ events[ 0 ] ,
707+ StreamingEvent :: OutputTextDelta { ref delta, .. } if delta == "split"
708+ ) ) ;
709+ }
710+
711+ #[ tokio:: test]
712+ async fn skips_event_lines_and_blank_blocks ( ) {
713+ let payload = "event: response.output_text.delta\n \
714+ data: {\" type\" :\" response.output_text.delta\" ,\" item_id\" :\" i1\" ,\
715+ \" output_index\" :0,\" content_index\" :0,\" delta\" :\" ok\" ,\" sequence_number\" :4}\n \n \
716+ \n \n \
717+ data: [DONE]\n \n ";
718+
719+ let events = collect_events ( vec ! [ payload] ) . await ;
720+ assert_eq ! ( events. len( ) , 1 ) ;
721+ }
722+ }
0 commit comments