diff --git a/src/open_responses_server/models/responses_models.py b/src/open_responses_server/models/responses_models.py index 5197985..0513eb3 100644 --- a/src/open_responses_server/models/responses_models.py +++ b/src/open_responses_server/models/responses_models.py @@ -77,7 +77,7 @@ class ToolCallArgumentsDelta(BaseModel): class ToolCallArgumentsDone(BaseModel): type: str = "response.function_call_arguments.done" - id: str + item_id: str output_index: int arguments: str @@ -102,6 +102,23 @@ class ResponseInProgress(BaseModel): type: str = "response.in_progress" response: ResponseModel +class OutputItemAdded(BaseModel): + type: str = "response.output_item.added" + output_index: int + item: Dict + +class OutputItemDone(BaseModel): + type: str = "response.output_item.done" + output_index: int + item: Dict + +class OutputTextDone(BaseModel): + type: str = "response.output_text.done" + item_id: str + output_index: int + content_index: int + text: str + class ResponseCompleted(BaseModel): type: str = "response.completed" - response: ResponseModel \ No newline at end of file + response: ResponseModel \ No newline at end of file diff --git a/src/open_responses_server/responses_service.py b/src/open_responses_server/responses_service.py index 6080a74..cd161f7 100644 --- a/src/open_responses_server/responses_service.py +++ b/src/open_responses_server/responses_service.py @@ -1,21 +1,52 @@ import json import uuid import time +from collections import OrderedDict from typing import Dict, List, Any from open_responses_server.common.config import logger, MAX_CONVERSATION_HISTORY from open_responses_server.common.mcp_manager import mcp_manager, serialize_tool_result from open_responses_server.models.responses_models import ( ResponseModel, ResponseCreated, ResponseInProgress, ResponseCompleted, - ToolCallsCreated, ToolCallArgumentsDelta, ToolCallArgumentsDone, OutputTextDelta + ToolCallsCreated, ToolCallArgumentsDelta, ToolCallArgumentsDone, OutputTextDelta, + OutputItemAdded, OutputItemDone, OutputTextDone ) # Global dictionary to store conversation history by response ID conversation_history: Dict[str, List[Dict[str, Any]]] = {} +# Cache reasoning_content (CoT) keyed by tool call_id for passback. +# Keep a bounded insertion-ordered cache so recent tool-call chains can feed +# reasoning back into the next request without unbounded growth. +reasoning_content_cache: OrderedDict[str, str] = OrderedDict() + def current_timestamp() -> int: return int(time.time()) + +def _stringify_tool_output(output: Any) -> str: + """Normalize tool output into the string payload chat.completions expects.""" + if output is None: + return "" + if isinstance(output, str): + return output + try: + return serialize_tool_result(output) + except TypeError: + return str(output) + + +def _cache_reasoning_content(call_id: str, reasoning_content: str, max_entries: int = 200) -> None: + """Store reasoning content by call_id and evict oldest entries when bounded.""" + if not call_id or not reasoning_content: + return + + reasoning_content_cache[call_id] = reasoning_content + reasoning_content_cache.move_to_end(call_id) + + while len(reasoning_content_cache) > max_entries: + reasoning_content_cache.popitem(last=False) + def validate_message_sequence(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Validate and fix the message sequence to ensure tool messages have preceding assistant messages with tool_calls. @@ -116,105 +147,143 @@ def convert_responses_to_chat_completions(request_data: dict) -> dict: # Check for previous tool responses in the input if "input" in request_data and request_data["input"]: - user_message = {"role": "user", "content": ""} - logger.info(f"Processing input messages {request_data['input']}") + logger.info(f"Processing {len(request_data['input'])} input items") for i, item in enumerate(request_data["input"]): if isinstance(item, dict): - if item.get("type") == "message" and item.get("role") == "user": - # Add user message - content = "" - if "content" in item: - for j, content_item in enumerate(item["content"]): - if isinstance(content_item, dict) and content_item.get("type") == "input_text": - content += content_item.get("text", "") - elif isinstance(content_item, dict) and content_item.get("type") == "text": - content += content_item.get("text", "") - elif isinstance(content_item, str): - content += content_item - user_message = {"role": "user", "content": content} - messages.append(user_message) - # Log user message content for context - logger.info(f"User message: {content[:100]}...") - - elif item.get("type") == "function_call_output": - # Add tool output - log tool usage - logger.info(f"[TOOL-OUTPUT-PROCESSING] Processing function_call_output: call_id={item.get('call_id')}, output={item.get('output', '')[:50]}...") - logger.info(f"[TOOL-OUTPUT-PROCESSING] Full item: {json.dumps(item, indent=2)}") - - # Check if we have a corresponding assistant message with a tool call first - call_id = item.get("call_id") + item_type = item.get("type") + item_role = item.get("role") + + # Handle message items + if item_type == "message": + if item_role == "user": + content = "" + if "content" in item: + for content_item in item["content"]: + if isinstance(content_item, dict) and content_item.get("type") in ("input_text", "text"): + content += content_item.get("text", "") + elif isinstance(content_item, str): + content += content_item + messages.append({"role": "user", "content": content}) + logger.info(f"User message: {content[:100]}...") + + elif item_role == "developer": + # Developer messages → system role in chat completions + content = "" + if "content" in item: + for content_item in item["content"]: + if isinstance(content_item, dict) and content_item.get("type") in ("input_text", "text"): + content += content_item.get("text", "") + elif isinstance(content_item, str): + content += content_item + if content: + # Check if system message already exists + has_system = any(msg.get("role") == "system" for msg in messages) + if has_system: + # Append to existing system message + for msg in messages: + if msg.get("role") == "system": + msg["content"] += "\n" + content + break + else: + messages.append({"role": "system", "content": content}) + logger.info(f"Developer message (as system): {content[:100]}...") + + elif item_role == "assistant": + content = "" + if "content" in item and isinstance(item["content"], list): + for content_item in item["content"]: + if isinstance(content_item, dict) and content_item.get("type") == "output_text": + content += content_item.get("text", "") + if content: + messages.append({"role": "assistant", "content": content}) + logger.info(f"Assistant message: {content[:100]}...") + + # Handle function_call items (assistant's tool calls sent back by client) + elif item_type == "function_call": + call_id = item.get("call_id", item.get("id", f"call_{uuid.uuid4().hex}")) + tool_name = item.get("name", "") + arguments = item.get("arguments", "{}") + + # Look up cached reasoning_content for CoT passback + cached_reasoning = reasoning_content_cache.get(call_id, "") + if cached_reasoning: + logger.info(f"[INPUT] function_call: name={tool_name} call_id={call_id} +reasoning={len(cached_reasoning)} chars") + else: + logger.info(f"[INPUT] function_call: name={tool_name} call_id={call_id}") + + # Group consecutive function_calls into one assistant message + # Check if the last message is an assistant with tool_calls + if messages and messages[-1].get("role") == "assistant" and "tool_calls" in messages[-1]: + messages[-1]["tool_calls"].append({ + "id": call_id, + "type": "function", + "function": {"name": tool_name, "arguments": arguments} + }) + # Merge reasoning: use longest (first call's reasoning covers all) + if cached_reasoning and not messages[-1].get("reasoning_content"): + messages[-1]["reasoning_content"] = cached_reasoning + else: + assistant_msg = { + "role": "assistant", + "content": None, + "tool_calls": [{ + "id": call_id, + "type": "function", + "function": {"name": tool_name, "arguments": arguments} + }] + } + if cached_reasoning: + assistant_msg["reasoning_content"] = cached_reasoning + messages.append(assistant_msg) + + # Handle function_call_output items (tool results) + elif item_type == "function_call_output": + call_id = item.get("call_id") or item.get("id") or f"call_{uuid.uuid4().hex}" + output = _stringify_tool_output(item.get("output", "")) + logger.info(f"[INPUT] function_call_output: call_id={call_id} output_len={len(str(output))}") + + # Check if we have a corresponding assistant message with a matching tool call has_matching_tool_call = False - - # Look for a matching tool call in the existing messages for msg in messages: if msg.get("role") == "assistant" and "tool_calls" in msg: for tool_call in msg["tool_calls"]: if tool_call.get("id") == call_id: has_matching_tool_call = True break - - # Debug: Log messages structure for debugging - logger.info(f"[TOOL-OUTPUT-PROCESSING] Messages so far: {len(messages)} messages") - for i, msg in enumerate(messages): - logger.info(f"[TOOL-OUTPUT-PROCESSING] Message {i}: role={msg.get('role')}, has_tool_calls={'tool_calls' in msg}") - if msg.get("role") == "tool": - logger.info(f"[TOOL-OUTPUT-PROCESSING] Tool message {i}: call_id={msg.get('tool_call_id')}") - + if has_matching_tool_call: - # Only add the tool response if we found a matching tool call - tool_message = { + messages.append({ "role": "tool", "tool_call_id": call_id, - "content": item.get("output", "") - } - messages.append(tool_message) - logger.info(f"[TOOL-OUTPUT-PROCESSING] Added tool response for existing tool call {call_id}") + "content": output + }) + logger.info(f"[INPUT] Added tool response for call_id={call_id}") else: - # If no matching tool call, we need to add an assistant message with the tool call first - # as this could be from a previous conversation + # Fallback: create synthetic assistant + tool message tool_name = item.get("name", "unknown_tool") - - # Validate we have required fields - if not tool_name or tool_name == "unknown_tool": - logger.error(f"[TOOL-OUTPUT-PROCESSING] Cannot create tool call without tool name. Item: {item}") - continue - - # Create an assistant message with a tool call - assistant_message = { - "role": "assistant", - "content": None, - "tool_calls": [{ - "id": call_id, - "type": "function", - "function": { - "name": tool_name, - "arguments": item.get("arguments", "{}") - } - }] - } - messages.append(assistant_message) - - # Then add the tool response - tool_message = { - "role": "tool", - "tool_call_id": call_id, - "content": item.get("output", "") - } - messages.append(tool_message) - logger.info(f"[TOOL-OUTPUT-PROCESSING] Added assistant message with tool call and corresponding tool response for {tool_name}") - elif item.get("type") == "message" and item.get("role") == "assistant": - # Handle assistant messages from previous conversations - content = "" - if "content" in item and isinstance(item["content"], list): - for content_item in item["content"]: - if isinstance(content_item, dict) and content_item.get("type") == "output_text": - content += content_item.get("text", "") - - if content: - messages.append({"role": "assistant", "content": content}) - logger.info(f"Added assistant message: {content[:100]}...") + if tool_name and tool_name != "unknown_tool": + messages.append({ + "role": "assistant", + "content": None, + "tool_calls": [{ + "id": call_id, + "type": "function", + "function": { + "name": tool_name, + "arguments": item.get("arguments", "{}") + } + }] + }) + messages.append({ + "role": "tool", + "tool_call_id": call_id, + "content": output + }) + logger.info(f"[INPUT] Created synthetic assistant+tool for {tool_name} call_id={call_id}") + else: + logger.warning(f"[INPUT] Skipping orphaned function_call_output: call_id={call_id}, no tool name") + elif isinstance(item, str): - # Simple string input messages.append({"role": "user", "content": item}) logger.info(f"User message (string): {item[:100]}...") @@ -324,6 +393,7 @@ async def process_chat_completions_stream(response, chat_request=None): tool_call_counter = 0 message_id = f"msg_{uuid.uuid4().hex}" output_text_content = "" # Track the full text content for logging + reasoning_content = "" # Accumulate reasoning/CoT from model for passback request_start_time = time.time() last_chunk_time = request_start_time logger.info(f"[STREAM-START] response_id={response_id} message_id={message_id}") @@ -353,6 +423,35 @@ async def process_chat_completions_stream(response, chat_request=None): yield f"data: {json.dumps(in_progress_event.dict())}\n\n" chunk_counter = 0 + + def ensure_tool_call_added(tool_call: Dict[str, Any]) -> str | None: + """Emit output_item.added once per tool call after its name becomes available.""" + tool_name = tool_call["function"]["name"] + if tool_call.get("added_emitted") or not tool_name: + return None + + logger.info(f"Tool call created: {tool_name}") + is_mcp = mcp_manager.is_mcp_tool(tool_name) + logger.info(f"[TOOL-CALL-CREATED] Tool '{tool_name}': is_mcp={is_mcp}, status=in_progress") + + fc_item = { + "arguments": "", + "call_id": tool_call["id"], + "name": tool_name, + "type": "function_call", + "id": tool_call["id"], + "status": "in_progress" + } + response_obj.output.append(fc_item) + tool_call["added_emitted"] = True + + item_added_event = OutputItemAdded( + output_index=tool_call["output_index"], + item=fc_item + ) + logger.info(f"Emitting output_item.added for '{tool_name}'") + return f"data: {json.dumps(item_added_event.dict())}\n\n" + try: async for chunk in response.aiter_lines(): chunk_counter += 1 @@ -378,15 +477,26 @@ async def process_chat_completions_stream(response, chat_request=None): # If we haven't already completed the response, do it now if response_obj.status != "completed": - # If no output, add empty message - if not response_obj.output: - response_obj.output.append({ - "id": message_id, - "type": "message", - "role": "assistant", - "content": [{"type": "output_text", "text": f"{output_text_content}\n\n" or "Done"}] - }) - + final_text = output_text_content or "" + + # Emit text closing events if we had text content + if final_text: + yield f"data: {json.dumps({'type': 'response.output_text.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'text': final_text})}\n\n" + yield f"data: {json.dumps({'type': 'response.content_part.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': final_text, 'annotations': []}})}\n\n" + + final_msg_item = { + "id": message_id, + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": final_text, "annotations": []}] + } + + # Emit output_item.done if we have text + if final_text: + yield f"data: {json.dumps({'type': 'response.output_item.done', 'output_index': 0, 'item': final_msg_item})}\n\n" + + response_obj.output = [final_msg_item] if final_text else response_obj.output response_obj.status = "completed" completed_event = ResponseCompleted( type="response.completed", @@ -480,80 +590,64 @@ async def process_chat_completions_stream(response, chat_request=None): # Initialize tool call if not exists if index not in tool_calls: + tool_call_id = tool_delta.get("id", f"call_{uuid.uuid4().hex}") tool_calls[index] = { - "id": tool_delta.get("id", f"call_{uuid.uuid4().hex}"), + "id": tool_call_id, "type": tool_delta.get("type", "function"), "function": { "name": tool_delta.get("function", {}).get("name", ""), - "arguments": tool_delta.get("function", {}).get("arguments", ""), + "arguments": "", }, - "item_id": f"tool_call_{uuid.uuid4().hex}", - "output_index": tool_call_counter + "output_index": tool_call_counter, + "added_emitted": False, } - - # If we got a tool name, emit the created event - if "function" in tool_delta and "name" in tool_delta["function"]: - tool_call = tool_calls[index] - tool_call["function"]["name"] = tool_delta["function"]["name"] - # Log tool call creation - logger.info(f"Tool call created: {tool_call['function']['name']}") - - # Check if this is an MCP tool or a user-defined tool - is_mcp = mcp_manager.is_mcp_tool(tool_call["function"]["name"]) - tool_status = "in_progress" if is_mcp else "ready" - - logger.info(f"[TOOL-CALL-CREATED] Tool '{tool_call['function']['name']}': is_mcp={is_mcp}, status={tool_status}") - - # Add the tool call to the response output in Responses API format - response_obj.output.append({ - "arguments": tool_call["function"]["arguments"], - "call_id": tool_call["id"], - "name": tool_call["function"]["name"], - "type": "function_call", - "id": tool_call["id"], - "status": tool_status - }) - - # Emit the in_progress event - in_progress_event = ResponseInProgress( - type="response.in_progress", - response=response_obj - ) - - logger.info(f"Emitting {in_progress_event}") - yield f"data: {json.dumps(in_progress_event.dict())}\n\n" + tool_call_counter += 1 + + tool_call = tool_calls[index] - tool_call_counter += 1 + if "function" in tool_delta and "name" in tool_delta["function"]: + tool_call["function"]["name"] = tool_delta["function"]["name"] + item_added_payload = ensure_tool_call_added(tool_call) + if item_added_payload: + yield item_added_payload # Process function arguments if present if "function" in tool_delta and "arguments" in tool_delta["function"]: arg_fragment = tool_delta["function"]["arguments"] tool_calls[index]["function"]["arguments"] += arg_fragment - + # Emit delta event args_event = ToolCallArgumentsDelta( type="response.function_call_arguments.delta", - item_id=tool_calls[index]["item_id"], + item_id=tool_calls[index]["id"], output_index=tool_calls[index]["output_index"], delta=arg_fragment ) - + yield f"data: {json.dumps(args_event.dict())}\n\n" # Handle content (text) elif "content" in delta and delta["content"] is not None: content_delta = delta["content"] output_text_content += content_delta - - # Create a new message if it doesn't exist - if not response_obj.output: - response_obj.output.append({ + + # On first text chunk, emit output_item.added + content_part.added + if not response_obj.output or not any( + o.get("type") == "message" for o in response_obj.output + ): + msg_item = { "id": message_id, "type": "message", "role": "assistant", - "content": [{"type": "output_text", "text": output_text_content or "(No update)"}] - }) - + "status": "in_progress", + "content": [] + } + response_obj.output.append(msg_item) + # output_item.added + yield f"data: {json.dumps({'type': 'response.output_item.added', 'output_index': 0, 'item': msg_item})}\n\n" + # content_part.added + yield f"data: {json.dumps({'type': 'response.content_part.added', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': '', 'annotations': []}})}\n\n" + # Emit text delta event text_event = OutputTextDelta( type="response.output_text.delta", @@ -562,9 +656,12 @@ async def process_chat_completions_stream(response, chat_request=None): content_index=0, delta=content_delta ) - yield f"data: {json.dumps(text_event.dict())}\n\n" - + + # Accumulate reasoning_content (CoT) from model for passback + if "reasoning_content" in delta and delta["reasoning_content"] is not None: + reasoning_content += delta["reasoning_content"] + if "finish_reason" in choice and choice["finish_reason"] is not None: logger.info(f"Received finish_reason: {choice['finish_reason']}") @@ -618,7 +715,7 @@ async def process_chat_completions_stream(response, chat_request=None): else: # For non-MCP tools, send the function call back to the client in Responses API format logger.info(f"[TOOL-EXECUTE] Forwarding non-MCP tool call to client: {tool_name}") - + # Include the function call in the response response_obj.output.append({ "id": tool_call["id"], @@ -626,16 +723,21 @@ async def process_chat_completions_stream(response, chat_request=None): "name": tool_name, "arguments": tool_call["function"]["arguments"], "call_id": tool_call["id"], - "status": "ready" + "status": "completed" }) - + + # Cache reasoning for CoT passback + if reasoning_content: + _cache_reasoning_content(tool_call["id"], reasoning_content) + logger.info(f"[COT-PASSBACK] Cached reasoning ({len(reasoning_content)} chars) for call_id={tool_call['id']}") + # After tool handling, complete the response response_obj.status = "completed" completed_event = ResponseCompleted( type="response.completed", response=response_obj ) - + # Save conversation history if we have chat_request available if chat_request: # Get the existing messages from the request @@ -654,8 +756,12 @@ async def process_chat_completions_stream(response, chat_request=None): } }] } + # Preserve reasoning_content for CoT passback on tool call turns + if reasoning_content: + assistant_message["reasoning_content"] = reasoning_content + logger.info(f"[COT-PASSBACK] Stored {len(reasoning_content)} chars of reasoning_content in history") messages.append(assistant_message) - + # Add the tool response for immediate tools if mcp_manager.is_mcp_tool(tool_name): # For MCP tools, also add the tool response @@ -694,50 +800,67 @@ async def process_chat_completions_stream(response, chat_request=None): logger.info(f"[TOOL-CALLS-FINISH] Tool '{tool_call['function']['name']}': is_mcp={is_mcp}") + # Emit the arguments.done event (same for MCP and non-MCP) + done_event = ToolCallArgumentsDone( + type="response.function_call_arguments.done", + item_id=tool_call["id"], + output_index=tool_call["output_index"], + arguments=tool_call["function"]["arguments"] + ) + logger.info(f"Emitting arguments.done for '{tool_call['function']['name']}'") + yield f"data: {json.dumps(done_event.dict())}\n\n" + + # Update the function_call item in output: set final arguments and status + for output_item in response_obj.output: + if output_item.get("id") == tool_call["id"] and output_item.get("type") == "function_call": + output_item["arguments"] = tool_call["function"]["arguments"] + output_item["status"] = "completed" + break + + # Emit response.output_item.done with completed status + done_fc_item = { + "arguments": tool_call["function"]["arguments"], + "call_id": tool_call["id"], + "name": tool_call["function"]["name"], + "type": "function_call", + "id": tool_call["id"], + "status": "completed" + } + item_done_event = OutputItemDone( + output_index=tool_call["output_index"], + item=done_fc_item + ) + logger.info(f"Emitting output_item.done for '{tool_call['function']['name']}'") + yield f"data: {json.dumps(item_done_event.dict())}\n\n" + # For MCP tools, execute them immediately if is_mcp: logger.info(f"[TOOL-CALLS-FINISH] Executing MCP tool '{tool_call['function']['name']}'") - - # Parse the arguments JSON + try: args = json.loads(tool_call["function"]["arguments"]) except Exception: args = {} - - # Execute MCP tool + try: result = await mcp_manager.execute_mcp_tool(tool_call["function"]["name"], args) - logger.info(f"[TOOL-CALLS-FINISH] ✓ MCP tool '{tool_call['function']['name']}' executed successfully") - logger.debug(f"[TOOL-CALLS-FINISH] MCP tool result: {result}") + logger.info(f"[TOOL-CALLS-FINISH] MCP tool '{tool_call['function']['name']}' executed successfully") except Exception as e: result = {"error": str(e)} - logger.error(f"[TOOL-CALLS-FINISH] ✗ MCP tool '{tool_call['function']['name']}' failed: {e}") - - # Emit the arguments.done event - done_event = ToolCallArgumentsDone( - type="response.function_call_arguments.done", - id=tool_call["item_id"], - output_index=tool_call["output_index"], - arguments=tool_call["function"]["arguments"] - ) - logger.info(f"Emitting {done_event}") - yield f"data: {json.dumps(done_event.dict())}\n\n" - - # Add the tool execution result to the response + logger.error(f"[TOOL-CALLS-FINISH] MCP tool '{tool_call['function']['name']}' failed: {e}") + response_obj.output.append({ "id": tool_call["id"], "type": "function_call_output", "call_id": tool_call["id"], "output": serialize_tool_result(result) }) - - # Convert result to JSON for text delta + try: text = serialize_tool_result(result) except TypeError: text = serialize_tool_result(str(result)) - - # Emit text delta with the result + text_event = OutputTextDelta( type="response.output_text.delta", item_id=tool_call["id"], @@ -746,44 +869,17 @@ async def process_chat_completions_stream(response, chat_request=None): delta=text ) yield f"data: {json.dumps(text_event.dict())}\n\n" - logger.info(f"[TOOL-CALLS-FINISH] Added function_call_output for MCP tool '{tool_call['function']['name']}'") - else: - # For non-MCP tools, emit arguments.done and leave them in ready state for client - logger.info(f"[TOOL-CALLS-FINISH] Keeping non-MCP tool '{tool_call['function']['name']}' in ready state for client") - - done_event = ToolCallArgumentsDone( - type="response.function_call_arguments.done", - id=tool_call["item_id"], - output_index=tool_call["output_index"], - arguments=tool_call["function"]["arguments"] - ) - logger.info(f"Emitting {done_event}") - yield f"data: {json.dumps(done_event.dict())}\n\n" - - # Update response object for non-MCP tools - # Find any existing entry for this tool call and update args - found = False - for output_item in response_obj.output: - if output_item.get("id") == tool_call["id"] and output_item.get("type") == "function_call": - output_item["arguments"] = tool_call["function"]["arguments"] - found = True - logger.info(f"[TOOL-CALLS-FINISH] Updated existing function_call entry for '{tool_call['function']['name']}'") - break - - # If not found, add it - if not found: - response_obj.output.append({ - "id": tool_call["id"], - "type": "function_call", - "name": tool_call["function"]["name"], - "arguments": tool_call["function"]["arguments"], - "call_id": tool_call["id"], - "status": "ready" - }) - logger.info(f"[TOOL-CALLS-FINISH] Added new function_call entry for '{tool_call['function']['name']}'") - + logger.info(f"[TOOL-CALLS-FINISH] Non-MCP tool '{tool_call['function']['name']}' completed, client will execute") + + # Cache reasoning_content keyed by call_ids for CoT passback + # When Codex CLI sends these call_ids back, we inject the reasoning + if reasoning_content: + for tc in tool_calls.values(): + _cache_reasoning_content(tc["id"], reasoning_content) + logger.info(f"[COT-PASSBACK] Cached reasoning ({len(reasoning_content)} chars) for {len(tool_calls)} call_ids") + # After processing all tool calls, complete the response response_obj.status = "completed" completed_event = ResponseCompleted( @@ -809,8 +905,12 @@ async def process_chat_completions_stream(response, chat_request=None): } } for tool_call in tool_calls.values()] } + # Preserve reasoning_content for CoT passback on tool call turns + if reasoning_content: + assistant_message["reasoning_content"] = reasoning_content + logger.info(f"[COT-PASSBACK] Stored {len(reasoning_content)} chars of reasoning_content in history") messages.append(assistant_message) - + # Add tool responses for executed MCP tools for tool_call in tool_calls.values(): if mcp_manager.is_mcp_tool(tool_call["function"]["name"]): @@ -847,30 +947,52 @@ async def process_chat_completions_stream(response, chat_request=None): # If the finish reason is "stop", emit the completed event if choice["finish_reason"] == "stop": logger.info("Received stop finish reason") - # If we have any text content, add it to the output - if not response_obj.output: - response_obj.output.append({ + + final_text = output_text_content or "" + has_message_output = any( + output_item.get("type") == "message" + for output_item in response_obj.output + ) + + if not has_message_output: + added_msg_item = { "id": message_id, "type": "message", "role": "assistant", - "content": [{"type": "output_text", "text": f"{output_text_content}\n\n" or "Done"}] - }) - - # Log complete output text - logger.info(f"Response completed with text: {output_text_content[:100]}...\n\n") - - response_obj.status = "completed" - response_obj.output= [{ + "status": "in_progress", + "content": [] + } + yield f"data: {json.dumps({'type': 'response.output_item.added', 'output_index': 0, 'item': added_msg_item})}\n\n" + yield f"data: {json.dumps({'type': 'response.content_part.added', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': '', 'annotations': []}})}\n\n" + + # Emit text closing events: output_text.done, content_part.done, output_item.done + if final_text: + # output_text.done + yield f"data: {json.dumps({'type': 'response.output_text.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'text': final_text})}\n\n" + # content_part.done + yield f"data: {json.dumps({'type': 'response.content_part.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': final_text, 'annotations': []}})}\n\n" + + # Build the final message item + final_msg_item = { "id": message_id, "type": "message", "role": "assistant", - "content": [{"type": "output_text", "text": output_text_content or "(No update)"}] - }] + "status": "completed", + "content": [{"type": "output_text", "text": final_text, "annotations": []}] + } + + # output_item.done + yield f"data: {json.dumps({'type': 'response.output_item.done', 'output_index': 0, 'item': final_msg_item})}\n\n" + + logger.info(f"Response completed with text: {final_text[:100]}...") + + response_obj.status = "completed" + response_obj.output = [final_msg_item] completed_event = ResponseCompleted( type="response.completed", response=response_obj ) - + # Save conversation history if we have chat_request available if chat_request: # Get the existing messages from the request @@ -879,7 +1001,7 @@ async def process_chat_completions_stream(response, chat_request=None): # Add the assistant response to the conversation history messages.append({ "role": "assistant", - "content": output_text_content or "(No update)" + "content": final_text }) # Store in conversation history @@ -919,4 +1041,4 @@ async def process_chat_completions_stream(response, chat_request=None): response=response_obj ) - yield f"data: {json.dumps(completed_event.dict())}\n\n" \ No newline at end of file + yield f"data: {json.dumps(completed_event.dict())}\n\n" diff --git a/tests/test_responses_service.py b/tests/test_responses_service.py index d893913..859ef65 100644 --- a/tests/test_responses_service.py +++ b/tests/test_responses_service.py @@ -16,6 +16,8 @@ validate_message_sequence, process_chat_completions_stream, conversation_history, + reasoning_content_cache, + _cache_reasoning_content, ) @@ -31,6 +33,15 @@ def parse_sse(raw: str) -> dict: return json.loads(text) +@pytest.fixture(autouse=True) +def clear_global_response_state(): + conversation_history.clear() + reasoning_content_cache.clear() + yield + conversation_history.clear() + reasoning_content_cache.clear() + + # =================================================================== # 1. validate_message_sequence # =================================================================== @@ -132,6 +143,28 @@ def test_multiple_tool_calls_same_assistant(self): assert len(result) == 4 +class TestReasoningContentCache: + """Tests for reasoning_content cache eviction behavior.""" + + def test_cache_reasoning_content_evicts_oldest_entries(self): + """Cache should keep only the most recent bounded entries.""" + _cache_reasoning_content("call_1", "r1", max_entries=2) + _cache_reasoning_content("call_2", "r2", max_entries=2) + _cache_reasoning_content("call_3", "r3", max_entries=2) + + assert list(reasoning_content_cache.keys()) == ["call_2", "call_3"] + + def test_cache_reasoning_content_refreshes_existing_key(self): + """Reinserting an existing call_id should move it to the newest position.""" + _cache_reasoning_content("call_1", "r1", max_entries=2) + _cache_reasoning_content("call_2", "r2", max_entries=2) + _cache_reasoning_content("call_1", "r1-new", max_entries=2) + _cache_reasoning_content("call_3", "r3", max_entries=2) + + assert list(reasoning_content_cache.keys()) == ["call_1", "call_3"] + assert reasoning_content_cache["call_1"] == "r1-new" + + # =================================================================== # 2. convert_responses_to_chat_completions # =================================================================== @@ -285,6 +318,56 @@ def test_function_call_output_without_matching_creates_pair(self): assert len(tool_msgs) >= 1 assert tool_msgs[0]["tool_call_id"] == "call_new" + def test_function_call_output_falls_back_to_id_field(self): + """function_call_output should accept id when call_id is absent.""" + conversation_history["prev_fc_id"] = [ + {"role": "user", "content": "do something"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "call_from_id", "type": "function", "function": {"name": "my_tool", "arguments": "{}"}}, + ], + }, + ] + req = { + "model": "m", + "previous_response_id": "prev_fc_id", + "input": [ + { + "type": "function_call_output", + "id": "call_from_id", + "name": "my_tool", + "output": "tool result here", + } + ], + } + result = convert_responses_to_chat_completions(req) + tool_msgs = [m for m in result["messages"] if m.get("role") == "tool"] + assert len(tool_msgs) == 1 + assert tool_msgs[0]["tool_call_id"] == "call_from_id" + assert tool_msgs[0]["content"] == "tool result here" + + def test_function_call_output_normalizes_non_string_output(self): + """function_call_output content should be stringified for chat.completions.""" + req = { + "model": "m", + "input": [ + { + "type": "function_call_output", + "call_id": "call_structured", + "name": "new_tool", + "output": {"ok": True, "items": [1, 2]}, + } + ], + } + result = convert_responses_to_chat_completions(req) + tool_msgs = [m for m in result["messages"] if m.get("role") == "tool"] + assert len(tool_msgs) == 1 + assert tool_msgs[0]["tool_call_id"] == "call_structured" + assert isinstance(tool_msgs[0]["content"], str) + assert json.loads(tool_msgs[0]["content"]) == {"ok": True, "items": [1, 2]} + def test_function_call_output_without_tool_name_skipped(self): """function_call_output without a tool name is skipped (continues).""" req = { @@ -656,12 +739,12 @@ async def test_tool_calls_finish_with_non_mcp_tool( done_evts = [e for e in events if e["type"] == "response.function_call_arguments.done"] assert len(done_evts) >= 1 - # The tool call in the completed response should have status "ready" + # The tool call in the completed response should have status "completed" completed = [e for e in events if e["type"] == "response.completed"] assert len(completed) == 1 fc_items = [o for o in completed[0]["response"]["output"] if o.get("type") == "function_call"] assert len(fc_items) >= 1 - assert fc_items[0]["status"] == "ready" + assert fc_items[0]["status"] == "completed" async def test_function_call_finish_with_mcp_tool( self, mock_stream_response, mock_mcp_manager_fixture @@ -719,7 +802,7 @@ async def test_function_call_finish_with_non_mcp_tool( fc_items = [o for o in completed[0]["response"]["output"] if o.get("type") == "function_call"] assert len(fc_items) >= 1 assert fc_items[0]["name"] == "client_tool" - assert fc_items[0]["status"] == "ready" + assert fc_items[0]["status"] == "completed" async def test_conversation_history_saved_on_stop(self, mock_stream_response): """Conversation history is saved when finish_reason is 'stop'.""" @@ -863,7 +946,7 @@ async def test_tool_call_arguments_delta_events( async def test_tool_calls_created_event_emitted( self, mock_stream_response, mock_mcp_manager_fixture ): - """When a tool call is first seen, an in_progress event is emitted.""" + """When a tool call is first seen, an output_item.added event is emitted.""" mock_mcp = mock_mcp_manager_fixture mock_mcp.is_mcp_tool.return_value = False @@ -877,10 +960,47 @@ async def test_tool_calls_created_event_emitted( events = [parse_sse(e) async for e in process_chat_completions_stream(mock_resp, chat_req)] - # Should have in_progress event for the tool call - in_progress = [e for e in events if e["type"] == "response.in_progress"] - # At least 2: one initial, one when tool call is created - assert len(in_progress) >= 2 + # Should have output_item.added event for the tool call + item_added = [e for e in events if e["type"] == "response.output_item.added"] + assert len(item_added) >= 1 + assert item_added[0]["item"]["type"] == "function_call" + assert item_added[0]["item"]["status"] == "in_progress" + + # Should also have output_item.done event + item_done = [e for e in events if e["type"] == "response.output_item.done"] + assert len(item_done) >= 1 + assert item_done[0]["item"]["status"] == "completed" + + async def test_tool_calls_name_arrives_later_keep_unique_output_indexes( + self, mock_stream_response, mock_mcp_manager_fixture + ): + """Tool calls with delayed function names should still get unique output indexes.""" + mock_mcp = mock_mcp_manager_fixture + mock_mcp.is_mcp_tool.return_value = False + + lines = [ + 'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_a","type":"function","function":{"arguments":""}},{"index":1,"id":"call_b","type":"function","function":{"name":"tool_b","arguments":""}}]},"index":0}],"model":"m"}', + 'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"name":"tool_a","arguments":"{}"}},{"index":1,"function":{"arguments":"{}"}}]},"index":0}]}', + 'data: {"choices":[{"delta":{},"finish_reason":"tool_calls","index":0}]}', + 'data: [DONE]', + ] + mock_resp = mock_stream_response(lines) + chat_req = {"messages": [{"role": "user", "content": "hi"}]} + + events = [parse_sse(e) async for e in process_chat_completions_stream(mock_resp, chat_req)] + + item_added = [e for e in events if e["type"] == "response.output_item.added"] + item_done = [e for e in events if e["type"] == "response.output_item.done" and e["item"]["type"] == "function_call"] + + assert len(item_added) >= 2 + assert len(item_done) >= 2 + + added_by_call = {e["item"]["id"]: e["output_index"] for e in item_added if e["item"]["type"] == "function_call"} + done_by_call = {e["item"]["id"]: e["output_index"] for e in item_done} + + assert added_by_call["call_a"] != added_by_call["call_b"] + assert done_by_call["call_a"] == added_by_call["call_a"] + assert done_by_call["call_b"] == added_by_call["call_b"] async def test_function_call_legacy_created_event( self, mock_stream_response, mock_mcp_manager_fixture @@ -1030,7 +1150,7 @@ async def test_response_id_format(self, mock_stream_response): assert created[0]["response"]["id"].startswith("resp_") async def test_stop_with_no_output_adds_empty_message(self, mock_stream_response): - """Stop finish_reason with empty output_text adds message with fallback text.""" + """Stop finish_reason with empty output_text adds message with empty text.""" lines = [ 'data: {"choices":[{"delta":{},"finish_reason":"stop","index":0}],"model":"m"}', 'data: [DONE]', @@ -1042,5 +1162,39 @@ async def test_stop_with_no_output_adds_empty_message(self, mock_stream_response assert len(completed) >= 1 output = completed[0]["response"]["output"] assert len(output) >= 1 - # Should have the fallback "(No update)" text - assert output[0]["content"][0]["text"] == "(No update)" + assert output[0]["content"][0]["text"] == "" + + async def test_stop_with_no_output_saves_empty_history_message(self, mock_stream_response): + """Conversation history should match the empty assistant output on stop-without-text.""" + lines = [ + 'data: {"choices":[{"delta":{},"finish_reason":"stop","index":0}],"model":"m"}', + 'data: [DONE]', + ] + mock_resp = mock_stream_response(lines) + chat_req = {"messages": [{"role": "user", "content": "hi"}]} + + _events = [parse_sse(e) async for e in process_chat_completions_stream(mock_resp, chat_req)] + + assert len(conversation_history) == 1 + saved_key = list(conversation_history.keys())[0] + saved_msgs = conversation_history[saved_key] + assistant_msgs = [m for m in saved_msgs if m["role"] == "assistant"] + assert len(assistant_msgs) == 1 + assert assistant_msgs[0]["content"] == "" + + async def test_stop_with_no_output_emits_added_before_done(self, mock_stream_response): + """Empty stop responses should still emit a valid message item lifecycle.""" + lines = [ + 'data: {"choices":[{"delta":{},"finish_reason":"stop","index":0}],"model":"m"}', + 'data: [DONE]', + ] + mock_resp = mock_stream_response(lines) + + events = [parse_sse(e) async for e in process_chat_completions_stream(mock_resp)] + event_types = [e["type"] for e in events] + + assert "response.output_item.added" in event_types + assert "response.content_part.added" in event_types + assert "response.output_item.done" in event_types + assert event_types.index("response.output_item.added") < event_types.index("response.output_item.done") + assert event_types.index("response.content_part.added") < event_types.index("response.output_item.done")