Skip to content

Commit cfc8706

Browse files
committed
fix: encryption issues
1 parent 46a255b commit cfc8706

File tree

4 files changed

+31
-23
lines changed

4 files changed

+31
-23
lines changed

src/server/mcp_hub/orchestrator/tools.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -208,25 +208,31 @@ async def ask_user_clarification(ctx: Context, question: str, urgency: str = "no
208208
task_id = auth.get_task_id_from_context(ctx)
209209
db = state_manager.MongoManager()
210210
try:
211+
# --- FIX: Perform a read-modify-write to handle encryption correctly ---
212+
task = await db.get_task(task_id, user_id)
213+
if not task:
214+
raise ToolError(f"Task {task_id} not found.")
215+
216+
# `get_task` already decrypts, so `clarification_requests` is a list
217+
current_requests = task.get("clarification_requests", [])
218+
if not isinstance(current_requests, list):
219+
logger.warning(f"Task {task_id} had a non-list 'clarification_requests' field. Overwriting.")
220+
current_requests = []
221+
211222
request_id = str(uuid.uuid4())
212-
clarification_request = {
213-
"request_id": request_id,
214-
"question": question,
223+
current_requests.append({
224+
"request_id": request_id, "question": question,
215225
"asked_at": datetime.datetime.now(datetime.timezone.utc),
216-
"response": None,
217-
"responded_at": None,
218-
"status": "pending"
219-
}
220-
await db.tasks_collection.update_one(
221-
{"task_id": task_id, "user_id": user_id},
222-
{
223-
"$push": {"clarification_requests": clarification_request},
224-
"$set": {
225-
"orchestrator_state.current_state": "SUSPENDED",
226-
"status": "clarification_pending"
227-
}
228-
}
229-
)
226+
"response": None, "responded_at": None, "status": "pending"
227+
})
228+
229+
orchestrator_state = task.get("orchestrator_state", {})
230+
if not isinstance(orchestrator_state, dict): orchestrator_state = {}
231+
orchestrator_state["current_state"] = "SUSPENDED"
232+
233+
update_payload = {"clarification_requests": current_requests, "orchestrator_state": orchestrator_state, "status": "clarification_pending"}
234+
await db.update_task(task_id, user_id, update_payload)
235+
230236
await state_manager.add_execution_log(task_id, user_id, "clarification_requested", {"question": question}, reasoning)
231237

232238
await notify_user(

src/server/workers/executor/tasks.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from celery import chord, group
2121
from main.llm import run_agent as run_main_agent, LLMProviderDownError
2222
from workers.utils.crypto import aes_decrypt, encrypt_doc, decrypt_doc
23+
from main.db import MongoManager
2324

2425
# Load environment variables for the worker from its own config
2526
from workers.executor.config import (MONGO_URI, MONGO_DB_NAME,
@@ -267,7 +268,7 @@ async def async_execute_task_plan(task_id: str, user_id: str, run_id: str):
267268
await update_task_run_status(db, task_id, run_id, "processing", user_id, block_id=block_id)
268269
await add_progress_update(db, task_id, run_id, user_id, {"type": "info", "content": "Executor has picked up the task and is starting execution."}, block_id=block_id)
269270

270-
user_profile = await db.user_profiles.find_one({"user_id": user_id})
271+
user_profile = await MongoManager().get_user_profile(user_id)
271272
personal_info = user_profile.get("userData", {}).get("personalInfo", {}) if user_profile else {}
272273
user_name = personal_info.get("name", "User")
273274
user_location_raw = personal_info.get("location", "Not specified")
@@ -757,7 +758,7 @@ async def add_sub_task_run_update(update_type: str, content: Any):
757758
await update_sub_task_safely(sub_task_id, {"status": "processing", "runs": [run_doc]})
758759

759760
# 2. Get user context and integrations
760-
user_profile = await db.user_profiles.find_one({"user_id": user_id})
761+
user_profile = await MongoManager().get_user_profile(user_id)
761762
user_integrations = user_profile.get("userData", {}).get("integrations", {}) if user_profile else {} # noqa
762763

763764
# 3. Configure tools for the sub-agent

src/server/workers/retention_tasks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import random
33
import random
4+
from workers.utils.crypto import decrypt_doc
45
from datetime import datetime, timedelta, timezone
56

67
from workers.celery_app import celery_app
@@ -84,6 +85,9 @@ async def async_run_retention_campaigns():
8485

8586
async for user_profile in users_cursor:
8687
user_id = user_profile.get("user_id")
88+
# Decrypt the sensitive userData field before accessing it
89+
decrypt_doc(user_profile, ["userData"])
90+
8791
user_data = user_profile.get("userData", {})
8892
if not user_id:
8993
continue

src/server/workers/tasks.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,7 @@ async def async_generate_plan(task_id: str, user_id: str):
520520

521521
original_context = task.get("original_context", {})
522522

523-
user_profile = await db_manager.user_profiles_collection.find_one(
524-
{"user_id": user_id},
525-
{"userData.personalInfo": 1, "userData.integrations": 1} # Projection to get only necessary data
526-
)
523+
user_profile = await db_manager.get_user_profile(user_id)
527524
if not user_profile:
528525
logger.error(f"User profile not found for user_id '{user_id}' associated with task {task_id}. Cannot generate plan.")
529526
return

0 commit comments

Comments
 (0)