Skip to content

Commit bb41f0f

Browse files
committed
fix(toolcalling): address terminal stream review findings
Signed-off-by: Matej Kosec <mkosec@nvidia.com>
1 parent 5b12902 commit bb41f0f

3 files changed

Lines changed: 250 additions & 95 deletions

File tree

lib/llm/src/protocols/openai/chat_completions.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use std::collections::HashMap;
55

6-
use dynamo_runtime::protocols::annotated::AnnotationsProvider;
6+
use dynamo_runtime::protocols::annotated::{Annotated, AnnotationsProvider};
77
use serde::{Deserialize, Serialize};
88
use utoipa::ToSchema;
99
use validator::Validate;
@@ -30,8 +30,9 @@ pub use delta::DeltaGenerator;
3030

3131
use dynamo_parsers::tool_calling::{ToolCallResponse, ToolCallResponseChunk};
3232
use dynamo_protocols::types::{
33-
ChatCompletionMessageToolCall, ChatCompletionMessageToolCallChunk, FunctionCall,
34-
FunctionCallStream, FunctionType,
33+
ChatChoiceStream, ChatCompletionMessageContent, ChatCompletionMessageToolCall,
34+
ChatCompletionMessageToolCallChunk, ChatCompletionStreamResponseDelta, FinishReason,
35+
FunctionCall, FunctionCallStream, FunctionType,
3536
};
3637

3738
/// Map a parser-native [`ToolCallResponse`] onto the protocol/wire
@@ -239,6 +240,45 @@ pub struct NvCreateChatCompletionStreamResponse {
239240
pub llm_metrics: Option<crate::protocols::common::metrics::LLMMetricAnnotation>,
240241
}
241242

243+
/// Build one synthetic stream choice from an existing response template.
244+
///
245+
/// Both streaming tool-call paths use this constructor when an engine omits a
246+
/// terminal choice. Accounting data belongs only on the usage chunk and must
247+
/// not be copied onto the synthetic choice.
248+
pub(super) fn stream_choice_chunk_from_template(
249+
template: &NvCreateChatCompletionStreamResponse,
250+
index: u32,
251+
content: Option<ChatCompletionMessageContent>,
252+
tool_calls: Option<Vec<ChatCompletionMessageToolCallChunk>>,
253+
finish_reason: Option<FinishReason>,
254+
) -> Annotated<NvCreateChatCompletionStreamResponse> {
255+
let mut response = template.clone();
256+
response.inner.usage = None;
257+
response.llm_metrics = None;
258+
#[allow(deprecated)]
259+
let choice = ChatChoiceStream {
260+
index,
261+
delta: ChatCompletionStreamResponseDelta {
262+
role: None,
263+
content,
264+
tool_calls,
265+
function_call: None,
266+
refusal: None,
267+
reasoning_content: None,
268+
},
269+
finish_reason,
270+
logprobs: None,
271+
};
272+
response.inner.choices = vec![choice];
273+
Annotated {
274+
data: Some(response),
275+
id: None,
276+
event: None,
277+
comment: None,
278+
error: None,
279+
}
280+
}
281+
242282
/// Implements `NvExtProvider` for `NvCreateChatCompletionRequest`,
243283
/// providing access to NVIDIA-specific extensions.
244284
impl NvExtProvider for NvCreateChatCompletionRequest {

lib/llm/src/protocols/openai/chat_completions/jail.rs

Lines changed: 138 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ use dynamo_parsers::tool_calling::{
1717
};
1818
use dynamo_runtime::protocols::annotated::Annotated;
1919
use futures::{Stream, StreamExt};
20-
use std::collections::HashMap;
20+
use std::collections::{HashMap, HashSet};
2121
use uuid::Uuid;
2222

2323
use crate::utils::{MarkerMatcher, MatchResult};
2424

25-
use super::NvCreateChatCompletionStreamResponse;
25+
use super::{NvCreateChatCompletionStreamResponse, stream_choice_chunk_from_template};
2626

2727
fn is_harmony_parser(parser: Option<&str>) -> bool {
2828
parser == Some("harmony")
@@ -1451,45 +1451,6 @@ impl JailedStream {
14511451
false
14521452
}
14531453

1454-
/// Build a synthetic terminal chunk carrying `finish_reason: ToolCalls` for a
1455-
/// choice that emitted tool calls but never received a finish_reason from the
1456-
/// engine. Used by `fix_finish_reason` to satisfy the OpenAI stream ordering
1457-
/// requirement (the terminal chunk must carry a non-null `finish_reason`)
1458-
/// when the stream ends without one. The delta is empty; only the
1459-
/// `finish_reason` carries information.
1460-
fn synthesize_tool_calls_chunk(
1461-
template: &NvCreateChatCompletionStreamResponse,
1462-
index: u32,
1463-
) -> Annotated<NvCreateChatCompletionStreamResponse> {
1464-
let mut response = template.clone();
1465-
// A terminal finish chunk must not repeat accounting data copied from a
1466-
// usage-only template.
1467-
response.inner.usage = None;
1468-
response.llm_metrics = None;
1469-
#[allow(deprecated)]
1470-
let choice = dynamo_protocols::types::ChatChoiceStream {
1471-
index,
1472-
delta: dynamo_protocols::types::ChatCompletionStreamResponseDelta {
1473-
role: None,
1474-
content: None,
1475-
tool_calls: None,
1476-
function_call: None,
1477-
refusal: None,
1478-
reasoning_content: None,
1479-
},
1480-
finish_reason: Some(FinishReason::ToolCalls),
1481-
logprobs: None,
1482-
};
1483-
response.inner.choices = vec![choice];
1484-
Annotated {
1485-
data: Some(response),
1486-
id: None,
1487-
event: None,
1488-
comment: None,
1489-
error: None,
1490-
}
1491-
}
1492-
14931454
/// Post-processor that sets finish_reason to ToolCalls when tool calls were emitted
14941455
/// This should be called after apply() to fix the finish_reason for tool call chunks
14951456
fn fix_finish_reason<S>(
@@ -1507,13 +1468,14 @@ impl JailedStream {
15071468
let mut has_tool_calls_per_choice: HashMap<u32, bool> = HashMap::new();
15081469
// Choices that already received a finish_reason during the stream — used by
15091470
// the backstop below to avoid synthesizing a duplicate.
1510-
let mut terminated: std::collections::HashSet<u32> = std::collections::HashSet::new();
1471+
let mut terminated: HashSet<u32> = HashSet::new();
15111472
// Last response, kept (with choices cleared) as a template for a synthesized
15121473
// finish_reason chunk when the stream ended without one.
15131474
let mut template: Option<NvCreateChatCompletionStreamResponse> = None;
1514-
// Whether we have already emitted the synthesized terminal chunks (either
1515-
// before the usage-only chunk or at stream end), so we don't emit twice.
1516-
let mut synthesized = false;
1475+
// Choices for which this post-processor has already emitted a synthetic
1476+
// terminal chunk. Tracking this per choice allows a later tool-call choice
1477+
// to terminate even if an earlier empty-choices chunk emitted nothing.
1478+
let mut synthesized: HashSet<u32> = HashSet::new();
15171479

15181480
while let Some(mut response) = input_stream.next().await {
15191481
// Track if any choice emitted tool calls, and which already terminated.
@@ -1572,17 +1534,25 @@ impl JailedStream {
15721534
.data
15731535
.as_ref()
15741536
.is_some_and(|d| d.inner.choices.is_empty());
1575-
if is_empty_choices
1576-
&& !synthesized
1577-
&& let Some(template) = &template
1578-
{
1579-
for (index, _) in has_tool_calls_per_choice.iter().filter(|(_, has)| **has) {
1580-
if terminated.contains(index) {
1581-
continue;
1582-
}
1583-
yield Self::synthesize_tool_calls_chunk(template, *index);
1537+
if is_empty_choices && let Some(template) = &template {
1538+
let mut indices: Vec<_> = has_tool_calls_per_choice
1539+
.iter()
1540+
.filter_map(|(index, has)| {
1541+
(*has && !terminated.contains(index) && !synthesized.contains(index))
1542+
.then_some(*index)
1543+
})
1544+
.collect();
1545+
indices.sort_unstable();
1546+
for index in indices {
1547+
yield stream_choice_chunk_from_template(
1548+
template,
1549+
index,
1550+
None,
1551+
None,
1552+
Some(FinishReason::ToolCalls),
1553+
);
1554+
synthesized.insert(index);
15841555
}
1585-
synthesized = true;
15861556
}
15871557

15881558
yield response;
@@ -1597,12 +1567,24 @@ impl JailedStream {
15971567
// complete; without this they hang until their client-side timeout.
15981568
// Choices that never emitted tool calls are left alone — there
15991569
// is no signal to invent a finish_reason from for text-only output.
1600-
if !synthesized && let Some(template) = template {
1601-
for (index, _) in has_tool_calls_per_choice.iter().filter(|(_, has)| **has) {
1602-
if terminated.contains(index) {
1603-
continue;
1604-
}
1605-
yield Self::synthesize_tool_calls_chunk(&template, *index);
1570+
if let Some(template) = template {
1571+
let mut indices: Vec<_> = has_tool_calls_per_choice
1572+
.iter()
1573+
.filter_map(|(index, has)| {
1574+
(*has && !terminated.contains(index) && !synthesized.contains(index))
1575+
.then_some(*index)
1576+
})
1577+
.collect();
1578+
indices.sort_unstable();
1579+
for index in indices {
1580+
yield stream_choice_chunk_from_template(
1581+
&template,
1582+
index,
1583+
None,
1584+
None,
1585+
Some(FinishReason::ToolCalls),
1586+
);
1587+
synthesized.insert(index);
16061588
}
16071589
}
16081590
}
@@ -1918,6 +1900,40 @@ mod tests {
19181900
}
19191901
}
19201902

1903+
/// Build one data chunk whose choices have already emitted tool-call deltas.
1904+
fn tool_call_choices_chunk(indices: &[u32]) -> Annotated<NvCreateChatCompletionStreamResponse> {
1905+
let mut chunk = text_chunk("");
1906+
let data = chunk.data.as_mut().expect("tool-call response data");
1907+
#[allow(deprecated)]
1908+
{
1909+
data.inner.choices = indices
1910+
.iter()
1911+
.map(|index| ChatChoiceStream {
1912+
index: *index,
1913+
delta: ChatCompletionStreamResponseDelta {
1914+
role: Some(Role::Assistant),
1915+
content: None,
1916+
tool_calls: Some(vec![ChatCompletionMessageToolCallChunk {
1917+
index: 0,
1918+
id: Some(format!("call-{index}")),
1919+
r#type: Some(FunctionType::Function),
1920+
function: Some(FunctionCallStream {
1921+
name: Some(format!("tool_{index}")),
1922+
arguments: Some("{}".to_string()),
1923+
}),
1924+
}]),
1925+
function_call: None,
1926+
refusal: None,
1927+
reasoning_content: None,
1928+
},
1929+
finish_reason: None,
1930+
logprobs: None,
1931+
})
1932+
.collect();
1933+
}
1934+
chunk
1935+
}
1936+
19211937
fn heartbeat() -> Annotated<NvCreateChatCompletionStreamResponse> {
19221938
Annotated {
19231939
data: None,
@@ -2376,4 +2392,65 @@ mod tests {
23762392
"synthesized ToolCalls chunk must not repeat LLM metrics"
23772393
);
23782394
}
2395+
2396+
// An empty-choices chunk can precede tool deltas (for example, a metadata
2397+
// response). It must not disable later synthesis. When several choices then
2398+
// emit tool calls, their terminal chunks must be ordered by choice index.
2399+
#[tokio::test]
2400+
async fn jail_synthesizes_late_tool_choices_in_index_order() {
2401+
let chunks = vec![
2402+
usage_only_chunk(),
2403+
tool_call_choices_chunk(&[2, 0, 1]),
2404+
usage_only_chunk(),
2405+
];
2406+
2407+
let responses: Vec<_> =
2408+
JailedStream::fix_finish_reason(stream::iter(chunks), JailMode::MarkerBased, false)
2409+
.collect()
2410+
.await;
2411+
2412+
let usage_positions: Vec<_> = responses
2413+
.iter()
2414+
.enumerate()
2415+
.filter_map(|(position, response)| {
2416+
response
2417+
.data
2418+
.as_ref()
2419+
.is_some_and(|data| data.inner.choices.is_empty() && data.inner.usage.is_some())
2420+
.then_some(position)
2421+
})
2422+
.collect();
2423+
assert_eq!(
2424+
usage_positions.len(),
2425+
2,
2426+
"both empty-choices chunks must pass through"
2427+
);
2428+
2429+
let terminals: Vec<_> = responses
2430+
.iter()
2431+
.enumerate()
2432+
.flat_map(|(position, response)| {
2433+
response.data.iter().flat_map(move |data| {
2434+
data.inner.choices.iter().filter_map(move |choice| {
2435+
(choice.finish_reason == Some(FinishReason::ToolCalls))
2436+
.then_some((position, choice.index))
2437+
})
2438+
})
2439+
})
2440+
.collect();
2441+
assert_eq!(
2442+
terminals
2443+
.iter()
2444+
.map(|(_, index)| *index)
2445+
.collect::<Vec<_>>(),
2446+
vec![0, 1, 2],
2447+
"synthetic terminal chunks must be deterministic"
2448+
);
2449+
assert!(
2450+
terminals.iter().all(|(position, _)| {
2451+
usage_positions[0] < *position && *position < usage_positions[1]
2452+
}),
2453+
"terminal chunks must follow the early empty response and precede the final usage response"
2454+
);
2455+
}
23792456
}

0 commit comments

Comments
 (0)