From 098a9737bf0d17fa063f9b93534e1251c7b98873 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 14:24:39 +0800 Subject: [PATCH 01/26] feat: ark litellm client --- veadk/agent.py | 24 +++++-- veadk/models/ark_llm.py | 138 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 veadk/models/ark_llm.py diff --git a/veadk/agent.py b/veadk/agent.py index dfc37db8..c687207f 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -151,6 +151,8 @@ class Agent(LlmAgent): tracers: list[BaseTracer] = [] + enable_responses: bool = False + run_processor: Optional[BaseRunProcessor] = Field(default=None, exclude=True) """Optional run processor for intercepting and processing agent execution flows. @@ -197,12 +199,22 @@ def model_post_init(self, __context: Any) -> None: logger.info(f"Model extra config: {self.model_extra_config}") if not self.model: - self.model = LiteLlm( - model=f"{self.model_provider}/{self.model_name}", - api_key=self.model_api_key, - api_base=self.model_api_base, - **self.model_extra_config, - ) + if self.enable_responses: + from veadk.models.ark_llm import ArkLlm + + self.model = ArkLlm( + model=f"{self.model_provider}/{self.model_name}", + api_key=self.model_api_key, + api_base=self.model_api_base, + **self.model_extra_config, + ) + else: + self.model = LiteLlm( + model=f"{self.model_provider}/{self.model_name}", + api_key=self.model_api_key, + api_base=self.model_api_base, + **self.model_extra_config, + ) logger.debug( f"LiteLLM client created with config: {self.model_extra_config}" ) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py new file mode 100644 index 00000000..1af091f3 --- /dev/null +++ b/veadk/models/ark_llm.py @@ -0,0 +1,138 @@ +import uuid +from datetime import datetime +from typing import Any, Dict, Union + +import litellm +from google.adk.models.lite_llm import ( + LiteLlm, + LiteLLMClient, +) +from litellm import Logging +from litellm import aresponses +from litellm.completion_extras.litellm_responses_transformation.transformation import ( + LiteLLMResponsesTransformationHandler, +) +from litellm.litellm_core_utils.get_litellm_params import get_litellm_params +from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider +from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper +from litellm.types.llms.openai import ResponsesAPIResponse +from litellm.types.utils import ModelResponse, LlmProviders +from litellm.utils import get_optional_params, ProviderConfigManager +from pydantic import Field + +from veadk.utils.logger import get_logger + +# This will add functions to prompts if functions are provided. +litellm.add_function_to_prompt = True + +logger = get_logger(__name__) + + +class ArkLlmClient(LiteLLMClient): + def __init__(self): + super().__init__() + self.transformation_handler = LiteLLMResponsesTransformationHandler() + + async def acompletion( + self, model, messages, tools, **kwargs + ) -> Union[ModelResponse, CustomStreamWrapper]: + # 1.1. Get optional_params using get_optional_params function + optional_params = get_optional_params(model=model, **kwargs) + + # 1.2. Get litellm_params using get_litellm_params function + litellm_params = get_litellm_params(**kwargs) + + # 1.3. Get headers by merging kwargs headers and extra_headers + headers = kwargs.get("headers", None) or kwargs.get("extra_headers", None) + if headers is None: + headers = {} + if kwargs.get("extra_headers") is not None: + headers.update(kwargs.get("extra_headers")) + + # 1.4. Get logging_obj from kwargs or create new LiteLLMLoggingObj + logging_obj = kwargs.get("litellm_logging_obj", None) + if logging_obj is None: + logging_obj = Logging( + model=model, + messages=messages, + stream=kwargs.get("stream", False), + call_type="acompletion", + litellm_call_id=str(uuid.uuid4()), + function_id=str(uuid.uuid4()), + start_time=datetime.now(), + kwargs=kwargs, + ) + # 1.5. Convert Message to `llm_provider` format + _, custom_llm_provider, _, _ = get_llm_provider(model=model) + if custom_llm_provider is not None and custom_llm_provider in [ + provider.value for provider in LlmProviders + ]: + provider_config = ProviderConfigManager.get_provider_chat_config( + model=model, provider=LlmProviders(custom_llm_provider) + ) + if provider_config is not None: + messages = provider_config.translate_developer_role_to_system_role( + messages=messages + ) + + # 1.6 Transform request to responses api format + request_data = self.transformation_handler.transform_request( + model=model, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + headers=headers, + litellm_logging_obj=logging_obj, + client=kwargs.get("client"), + ) + + # 2. Call litellm.aresponses with the transformed request data + result = await aresponses( + **request_data, + ) + + # 3.1 Create model_response object + model_response = ModelResponse() + setattr(model_response, "usage", litellm.Usage()) + + # 3.2 Transform ResponsesAPIResponse to ModelResponses + if isinstance(result, ResponsesAPIResponse): + return self.transformation_handler.transform_response( + model=model, + raw_response=result, + model_response=model_response, + logging_obj=logging_obj, + request_data=request_data, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + encoding=kwargs.get("encoding"), + api_key=kwargs.get("api_key"), + json_mode=kwargs.get("json_mode"), + ) + else: + completion_stream = self.transformation_handler.get_model_response_iterator( + streaming_response=result, # type: ignore + sync_stream=True, + json_mode=kwargs.get("json_mode"), + ) + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider=custom_llm_provider, + logging_obj=logging_obj, + ) + return streamwrapper + + +class ArkLlm(LiteLlm): + llm_client: ArkLlmClient = Field(default_factory=ArkLlmClient) + _additional_args: Dict[str, Any] = None + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # async def generate_content_async( + # self, llm_request: LlmRequest, stream: bool = False + # ) -> AsyncGenerator[LlmResponse, None]: + # pass From 92b170a2df041c5d1bbdeda560e79813b89e4bb2 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 15:30:48 +0800 Subject: [PATCH 02/26] feat: use the patch method to pass it --- veadk/agent.py | 2 + veadk/models/ark_llm.py | 208 ++++++++++++++++++++++++++++++++++++---- veadk/utils/patches.py | 64 +++++++++++++ 3 files changed, 258 insertions(+), 16 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index c687207f..3d45ef3b 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -200,8 +200,10 @@ def model_post_init(self, __context: Any) -> None: if not self.model: if self.enable_responses: + from veadk.utils.patches import patch_google_adk_call_llm_async from veadk.models.ark_llm import ArkLlm + patch_google_adk_call_llm_async() self.model = ArkLlm( model=f"{self.model_provider}/{self.model_name}", api_key=self.model_api_key, diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 1af091f3..874688ad 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -1,14 +1,23 @@ +import json import uuid from datetime import datetime -from typing import Any, Dict, Union +from typing import Any, Dict, Union, AsyncGenerator import litellm +from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( LiteLlm, LiteLLMClient, + _get_completion_inputs, + _model_response_to_chunk, + FunctionChunk, + TextChunk, + _message_to_generate_content_response, + UsageMetadataChunk, + _model_response_to_generate_content_response, ) -from litellm import Logging -from litellm import aresponses +from google.genai import types +from litellm import Logging, aresponses, ChatCompletionAssistantMessage from litellm.completion_extras.litellm_responses_transformation.transformation import ( LiteLLMResponsesTransformationHandler, ) @@ -16,7 +25,12 @@ from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper from litellm.types.llms.openai import ResponsesAPIResponse -from litellm.types.utils import ModelResponse, LlmProviders +from litellm.types.utils import ( + ModelResponse, + LlmProviders, + ChatCompletionMessageToolCall, + Function, +) from litellm.utils import get_optional_params, ProviderConfigManager from pydantic import Field @@ -28,6 +42,16 @@ logger = get_logger(__name__) +def _add_response_id_to_llm_response( + llm_response: LlmResponse, response: ModelResponse +) -> LlmResponse: + if not response.id.startswith("chatcmpl"): + if llm_response.custom_metadata is None: + llm_response.custom_metadata = {} + llm_response.custom_metadata["response_id"] = response["id"] + return llm_response + + class ArkLlmClient(LiteLLMClient): def __init__(self): super().__init__() @@ -37,7 +61,7 @@ async def acompletion( self, model, messages, tools, **kwargs ) -> Union[ModelResponse, CustomStreamWrapper]: # 1.1. Get optional_params using get_optional_params function - optional_params = get_optional_params(model=model, **kwargs) + optional_params = get_optional_params(model=model, tools=tools, **kwargs) # 1.2. Get litellm_params using get_litellm_params function litellm_params = get_litellm_params(**kwargs) @@ -74,8 +98,10 @@ async def acompletion( messages = provider_config.translate_developer_role_to_system_role( messages=messages ) - - # 1.6 Transform request to responses api format + # 1.6 Add response_id to llm_response + # Keep the header system-prompt and the user's messages + messages = messages[:1] + messages[-1:] + # 1.7 Transform request to responses api format request_data = self.transformation_handler.transform_request( model=model, messages=messages, @@ -87,7 +113,7 @@ async def acompletion( ) # 2. Call litellm.aresponses with the transformed request data - result = await aresponses( + raw_response = await aresponses( **request_data, ) @@ -96,10 +122,10 @@ async def acompletion( setattr(model_response, "usage", litellm.Usage()) # 3.2 Transform ResponsesAPIResponse to ModelResponses - if isinstance(result, ResponsesAPIResponse): - return self.transformation_handler.transform_response( + if isinstance(raw_response, ResponsesAPIResponse): + response = self.transformation_handler.transform_response( model=model, - raw_response=result, + raw_response=raw_response, model_response=model_response, logging_obj=logging_obj, request_data=request_data, @@ -110,9 +136,14 @@ async def acompletion( api_key=kwargs.get("api_key"), json_mode=kwargs.get("json_mode"), ) + # 3.2.1 Modify ModelResponse id + if raw_response and hasattr(raw_response, "id"): + response.id = raw_response.id + return response + else: completion_stream = self.transformation_handler.get_model_response_iterator( - streaming_response=result, # type: ignore + streaming_response=raw_response, # type: ignore sync_stream=True, json_mode=kwargs.get("json_mode"), ) @@ -132,7 +163,152 @@ class ArkLlm(LiteLlm): def __init__(self, **kwargs): super().__init__(**kwargs) - # async def generate_content_async( - # self, llm_request: LlmRequest, stream: bool = False - # ) -> AsyncGenerator[LlmResponse, None]: - # pass + async def generate_content_async( + self, llm_request: LlmRequest, stream: bool = False + ) -> AsyncGenerator[LlmResponse, None]: + """Generates content asynchronously. + + Args: + llm_request: LlmRequest, the request to send to the LiteLlm model. + stream: bool = False, whether to do streaming call. + + Yields: + LlmResponse: The model response. + """ + + self._maybe_append_user_content(llm_request) + # logger.debug(_build_request_log(llm_request)) + + messages, tools, response_format, generation_params = _get_completion_inputs( + llm_request + ) + + if "functions" in self._additional_args: + # LiteLLM does not support both tools and functions together. + tools = None + # ------------------------------------------------------ # + # get previous_response_id + previous_response_id = None + if llm_request.cache_metadata and llm_request.cache_metadata.cache_name: + previous_response_id = llm_request.cache_metadata.cache_name + # ------------------------------------------------------ # + completion_args = { + "model": self.model, + "messages": messages, + "tools": tools, + "response_format": response_format, + "previous_response_id": previous_response_id, # supply previous_response_id + } + completion_args.update(self._additional_args) + + if generation_params: + completion_args.update(generation_params) + + if stream: + text = "" + # Track function calls by index + function_calls = {} # index -> {name, args, id} + completion_args["stream"] = True + aggregated_llm_response = None + aggregated_llm_response_with_tool_call = None + usage_metadata = None + fallback_index = 0 + async for part in await self.llm_client.acompletion(**completion_args): + for chunk, finish_reason in _model_response_to_chunk(part): + if isinstance(chunk, FunctionChunk): + index = chunk.index or fallback_index + if index not in function_calls: + function_calls[index] = {"name": "", "args": "", "id": None} + + if chunk.name: + function_calls[index]["name"] += chunk.name + if chunk.args: + function_calls[index]["args"] += chunk.args + + # check if args is completed (workaround for improper chunk + # indexing) + try: + json.loads(function_calls[index]["args"]) + fallback_index += 1 + except json.JSONDecodeError: + pass + + function_calls[index]["id"] = ( + chunk.id or function_calls[index]["id"] or str(index) + ) + elif isinstance(chunk, TextChunk): + text += chunk.text + yield _message_to_generate_content_response( + ChatCompletionAssistantMessage( + role="assistant", + content=chunk.text, + ), + is_partial=True, + ) + elif isinstance(chunk, UsageMetadataChunk): + usage_metadata = types.GenerateContentResponseUsageMetadata( + prompt_token_count=chunk.prompt_tokens, + candidates_token_count=chunk.completion_tokens, + total_token_count=chunk.total_tokens, + ) + + if ( + finish_reason == "tool_calls" or finish_reason == "stop" + ) and function_calls: + tool_calls = [] + for index, func_data in function_calls.items(): + if func_data["id"]: + tool_calls.append( + ChatCompletionMessageToolCall( + type="function", + id=func_data["id"], + function=Function( + name=func_data["name"], + arguments=func_data["args"], + index=index, + ), + ) + ) + aggregated_llm_response_with_tool_call = ( + _message_to_generate_content_response( + ChatCompletionAssistantMessage( + role="assistant", + content=text, + tool_calls=tool_calls, + ) + ) + ) + text = "" + function_calls.clear() + elif finish_reason == "stop" and text: + aggregated_llm_response = _message_to_generate_content_response( + ChatCompletionAssistantMessage( + role="assistant", content=text + ) + ) + text = "" + + # waiting until streaming ends to yield the llm_response as litellm tends + # to send chunk that contains usage_metadata after the chunk with + # finish_reason set to tool_calls or stop. + if aggregated_llm_response: + if usage_metadata: + aggregated_llm_response.usage_metadata = usage_metadata + usage_metadata = None + yield aggregated_llm_response + + if aggregated_llm_response_with_tool_call: + if usage_metadata: + aggregated_llm_response_with_tool_call.usage_metadata = ( + usage_metadata + ) + yield aggregated_llm_response_with_tool_call + + else: + response = await self.llm_client.acompletion(**completion_args) + # ------------------------------------------------------ # + # Transport response id + # yield _model_response_to_generate_content_response(response) + llm_response = _model_response_to_generate_content_response(response) + yield _add_response_id_to_llm_response(llm_response, response) + # ------------------------------------------------------ # diff --git a/veadk/utils/patches.py b/veadk/utils/patches.py index 8df5c3a6..921ff5a0 100644 --- a/veadk/utils/patches.py +++ b/veadk/utils/patches.py @@ -16,6 +16,10 @@ import sys from typing import Callable +from google.adk.agents import InvocationContext +from google.adk.models import LlmRequest +from google.adk.models.cache_metadata import CacheMetadata + from veadk.tracing.telemetry.telemetry import ( trace_call_llm, trace_send_data, @@ -78,3 +82,63 @@ def patch_google_adk_telemetry() -> None: logger.debug( f"Patch {mod_name} {var_name} with {trace_functions[var_name]}" ) + + +# +# BaseLlmFlow._call_llm_async patch hook +# +def patch_google_adk_call_llm_async() -> None: + """Patch google.adk BaseLlmFlow._call_llm_async with a delegating wrapper. + + Current behavior: simply calls the original implementation and yields its results. + This provides a stable hook for later custom business logic without changing behavior now. + """ + # Prevent duplicate patches + if hasattr(patch_google_adk_call_llm_async, "_patched"): + logger.debug("BaseLlmFlow._call_llm_async already patched, skipping") + return + + try: + from google.adk.flows.llm_flows.base_llm_flow import BaseLlmFlow + + original_call_llm_async = BaseLlmFlow._call_llm_async + + async def patched_call_llm_async( + self, + invocation_context: InvocationContext, + llm_request: LlmRequest, + model_response_event, + ): + logger.debug( + "Patched BaseLlmFlow._call_llm_async invoked; delegating to original" + ) + events = invocation_context.session.events + if ( + events + and len(events) >= 2 + and events[-2].custom_metadata + and "response_id" in events[-2].custom_metadata + ): + previous_response_id = events[-2].custom_metadata["response_id"] + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + cached_contents_count=0, + ) + + async for llm_response in original_call_llm_async( + self, invocation_context, llm_request, model_response_event + ): + # Currently, just pass through the original responses + yield llm_response + + BaseLlmFlow._call_llm_async = patched_call_llm_async + + # Marked as patched to prevent duplicate application + patch_google_adk_call_llm_async._patched = True + logger.info("Successfully patched BaseLlmFlow._call_llm_async") + + except ImportError as e: + logger.warning(f"Failed to patch BaseLlmFlow._call_llm_async: {e}") From d2b75fead3d017e663c2268e353637f1c7dfe8cd Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 16:48:02 +0800 Subject: [PATCH 03/26] chore: organize the code --- veadk/models/ark_llm.py | 128 ++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 50 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 874688ad..036a19f2 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -60,20 +60,77 @@ def __init__(self): async def acompletion( self, model, messages, tools, **kwargs ) -> Union[ModelResponse, CustomStreamWrapper]: - # 1.1. Get optional_params using get_optional_params function + # 1 Modify messages + # Keep the header system-prompt and the user's messages + messages = messages[:1] + messages[-1:] + + # 2 Get request params + ( + request_data, + optional_params, + litellm_params, + logging_obj, + custom_llm_provider, + ) = self._get_request_data(model, messages, tools, **kwargs) + + # 3. Call litellm.aresponses with the transformed request data + raw_response = await aresponses( + **request_data, + ) + # 4. Transform ResponsesAPIResponse + # 4.1 Create model_response object + model_response = ModelResponse() + setattr(model_response, "usage", litellm.Usage()) + + # 4.2 Transform ResponsesAPIResponse to ModelResponses + if isinstance(raw_response, ResponsesAPIResponse): + response = self.transformation_handler.transform_response( + model=model, + raw_response=raw_response, + model_response=model_response, + logging_obj=logging_obj, + request_data=request_data, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + encoding=kwargs.get("encoding"), + api_key=kwargs.get("api_key"), + json_mode=kwargs.get("json_mode"), + ) + # 4.2.1 Modify ModelResponse id + if raw_response and hasattr(raw_response, "id"): + response.id = raw_response.id + return response + + else: + completion_stream = self.transformation_handler.get_model_response_iterator( + streaming_response=raw_response, # type: ignore + sync_stream=True, + json_mode=kwargs.get("json_mode"), + ) + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider=custom_llm_provider, + logging_obj=logging_obj, + ) + return streamwrapper + + def _get_request_data(self, model, messages, tools, **kwargs) -> tuple: + # 1. Get optional_params using get_optional_params function optional_params = get_optional_params(model=model, tools=tools, **kwargs) - # 1.2. Get litellm_params using get_litellm_params function + # 2. Get litellm_params using get_litellm_params function litellm_params = get_litellm_params(**kwargs) - # 1.3. Get headers by merging kwargs headers and extra_headers + # 3. Get headers by merging kwargs headers and extra_headers headers = kwargs.get("headers", None) or kwargs.get("extra_headers", None) if headers is None: headers = {} if kwargs.get("extra_headers") is not None: headers.update(kwargs.get("extra_headers")) - # 1.4. Get logging_obj from kwargs or create new LiteLLMLoggingObj + # 4. Get logging_obj from kwargs or create new LiteLLMLoggingObj logging_obj = kwargs.get("litellm_logging_obj", None) if logging_obj is None: logging_obj = Logging( @@ -86,7 +143,7 @@ async def acompletion( start_time=datetime.now(), kwargs=kwargs, ) - # 1.5. Convert Message to `llm_provider` format + # 4. Convert Message to `llm_provider` format _, custom_llm_provider, _, _ = get_llm_provider(model=model) if custom_llm_provider is not None and custom_llm_provider in [ provider.value for provider in LlmProviders @@ -98,10 +155,8 @@ async def acompletion( messages = provider_config.translate_developer_role_to_system_role( messages=messages ) - # 1.6 Add response_id to llm_response - # Keep the header system-prompt and the user's messages - messages = messages[:1] + messages[-1:] - # 1.7 Transform request to responses api format + + # 5 Transform request to responses api format request_data = self.transformation_handler.transform_request( model=model, messages=messages, @@ -112,49 +167,22 @@ async def acompletion( client=kwargs.get("client"), ) - # 2. Call litellm.aresponses with the transformed request data - raw_response = await aresponses( - **request_data, + # 6 handler Missing field supply + if "extra_body" not in request_data and kwargs.get("extra_body"): + request_data["extra_body"] = kwargs.get("extra_body") + if "extra_query" not in request_data and kwargs.get("extra_query"): + request_data["extra_query"] = kwargs.get("extra_query") + if "extra_headers" not in request_data and kwargs.get("extra_headers"): + request_data["extra_headers"] = kwargs.get("extra_headers") + + return ( + request_data, + optional_params, + litellm_params, + logging_obj, + custom_llm_provider, ) - # 3.1 Create model_response object - model_response = ModelResponse() - setattr(model_response, "usage", litellm.Usage()) - - # 3.2 Transform ResponsesAPIResponse to ModelResponses - if isinstance(raw_response, ResponsesAPIResponse): - response = self.transformation_handler.transform_response( - model=model, - raw_response=raw_response, - model_response=model_response, - logging_obj=logging_obj, - request_data=request_data, - messages=messages, - optional_params=optional_params, - litellm_params=litellm_params, - encoding=kwargs.get("encoding"), - api_key=kwargs.get("api_key"), - json_mode=kwargs.get("json_mode"), - ) - # 3.2.1 Modify ModelResponse id - if raw_response and hasattr(raw_response, "id"): - response.id = raw_response.id - return response - - else: - completion_stream = self.transformation_handler.get_model_response_iterator( - streaming_response=raw_response, # type: ignore - sync_stream=True, - json_mode=kwargs.get("json_mode"), - ) - streamwrapper = CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider=custom_llm_provider, - logging_obj=logging_obj, - ) - return streamwrapper - class ArkLlm(LiteLlm): llm_client: ArkLlmClient = Field(default_factory=ArkLlmClient) From 36030500336b73895ced49f9e69cb67959bfa4be Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 18:03:15 +0800 Subject: [PATCH 04/26] feat: openai sdk for responses api --- veadk/models/ark_llm.py | 93 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 4 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 036a19f2..b05cb991 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Union, AsyncGenerator import litellm +from openai import OpenAI from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( LiteLlm, @@ -17,7 +18,7 @@ _model_response_to_generate_content_response, ) from google.genai import types -from litellm import Logging, aresponses, ChatCompletionAssistantMessage +from litellm import Logging, ChatCompletionAssistantMessage from litellm.completion_extras.litellm_responses_transformation.transformation import ( LiteLLMResponsesTransformationHandler, ) @@ -41,6 +42,40 @@ logger = get_logger(__name__) +openai_supported_fields = [ + "stream", + "background", + "include", + "input", + "instructions", + "max_output_tokens", + "max_tool_calls", + "metadata", + "model", + "parallel_tool_calls", + "previous_response_id", + "prompt", + "prompt_cache_key", + "reasoning", + "safety_identifier", + "service_tier", + "store", + "stream", + "stream_options", + "temperature", + "text", + "tool_choice", + "tools", + "top_logprobs", + "top_p", + "truncation", + "user", + "extra_headers", + "extra_query", + "extra_body", + "timeout", +] + def _add_response_id_to_llm_response( llm_response: LlmResponse, response: ModelResponse @@ -52,6 +87,52 @@ def _add_response_id_to_llm_response( return llm_response +async def openai_response_async(request_data: dict): + # Filter out fields that are not supported by OpenAI SDK + filtered_request_data = { + key: value + for key, value in request_data.items() + if key in openai_supported_fields and value is not None + } + model_name, custom_llm_provider, _, _ = get_llm_provider( + model=request_data["model"] + ) + filtered_request_data["model"] = model_name # remove custom_llm_provider + + if ( + "tools" in filtered_request_data + and "extra_body" in filtered_request_data + and isinstance(filtered_request_data["extra_body"], dict) + and "caching" in filtered_request_data["extra_body"] + and isinstance(filtered_request_data["extra_body"]["caching"], dict) + and filtered_request_data["extra_body"]["caching"].get("type") == "enabled" + and "previous_response_id" in filtered_request_data + and filtered_request_data["previous_response_id"] is not None + ): + # Remove tools when caching is enabled and previous_response_id is present + del filtered_request_data["tools"] + + # Remove instructions when caching is enabled with specific configuration + if ( + "instructions" in filtered_request_data + and "extra_body" in filtered_request_data + and isinstance(filtered_request_data["extra_body"], dict) + and "caching" in filtered_request_data["extra_body"] + and isinstance(filtered_request_data["extra_body"]["caching"], dict) + and filtered_request_data["extra_body"]["caching"].get("type") == "enabled" + ): + # Remove instructions when caching is enabled + del filtered_request_data["instructions"] + + client = OpenAI( + base_url=request_data["api_base"], + api_key=request_data["api_key"], + ) + openai_response = client.responses.create(**filtered_request_data) + raw_response = ResponsesAPIResponse(**openai_response.model_dump()) + return raw_response + + class ArkLlmClient(LiteLLMClient): def __init__(self): super().__init__() @@ -74,9 +155,13 @@ async def acompletion( ) = self._get_request_data(model, messages, tools, **kwargs) # 3. Call litellm.aresponses with the transformed request data - raw_response = await aresponses( - **request_data, - ) + # Cannot be called directly; there is a litellm bug : + # https://github.com/BerriAI/litellm/issues/16267 + # raw_response = await aresponses( + # **request_data, + # ) + raw_response = await openai_response_async(request_data) + # 4. Transform ResponsesAPIResponse # 4.1 Create model_response object model_response = ModelResponse() From 15fb4807e88e1a8593904b56a6ce760da24be5ed Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 19:23:11 +0800 Subject: [PATCH 05/26] feat: use callback instead patch --- veadk/agent.py | 9 ++++-- veadk/models/ark_llm.py | 37 ++++++++++++++++++++++-- veadk/utils/patches.py | 63 ----------------------------------------- 3 files changed, 41 insertions(+), 68 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index 3d45ef3b..6c786bf4 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -36,6 +36,7 @@ from veadk.knowledgebase import KnowledgeBase from veadk.memory.long_term_memory import LongTermMemory from veadk.memory.short_term_memory import ShortTermMemory +from veadk.models.ark_llm import add_previous_response_id from veadk.processors import BaseRunProcessor, NoOpRunProcessor from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer @@ -200,16 +201,20 @@ def model_post_init(self, __context: Any) -> None: if not self.model: if self.enable_responses: - from veadk.utils.patches import patch_google_adk_call_llm_async + # from veadk.utils.patches import patch_google_adk_call_llm_async from veadk.models.ark_llm import ArkLlm - patch_google_adk_call_llm_async() + # patch_google_adk_call_llm_async() self.model = ArkLlm( model=f"{self.model_provider}/{self.model_name}", api_key=self.model_api_key, api_base=self.model_api_base, **self.model_extra_config, ) + if not self.before_model_callback: + self.before_model_callback = add_previous_response_id + else: + self.before_model_callback.append(add_previous_response_id) else: self.model = LiteLlm( model=f"{self.model_provider}/{self.model_name}", diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index b05cb991..fcd53ebe 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -1,9 +1,11 @@ import json import uuid from datetime import datetime -from typing import Any, Dict, Union, AsyncGenerator +from typing import Any, Dict, Union, AsyncGenerator, Optional import litellm +from google.adk.agents.callback_context import CallbackContext +from google.adk.models.cache_metadata import CacheMetadata from openai import OpenAI from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( @@ -77,13 +79,20 @@ ] -def _add_response_id_to_llm_response( +def _add_response_data_to_llm_response( llm_response: LlmResponse, response: ModelResponse ) -> LlmResponse: + # add responses id if not response.id.startswith("chatcmpl"): if llm_response.custom_metadata is None: llm_response.custom_metadata = {} llm_response.custom_metadata["response_id"] = response["id"] + # add responses cache data + if response.get("usage", {}).get("prompt_tokens_details"): + if llm_response.usage_metadata: + llm_response.usage_metadata.cached_content_token_count = ( + response.get("usage", {}).get("prompt_tokens_details").cached_tokens + ) return llm_response @@ -423,5 +432,27 @@ async def generate_content_async( # Transport response id # yield _model_response_to_generate_content_response(response) llm_response = _model_response_to_generate_content_response(response) - yield _add_response_id_to_llm_response(llm_response, response) + yield _add_response_data_to_llm_response(llm_response, response) # ------------------------------------------------------ # + + +def add_previous_response_id( + callback_context: CallbackContext, llm_request: LlmRequest +) -> Optional[LlmResponse]: + invocation_context = callback_context._invocation_context + events = invocation_context.session.events + if ( + events + and len(events) >= 2 + and events[-2].custom_metadata + and "response_id" in events[-2].custom_metadata + ): + previous_response_id = events[-2].custom_metadata["response_id"] + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + cached_contents_count=0, + ) + return diff --git a/veadk/utils/patches.py b/veadk/utils/patches.py index 921ff5a0..b894063c 100644 --- a/veadk/utils/patches.py +++ b/veadk/utils/patches.py @@ -16,9 +16,6 @@ import sys from typing import Callable -from google.adk.agents import InvocationContext -from google.adk.models import LlmRequest -from google.adk.models.cache_metadata import CacheMetadata from veadk.tracing.telemetry.telemetry import ( trace_call_llm, @@ -82,63 +79,3 @@ def patch_google_adk_telemetry() -> None: logger.debug( f"Patch {mod_name} {var_name} with {trace_functions[var_name]}" ) - - -# -# BaseLlmFlow._call_llm_async patch hook -# -def patch_google_adk_call_llm_async() -> None: - """Patch google.adk BaseLlmFlow._call_llm_async with a delegating wrapper. - - Current behavior: simply calls the original implementation and yields its results. - This provides a stable hook for later custom business logic without changing behavior now. - """ - # Prevent duplicate patches - if hasattr(patch_google_adk_call_llm_async, "_patched"): - logger.debug("BaseLlmFlow._call_llm_async already patched, skipping") - return - - try: - from google.adk.flows.llm_flows.base_llm_flow import BaseLlmFlow - - original_call_llm_async = BaseLlmFlow._call_llm_async - - async def patched_call_llm_async( - self, - invocation_context: InvocationContext, - llm_request: LlmRequest, - model_response_event, - ): - logger.debug( - "Patched BaseLlmFlow._call_llm_async invoked; delegating to original" - ) - events = invocation_context.session.events - if ( - events - and len(events) >= 2 - and events[-2].custom_metadata - and "response_id" in events[-2].custom_metadata - ): - previous_response_id = events[-2].custom_metadata["response_id"] - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - cached_contents_count=0, - ) - - async for llm_response in original_call_llm_async( - self, invocation_context, llm_request, model_response_event - ): - # Currently, just pass through the original responses - yield llm_response - - BaseLlmFlow._call_llm_async = patched_call_llm_async - - # Marked as patched to prevent duplicate application - patch_google_adk_call_llm_async._patched = True - logger.info("Successfully patched BaseLlmFlow._call_llm_async") - - except ImportError as e: - logger.warning(f"Failed to patch BaseLlmFlow._call_llm_async: {e}") From 37bc9712bf9cdd8898af13753b620f6be4c73a1d Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 19:24:27 +0800 Subject: [PATCH 06/26] feat: format patches --- veadk/utils/patches.py | 1 - 1 file changed, 1 deletion(-) diff --git a/veadk/utils/patches.py b/veadk/utils/patches.py index b894063c..8df5c3a6 100644 --- a/veadk/utils/patches.py +++ b/veadk/utils/patches.py @@ -16,7 +16,6 @@ import sys from typing import Callable - from veadk.tracing.telemetry.telemetry import ( trace_call_llm, trace_send_data, From 0ab4d40750a292cb9c681230106dbc07a96c97b8 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 5 Nov 2025 20:33:42 +0800 Subject: [PATCH 07/26] feat: system-prompt --- veadk/agent.py | 2 -- veadk/models/ark_llm.py | 44 +++++++++++++++++++++++++++++++++-------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index 6c786bf4..b1ea66ce 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -201,10 +201,8 @@ def model_post_init(self, __context: Any) -> None: if not self.model: if self.enable_responses: - # from veadk.utils.patches import patch_google_adk_call_llm_async from veadk.models.ark_llm import ArkLlm - # patch_google_adk_call_llm_async() self.model = ArkLlm( model=f"{self.model_provider}/{self.model_name}", api_key=self.model_api_key, diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index fcd53ebe..7b415b03 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -108,8 +108,18 @@ async def openai_response_async(request_data: dict): ) filtered_request_data["model"] = model_name # remove custom_llm_provider + # Remove tools in subsequent rounds (when previous_response_id is present) if ( "tools" in filtered_request_data + and "previous_response_id" in filtered_request_data + and filtered_request_data["previous_response_id"] is not None + ): + # Remove tools in subsequent rounds regardless of caching status + del filtered_request_data["tools"] + + # Ensure thinking field consistency for cache usage + if ( + "thinking" in filtered_request_data and "extra_body" in filtered_request_data and isinstance(filtered_request_data["extra_body"], dict) and "caching" in filtered_request_data["extra_body"] @@ -118,20 +128,37 @@ async def openai_response_async(request_data: dict): and "previous_response_id" in filtered_request_data and filtered_request_data["previous_response_id"] is not None ): - # Remove tools when caching is enabled and previous_response_id is present - del filtered_request_data["tools"] + # For cache usage, thinking should be consistent with previous round + # If thinking is present but inconsistent, remove it to avoid cache miss + # Note: This is a placeholder - actual consistency check requires state tracking + pass - # Remove instructions when caching is enabled with specific configuration + # Ensure store field is true or default when caching is enabled if ( - "instructions" in filtered_request_data - and "extra_body" in filtered_request_data + "extra_body" in filtered_request_data and isinstance(filtered_request_data["extra_body"], dict) and "caching" in filtered_request_data["extra_body"] and isinstance(filtered_request_data["extra_body"]["caching"], dict) and filtered_request_data["extra_body"]["caching"].get("type") == "enabled" ): - # Remove instructions when caching is enabled - del filtered_request_data["instructions"] + # Set store to true when caching is enabled for writing + if "store" not in filtered_request_data: + filtered_request_data["store"] = True + elif filtered_request_data["store"] is False: + # Override false to true for cache writing + filtered_request_data["store"] = True + + # [NOTE] Due to the Volcano Ark settings, there is a conflict between the cache and the instructions field. + # If a system prompt is needed, it should be placed in the system role message within the input, instead of using the instructions parameter. + # https://www.volcengine.com/docs/82379/1585128 + instructions = filtered_request_data.pop("instructions", None) + filtered_request_data["input"] = [ + { + "content": [{"text": instructions, "type": "input_text"}], + "role": "system", + "type": "message", + } + ] + filtered_request_data["input"] client = OpenAI( base_url=request_data["api_base"], @@ -164,7 +191,8 @@ async def acompletion( ) = self._get_request_data(model, messages, tools, **kwargs) # 3. Call litellm.aresponses with the transformed request data - # Cannot be called directly; there is a litellm bug : + # [NOTE] Cannot be called directly; there is a litellm bug, + # Therefore, we cannot directly call litellm.aresponses: # https://github.com/BerriAI/litellm/issues/16267 # raw_response = await aresponses( # **request_data, From 13564de53c0914d64f172d2061b0d392dedaf732 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Thu, 6 Nov 2025 11:09:31 +0800 Subject: [PATCH 08/26] fix: async openai --- veadk/models/ark_llm.py | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 7b415b03..6aafa56c 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -6,7 +6,8 @@ import litellm from google.adk.agents.callback_context import CallbackContext from google.adk.models.cache_metadata import CacheMetadata -from openai import OpenAI +from litellm.responses.streaming_iterator import BaseResponsesAPIStreamingIterator +from openai import AsyncOpenAI from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( LiteLlm, @@ -96,7 +97,9 @@ def _add_response_data_to_llm_response( return llm_response -async def openai_response_async(request_data: dict): +async def openai_response_async( + request_data: dict, +) -> Union[ResponsesAPIResponse, BaseResponsesAPIStreamingIterator]: # Filter out fields that are not supported by OpenAI SDK filtered_request_data = { key: value @@ -108,7 +111,7 @@ async def openai_response_async(request_data: dict): ) filtered_request_data["model"] = model_name # remove custom_llm_provider - # Remove tools in subsequent rounds (when previous_response_id is present) + # [Note: Ark Limitations] Remove tools in subsequent rounds (when previous_response_id is present) if ( "tools" in filtered_request_data and "previous_response_id" in filtered_request_data @@ -117,7 +120,7 @@ async def openai_response_async(request_data: dict): # Remove tools in subsequent rounds regardless of caching status del filtered_request_data["tools"] - # Ensure thinking field consistency for cache usage + # [Note: Ark Limitations] Ensure thinking field consistency for cache usage if ( "thinking" in filtered_request_data and "extra_body" in filtered_request_data @@ -133,7 +136,7 @@ async def openai_response_async(request_data: dict): # Note: This is a placeholder - actual consistency check requires state tracking pass - # Ensure store field is true or default when caching is enabled + # [Note: Ark Limitations] Ensure store field is true or default when caching is enabled if ( "extra_body" in filtered_request_data and isinstance(filtered_request_data["extra_body"], dict) @@ -160,13 +163,20 @@ async def openai_response_async(request_data: dict): } ] + filtered_request_data["input"] - client = OpenAI( + client = AsyncOpenAI( base_url=request_data["api_base"], api_key=request_data["api_key"], ) - openai_response = client.responses.create(**filtered_request_data) - raw_response = ResponsesAPIResponse(**openai_response.model_dump()) - return raw_response + if "stream" in filtered_request_data and filtered_request_data["stream"]: + # For streaming responses, return the streaming iterator directly + # stream_response = await client.responses.create(**filtered_request_data) + # return stream_response + raise NotImplementedError("Not implemented streaming responses api") + else: + # For non-streaming responses, return the ResponsesAPIResponse + openai_response = await client.responses.create(**filtered_request_data) + raw_response = ResponsesAPIResponse(**openai_response.model_dump()) + return raw_response class ArkLlmClient(LiteLLMClient): @@ -194,18 +204,17 @@ async def acompletion( # [NOTE] Cannot be called directly; there is a litellm bug, # Therefore, we cannot directly call litellm.aresponses: # https://github.com/BerriAI/litellm/issues/16267 - # raw_response = await aresponses( + # raw_response = await litellm.aresponses( # **request_data, # ) raw_response = await openai_response_async(request_data) # 4. Transform ResponsesAPIResponse - # 4.1 Create model_response object - model_response = ModelResponse() - setattr(model_response, "usage", litellm.Usage()) - - # 4.2 Transform ResponsesAPIResponse to ModelResponses + # 4.1 Transform ResponsesAPIResponse to ModelResponses if isinstance(raw_response, ResponsesAPIResponse): + # 4.2 Create model_response object + model_response = ModelResponse() + setattr(model_response, "usage", litellm.Usage()) response = self.transformation_handler.transform_response( model=model, raw_response=raw_response, From 5b9e6d76d527948ef57e6dda665bb08c58ccf67e Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Sun, 9 Nov 2025 14:24:31 +0800 Subject: [PATCH 09/26] fix: acompletion to aresponse --- veadk/models/ark_llm.py | 346 +++++++------------------------------- veadk/models/transform.py | 202 ++++++++++++++++++++++ 2 files changed, 259 insertions(+), 289 deletions(-) create mode 100644 veadk/models/transform.py diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 6aafa56c..8cf243bc 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -1,17 +1,14 @@ import json -import uuid -from datetime import datetime from typing import Any, Dict, Union, AsyncGenerator, Optional import litellm +import openai +from openai.types.responses import Response as OpenAITypeResponse, ResponseStreamEvent from google.adk.agents.callback_context import CallbackContext from google.adk.models.cache_metadata import CacheMetadata -from litellm.responses.streaming_iterator import BaseResponsesAPIStreamingIterator -from openai import AsyncOpenAI from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( LiteLlm, - LiteLLMClient, _get_completion_inputs, _model_response_to_chunk, FunctionChunk, @@ -21,23 +18,16 @@ _model_response_to_generate_content_response, ) from google.genai import types -from litellm import Logging, ChatCompletionAssistantMessage -from litellm.completion_extras.litellm_responses_transformation.transformation import ( - LiteLLMResponsesTransformationHandler, -) -from litellm.litellm_core_utils.get_litellm_params import get_litellm_params -from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider -from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper -from litellm.types.llms.openai import ResponsesAPIResponse +from litellm import ChatCompletionAssistantMessage from litellm.types.utils import ( - ModelResponse, - LlmProviders, ChatCompletionMessageToolCall, Function, ) -from litellm.utils import get_optional_params, ProviderConfigManager from pydantic import Field +from veadk.models.transform import ( + CompletionToResponsesAPIHandler, +) from veadk.utils.logger import get_logger # This will add functions to prompts if functions are provided. @@ -45,279 +35,31 @@ logger = get_logger(__name__) -openai_supported_fields = [ - "stream", - "background", - "include", - "input", - "instructions", - "max_output_tokens", - "max_tool_calls", - "metadata", - "model", - "parallel_tool_calls", - "previous_response_id", - "prompt", - "prompt_cache_key", - "reasoning", - "safety_identifier", - "service_tier", - "store", - "stream", - "stream_options", - "temperature", - "text", - "tool_choice", - "tools", - "top_logprobs", - "top_p", - "truncation", - "user", - "extra_headers", - "extra_query", - "extra_body", - "timeout", -] - - -def _add_response_data_to_llm_response( - llm_response: LlmResponse, response: ModelResponse -) -> LlmResponse: - # add responses id - if not response.id.startswith("chatcmpl"): - if llm_response.custom_metadata is None: - llm_response.custom_metadata = {} - llm_response.custom_metadata["response_id"] = response["id"] - # add responses cache data - if response.get("usage", {}).get("prompt_tokens_details"): - if llm_response.usage_metadata: - llm_response.usage_metadata.cached_content_token_count = ( - response.get("usage", {}).get("prompt_tokens_details").cached_tokens - ) - return llm_response - - -async def openai_response_async( - request_data: dict, -) -> Union[ResponsesAPIResponse, BaseResponsesAPIStreamingIterator]: - # Filter out fields that are not supported by OpenAI SDK - filtered_request_data = { - key: value - for key, value in request_data.items() - if key in openai_supported_fields and value is not None - } - model_name, custom_llm_provider, _, _ = get_llm_provider( - model=request_data["model"] - ) - filtered_request_data["model"] = model_name # remove custom_llm_provider - - # [Note: Ark Limitations] Remove tools in subsequent rounds (when previous_response_id is present) - if ( - "tools" in filtered_request_data - and "previous_response_id" in filtered_request_data - and filtered_request_data["previous_response_id"] is not None - ): - # Remove tools in subsequent rounds regardless of caching status - del filtered_request_data["tools"] - - # [Note: Ark Limitations] Ensure thinking field consistency for cache usage - if ( - "thinking" in filtered_request_data - and "extra_body" in filtered_request_data - and isinstance(filtered_request_data["extra_body"], dict) - and "caching" in filtered_request_data["extra_body"] - and isinstance(filtered_request_data["extra_body"]["caching"], dict) - and filtered_request_data["extra_body"]["caching"].get("type") == "enabled" - and "previous_response_id" in filtered_request_data - and filtered_request_data["previous_response_id"] is not None - ): - # For cache usage, thinking should be consistent with previous round - # If thinking is present but inconsistent, remove it to avoid cache miss - # Note: This is a placeholder - actual consistency check requires state tracking - pass - - # [Note: Ark Limitations] Ensure store field is true or default when caching is enabled - if ( - "extra_body" in filtered_request_data - and isinstance(filtered_request_data["extra_body"], dict) - and "caching" in filtered_request_data["extra_body"] - and isinstance(filtered_request_data["extra_body"]["caching"], dict) - and filtered_request_data["extra_body"]["caching"].get("type") == "enabled" - ): - # Set store to true when caching is enabled for writing - if "store" not in filtered_request_data: - filtered_request_data["store"] = True - elif filtered_request_data["store"] is False: - # Override false to true for cache writing - filtered_request_data["store"] = True - - # [NOTE] Due to the Volcano Ark settings, there is a conflict between the cache and the instructions field. - # If a system prompt is needed, it should be placed in the system role message within the input, instead of using the instructions parameter. - # https://www.volcengine.com/docs/82379/1585128 - instructions = filtered_request_data.pop("instructions", None) - filtered_request_data["input"] = [ - { - "content": [{"text": instructions, "type": "input_text"}], - "role": "system", - "type": "message", - } - ] + filtered_request_data["input"] - - client = AsyncOpenAI( - base_url=request_data["api_base"], - api_key=request_data["api_key"], - ) - if "stream" in filtered_request_data and filtered_request_data["stream"]: - # For streaming responses, return the streaming iterator directly - # stream_response = await client.responses.create(**filtered_request_data) - # return stream_response - raise NotImplementedError("Not implemented streaming responses api") - else: - # For non-streaming responses, return the ResponsesAPIResponse - openai_response = await client.responses.create(**filtered_request_data) - raw_response = ResponsesAPIResponse(**openai_response.model_dump()) - return raw_response - - -class ArkLlmClient(LiteLLMClient): - def __init__(self): - super().__init__() - self.transformation_handler = LiteLLMResponsesTransformationHandler() - - async def acompletion( - self, model, messages, tools, **kwargs - ) -> Union[ModelResponse, CustomStreamWrapper]: - # 1 Modify messages - # Keep the header system-prompt and the user's messages - messages = messages[:1] + messages[-1:] - # 2 Get request params - ( - request_data, - optional_params, - litellm_params, - logging_obj, - custom_llm_provider, - ) = self._get_request_data(model, messages, tools, **kwargs) +class ArkLlmClient: + async def aresponse( + self, **kwargs + ) -> Union[OpenAITypeResponse, openai.AsyncStream[ResponseStreamEvent]]: + # 1. Get request params + api_base = kwargs.pop("api_base", None) + api_key = kwargs.pop("api_key", None) - # 3. Call litellm.aresponses with the transformed request data - # [NOTE] Cannot be called directly; there is a litellm bug, - # Therefore, we cannot directly call litellm.aresponses: - # https://github.com/BerriAI/litellm/issues/16267 - # raw_response = await litellm.aresponses( - # **request_data, - # ) - raw_response = await openai_response_async(request_data) - - # 4. Transform ResponsesAPIResponse - # 4.1 Transform ResponsesAPIResponse to ModelResponses - if isinstance(raw_response, ResponsesAPIResponse): - # 4.2 Create model_response object - model_response = ModelResponse() - setattr(model_response, "usage", litellm.Usage()) - response = self.transformation_handler.transform_response( - model=model, - raw_response=raw_response, - model_response=model_response, - logging_obj=logging_obj, - request_data=request_data, - messages=messages, - optional_params=optional_params, - litellm_params=litellm_params, - encoding=kwargs.get("encoding"), - api_key=kwargs.get("api_key"), - json_mode=kwargs.get("json_mode"), - ) - # 4.2.1 Modify ModelResponse id - if raw_response and hasattr(raw_response, "id"): - response.id = raw_response.id - return response - - else: - completion_stream = self.transformation_handler.get_model_response_iterator( - streaming_response=raw_response, # type: ignore - sync_stream=True, - json_mode=kwargs.get("json_mode"), - ) - streamwrapper = CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider=custom_llm_provider, - logging_obj=logging_obj, - ) - return streamwrapper - - def _get_request_data(self, model, messages, tools, **kwargs) -> tuple: - # 1. Get optional_params using get_optional_params function - optional_params = get_optional_params(model=model, tools=tools, **kwargs) - - # 2. Get litellm_params using get_litellm_params function - litellm_params = get_litellm_params(**kwargs) - - # 3. Get headers by merging kwargs headers and extra_headers - headers = kwargs.get("headers", None) or kwargs.get("extra_headers", None) - if headers is None: - headers = {} - if kwargs.get("extra_headers") is not None: - headers.update(kwargs.get("extra_headers")) - - # 4. Get logging_obj from kwargs or create new LiteLLMLoggingObj - logging_obj = kwargs.get("litellm_logging_obj", None) - if logging_obj is None: - logging_obj = Logging( - model=model, - messages=messages, - stream=kwargs.get("stream", False), - call_type="acompletion", - litellm_call_id=str(uuid.uuid4()), - function_id=str(uuid.uuid4()), - start_time=datetime.now(), - kwargs=kwargs, - ) - # 4. Convert Message to `llm_provider` format - _, custom_llm_provider, _, _ = get_llm_provider(model=model) - if custom_llm_provider is not None and custom_llm_provider in [ - provider.value for provider in LlmProviders - ]: - provider_config = ProviderConfigManager.get_provider_chat_config( - model=model, provider=LlmProviders(custom_llm_provider) - ) - if provider_config is not None: - messages = provider_config.translate_developer_role_to_system_role( - messages=messages - ) - - # 5 Transform request to responses api format - request_data = self.transformation_handler.transform_request( - model=model, - messages=messages, - optional_params=optional_params, - litellm_params=litellm_params, - headers=headers, - litellm_logging_obj=logging_obj, - client=kwargs.get("client"), + # 2. Call openai responses + client = openai.AsyncOpenAI( + base_url=api_base, + api_key=api_key, ) - # 6 handler Missing field supply - if "extra_body" not in request_data and kwargs.get("extra_body"): - request_data["extra_body"] = kwargs.get("extra_body") - if "extra_query" not in request_data and kwargs.get("extra_query"): - request_data["extra_query"] = kwargs.get("extra_query") - if "extra_headers" not in request_data and kwargs.get("extra_headers"): - request_data["extra_headers"] = kwargs.get("extra_headers") - - return ( - request_data, - optional_params, - litellm_params, - logging_obj, - custom_llm_provider, - ) + raw_response = await client.responses.create(**kwargs) + return raw_response class ArkLlm(LiteLlm): llm_client: ArkLlmClient = Field(default_factory=ArkLlmClient) _additional_args: Dict[str, Any] = None + transform_handler: CompletionToResponsesAPIHandler = Field( + default_factory=CompletionToResponsesAPIHandler + ) def __init__(self, **kwargs): super().__init__(**kwargs) @@ -362,17 +104,22 @@ async def generate_content_async( if generation_params: completion_args.update(generation_params) + response_args = self.transform_handler.transform_request(**completion_args) if stream: text = "" # Track function calls by index function_calls = {} # index -> {name, args, id} - completion_args["stream"] = True + response_args["stream"] = True aggregated_llm_response = None aggregated_llm_response_with_tool_call = None usage_metadata = None fallback_index = 0 - async for part in await self.llm_client.acompletion(**completion_args): + raw_response = await self.llm_client.aresponse(**response_args) + stream_events = self.transform_handler.transform_streamable_response( + raw_response, model=self.model + ) + async for part in stream_events: for chunk, finish_reason in _model_response_to_chunk(part): if isinstance(chunk, FunctionChunk): index = chunk.index or fallback_index @@ -464,15 +211,36 @@ async def generate_content_async( yield aggregated_llm_response_with_tool_call else: - response = await self.llm_client.acompletion(**completion_args) - # ------------------------------------------------------ # - # Transport response id - # yield _model_response_to_generate_content_response(response) - llm_response = _model_response_to_generate_content_response(response) - yield _add_response_data_to_llm_response(llm_response, response) - # ------------------------------------------------------ # + raw_response = await self.llm_client.aresponse(**response_args) + yield self._openai_response_to_generate_content_response(raw_response) + + def _openai_response_to_generate_content_response( + self, raw_response: OpenAITypeResponse + ) -> LlmResponse: + """ + OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse + """ + model_response = self.transform_handler.transform_response( + openai_response=raw_response, + ) + llm_response = _model_response_to_generate_content_response(model_response) + + if not model_response.id.startswith("chatcmpl"): + if llm_response.custom_metadata is None: + llm_response.custom_metadata = {} + llm_response.custom_metadata["response_id"] = model_response["id"] + # add responses cache data + if model_response.get("usage", {}).get("prompt_tokens_details"): + if llm_response.usage_metadata: + llm_response.usage_metadata.cached_content_token_count = ( + model_response.get("usage", {}) + .get("prompt_tokens_details") + .cached_tokens + ) + return llm_response +# before_model_callback def add_previous_response_id( callback_context: CallbackContext, llm_request: LlmRequest ) -> Optional[LlmResponse]: diff --git a/veadk/models/transform.py b/veadk/models/transform.py new file mode 100644 index 00000000..63967843 --- /dev/null +++ b/veadk/models/transform.py @@ -0,0 +1,202 @@ +import uuid +from datetime import datetime +from typing import Any, Dict, Optional, cast, List + +import litellm +from openai.types.responses import Response as OpenAITypeResponse +from litellm.completion_extras.litellm_responses_transformation.transformation import ( + LiteLLMResponsesTransformationHandler, +) +from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider +from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper +from litellm.types.llms.openai import ResponsesAPIResponse +from litellm.types.utils import ( + ModelResponse, + LlmProviders, +) +from litellm.utils import ProviderConfigManager + +from veadk.utils.logger import get_logger + +# This will add functions to prompts if functions are provided. +litellm.add_function_to_prompt = True + +logger = get_logger(__name__) + + +openai_supported_fields = [ + "stream", + "background", + "include", + "input", + "instructions", + "max_output_tokens", + "max_tool_calls", + "metadata", + "model", + "parallel_tool_calls", + "previous_response_id", + "prompt", + "prompt_cache_key", + "reasoning", + "safety_identifier", + "service_tier", + "store", + "stream", + "stream_options", + "temperature", + "text", + "tool_choice", + "tools", + "top_logprobs", + "top_p", + "truncation", + "user", + "extra_headers", + "extra_query", + "extra_body", + "timeout", + # auth params + "api_key", + "api_base", +] + + +def ark_field_reorganization(request_data: dict) -> dict: + # [Note: Ark Limitations] tools and previous_response_id + # Remove tools in subsequent rounds (when previous_response_id is present) + if ( + "tools" in request_data + and "previous_response_id" in request_data + and request_data["previous_response_id"] is not None + ): + # Remove tools in subsequent rounds regardless of caching status + del request_data["tools"] + + # [Note: Ark Limitations] caching and store + # Ensure store field is true or default when caching is enabled + if ( + "extra_body" in request_data + and isinstance(request_data["extra_body"], dict) + and "caching" in request_data["extra_body"] + and isinstance(request_data["extra_body"]["caching"], dict) + and request_data["extra_body"]["caching"].get("type") == "enabled" + ): + # Set store to true when caching is enabled for writing + if "store" not in request_data: + request_data["store"] = True + elif request_data["store"] is False: + # Override false to true for cache writing + request_data["store"] = True + + # [NOTE Ark Limitations] instructions -> input (because of caching) + # Due to the Volcano Ark settings, there is a conflict between the cache and the instructions field. + # If a system prompt is needed, it should be placed in the system role message within the input, instead of using the instructions parameter. + # https://www.volcengine.com/docs/82379/1585128 + instructions = request_data.pop("instructions", None) + request_data["input"] = [ + { + "content": [{"text": instructions, "type": "input_text"}], + "role": "system", + "type": "message", + } + ] + request_data["input"] + + return request_data + + +class CompletionToResponsesAPIHandler: + def __init__(self): + self.litellm_handler = LiteLLMResponsesTransformationHandler() + + def transform_request( + self, model: str, messages: list, tools: Optional[list], **kwargs + ): + messages = messages[:1] + messages[-1:] + # completion_request to responses api request + # 1. model and llm_custom_provider + model, custom_llm_provider, _, _ = get_llm_provider(model=model) + + # 2. input and instruction + if custom_llm_provider is not None and custom_llm_provider in [ + provider.value for provider in LlmProviders + ]: + provider_config = ProviderConfigManager.get_provider_chat_config( + model=model, provider=LlmProviders(custom_llm_provider) + ) + if provider_config is not None: + messages = provider_config.translate_developer_role_to_system_role( + messages=messages + ) + + input_items, instructions = ( + self.litellm_handler.convert_chat_completion_messages_to_responses_api( + messages + ) + ) + if tools is not None: + tools = self.litellm_handler._convert_tools_to_responses_format( + cast(List[Dict[str, Any]], tools) + ) + + response_args = { + "input": input_items, + "instructions": instructions, + "tools": tools, + "stream": kwargs.get("stream", False), + "model": model, + **kwargs, + } + result = { + key: value + for key, value in response_args.items() + if key in openai_supported_fields + } + + # Filter and reorganize scenarios that are not supported by some arks + return ark_field_reorganization(result) + + def transform_response(self, openai_response: OpenAITypeResponse) -> ModelResponse: + # responses api response to completion response + # get message + raw_response = ResponsesAPIResponse(**openai_response.model_dump()) + + model_response = ModelResponse() + setattr(model_response, "usage", litellm.Usage()) + response = self.litellm_handler.transform_response( + model=raw_response.model, + raw_response=raw_response, + model_response=model_response, + logging_obj=None, + request_data={}, + messages=[], + optional_params={}, + litellm_params={}, + encoding=None, + ) + if raw_response and hasattr(raw_response, "id"): + response.id = raw_response.id + return response + + def transform_streamable_response(self, result, model: str): + logging_obj = litellm.Logging( + model="doubao", + messages=None, + stream=True, + call_type="acompletion", + litellm_call_id=str(uuid.uuid4()), + function_id=str(uuid.uuid4()), + start_time=datetime.now(), + ) + completion_stream = self.litellm_handler.get_model_response_iterator( + streaming_response=result, + sync_stream=True, + json_mode=False, + ) + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="openai", + logging_obj=logging_obj, + ) + return streamwrapper From c6681642a040a4c747c44aaad4fd0e7f738e7d4d Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Sun, 9 Nov 2025 22:44:44 +0800 Subject: [PATCH 10/26] fix: stream type done but response.id --- pyproject.toml | 2 +- veadk/models/ark_llm.py | 17 ++-- .../models/{transform.py => ark_transform.py} | 97 +++++++++++++------ 3 files changed, 77 insertions(+), 39 deletions(-) rename veadk/models/{transform.py => ark_transform.py} (70%) diff --git a/pyproject.toml b/pyproject.toml index f3b83a25..16caf877 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "a2a-sdk>=0.3.0", # For Google Agent2Agent protocol "deprecated>=1.2.18", "google-adk>=1.10.0", # For basic agent architecture - "litellm>=1.74.3", # For model inference + "litellm>=1.79.3", # For model inference "loguru>=0.7.3", # For better logging "opentelemetry-exporter-otlp>=1.35.0", "opentelemetry-instrumentation-logging>=0.56b0", diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 8cf243bc..bbe2f476 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -10,7 +10,6 @@ from google.adk.models.lite_llm import ( LiteLlm, _get_completion_inputs, - _model_response_to_chunk, FunctionChunk, TextChunk, _message_to_generate_content_response, @@ -25,7 +24,7 @@ ) from pydantic import Field -from veadk.models.transform import ( +from veadk.models.ark_transform import ( CompletionToResponsesAPIHandler, ) from veadk.utils.logger import get_logger @@ -116,11 +115,13 @@ async def generate_content_async( usage_metadata = None fallback_index = 0 raw_response = await self.llm_client.aresponse(**response_args) - stream_events = self.transform_handler.transform_streamable_response( - raw_response, model=self.model - ) - async for part in stream_events: - for chunk, finish_reason in _model_response_to_chunk(part): + async for part in raw_response: + for ( + chunk, + finish_reason, + ) in self.transform_handler.stream_event_to_chunk( + part, model=self.model + ): if isinstance(chunk, FunctionChunk): index = chunk.index or fallback_index if index not in function_calls: @@ -221,7 +222,7 @@ def _openai_response_to_generate_content_response( OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse """ model_response = self.transform_handler.transform_response( - openai_response=raw_response, + openai_response=raw_response, stream=False ) llm_response = _model_response_to_generate_content_response(model_response) diff --git a/veadk/models/transform.py b/veadk/models/ark_transform.py similarity index 70% rename from veadk/models/transform.py rename to veadk/models/ark_transform.py index 63967843..439c85d5 100644 --- a/veadk/models/transform.py +++ b/veadk/models/ark_transform.py @@ -1,18 +1,31 @@ import uuid -from datetime import datetime -from typing import Any, Dict, Optional, cast, List +from typing import Any, Dict, Optional, cast, List, Generator, Tuple, Union import litellm -from openai.types.responses import Response as OpenAITypeResponse +from google.adk.models.lite_llm import ( + TextChunk, + FunctionChunk, + UsageMetadataChunk, + _model_response_to_chunk, +) +from openai.types.responses import ( + Response as OpenAITypeResponse, + ResponseStreamEvent, + ResponseTextDeltaEvent, +) +from openai.types.responses import ( + ResponseCompletedEvent, +) from litellm.completion_extras.litellm_responses_transformation.transformation import ( LiteLLMResponsesTransformationHandler, ) from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider -from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper from litellm.types.llms.openai import ResponsesAPIResponse from litellm.types.utils import ( ModelResponse, LlmProviders, + Choices, + Message, ) from litellm.utils import ProviderConfigManager @@ -156,12 +169,13 @@ def transform_request( # Filter and reorganize scenarios that are not supported by some arks return ark_field_reorganization(result) - def transform_response(self, openai_response: OpenAITypeResponse) -> ModelResponse: - # responses api response to completion response - # get message + def transform_response( + self, openai_response: OpenAITypeResponse, stream: bool = False + ) -> ModelResponse: + # openai_type_response -> responses_api_response -> completion_response raw_response = ResponsesAPIResponse(**openai_response.model_dump()) - model_response = ModelResponse() + model_response = ModelResponse(stream=stream) setattr(model_response, "usage", litellm.Usage()) response = self.litellm_handler.transform_response( model=raw_response.model, @@ -178,25 +192,48 @@ def transform_response(self, openai_response: OpenAITypeResponse) -> ModelRespon response.id = raw_response.id return response - def transform_streamable_response(self, result, model: str): - logging_obj = litellm.Logging( - model="doubao", - messages=None, - stream=True, - call_type="acompletion", - litellm_call_id=str(uuid.uuid4()), - function_id=str(uuid.uuid4()), - start_time=datetime.now(), - ) - completion_stream = self.litellm_handler.get_model_response_iterator( - streaming_response=result, - sync_stream=True, - json_mode=False, - ) - streamwrapper = CustomStreamWrapper( - completion_stream=completion_stream, - model=model, - custom_llm_provider="openai", - logging_obj=logging_obj, - ) - return streamwrapper + def stream_event_to_chunk( + self, event: ResponseStreamEvent, model: str + ) -> Generator[ + Tuple[ + Optional[Union[TextChunk, FunctionChunk, UsageMetadataChunk]], + Optional[str], + ], + None, + None, + ]: + choices = [] + model_response = None + + if isinstance(event, ResponseTextDeltaEvent): + delta = Message(content=event.delta) + choices.append( + Choices(delta=delta, index=event.output_index, finish_reason=None) + ) + model_response = ModelResponse( + stream=True, choices=choices, model=model, id=str(uuid.uuid4()) + ) + elif isinstance(event, ResponseCompletedEvent): + pass + response = event.response + model_response = self.transform_response(response, stream=True) + model_response = fix_response(model_response) + else: + # Ignore other event types like ResponseOutputItemAddedEvent, etc. + pass + + if model_response: + yield from _model_response_to_chunk(model_response) + + +def fix_response(model_response: ModelResponse) -> ModelResponse: + """ + Fix the response to ensure some fields that cannot be transferred through direct conversion. + """ + for i, choice in enumerate(model_response.choices): + if choice.message.tool_calls: + for idx, tool_call in enumerate(choice.message.tool_calls): + if not tool_call.get("index"): + model_response.choices[i].message.tool_calls[idx].index = 0 + + return model_response From ecdc6f6b11a95573435001d8164e5f1c3f78f0c6 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 10:22:00 +0800 Subject: [PATCH 11/26] fix: send response.id usage matadata --- veadk/models/ark_llm.py | 49 ++++++++++++---------------- veadk/models/ark_transform.py | 60 ++++++++++++++++++++++++++++++++--- 2 files changed, 76 insertions(+), 33 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index bbe2f476..606aa91c 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -14,7 +14,6 @@ TextChunk, _message_to_generate_content_response, UsageMetadataChunk, - _model_response_to_generate_content_response, ) from google.genai import types from litellm import ChatCompletionAssistantMessage @@ -91,7 +90,6 @@ async def generate_content_async( previous_response_id = None if llm_request.cache_metadata and llm_request.cache_metadata.cache_name: previous_response_id = llm_request.cache_metadata.cache_name - # ------------------------------------------------------ # completion_args = { "model": self.model, "messages": messages, @@ -99,6 +97,7 @@ async def generate_content_async( "response_format": response_format, "previous_response_id": previous_response_id, # supply previous_response_id } + # ------------------------------------------------------ # completion_args.update(self._additional_args) if generation_params: @@ -117,6 +116,7 @@ async def generate_content_async( raw_response = await self.llm_client.aresponse(**response_args) async for part in raw_response: for ( + model_response, chunk, finish_reason, ) in self.transform_handler.stream_event_to_chunk( @@ -158,6 +158,14 @@ async def generate_content_async( candidates_token_count=chunk.completion_tokens, total_token_count=chunk.total_tokens, ) + # ------------------------------------------------------ # + if model_response.get("usage", {}).get("prompt_tokens_details"): + usage_metadata.cached_content_token_count = ( + model_response.get("usage", {}) + .get("prompt_tokens_details") + .cached_tokens + ) + # ------------------------------------------------------ # if ( finish_reason == "tool_calls" or finish_reason == "stop" @@ -185,6 +193,11 @@ async def generate_content_async( ) ) ) + self.transform_handler.adapt_responses_api( + model_response, + aggregated_llm_response_with_tool_call, + stream=True, + ) text = "" function_calls.clear() elif finish_reason == "stop" and text: @@ -193,6 +206,9 @@ async def generate_content_async( role="assistant", content=text ) ) + self.transform_handler.adapt_responses_api( + model_response, aggregated_llm_response, stream=True + ) text = "" # waiting until streaming ends to yield the llm_response as litellm tends @@ -213,32 +229,9 @@ async def generate_content_async( else: raw_response = await self.llm_client.aresponse(**response_args) - yield self._openai_response_to_generate_content_response(raw_response) - - def _openai_response_to_generate_content_response( - self, raw_response: OpenAITypeResponse - ) -> LlmResponse: - """ - OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse - """ - model_response = self.transform_handler.transform_response( - openai_response=raw_response, stream=False - ) - llm_response = _model_response_to_generate_content_response(model_response) - - if not model_response.id.startswith("chatcmpl"): - if llm_response.custom_metadata is None: - llm_response.custom_metadata = {} - llm_response.custom_metadata["response_id"] = model_response["id"] - # add responses cache data - if model_response.get("usage", {}).get("prompt_tokens_details"): - if llm_response.usage_metadata: - llm_response.usage_metadata.cached_content_token_count = ( - model_response.get("usage", {}) - .get("prompt_tokens_details") - .cached_tokens - ) - return llm_response + yield self.transform_handler.openai_response_to_generate_content_response( + raw_response + ) # before_model_callback diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index 439c85d5..b27a95d5 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -2,11 +2,13 @@ from typing import Any, Dict, Optional, cast, List, Generator, Tuple, Union import litellm +from google.adk.models import LlmResponse from google.adk.models.lite_llm import ( TextChunk, FunctionChunk, UsageMetadataChunk, _model_response_to_chunk, + _model_response_to_generate_content_response, ) from openai.types.responses import ( Response as OpenAITypeResponse, @@ -192,16 +194,64 @@ def transform_response( response.id = raw_response.id return response + def openai_response_to_generate_content_response( + self, raw_response: OpenAITypeResponse + ) -> LlmResponse: + """ + OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse + instead of `_model_response_to_generate_content_response`, + """ + # no stream response + model_response = self.transform_response( + openai_response=raw_response, stream=False + ) + llm_response = _model_response_to_generate_content_response(model_response) + + llm_response = self.adapt_responses_api( + model_response, + llm_response, + ) + return llm_response + + def adapt_responses_api( + self, + model_response: ModelResponse, + llm_response: LlmResponse, + stream: bool = False, + ): + """ + Adapt responses api. + """ + if not model_response.id.startswith("chatcmpl"): + if llm_response.custom_metadata is None: + llm_response.custom_metadata = {} + llm_response.custom_metadata["response_id"] = model_response["id"] + # add responses cache data + if not stream: + if model_response.get("usage", {}).get("prompt_tokens_details"): + if llm_response.usage_metadata: + llm_response.usage_metadata.cached_content_token_count = ( + model_response.get("usage", {}) + .get("prompt_tokens_details") + .cached_tokens + ) + return llm_response + def stream_event_to_chunk( self, event: ResponseStreamEvent, model: str ) -> Generator[ Tuple[ + ModelResponse, Optional[Union[TextChunk, FunctionChunk, UsageMetadataChunk]], Optional[str], ], None, None, ]: + """ + instead of using `_model_response_to_chunk`, + we use our own implementation to support the responses api. + """ choices = [] model_response = None @@ -214,21 +264,21 @@ def stream_event_to_chunk( stream=True, choices=choices, model=model, id=str(uuid.uuid4()) ) elif isinstance(event, ResponseCompletedEvent): - pass response = event.response model_response = self.transform_response(response, stream=True) - model_response = fix_response(model_response) + model_response = fix_model_response(model_response) else: # Ignore other event types like ResponseOutputItemAddedEvent, etc. pass if model_response: - yield from _model_response_to_chunk(model_response) + for chunk, finish_reason in _model_response_to_chunk(model_response): + yield model_response, chunk, finish_reason -def fix_response(model_response: ModelResponse) -> ModelResponse: +def fix_model_response(model_response: ModelResponse) -> ModelResponse: """ - Fix the response to ensure some fields that cannot be transferred through direct conversion. + fix: tool_call has no attribute `index` in `_model_response_to_chunk` """ for i, choice in enumerate(model_response.choices): if choice.message.tool_calls: From 102f88efb4d25b939c8560786b3a99f587526277 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 10:26:46 +0800 Subject: [PATCH 12/26] fix: add license header --- veadk/models/ark_llm.py | 16 ++++++++++++++++ veadk/models/ark_transform.py | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 606aa91c..cb648f55 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -1,3 +1,19 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# adapted from Google ADK models adk-python/blob/main/src/google/adk/models/lite_llm.py at f1f44675e4a86b75e72cfd838efd8a0399f23e24 · google/adk-python + import json from typing import Any, Dict, Union, AsyncGenerator, Optional diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index b27a95d5..e7f599f4 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -1,3 +1,19 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# adapted from Google ADK models adk-python/blob/main/src/google/adk/models/lite_llm.py at f1f44675e4a86b75e72cfd838efd8a0399f23e24 · google/adk-python + import uuid from typing import Any, Dict, Optional, cast, List, Generator, Tuple, Union From d3132ecc528e6128270444c4e4939ac8057e6002 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 12:28:35 +0800 Subject: [PATCH 13/26] fix: finish reason and chunk send --- veadk/models/ark_transform.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index e7f599f4..820679b2 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -269,7 +269,6 @@ def stream_event_to_chunk( we use our own implementation to support the responses api. """ choices = [] - model_response = None if isinstance(event, ResponseTextDeltaEvent): delta = Message(content=event.delta) @@ -279,18 +278,23 @@ def stream_event_to_chunk( model_response = ModelResponse( stream=True, choices=choices, model=model, id=str(uuid.uuid4()) ) + for chunk, _ in _model_response_to_chunk(model_response): + # delta text, not finish + yield model_response, chunk, None elif isinstance(event, ResponseCompletedEvent): response = event.response model_response = self.transform_response(response, stream=True) model_response = fix_model_response(model_response) + + for chunk, finish_reason in _model_response_to_chunk(model_response): + if isinstance(chunk, TextChunk): + yield model_response, None, finish_reason + else: + yield model_response, chunk, finish_reason else: # Ignore other event types like ResponseOutputItemAddedEvent, etc. pass - if model_response: - for chunk, finish_reason in _model_response_to_chunk(model_response): - yield model_response, chunk, finish_reason - def fix_model_response(model_response: ModelResponse) -> ModelResponse: """ From c802b241c421257a8f77b207a248c9de9bd44c7f Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 13:35:32 +0800 Subject: [PATCH 14/26] fix: without instruction bug --- veadk/models/ark_transform.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index 820679b2..c7bf10c8 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -125,13 +125,14 @@ def ark_field_reorganization(request_data: dict) -> dict: # If a system prompt is needed, it should be placed in the system role message within the input, instead of using the instructions parameter. # https://www.volcengine.com/docs/82379/1585128 instructions = request_data.pop("instructions", None) - request_data["input"] = [ - { - "content": [{"text": instructions, "type": "input_text"}], - "role": "system", - "type": "message", - } - ] + request_data["input"] + if instructions: + request_data["input"] = [ + { + "content": [{"text": instructions, "type": "input_text"}], + "role": "system", + "type": "message", + } + ] + request_data["input"] return request_data From 8e4874d23a26b5147bb04e15cfb8c7e56b4bb797 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 14:11:50 +0800 Subject: [PATCH 15/26] fix: update google-adk version >=1.18 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 16caf877..d770c684 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "pydantic-settings>=2.10.1", # Config management "a2a-sdk>=0.3.0", # For Google Agent2Agent protocol "deprecated>=1.2.18", - "google-adk>=1.10.0", # For basic agent architecture + "google-adk>=1.18.0", # For basic agent architecture "litellm>=1.79.3", # For model inference "loguru>=0.7.3", # For better logging "opentelemetry-exporter-otlp>=1.35.0", From 143ad50695a4dc2f6faf7100bad7227a7d008259 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 14:18:03 +0800 Subject: [PATCH 16/26] fix: back --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d770c684..16caf877 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "pydantic-settings>=2.10.1", # Config management "a2a-sdk>=0.3.0", # For Google Agent2Agent protocol "deprecated>=1.2.18", - "google-adk>=1.18.0", # For basic agent architecture + "google-adk>=1.10.0", # For basic agent architecture "litellm>=1.79.3", # For model inference "loguru>=0.7.3", # For better logging "opentelemetry-exporter-otlp>=1.35.0", From 45a60702dea82e48adcdf180dbc346c9c549d213 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Mon, 10 Nov 2025 14:36:20 +0800 Subject: [PATCH 17/26] fix: version and cache metadata --- pyproject.toml | 2 +- veadk/models/ark_llm.py | 23 ++++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 16caf877..382177f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ dependencies = [ "pydantic-settings>=2.10.1", # Config management "a2a-sdk>=0.3.0", # For Google Agent2Agent protocol "deprecated>=1.2.18", - "google-adk>=1.10.0", # For basic agent architecture + "google-adk>=1.15.0", # For basic agent architecture "litellm>=1.79.3", # For model inference "loguru>=0.7.3", # For better logging "opentelemetry-exporter-otlp>=1.35.0", diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index cb648f55..79a2057c 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -263,11 +263,20 @@ def add_previous_response_id( and "response_id" in events[-2].custom_metadata ): previous_response_id = events[-2].custom_metadata["response_id"] - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - cached_contents_count=0, - ) + if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + contents_count=0, + ) + else: # 1.15 <= adk < 1.17 + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + cached_contents_count=0, + ) return From bd6658d43e0becc4994c7d815c502ccb06c70b5a Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 11 Nov 2025 12:05:41 +0800 Subject: [PATCH 18/26] fix: multi-agent and multi llm_response scenario --- veadk/models/ark_llm.py | 7 +- veadk/models/ark_transform.py | 129 ++++++++++++++++++++++++---------- 2 files changed, 98 insertions(+), 38 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 79a2057c..17450395 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -245,9 +245,12 @@ async def generate_content_async( else: raw_response = await self.llm_client.aresponse(**response_args) - yield self.transform_handler.openai_response_to_generate_content_response( + for ( + llm_response + ) in self.transform_handler.openai_response_to_generate_content_response( raw_response - ) + ): + yield llm_response # before_model_callback diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index c7bf10c8..cc3a0078 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -30,6 +30,8 @@ Response as OpenAITypeResponse, ResponseStreamEvent, ResponseTextDeltaEvent, + ResponseOutputMessage, + ResponseFunctionToolCall, ) from openai.types.responses import ( ResponseCompletedEvent, @@ -144,7 +146,21 @@ def __init__(self): def transform_request( self, model: str, messages: list, tools: Optional[list], **kwargs ): - messages = messages[:1] + messages[-1:] + # Keep the first message and all consecutive user messages from the end + filtered_messages = messages[:1] + + # Collect all consecutive user messages from the end + user_messages_from_end = [] + for message in reversed(messages[1:]): # Skip the first message + if message.get("role") and message.get("role") in {"user", "tool"}: + user_messages_from_end.append(message) + else: + break # Stop when we encounter a non-user message + + # Reverse to maintain original order and add to filtered messages + filtered_messages.extend(reversed(user_messages_from_end)) + + messages = filtered_messages # completion_request to responses api request # 1. model and llm_custom_provider model, custom_llm_provider, _, _ = get_llm_provider(model=model) @@ -190,45 +206,51 @@ def transform_request( def transform_response( self, openai_response: OpenAITypeResponse, stream: bool = False - ) -> ModelResponse: + ) -> list[ModelResponse]: # openai_type_response -> responses_api_response -> completion_response - raw_response = ResponsesAPIResponse(**openai_response.model_dump()) - - model_response = ModelResponse(stream=stream) - setattr(model_response, "usage", litellm.Usage()) - response = self.litellm_handler.transform_response( - model=raw_response.model, - raw_response=raw_response, - model_response=model_response, - logging_obj=None, - request_data={}, - messages=[], - optional_params={}, - litellm_params={}, - encoding=None, - ) - if raw_response and hasattr(raw_response, "id"): - response.id = raw_response.id - return response + result_list = [] + raw_response_list = construct_responses_api_response(openai_response) + for raw_response in raw_response_list: + model_response = ModelResponse(stream=stream) + setattr(model_response, "usage", litellm.Usage()) + response = self.litellm_handler.transform_response( + model=raw_response.model, + raw_response=raw_response, + model_response=model_response, + logging_obj=None, + request_data={}, + messages=[], + optional_params={}, + litellm_params={}, + encoding=None, + ) + if raw_response and hasattr(raw_response, "id"): + response.id = raw_response.id + result_list.append(response) + + return result_list def openai_response_to_generate_content_response( self, raw_response: OpenAITypeResponse - ) -> LlmResponse: + ) -> list[LlmResponse]: """ OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse instead of `_model_response_to_generate_content_response`, """ # no stream response - model_response = self.transform_response( + model_response_list = self.transform_response( openai_response=raw_response, stream=False ) - llm_response = _model_response_to_generate_content_response(model_response) + llm_response_list = [] + for model_response in model_response_list: + llm_response = _model_response_to_generate_content_response(model_response) - llm_response = self.adapt_responses_api( - model_response, - llm_response, - ) - return llm_response + llm_response = self.adapt_responses_api( + model_response, + llm_response, + ) + llm_response_list.append(llm_response) + return llm_response_list def adapt_responses_api( self, @@ -284,14 +306,15 @@ def stream_event_to_chunk( yield model_response, chunk, None elif isinstance(event, ResponseCompletedEvent): response = event.response - model_response = self.transform_response(response, stream=True) - model_response = fix_model_response(model_response) - - for chunk, finish_reason in _model_response_to_chunk(model_response): - if isinstance(chunk, TextChunk): - yield model_response, None, finish_reason - else: - yield model_response, chunk, finish_reason + model_response_list = self.transform_response(response, stream=True) + for model_response in model_response_list: + model_response = fix_model_response(model_response) + + for chunk, finish_reason in _model_response_to_chunk(model_response): + if isinstance(chunk, TextChunk): + yield model_response, None, finish_reason + else: + yield model_response, chunk, finish_reason else: # Ignore other event types like ResponseOutputItemAddedEvent, etc. pass @@ -308,3 +331,37 @@ def fix_model_response(model_response: ModelResponse) -> ModelResponse: model_response.choices[i].message.tool_calls[idx].index = 0 return model_response + + +def construct_responses_api_response( + openai_response: OpenAITypeResponse, +) -> list[ResponsesAPIResponse]: + output = openai_response.output + + # Check if we need to split the response + if len(output) >= 2: + # Check if output contains both ResponseOutputMessage and ResponseFunctionToolCall types + has_message = any(isinstance(item, ResponseOutputMessage) for item in output) + has_tool_call = any( + isinstance(item, ResponseFunctionToolCall) for item in output + ) + + if has_message and has_tool_call: + # Split into separate responses for each item + raw_response_list = [] + for item in output: + if isinstance(item, (ResponseOutputMessage, ResponseFunctionToolCall)): + raw_response_list.append( + ResponsesAPIResponse( + **{ + k: v + for k, v in openai_response.model_dump().items() + if k != "output" + }, + output=[item], + ) + ) + return raw_response_list + + # Otherwise, return the original response structure + return [ResponsesAPIResponse(**openai_response.model_dump())] From 8aa0ec78c9ceee9ab099fac8ba0963143b7acba0 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 11 Nov 2025 18:49:07 +0800 Subject: [PATCH 19/26] fix: transport response_id by session state --- veadk/agent.py | 6 +++++- veadk/models/ark_llm.py | 18 +++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index b1ea66ce..5ec51403 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -36,7 +36,7 @@ from veadk.knowledgebase import KnowledgeBase from veadk.memory.long_term_memory import LongTermMemory from veadk.memory.short_term_memory import ShortTermMemory -from veadk.models.ark_llm import add_previous_response_id +from veadk.models.ark_llm import add_previous_response_id, add_response_id from veadk.processors import BaseRunProcessor, NoOpRunProcessor from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer @@ -213,6 +213,10 @@ def model_post_init(self, __context: Any) -> None: self.before_model_callback = add_previous_response_id else: self.before_model_callback.append(add_previous_response_id) + if not self.after_tool_callback: + self.after_model_callback = add_response_id + else: + self.after_model_callback.append(add_response_id) else: self.model = LiteLlm( model=f"{self.model_provider}/{self.model_name}", diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 17450395..b87b5412 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -259,13 +259,18 @@ def add_previous_response_id( ) -> Optional[LlmResponse]: invocation_context = callback_context._invocation_context events = invocation_context.session.events + agent_name = callback_context.agent_name + # read response_id if ( events and len(events) >= 2 and events[-2].custom_metadata and "response_id" in events[-2].custom_metadata ): - previous_response_id = events[-2].custom_metadata["response_id"] + previous_response_id = callback_context.state.get( + f"agent:{agent_name}:response_id" + ) + # previous_response_id = events[-2].custom_metadata["response_id"] if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 llm_request.cache_metadata = CacheMetadata( cache_name=previous_response_id, @@ -283,3 +288,14 @@ def add_previous_response_id( cached_contents_count=0, ) return + + +# after_model_callback +def add_response_id( + callback_context: CallbackContext, llm_response: LlmResponse +) -> Optional[LlmResponse]: + agent_name = callback_context.agent_name + + response_id = llm_response.custom_metadata["response_id"] + callback_context.state[f"agent:{agent_name}:response_id"] = response_id + return From 894d4c3bb0d34c281e26c8977efa95a4b03a2010 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 11 Nov 2025 20:42:49 +0800 Subject: [PATCH 20/26] fix: multi-agent bug --- veadk/agent.py | 40 ++++++++++++++++++++++++------ veadk/models/ark_llm.py | 54 ++++++++++++----------------------------- 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index 5ec51403..41ac4489 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -15,12 +15,13 @@ from __future__ import annotations import os -from typing import Optional, Union +from typing import Optional, Union, AsyncGenerator -from google.adk.agents import LlmAgent, RunConfig +from google.adk.agents import LlmAgent, RunConfig, InvocationContext from google.adk.agents.base_agent import BaseAgent from google.adk.agents.llm_agent import InstructionProvider, ToolUnion from google.adk.agents.run_config import StreamingMode +from google.adk.events import Event, EventActions from google.adk.models.lite_llm import LiteLlm from google.adk.runners import Runner from google.genai import types @@ -36,7 +37,7 @@ from veadk.knowledgebase import KnowledgeBase from veadk.memory.long_term_memory import LongTermMemory from veadk.memory.short_term_memory import ShortTermMemory -from veadk.models.ark_llm import add_previous_response_id, add_response_id +from veadk.models.ark_llm import add_previous_response_id from veadk.processors import BaseRunProcessor, NoOpRunProcessor from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer @@ -212,11 +213,13 @@ def model_post_init(self, __context: Any) -> None: if not self.before_model_callback: self.before_model_callback = add_previous_response_id else: - self.before_model_callback.append(add_previous_response_id) - if not self.after_tool_callback: - self.after_model_callback = add_response_id - else: - self.after_model_callback.append(add_response_id) + if isinstance(self.before_model_callback, list): + self.before_model_callback.append(add_previous_response_id) + else: + self.before_model_callback = [ + self.before_model_callback, + add_previous_response_id, + ] else: self.model = LiteLlm( model=f"{self.model_provider}/{self.model_name}", @@ -259,6 +262,27 @@ def model_post_init(self, __context: Any) -> None: f"Agent: {self.model_dump(include={'name', 'model_name', 'model_api_base', 'tools'})}" ) + async def _run_async_impl( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + async for event in super()._run_async_impl(ctx): + agent_name = self.name + if ( + self.enable_responses + and event.custom_metadata + and event.custom_metadata.get("response_id") + ): + response_id = event.custom_metadata["response_id"] + yield Event( + invocation_id=ctx.invocation_id, + author=self.name, + actions=EventActions( + state_delta={f"agent:{agent_name}:response_id": response_id} + ), + branch=ctx.branch, + ) + yield event + async def _run( self, runner, diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index b87b5412..860d6dba 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -257,45 +257,23 @@ async def generate_content_async( def add_previous_response_id( callback_context: CallbackContext, llm_request: LlmRequest ) -> Optional[LlmResponse]: - invocation_context = callback_context._invocation_context - events = invocation_context.session.events agent_name = callback_context.agent_name # read response_id - if ( - events - and len(events) >= 2 - and events[-2].custom_metadata - and "response_id" in events[-2].custom_metadata - ): - previous_response_id = callback_context.state.get( - f"agent:{agent_name}:response_id" + previous_response_id = callback_context.state.get(f"agent:{agent_name}:response_id") + if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + contents_count=0, + ) + else: # 1.15 <= adk < 1.17 + llm_request.cache_metadata = CacheMetadata( + cache_name=previous_response_id, + expire_time=0, + fingerprint="", + invocations_used=0, + cached_contents_count=0, ) - # previous_response_id = events[-2].custom_metadata["response_id"] - if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - contents_count=0, - ) - else: # 1.15 <= adk < 1.17 - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - cached_contents_count=0, - ) - return - - -# after_model_callback -def add_response_id( - callback_context: CallbackContext, llm_response: LlmResponse -) -> Optional[LlmResponse]: - agent_name = callback_context.agent_name - - response_id = llm_response.custom_metadata["response_id"] - callback_context.state[f"agent:{agent_name}:response_id"] = response_id return From b453d482f645878c9a3a4ddd63fb256bf5bae818 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 12 Nov 2025 11:33:51 +0800 Subject: [PATCH 21/26] fix: remove before_model_callback --- veadk/agent.py | 41 ++++++--------- veadk/models/ark_llm.py | 46 +++++------------ veadk/models/ark_transform.py | 94 +++++++++++++++++++++++++++++++++-- 3 files changed, 116 insertions(+), 65 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index 41ac4489..259294db 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -19,9 +19,10 @@ from google.adk.agents import LlmAgent, RunConfig, InvocationContext from google.adk.agents.base_agent import BaseAgent +from google.adk.agents.context_cache_config import ContextCacheConfig from google.adk.agents.llm_agent import InstructionProvider, ToolUnion from google.adk.agents.run_config import StreamingMode -from google.adk.events import Event, EventActions +from google.adk.events import Event from google.adk.models.lite_llm import LiteLlm from google.adk.runners import Runner from google.genai import types @@ -37,7 +38,6 @@ from veadk.knowledgebase import KnowledgeBase from veadk.memory.long_term_memory import LongTermMemory from veadk.memory.short_term_memory import ShortTermMemory -from veadk.models.ark_llm import add_previous_response_id from veadk.processors import BaseRunProcessor, NoOpRunProcessor from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer @@ -155,6 +155,8 @@ class Agent(LlmAgent): enable_responses: bool = False + context_cache_config: Optional[ContextCacheConfig] = None + run_processor: Optional[BaseRunProcessor] = Field(default=None, exclude=True) """Optional run processor for intercepting and processing agent execution flows. @@ -210,16 +212,12 @@ def model_post_init(self, __context: Any) -> None: api_base=self.model_api_base, **self.model_extra_config, ) - if not self.before_model_callback: - self.before_model_callback = add_previous_response_id - else: - if isinstance(self.before_model_callback, list): - self.before_model_callback.append(add_previous_response_id) - else: - self.before_model_callback = [ - self.before_model_callback, - add_previous_response_id, - ] + if not self.context_cache_config: + self.context_cache_config = ContextCacheConfig( + cache_intervals=100, # maximum number + ttl_seconds=315360000, + min_tokens=0, + ) else: self.model = LiteLlm( model=f"{self.model_provider}/{self.model_name}", @@ -265,22 +263,11 @@ def model_post_init(self, __context: Any) -> None: async def _run_async_impl( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: + if self.enable_responses: + if not ctx.context_cache_config: + ctx.context_cache_config = self.context_cache_config + async for event in super()._run_async_impl(ctx): - agent_name = self.name - if ( - self.enable_responses - and event.custom_metadata - and event.custom_metadata.get("response_id") - ): - response_id = event.custom_metadata["response_id"] - yield Event( - invocation_id=ctx.invocation_id, - author=self.name, - actions=EventActions( - state_delta={f"agent:{agent_name}:response_id": response_id} - ), - branch=ctx.branch, - ) yield event async def _run( diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index 860d6dba..e312df4c 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -15,13 +15,11 @@ # adapted from Google ADK models adk-python/blob/main/src/google/adk/models/lite_llm.py at f1f44675e4a86b75e72cfd838efd8a0399f23e24 · google/adk-python import json -from typing import Any, Dict, Union, AsyncGenerator, Optional +from typing import Any, Dict, Union, AsyncGenerator import litellm import openai from openai.types.responses import Response as OpenAITypeResponse, ResponseStreamEvent -from google.adk.agents.callback_context import CallbackContext -from google.adk.models.cache_metadata import CacheMetadata from google.adk.models import LlmRequest, LlmResponse from google.adk.models.lite_llm import ( LiteLlm, @@ -41,6 +39,7 @@ from veadk.models.ark_transform import ( CompletionToResponsesAPIHandler, + get_previous_response_id, ) from veadk.utils.logger import get_logger @@ -90,7 +89,7 @@ async def generate_content_async( Yields: LlmResponse: The model response. """ - + agent_name = llm_request.config.labels["adk_agent_name"] self._maybe_append_user_content(llm_request) # logger.debug(_build_request_log(llm_request)) @@ -105,7 +104,10 @@ async def generate_content_async( # get previous_response_id previous_response_id = None if llm_request.cache_metadata and llm_request.cache_metadata.cache_name: - previous_response_id = llm_request.cache_metadata.cache_name + previous_response_id = get_previous_response_id( + llm_request.cache_metadata, + agent_name, + ) completion_args = { "model": self.model, "messages": messages, @@ -210,6 +212,7 @@ async def generate_content_async( ) ) self.transform_handler.adapt_responses_api( + llm_request, model_response, aggregated_llm_response_with_tool_call, stream=True, @@ -223,7 +226,10 @@ async def generate_content_async( ) ) self.transform_handler.adapt_responses_api( - model_response, aggregated_llm_response, stream=True + llm_request, + model_response, + aggregated_llm_response, + stream=True, ) text = "" @@ -248,32 +254,6 @@ async def generate_content_async( for ( llm_response ) in self.transform_handler.openai_response_to_generate_content_response( - raw_response + llm_request, raw_response ): yield llm_response - - -# before_model_callback -def add_previous_response_id( - callback_context: CallbackContext, llm_request: LlmRequest -) -> Optional[LlmResponse]: - agent_name = callback_context.agent_name - # read response_id - previous_response_id = callback_context.state.get(f"agent:{agent_name}:response_id") - if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - contents_count=0, - ) - else: # 1.15 <= adk < 1.17 - llm_request.cache_metadata = CacheMetadata( - cache_name=previous_response_id, - expire_time=0, - fingerprint="", - invocations_used=0, - cached_contents_count=0, - ) - return diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index cc3a0078..4985f5ea 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -14,11 +14,13 @@ # adapted from Google ADK models adk-python/blob/main/src/google/adk/models/lite_llm.py at f1f44675e4a86b75e72cfd838efd8a0399f23e24 · google/adk-python +import json import uuid from typing import Any, Dict, Optional, cast, List, Generator, Tuple, Union import litellm -from google.adk.models import LlmResponse +from google.adk.models import LlmResponse, LlmRequest +from google.adk.models.cache_metadata import CacheMetadata from google.adk.models.lite_llm import ( TextChunk, FunctionChunk, @@ -139,6 +141,74 @@ def ark_field_reorganization(request_data: dict) -> dict: return request_data +def build_cache_metadata(agent_response_id: dict) -> CacheMetadata: + """Create a new CacheMetadata instance for agent response tracking. + + Args: + agent_name: Name of the agent + response_id: Response ID to track + + Returns: + A new CacheMetadata instance with the agent-response mapping + """ + cache_name = json.dumps(agent_response_id) + if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 + cache_metadata = CacheMetadata( + cache_name=cache_name, + expire_time=0, + fingerprint="", + invocations_used=0, + contents_count=0, + ) + else: # 1.15 <= adk < 1.17 + cache_metadata = CacheMetadata( + cache_name=cache_name, + expire_time=0, + fingerprint="", + invocations_used=0, + cached_contents_count=0, + ) + return cache_metadata + + +def update_cache_metadata( + cache_metadata: CacheMetadata, + agent_name: str, + response_id: str, +) -> CacheMetadata: + """Update cache metadata by creating a new instance with updated cache_name. + + Since CacheMetadata is frozen, we cannot modify it directly. Instead, + we create a new instance with the updated cache_name field. + """ + try: + agent_response_id = json.loads(cache_metadata.cache_name) + agent_response_id[agent_name] = response_id + updated_cache_name = agent_response_id + + # Create a new CacheMetadata instance with updated cache_name + return build_cache_metadata(updated_cache_name) + except json.JSONDecodeError as e: + logger.warning( + f"Failed to update cache metadata. The cache_name is not a valid JSON string., {str(e)}" + ) + return cache_metadata + + +def get_previous_response_id( + cache_metadata: CacheMetadata, + agent_name: str, +): + try: + agent_response_id = json.loads(cache_metadata.cache_name) + return agent_response_id.get(agent_name, None) + except json.JSONDecodeError as e: + logger.warning( + f"Failed to get previous response id. The cache_name is not a valid JSON string., {str(e)}" + ) + return None + + class CompletionToResponsesAPIHandler: def __init__(self): self.litellm_handler = LiteLLMResponsesTransformationHandler() @@ -231,7 +301,7 @@ def transform_response( return result_list def openai_response_to_generate_content_response( - self, raw_response: OpenAITypeResponse + self, llm_request: LlmRequest, raw_response: OpenAITypeResponse ) -> list[LlmResponse]: """ OpenAITypeResponse -> litellm.ModelResponse -> LlmResponse @@ -246,6 +316,7 @@ def openai_response_to_generate_content_response( llm_response = _model_response_to_generate_content_response(model_response) llm_response = self.adapt_responses_api( + llm_request, model_response, llm_response, ) @@ -254,6 +325,7 @@ def openai_response_to_generate_content_response( def adapt_responses_api( self, + llm_request: LlmRequest, model_response: ModelResponse, llm_response: LlmResponse, stream: bool = False, @@ -262,9 +334,21 @@ def adapt_responses_api( Adapt responses api. """ if not model_response.id.startswith("chatcmpl"): - if llm_response.custom_metadata is None: - llm_response.custom_metadata = {} - llm_response.custom_metadata["response_id"] = model_response["id"] + # if llm_response.custom_metadata is None: + # llm_response.custom_metadata = {} + # llm_response.custom_metadata["response_id"] = model_response["id"] + previous_response_id = model_response["id"] + if not llm_request.cache_metadata: + llm_response.cache_metadata = build_cache_metadata( + {llm_request.config.labels["adk_agent_name"]: previous_response_id} + ) + else: + llm_response.cache_metadata = update_cache_metadata( + llm_request.cache_metadata, + llm_request.config.labels["adk_agent_name"], + previous_response_id, + ) + # add responses cache data if not stream: if model_response.get("usage", {}).get("prompt_tokens_details"): From 44a249e469fa6503e3e08c8ce8981767fd90d0b8 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 12 Nov 2025 16:41:13 +0800 Subject: [PATCH 22/26] fix: clarify the transmission of response_id --- veadk/models/ark_llm.py | 9 +---- veadk/models/ark_transform.py | 66 ++++------------------------------- 2 files changed, 7 insertions(+), 68 deletions(-) diff --git a/veadk/models/ark_llm.py b/veadk/models/ark_llm.py index e312df4c..54295b7e 100644 --- a/veadk/models/ark_llm.py +++ b/veadk/models/ark_llm.py @@ -39,7 +39,6 @@ from veadk.models.ark_transform import ( CompletionToResponsesAPIHandler, - get_previous_response_id, ) from veadk.utils.logger import get_logger @@ -89,7 +88,6 @@ async def generate_content_async( Yields: LlmResponse: The model response. """ - agent_name = llm_request.config.labels["adk_agent_name"] self._maybe_append_user_content(llm_request) # logger.debug(_build_request_log(llm_request)) @@ -104,10 +102,7 @@ async def generate_content_async( # get previous_response_id previous_response_id = None if llm_request.cache_metadata and llm_request.cache_metadata.cache_name: - previous_response_id = get_previous_response_id( - llm_request.cache_metadata, - agent_name, - ) + previous_response_id = llm_request.cache_metadata.cache_name completion_args = { "model": self.model, "messages": messages, @@ -212,7 +207,6 @@ async def generate_content_async( ) ) self.transform_handler.adapt_responses_api( - llm_request, model_response, aggregated_llm_response_with_tool_call, stream=True, @@ -226,7 +220,6 @@ async def generate_content_async( ) ) self.transform_handler.adapt_responses_api( - llm_request, model_response, aggregated_llm_response, stream=True, diff --git a/veadk/models/ark_transform.py b/veadk/models/ark_transform.py index 4985f5ea..dc2301e5 100644 --- a/veadk/models/ark_transform.py +++ b/veadk/models/ark_transform.py @@ -14,7 +14,6 @@ # adapted from Google ADK models adk-python/blob/main/src/google/adk/models/lite_llm.py at f1f44675e4a86b75e72cfd838efd8a0399f23e24 · google/adk-python -import json import uuid from typing import Any, Dict, Optional, cast, List, Generator, Tuple, Union @@ -141,20 +140,18 @@ def ark_field_reorganization(request_data: dict) -> dict: return request_data -def build_cache_metadata(agent_response_id: dict) -> CacheMetadata: +def build_cache_metadata(response_id: str) -> CacheMetadata: """Create a new CacheMetadata instance for agent response tracking. Args: - agent_name: Name of the agent response_id: Response ID to track Returns: A new CacheMetadata instance with the agent-response mapping """ - cache_name = json.dumps(agent_response_id) if "contents_count" in CacheMetadata.model_fields: # adk >= 1.17 cache_metadata = CacheMetadata( - cache_name=cache_name, + cache_name=response_id, expire_time=0, fingerprint="", invocations_used=0, @@ -162,7 +159,7 @@ def build_cache_metadata(agent_response_id: dict) -> CacheMetadata: ) else: # 1.15 <= adk < 1.17 cache_metadata = CacheMetadata( - cache_name=cache_name, + cache_name=response_id, expire_time=0, fingerprint="", invocations_used=0, @@ -171,44 +168,6 @@ def build_cache_metadata(agent_response_id: dict) -> CacheMetadata: return cache_metadata -def update_cache_metadata( - cache_metadata: CacheMetadata, - agent_name: str, - response_id: str, -) -> CacheMetadata: - """Update cache metadata by creating a new instance with updated cache_name. - - Since CacheMetadata is frozen, we cannot modify it directly. Instead, - we create a new instance with the updated cache_name field. - """ - try: - agent_response_id = json.loads(cache_metadata.cache_name) - agent_response_id[agent_name] = response_id - updated_cache_name = agent_response_id - - # Create a new CacheMetadata instance with updated cache_name - return build_cache_metadata(updated_cache_name) - except json.JSONDecodeError as e: - logger.warning( - f"Failed to update cache metadata. The cache_name is not a valid JSON string., {str(e)}" - ) - return cache_metadata - - -def get_previous_response_id( - cache_metadata: CacheMetadata, - agent_name: str, -): - try: - agent_response_id = json.loads(cache_metadata.cache_name) - return agent_response_id.get(agent_name, None) - except json.JSONDecodeError as e: - logger.warning( - f"Failed to get previous response id. The cache_name is not a valid JSON string., {str(e)}" - ) - return None - - class CompletionToResponsesAPIHandler: def __init__(self): self.litellm_handler = LiteLLMResponsesTransformationHandler() @@ -316,7 +275,6 @@ def openai_response_to_generate_content_response( llm_response = _model_response_to_generate_content_response(model_response) llm_response = self.adapt_responses_api( - llm_request, model_response, llm_response, ) @@ -325,7 +283,6 @@ def openai_response_to_generate_content_response( def adapt_responses_api( self, - llm_request: LlmRequest, model_response: ModelResponse, llm_response: LlmResponse, stream: bool = False, @@ -334,21 +291,10 @@ def adapt_responses_api( Adapt responses api. """ if not model_response.id.startswith("chatcmpl"): - # if llm_response.custom_metadata is None: - # llm_response.custom_metadata = {} - # llm_response.custom_metadata["response_id"] = model_response["id"] previous_response_id = model_response["id"] - if not llm_request.cache_metadata: - llm_response.cache_metadata = build_cache_metadata( - {llm_request.config.labels["adk_agent_name"]: previous_response_id} - ) - else: - llm_response.cache_metadata = update_cache_metadata( - llm_request.cache_metadata, - llm_request.config.labels["adk_agent_name"], - previous_response_id, - ) - + llm_response.cache_metadata = build_cache_metadata( + previous_response_id, + ) # add responses cache data if not stream: if model_response.get("usage", {}).get("prompt_tokens_details"): From 50454977e89124006a8438fae4c97ab48f03be55 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Sat, 29 Nov 2025 18:49:58 +0800 Subject: [PATCH 23/26] feat: enable persistent short-term memory to pass `response-id` --- veadk/agent.py | 14 +++++++++++++- veadk/memory/short_term_memory.py | 25 +++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index 2b3ab5bf..2c0c515c 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -29,7 +29,7 @@ from google.adk.agents.context_cache_config import ContextCacheConfig from google.adk.agents.llm_agent import InstructionProvider, ToolUnion from google.adk.agents.run_config import StreamingMode -from google.adk.events import Event +from google.adk.events import Event, EventActions from google.adk.models.lite_llm import LiteLlm from google.adk.runners import Runner from google.genai import types @@ -240,6 +240,18 @@ async def _run_async_impl( async for event in super()._run_async_impl(ctx): yield event + if self.enable_responses and event.cache_metadata: + # for persistent short-term memory with response api + session_state_event = Event( + invocation_id=event.invocation_id, + author=event.author, + actions=EventActions( + state_delta={ + "response_id": event.cache_metadata.cache_name, + } + ), + ) + yield session_state_event async def _run( self, diff --git a/veadk/memory/short_term_memory.py b/veadk/memory/short_term_memory.py index 6c69c750..259ea928 100644 --- a/veadk/memory/short_term_memory.py +++ b/veadk/memory/short_term_memory.py @@ -32,6 +32,7 @@ from veadk.memory.short_term_memory_backends.sqlite_backend import ( SQLiteSTMBackend, ) +from veadk.models.ark_transform import build_cache_metadata from veadk.utils.logger import get_logger logger = get_logger(__name__) @@ -41,14 +42,29 @@ def wrap_get_session_with_callbacks(obj, callback_fn: Callable): get_session_fn = getattr(obj, "get_session") @wraps(get_session_fn) - def wrapper(*args, **kwargs): - result = get_session_fn(*args, **kwargs) + async def wrapper(*args, **kwargs): + result = await get_session_fn(*args, **kwargs) callback_fn(result, *args, **kwargs) return result setattr(obj, "get_session", wrapper) +def enable_responses_api_for_session_service(result, *args, **kwargs): + if result and isinstance(result, Session): + if result.events: + for event in result.events: + if ( + event.actions + and event.actions.state_delta + and not event.cache_metadata + and "response_id" in event.actions.state_delta + ): + event.cache_metadata = build_cache_metadata( + response_id=event.actions.state_delta.get("response_id"), + ) + + class ShortTermMemory(BaseModel): """Short term memory for agent execution. @@ -170,6 +186,11 @@ def model_post_init(self, __context: Any) -> None: db_kwargs=self.db_kwargs, **self.backend_configs ).session_service + if self.backend != "local": + wrap_get_session_with_callbacks( + self._session_service, enable_responses_api_for_session_service + ) + if self.after_load_memory_callback: wrap_get_session_with_callbacks( self._session_service, self.after_load_memory_callback From 80a3b7225d8f7723f27c9f1135b70188ec09c28a Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 9 Dec 2025 09:20:14 +0800 Subject: [PATCH 24/26] feat: add package --- pyproject.toml | 5 ++++- veadk/agent.py | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6e4f5350..8061d5e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "deprecated>=1.2.18", "google-adk>=1.10.0", # For basic agent architecture "google-adk<=1.19.0", # For basic agent architecture - "litellm>=1.79.3", # For model inference + "litellm>=1.74.3", # For model inference "loguru>=0.7.3", # For better logging "opentelemetry-exporter-otlp>=1.35.0", "opentelemetry-instrumentation-logging>=0.56b0", @@ -72,6 +72,9 @@ dev = [ "pytest-xdist>=3.8.0", ] +responses = [ + "litellm>=1.79.3" +] [dependency-groups] dev = [ diff --git a/veadk/agent.py b/veadk/agent.py index 2c0c515c..bbe50c57 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -158,6 +158,30 @@ def model_post_init(self, __context: Any) -> None: if not self.model: if self.enable_responses: + min_version = "1.79.3" + from packaging.version import InvalidVersion + from packaging.version import parse as parse_version + import pkg_resources + + try: + installed = parse_version( + pkg_resources.get_distribution("litellm").version + ) + except pkg_resources.DistributionNotFound: + raise ImportError( + "litellm installation not detected, please install it first: pip install litellm>=1.79.3" + ) from None + except InvalidVersion as e: + raise ValueError( + f"Invalid format of litellm version number:{e}" + ) from None + required = parse_version(min_version) + if installed < required: + raise ValueError( + "You have used `enable_responses=True`. If you want to use the `responses_api`, please install the relevant support:" + "\npip install veadk-python[responses]" + ) + from veadk.models.ark_llm import ArkLlm self.model = ArkLlm( From 8f08e5ab6004024b8e08774f6469d20e2360b943 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Tue, 9 Dec 2025 14:09:04 +0800 Subject: [PATCH 25/26] fix: check litellm version --- veadk/agent.py | 24 ++---------------------- veadk/utils/misc.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/veadk/agent.py b/veadk/agent.py index bbe50c57..ce33ee1e 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -50,6 +50,7 @@ from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger from veadk.utils.patches import patch_asyncio, patch_tracer +from veadk.utils.misc import check_litellm_version from veadk.version import VERSION patch_tracer() @@ -159,28 +160,7 @@ def model_post_init(self, __context: Any) -> None: if not self.model: if self.enable_responses: min_version = "1.79.3" - from packaging.version import InvalidVersion - from packaging.version import parse as parse_version - import pkg_resources - - try: - installed = parse_version( - pkg_resources.get_distribution("litellm").version - ) - except pkg_resources.DistributionNotFound: - raise ImportError( - "litellm installation not detected, please install it first: pip install litellm>=1.79.3" - ) from None - except InvalidVersion as e: - raise ValueError( - f"Invalid format of litellm version number:{e}" - ) from None - required = parse_version(min_version) - if installed < required: - raise ValueError( - "You have used `enable_responses=True`. If you want to use the `responses_api`, please install the relevant support:" - "\npip install veadk-python[responses]" - ) + check_litellm_version(min_version) from veadk.models.ark_llm import ArkLlm diff --git a/veadk/utils/misc.py b/veadk/utils/misc.py index 24866922..6c20cf0c 100644 --- a/veadk/utils/misc.py +++ b/veadk/utils/misc.py @@ -182,3 +182,35 @@ def get_agent_dir(): full_path = os.getcwd() return full_path + + +def check_litellm_version(min_version: str): + """ + Check if the installed litellm version meets the minimum requirement. + + Args: + min_version (str): The minimum required version of litellm. + """ + try: + from packaging.version import InvalidVersion + from packaging.version import parse as parse_version + import pkg_resources + + try: + installed = parse_version(pkg_resources.get_distribution("litellm").version) + except pkg_resources.DistributionNotFound: + raise ImportError( + "litellm installation not detected, please install it first: pip install litellm>=1.79.3" + ) from None + except InvalidVersion as e: + raise ValueError(f"Invalid format of litellm version number:{e}") from None + required = parse_version(min_version) + if installed < required: + raise ValueError( + "You have used `enable_responses=True`. If you want to use the `responses_api`, please install the relevant support:" + "\npip install veadk-python[responses]" + ) + except ImportError: + raise ImportError( + "packaging or pkg_resources not found. Please install them: pip install packaging setuptools" + ) From a85b890cb3883d90ece796bbe1831d5c37a36aa7 Mon Sep 17 00:00:00 2001 From: "hanzhi.421" Date: Wed, 17 Dec 2025 22:18:30 +0800 Subject: [PATCH 26/26] chore: litellm version for pyproject --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b97ae00d..45c6e921 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "a2a-sdk==0.3.7", # For Google Agent2Agent protocol "deprecated==1.2.18", "google-adk==1.19.0", # For basic agent architecture - "litellm==1.74.3", # For model inference + "litellm>=1.74.3", # For model inference "loguru==0.7.3", # For better logging "opentelemetry-exporter-otlp==1.37.0", "opentelemetry-instrumentation-logging>=0.56b0",