Skip to content

Commit 47ac66b

Browse files
committed
Merge branch 'development' into staging
2 parents 34c4a2d + 84776b3 commit 47ac66b

5 files changed

Lines changed: 77 additions & 101 deletions

File tree

src/server/main/chat/prompts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
STAGE_1_SYSTEM_PROMPT = """
22
You are an expert Triage AI. You have two primary responsibilities:
33
1. Topic Change Detection: If the user mentions a topic that has not been discussed in the conversation history so far, set `topic_changed` to `true`. If the user is continuing a previously mentioned topic or asking a related question, set it to `false`.
4-
2. Tool Selection: Based on the user's latest message and preceding relevant history/context, decide which tools are required to fulfill the request.
4+
2. Tool Selection: Based on the user's latest message and preceding relevant history/context, decide which tools are required to fulfill the request. If the topic hasn't changed, keep the previous tools in your `tools` list.
55
66
CRITICAL INSTRUCTIONS:
77
- `topic_changed` (boolean): Set to `true` if the latest user message mentions a topic that has never been mentioned in the conversation history.

src/server/mcp_hub/gmail/main.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,20 @@ async def _execute_tool(ctx: Context, func, *args, **kwargs) -> Dict[str, Any]:
7373
# --- Sync Tool Implementations ---
7474

7575
def _send_email_sync(service, to: str, subject: str, body: str):
76-
message_raw = base64.urlsafe_b64encode(MIMEText(body).as_bytes()).decode()
77-
message_body = {"raw": message_raw, "to": to, "subject": subject}
76+
msg = MIMEText(body)
77+
msg["to"] = to
78+
msg["subject"] = subject
79+
80+
# Optional: add From header to avoid confusion
81+
msg["from"] = "me"
82+
83+
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
84+
message_body = {"raw": raw}
85+
7886
service.users().messages().send(userId="me", body=message_body).execute()
7987
return {"message": "Email sent successfully."}
8088

89+
8190
def _reply_to_email_sync(service, message_id: str, body: str, reply_all: bool = False):
8291
original_msg = service.users().messages().get(userId="me", id=message_id, format="metadata", metadataHeaders=["subject", "from", "to", "cc", "message-id", "references"]).execute()
8392
headers = {h['name'].lower(): h['value'] for h in original_msg['payload']['headers']}
@@ -262,8 +271,11 @@ async def applyLabels(ctx: Context, message_id: str, label_ids: List[str]) -> Di
262271
async def createDraft(ctx: Context, to: str, subject: str, body: str) -> Dict[str, Any]:
263272
"""Create a new draft email that can be edited before sending."""
264273
def _sync(service, to, subject, body):
265-
message_raw = base64.urlsafe_b64encode(MIMEText(body).as_bytes()).decode()
266-
message = {"message": {"raw": message_raw, "to": to, "subject": subject}}
274+
msg = MIMEText(body)
275+
msg["to"] = to
276+
msg["subject"] = subject
277+
message_raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
278+
message = {"message": {"raw": message_raw}}
267279
draft = service.users().drafts().create(userId="me", body=message).execute()
268280
return {"draft_id": draft['id'], "message": "Draft created successfully."}
269281
return await _execute_tool(ctx, _sync, to=to, subject=subject, body=body)
@@ -371,8 +383,13 @@ async def removeLabels(ctx: Context, message_id: str, label_ids: List[str]) -> D
371383
async def updateDraft(ctx: Context, draft_id: str, to: Optional[str] = None, subject: Optional[str] = None, body: Optional[str] = None) -> Dict[str, Any]:
372384
"""Update an existing draft email with new content."""
373385
def _sync(service, draft_id, to, subject, body):
374-
message_raw = base64.urlsafe_b64encode(MIMEText(body).as_bytes()).decode()
375-
message = {"message": {"raw": message_raw, "to": to, "subject": subject}}
386+
msg = MIMEText(body)
387+
if to:
388+
msg["to"] = to
389+
if subject:
390+
msg["subject"] = subject
391+
message_raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
392+
message = {"message": {"raw": message_raw}}
376393
updated_draft = service.users().drafts().update(userId="me", id=draft_id, body=message).execute()
377394
return {"draft_id": updated_draft['id'], "message": "Draft updated."}
378395
return await _execute_tool(ctx, _sync, draft_id=draft_id, to=to, subject=subject, body=body)

src/server/workers/planner/llm.py

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,38 +9,6 @@
99

1010
logger = logging.getLogger(__name__)
1111

12-
def run_agent_with_fallback(system_message: str, function_list: list, messages: list):
13-
"""
14-
Initializes and runs a Qwen Assistant, trying a list of API keys in sequence if failures occur.
15-
This function is a generator that yields the results from the successful agent run.
16-
"""
17-
if not OPENAI_API_KEYS:
18-
raise ValueError("No OpenAI API keys configured.")
19-
20-
errors = []
21-
for i, key in enumerate(OPENAI_API_KEYS):
22-
llm_cfg = {
23-
'model': OPENAI_MODEL_NAME,
24-
'model_server': OPENAI_API_BASE_URL,
25-
'api_key': key,
26-
}
27-
28-
try:
29-
logger.info(f"Attempting to run agent with API key #{i+1}")
30-
bot = Assistant(llm=llm_cfg, system_message=system_message, function_list=function_list or [])
31-
32-
yield from bot.run(messages=messages)
33-
return # If the stream completes successfully, exit the generator.
34-
35-
except Exception as e:
36-
error_message = f"Agent run with API key #{i+1} failed: {e}"
37-
logger.warning(error_message, exc_info=True)
38-
errors.append(error_message)
39-
continue # Try the next key
40-
41-
# If the loop completes, all keys have failed
42-
raise Exception(f"All OpenAI API keys failed. Errors: {errors}")
43-
4412
def get_planner_agent(available_tools: dict, current_time_str: str, user_name: str, user_location: str, retrieved_context: dict = None):
4513
"""Initializes and returns a Qwen Assistant agent for planning."""
4614

@@ -64,23 +32,4 @@ def get_planner_agent(available_tools: dict, current_time_str: str, user_name: s
6432
return {
6533
"system_message": system_prompt,
6634
"function_list": []
67-
}
68-
69-
def get_question_generator_agent(
70-
original_context: dict,
71-
available_tools_for_prompt: dict,
72-
mcp_servers_for_agent: dict
73-
):
74-
"""Initializes a unified Qwen agent to verify context and generate clarifying questions."""
75-
original_context_str = json.dumps(original_context, indent=2, default=str)
76-
77-
system_prompt = prompts.QUESTION_GENERATOR_SYSTEM_PROMPT.format(
78-
original_context=original_context_str,
79-
)
80-
81-
tools_config = [{"mcpServers": mcp_servers_for_agent}]
82-
83-
return {
84-
"system_message": system_prompt,
85-
"function_list": tools_config
86-
}
35+
}

src/server/workers/planner/prompts.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -71,32 +71,31 @@
7171
"""
7272

7373
QUESTION_GENERATOR_SYSTEM_PROMPT = """
74-
You are a highly intelligent context verification agent. Your primary function is to use a set of pre-selected, relevant tools to gather all necessary information about a task *before* deciding if you need to ask the user for clarification. Your goal is to be as autonomous as possible and only ask the user for information if it's a critical blocker that cannot be found with your tools.
75-
76-
**Your Goal:**
77-
1. Use the provided tools to search for context related to the task.
78-
2. Analyze the gathered context.
79-
3. If critical information is still missing, generate clarifying questions.
80-
4. If you have enough information to create a plan, you will indicate that no questions are needed.
81-
82-
**You have been given the following information:**
83-
**Original Context:** The raw information (e.g., email body) that triggered the task.
84-
```json
74+
You are a methodical Context Verification agent. Your sole purpose is to determine if enough information exists to fulfill a user's request. You will achieve this by first actively searching for information using your tools, and only then asking the user for clarification if necessary. You DO NOT perform the final task for the user.
75+
76+
**Your Mandated Workflow:**
77+
78+
**Step 1: Information Gathering (Tool Calls)**
79+
- Your first and primary action is to use the tools provided to you to find any missing information.
80+
- Analyze the user's request and the original context. Identify missing pieces of critical information (e.g., an email address, a document ID, a project name).
81+
- **You MUST call the appropriate tool to find this information.** For example, if the request is to 'email Sarthak', your first action MUST be to call `gpeople_server-search_contacts` with the query 'Sarthak'. If the request is about a document, use `gdrive_server-gdrive_search`.
82+
- If you believe you have enough information from the start, you can skip this step.
83+
84+
**Step 2: Analysis & Final Output**
85+
- After you have exhausted your tool usage or determined no tools are needed, you must make a final decision.
86+
- **Scenario A: Sufficient Information:** If you have gathered all necessary information to proceed with a plan, your final output MUST be the following JSON object and nothing else:
87+
`{{"clarifying_questions": []}}`
88+
- **Scenario B: Insufficient Information:** If critical information is still missing after you have tried to find it with your tools, your final output MUST be a JSON object containing a list of specific questions for the user. Follow this schema exactly:
89+
`{{"clarifying_questions": ["What is the email address for Sarthak Karandikar?", "What should be the subject of this test email?"]}}`
90+
91+
**CRITICAL RULES:**
92+
- Your response can only be one of two things: a tool call, or the final JSON object with `clarifying_questions`.
93+
- You are FORBIDDEN from performing the user's task (e.g., you cannot call `gmail_server-sendEmail`). Your job is only to verify context.
94+
- You are FORBIDDEN from outputting any JSON format other than the one specified for clarifying questions.
95+
- Do not include any text, explanations, or markdown formatting outside of your tool calls or the final JSON object.
96+
97+
**Original Context Provided for this Task:**
8598
{original_context}
86-
```
87-
88-
Output Requirements:
89-
Your response MUST be a single, valid JSON object that strictly adheres to the following schema. Do not include any other text or explanations.
9099
91-
JSON Schema:
92-
```json
93-
{{
94-
"clarifying_questions": [
95-
"A clear, concise question for the user.",
96-
"Another question if needed."
97-
]
98-
}}
99-
```
100-
- If you have enough information to proceed with planning, return an empty list: {{"clarifying_questions": []}}.
101-
- If you need more information, populate the list with your questions.
100+
ONLY RETURN THE JSON OBJECT WITH CLARIFYING QUESTIONS OR A TOOL CALL. NEVER RETURN A PLAN OR A TEXTUAL RESPONSE. NEVER TRY TO TALK TO THE USER. NEVER TRY TO DIRECTLY PERFORM THE TASK. YOUR ROLE IS ONLY TO PERFORM CONTEXT VERIFICATION AND CHECK IF YOU HAVE THE RELEVANT INFORMATION. IF YOU DON'T HAVE THE INFORMATION, RETURN THE JSON OBJECT. NEVER TRY TO SEND EMAILS. NEVER TRY TO CREATE DOCUMENTS. NEVER TRY TO PERFORM ANY TASKS.
102101
"""

src/server/workers/tasks.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
from main.llm import run_agent_with_fallback as run_main_agent_with_fallback
1919
from main.db import MongoManager
2020
from workers.celery_app import celery_app
21-
from workers.planner.llm import get_planner_agent, get_question_generator_agent # noqa: E501
21+
from workers.planner.llm import get_planner_agent # noqa: E501
22+
from workers.planner.prompts import QUESTION_GENERATOR_SYSTEM_PROMPT
2223
from workers.proactive.main import run_proactive_pipeline_logic
2324
from workers.planner.db import PlannerMongoManager, get_all_mcp_descriptions # noqa: E501
2425
from workers.memory_agent_utils import get_memory_qwen_agent, get_db_manager as get_memory_db_manager # noqa: E501
2526
from workers.executor.tasks import execute_task_plan
2627
from main.vector_db import get_conversation_summaries_collection
2728
from main.chat.prompts import STAGE_1_SYSTEM_PROMPT
28-
from workers.planner.llm import run_agent_with_fallback as run_worker_agent_with_fallback
2929
from mcp_hub.memory.utils import cud_memory, initialize_embedding_model, initialize_agents
3030
from workers.utils.text_utils import clean_llm_output
3131

@@ -76,8 +76,7 @@ async def _select_relevant_tools(query: str, available_tools_map: Dict[str, str]
7676
return []
7777

7878
try:
79-
tools_description = "\n".join(f"- `{name}`: {desc}" for name, desc in available_tools_map.items())
80-
prompt = f"User Query: \"{query}\"\n\nAvailable External Tools (for selection):\n{tools_description}"
79+
prompt = f"The user is trying to perform the following task: \"{query}\" Choose the relevant tools needed to complete the task, as well as any tools where important information or context can be found related to the task. \n (For example, if the user is asking to perform a task using Gmail, you should definitely include gmail in the selected tools, but also include gpeople which can be used to find relevant contacts.)"
8180

8281
messages = [{'role': 'user', 'content': prompt}]
8382

@@ -91,11 +90,18 @@ def _run_selector_sync():
9190
return final_content_str
9291

9392
final_content_str = await asyncio.to_thread(_run_selector_sync)
94-
selected_tools = JsonExtractor.extract_valid_json(final_content_str)
95-
if isinstance(selected_tools, list):
96-
logger.info(f"Tool selector identified relevant tools for context search: {selected_tools}")
97-
return selected_tools
98-
return []
93+
cleaned_output = clean_llm_output(final_content_str)
94+
parsed_output = JsonExtractor.extract_valid_json(cleaned_output)
95+
selected_tools = []
96+
if isinstance(parsed_output, dict) and "topic_changed" in parsed_output and "tools" in parsed_output:
97+
selected_tools = parsed_output.get("tools", [])
98+
99+
# Separate into connected and disconnected
100+
connected_tools_selected = [tool for tool in selected_tools if tool in available_tools_map]
101+
102+
selected_tools = connected_tools_selected
103+
104+
return selected_tools
99105
except Exception as e:
100106
logger.error(f"Error during tool selection for context search: {e}", exc_info=True)
101107
return list(available_tools_map.keys())
@@ -368,21 +374,26 @@ async def get_clarifying_questions(user_id: str, task_description: str, topics:
368374

369375
logger.info(f"Context Verifier for user {user_id} will use tools: {list(mcp_servers_for_agent.keys())}")
370376

371-
agent_config = get_question_generator_agent(
372-
original_context=original_context,
373-
available_tools_for_prompt=available_tools_for_prompt,
374-
mcp_servers_for_agent=mcp_servers_for_agent
377+
"""Initializes a unified Qwen agent to verify context and generate clarifying questions."""
378+
original_context_str = json.dumps(original_context, indent=2, default=str)
379+
380+
system_prompt = QUESTION_GENERATOR_SYSTEM_PROMPT.format(
381+
original_context=original_context_str,
375382
)
383+
384+
tools_config = [{"mcpServers": mcp_servers_for_agent}]
376385

377-
user_prompt = f"Based on the task '{task_description}' and the provided context, please use your tools to find relevant information and then determine if any clarifying questions are necessary."
386+
user_prompt = f"User's task request: '{task_description}'"
378387
messages = [{'role': 'user', 'content': user_prompt}]
379388

380389
final_response_str = ""
381-
for chunk in run_worker_agent_with_fallback(system_message=agent_config["system_message"], function_list=agent_config["function_list"], messages=messages):
390+
for chunk in run_main_agent_with_fallback(system_message=system_prompt, function_list=tools_config, messages=messages):
382391
if isinstance(chunk, list) and chunk and chunk[-1].get("role") == "assistant":
383392
final_response_str = chunk[-1].get("content", "")
384393

394+
print ("RAW RESPONSE FROM QUESTION GENERATOR:", final_response_str)
385395
response_data = JsonExtractor.extract_valid_json(clean_llm_output(final_response_str))
396+
print ("PARSED RESPONSE DATA:", response_data)
386397
if response_data and isinstance(response_data.get("clarifying_questions"), list):
387398
return response_data["clarifying_questions"]
388399
else:
@@ -531,7 +542,7 @@ async def async_generate_plan(task_id: str, user_id: str):
531542
messages = [{'role': 'user', 'content': user_prompt_content}]
532543

533544
final_response_str = ""
534-
for chunk in run_worker_agent_with_fallback(system_message=agent_config["system_message"], function_list=agent_config["function_list"], messages=messages):
545+
for chunk in run_main_agent_with_fallback(system_message=agent_config["system_message"], function_list=agent_config["function_list"], messages=messages):
535546
if isinstance(chunk, list) and chunk and chunk[-1].get("role") == "assistant":
536547
final_response_str = chunk[-1].get("content", "")
537548

0 commit comments

Comments
 (0)