@@ -17,12 +17,12 @@ use dynamo_parsers::tool_calling::{
1717} ;
1818use dynamo_runtime:: protocols:: annotated:: Annotated ;
1919use futures:: { Stream , StreamExt } ;
20- use std:: collections:: HashMap ;
20+ use std:: collections:: { HashMap , HashSet } ;
2121use uuid:: Uuid ;
2222
2323use crate :: utils:: { MarkerMatcher , MatchResult } ;
2424
25- use super :: NvCreateChatCompletionStreamResponse ;
25+ use super :: { NvCreateChatCompletionStreamResponse , stream_choice_chunk_from_template } ;
2626
2727fn is_harmony_parser ( parser : Option < & str > ) -> bool {
2828 parser == Some ( "harmony" )
@@ -1451,45 +1451,6 @@ impl JailedStream {
14511451 false
14521452 }
14531453
1454- /// Build a synthetic terminal chunk carrying `finish_reason: ToolCalls` for a
1455- /// choice that emitted tool calls but never received a finish_reason from the
1456- /// engine. Used by `fix_finish_reason` to satisfy the OpenAI stream ordering
1457- /// requirement (the terminal chunk must carry a non-null `finish_reason`)
1458- /// when the stream ends without one. The delta is empty; only the
1459- /// `finish_reason` carries information.
1460- fn synthesize_tool_calls_chunk (
1461- template : & NvCreateChatCompletionStreamResponse ,
1462- index : u32 ,
1463- ) -> Annotated < NvCreateChatCompletionStreamResponse > {
1464- let mut response = template. clone ( ) ;
1465- // A terminal finish chunk must not repeat accounting data copied from a
1466- // usage-only template.
1467- response. inner . usage = None ;
1468- response. llm_metrics = None ;
1469- #[ allow( deprecated) ]
1470- let choice = dynamo_protocols:: types:: ChatChoiceStream {
1471- index,
1472- delta : dynamo_protocols:: types:: ChatCompletionStreamResponseDelta {
1473- role : None ,
1474- content : None ,
1475- tool_calls : None ,
1476- function_call : None ,
1477- refusal : None ,
1478- reasoning_content : None ,
1479- } ,
1480- finish_reason : Some ( FinishReason :: ToolCalls ) ,
1481- logprobs : None ,
1482- } ;
1483- response. inner . choices = vec ! [ choice] ;
1484- Annotated {
1485- data : Some ( response) ,
1486- id : None ,
1487- event : None ,
1488- comment : None ,
1489- error : None ,
1490- }
1491- }
1492-
14931454 /// Post-processor that sets finish_reason to ToolCalls when tool calls were emitted
14941455 /// This should be called after apply() to fix the finish_reason for tool call chunks
14951456 fn fix_finish_reason < S > (
@@ -1507,13 +1468,14 @@ impl JailedStream {
15071468 let mut has_tool_calls_per_choice: HashMap <u32 , bool > = HashMap :: new( ) ;
15081469 // Choices that already received a finish_reason during the stream — used by
15091470 // the backstop below to avoid synthesizing a duplicate.
1510- let mut terminated: std :: collections :: HashSet <u32 > = std :: collections :: HashSet :: new( ) ;
1471+ let mut terminated: HashSet <u32 > = HashSet :: new( ) ;
15111472 // Last response, kept (with choices cleared) as a template for a synthesized
15121473 // finish_reason chunk when the stream ended without one.
15131474 let mut template: Option <NvCreateChatCompletionStreamResponse > = None ;
1514- // Whether we have already emitted the synthesized terminal chunks (either
1515- // before the usage-only chunk or at stream end), so we don't emit twice.
1516- let mut synthesized = false ;
1475+ // Choices for which this post-processor has already emitted a synthetic
1476+ // terminal chunk. Tracking this per choice allows a later tool-call choice
1477+ // to terminate even if an earlier empty-choices chunk emitted nothing.
1478+ let mut synthesized: HashSet <u32 > = HashSet :: new( ) ;
15171479
15181480 while let Some ( mut response) = input_stream. next( ) . await {
15191481 // Track if any choice emitted tool calls, and which already terminated.
@@ -1572,17 +1534,25 @@ impl JailedStream {
15721534 . data
15731535 . as_ref( )
15741536 . is_some_and( |d| d. inner. choices. is_empty( ) ) ;
1575- if is_empty_choices
1576- && !synthesized
1577- && let Some ( template) = & template
1578- {
1579- for ( index, _) in has_tool_calls_per_choice. iter( ) . filter( |( _, has) | * * has) {
1580- if terminated. contains( index) {
1581- continue ;
1582- }
1583- yield Self :: synthesize_tool_calls_chunk( template, * index) ;
1537+ if is_empty_choices && let Some ( template) = & template {
1538+ let mut indices: Vec <_> = has_tool_calls_per_choice
1539+ . iter( )
1540+ . filter_map( |( index, has) | {
1541+ ( * has && !terminated. contains( index) && !synthesized. contains( index) )
1542+ . then_some( * index)
1543+ } )
1544+ . collect( ) ;
1545+ indices. sort_unstable( ) ;
1546+ for index in indices {
1547+ yield stream_choice_chunk_from_template(
1548+ template,
1549+ index,
1550+ None ,
1551+ None ,
1552+ Some ( FinishReason :: ToolCalls ) ,
1553+ ) ;
1554+ synthesized. insert( index) ;
15841555 }
1585- synthesized = true ;
15861556 }
15871557
15881558 yield response;
@@ -1597,12 +1567,24 @@ impl JailedStream {
15971567 // complete; without this they hang until their client-side timeout.
15981568 // Choices that never emitted tool calls are left alone — there
15991569 // is no signal to invent a finish_reason from for text-only output.
1600- if !synthesized && let Some ( template) = template {
1601- for ( index, _) in has_tool_calls_per_choice. iter( ) . filter( |( _, has) | * * has) {
1602- if terminated. contains( index) {
1603- continue ;
1604- }
1605- yield Self :: synthesize_tool_calls_chunk( & template, * index) ;
1570+ if let Some ( template) = template {
1571+ let mut indices: Vec <_> = has_tool_calls_per_choice
1572+ . iter( )
1573+ . filter_map( |( index, has) | {
1574+ ( * has && !terminated. contains( index) && !synthesized. contains( index) )
1575+ . then_some( * index)
1576+ } )
1577+ . collect( ) ;
1578+ indices. sort_unstable( ) ;
1579+ for index in indices {
1580+ yield stream_choice_chunk_from_template(
1581+ & template,
1582+ index,
1583+ None ,
1584+ None ,
1585+ Some ( FinishReason :: ToolCalls ) ,
1586+ ) ;
1587+ synthesized. insert( index) ;
16061588 }
16071589 }
16081590 }
@@ -1918,6 +1900,40 @@ mod tests {
19181900 }
19191901 }
19201902
1903+ /// Build one data chunk whose choices have already emitted tool-call deltas.
1904+ fn tool_call_choices_chunk ( indices : & [ u32 ] ) -> Annotated < NvCreateChatCompletionStreamResponse > {
1905+ let mut chunk = text_chunk ( "" ) ;
1906+ let data = chunk. data . as_mut ( ) . expect ( "tool-call response data" ) ;
1907+ #[ allow( deprecated) ]
1908+ {
1909+ data. inner . choices = indices
1910+ . iter ( )
1911+ . map ( |index| ChatChoiceStream {
1912+ index : * index,
1913+ delta : ChatCompletionStreamResponseDelta {
1914+ role : Some ( Role :: Assistant ) ,
1915+ content : None ,
1916+ tool_calls : Some ( vec ! [ ChatCompletionMessageToolCallChunk {
1917+ index: 0 ,
1918+ id: Some ( format!( "call-{index}" ) ) ,
1919+ r#type: Some ( FunctionType :: Function ) ,
1920+ function: Some ( FunctionCallStream {
1921+ name: Some ( format!( "tool_{index}" ) ) ,
1922+ arguments: Some ( "{}" . to_string( ) ) ,
1923+ } ) ,
1924+ } ] ) ,
1925+ function_call : None ,
1926+ refusal : None ,
1927+ reasoning_content : None ,
1928+ } ,
1929+ finish_reason : None ,
1930+ logprobs : None ,
1931+ } )
1932+ . collect ( ) ;
1933+ }
1934+ chunk
1935+ }
1936+
19211937 fn heartbeat ( ) -> Annotated < NvCreateChatCompletionStreamResponse > {
19221938 Annotated {
19231939 data : None ,
@@ -2376,4 +2392,65 @@ mod tests {
23762392 "synthesized ToolCalls chunk must not repeat LLM metrics"
23772393 ) ;
23782394 }
2395+
2396+ // An empty-choices chunk can precede tool deltas (for example, a metadata
2397+ // response). It must not disable later synthesis. When several choices then
2398+ // emit tool calls, their terminal chunks must be ordered by choice index.
2399+ #[ tokio:: test]
2400+ async fn jail_synthesizes_late_tool_choices_in_index_order ( ) {
2401+ let chunks = vec ! [
2402+ usage_only_chunk( ) ,
2403+ tool_call_choices_chunk( & [ 2 , 0 , 1 ] ) ,
2404+ usage_only_chunk( ) ,
2405+ ] ;
2406+
2407+ let responses: Vec < _ > =
2408+ JailedStream :: fix_finish_reason ( stream:: iter ( chunks) , JailMode :: MarkerBased , false )
2409+ . collect ( )
2410+ . await ;
2411+
2412+ let usage_positions: Vec < _ > = responses
2413+ . iter ( )
2414+ . enumerate ( )
2415+ . filter_map ( |( position, response) | {
2416+ response
2417+ . data
2418+ . as_ref ( )
2419+ . is_some_and ( |data| data. inner . choices . is_empty ( ) && data. inner . usage . is_some ( ) )
2420+ . then_some ( position)
2421+ } )
2422+ . collect ( ) ;
2423+ assert_eq ! (
2424+ usage_positions. len( ) ,
2425+ 2 ,
2426+ "both empty-choices chunks must pass through"
2427+ ) ;
2428+
2429+ let terminals: Vec < _ > = responses
2430+ . iter ( )
2431+ . enumerate ( )
2432+ . flat_map ( |( position, response) | {
2433+ response. data . iter ( ) . flat_map ( move |data| {
2434+ data. inner . choices . iter ( ) . filter_map ( move |choice| {
2435+ ( choice. finish_reason == Some ( FinishReason :: ToolCalls ) )
2436+ . then_some ( ( position, choice. index ) )
2437+ } )
2438+ } )
2439+ } )
2440+ . collect ( ) ;
2441+ assert_eq ! (
2442+ terminals
2443+ . iter( )
2444+ . map( |( _, index) | * index)
2445+ . collect:: <Vec <_>>( ) ,
2446+ vec![ 0 , 1 , 2 ] ,
2447+ "synthetic terminal chunks must be deterministic"
2448+ ) ;
2449+ assert ! (
2450+ terminals. iter( ) . all( |( position, _) | {
2451+ usage_positions[ 0 ] < * position && * position < usage_positions[ 1 ]
2452+ } ) ,
2453+ "terminal chunks must follow the early empty response and precede the final usage response"
2454+ ) ;
2455+ }
23792456}
0 commit comments