Skip to content

Commit 583a045

Browse files
committed
Fix async tool handling for compatibility with all LLMs.
1 parent dc5b94f commit 583a045

4 files changed

Lines changed: 50 additions & 10 deletions

File tree

src/pipecat/frames/frames.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1918,12 +1918,16 @@ class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
19181918
tool_call_id: Unique identifier for this function call.
19191919
arguments: Arguments passed to the function.
19201920
cancel_on_interruption: Whether to cancel this call if interrupted.
1921+
is_async: Whether this function call runs asynchronously. When True,
1922+
the LLM continues the conversation immediately without waiting for
1923+
the result. The result is injected later via a developer message.
19211924
"""
19221925

19231926
function_name: str
19241927
tool_call_id: str
19251928
arguments: Any
19261929
cancel_on_interruption: bool = False
1930+
is_async: bool = False
19271931

19281932

19291933
@dataclass

src/pipecat/processors/aggregators/llm_response_universal.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,16 +1067,25 @@ async def _handle_function_call_result(self, frame: FunctionCallResultFrame):
10671067
)
10681068
return
10691069

1070+
in_progress_frame = self._function_calls_in_progress[frame.tool_call_id]
1071+
is_async = in_progress_frame.is_async if in_progress_frame else False
10701072
del self._function_calls_in_progress[frame.tool_call_id]
10711073

10721074
properties = frame.properties
10731075

1074-
# Update context with the function call result
1075-
if frame.result:
1076-
result = json.dumps(frame.result, ensure_ascii=False)
1077-
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
1076+
result = json.dumps(frame.result, ensure_ascii=False) if frame.result else "COMPLETED"
1077+
1078+
if is_async:
1079+
# For async function calls instead of updating the existing IN_PROGRESS tool message we inject
1080+
# a new developer message so the LLM is notified of the completed result.
1081+
self._context.add_message(
1082+
{
1083+
"role": "developer",
1084+
"content": f"Async function with id '{frame.tool_call_id}' completed with result: {result}",
1085+
}
1086+
)
10781087
else:
1079-
self._update_function_call_result(frame.function_name, frame.tool_call_id, "COMPLETED")
1088+
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
10801089

10811090
run_llm = False
10821091

@@ -1090,13 +1099,17 @@ async def _handle_function_call_result(self, frame: FunctionCallResultFrame):
10901099
run_llm = await self._maybe_append_image_to_context(image_frame)
10911100

10921101
# Run inference if the function call result requires it.
1093-
if frame.result:
1102+
if frame.result or is_async:
10941103
if properties and properties.run_llm is not None:
10951104
# If the tool call result has a run_llm property, use it.
10961105
run_llm = properties.run_llm
10971106
elif frame.run_llm is not None:
10981107
# If the frame is indicating we should run the LLM, do it.
10991108
run_llm = frame.run_llm
1109+
elif is_async:
1110+
# For async function calls, always run the LLM so it can
1111+
# respond to the completed result injected as a developer message.
1112+
run_llm = True
11001113
else:
11011114
# If this is the last function call in progress, run the LLM.
11021115
run_llm = not bool(self._function_calls_in_progress)

src/pipecat/services/llm_service.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,17 @@ class FunctionCallRegistryItem:
124124
function_name: The name of the function (None for catch-all handler).
125125
handler: The handler for processing function call parameters.
126126
cancel_on_interruption: Whether to cancel the call on interruption.
127+
is_async: Whether this function call runs asynchronously. When True,
128+
the LLM continues the conversation immediately without waiting for
129+
the result. The result is injected later via a developer message.
127130
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
128131
``function_call_timeout_secs`` for this specific function.
129132
"""
130133

131134
function_name: Optional[str]
132135
handler: FunctionCallHandler | "DirectFunctionWrapper"
133136
cancel_on_interruption: bool
137+
is_async: bool = False
134138
timeout_secs: Optional[float] = None
135139

136140

@@ -578,6 +582,7 @@ def register_function(
578582
handler: Any,
579583
*,
580584
cancel_on_interruption: bool = True,
585+
is_async: bool = False,
581586
timeout_secs: Optional[float] = None,
582587
):
583588
"""Register a function handler for LLM function calls.
@@ -589,6 +594,10 @@ def register_function(
589594
parameter.
590595
cancel_on_interruption: Whether to cancel this function call when an
591596
interruption occurs. Defaults to True.
597+
is_async: Whether this function call runs asynchronously. When True,
598+
the LLM continues the conversation immediately without waiting for
599+
the result. The result is injected later via a developer message.
600+
Defaults to False.
592601
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
593602
``function_call_timeout_secs`` for this specific function. Defaults to
594603
None, which uses the global timeout.
@@ -599,6 +608,7 @@ def register_function(
599608
function_name=function_name,
600609
handler=handler,
601610
cancel_on_interruption=cancel_on_interruption,
611+
is_async=is_async,
602612
timeout_secs=timeout_secs,
603613
)
604614

@@ -607,6 +617,7 @@ def register_direct_function(
607617
handler: DirectFunction,
608618
*,
609619
cancel_on_interruption: bool = True,
620+
is_async: bool = False,
610621
timeout_secs: Optional[float] = None,
611622
):
612623
"""Register a direct function handler for LLM function calls.
@@ -619,6 +630,10 @@ def register_direct_function(
619630
handler: The direct function to register. Must follow DirectFunction protocol.
620631
cancel_on_interruption: Whether to cancel this function call when an
621632
interruption occurs. Defaults to True.
633+
is_async: Whether this function call runs asynchronously. When True,
634+
the LLM continues the conversation immediately without waiting for
635+
the result. The result is injected later via a developer message.
636+
Defaults to False.
622637
timeout_secs: Optional per-tool timeout in seconds. Overrides the global
623638
``function_call_timeout_secs`` for this specific function. Defaults to
624639
None, which uses the global timeout.
@@ -628,6 +643,7 @@ def register_direct_function(
628643
function_name=wrapper.name,
629644
handler=wrapper,
630645
cancel_on_interruption=cancel_on_interruption,
646+
is_async=is_async,
631647
timeout_secs=timeout_secs,
632648
)
633649

@@ -766,6 +782,7 @@ async def _run_function_call(self, runner_item: FunctionCallRunnerItem):
766782
tool_call_id=runner_item.tool_call_id,
767783
arguments=runner_item.arguments,
768784
cancel_on_interruption=item.cancel_on_interruption,
785+
is_async=item.is_async,
769786
)
770787

771788
timeout_task: Optional[asyncio.Task] = None

src/pipecat/utils/context/llm_context_summarization.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,13 @@ def _get_earliest_function_call_not_resolved_in_range(
389389
390390
Scans messages from ``start_idx`` up to (but not including)
391391
``summary_end`` to identify tool calls whose responses either don't
392-
exist yet or fall in the kept portion of the context (>= summary_end).
392+
exist yet, fall in the kept portion of the context (>= summary_end),
393+
or are still marked as ``IN_PROGRESS`` (async calls whose results have
394+
not yet arrived).
395+
393396
This prevents summarizing tool call requests when their responses would
394-
remain in the kept context as orphans, which the OpenAI API rejects.
397+
remain in the kept context as orphans, which the OpenAI API rejects,
398+
and avoids summarizing async function calls before their results arrive.
395399
396400
Args:
397401
messages: List of messages to check.
@@ -428,11 +432,13 @@ def _get_earliest_function_call_not_resolved_in_range(
428432
if tool_call_id:
429433
pending_tool_calls[tool_call_id] = i
430434

431-
# Check for tool results
435+
# Check for tool results — treat IN_PROGRESS as still pending
436+
# (async function calls whose results have not yet arrived).
432437
if role == "tool":
433438
tool_call_id = msg.get("tool_call_id")
434439
if tool_call_id and tool_call_id in pending_tool_calls:
435-
pending_tool_calls.pop(tool_call_id)
440+
if msg.get("content") != "IN_PROGRESS":
441+
pending_tool_calls.pop(tool_call_id)
436442

437443
# If we have pending tool calls, return the earliest index
438444
if pending_tool_calls:

0 commit comments

Comments
 (0)