Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,13 @@ const renderAgentMessages = (
<Body1 className={styles.agentName}>
{getAgentDisplayName(msg.agent)}
</Body1>
<Tag
appearance="brand"
>
AI Agent
</Tag>
{msg.agent_type !== AgentMessageType.SYSTEM_AGENT && msg.agent?.toLowerCase() !== 'system' && (
<Tag
appearance="brand"
>
AI Agent
</Tag>
)}
</div>
)}

Expand Down
47 changes: 34 additions & 13 deletions src/App/src/hooks/usePlanWebSocket.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
selectPlanApproved,
approvalRequestReceived,
planCompletedFinal,
planFailedFinal,
} from '@/store/slices/planSlice';
import {
setSubmittingChatDisableInput,
Expand Down Expand Up @@ -178,16 +179,18 @@ export function usePlanWebSocket({
WebsocketMessageType.FINAL_RESULT_MESSAGE,
(finalMessage: any) => {
if (!finalMessage) return;
const agentMessageData: AgentMessageData = {
agent: AgentType.GROUP_CHAT_MANAGER,
agent_type: AgentMessageType.AI_AGENT,
timestamp: Date.now(),
steps: [],
next_steps: [],
content: '\u{1F389}\u{1F389} ' + (finalMessage.data?.content || ''),
raw_data: finalMessage,
};
if (finalMessage?.data?.status === PlanStatus.COMPLETED) {
const messageStatus = finalMessage?.data?.status;

if (messageStatus === PlanStatus.COMPLETED) {
const agentMessageData: AgentMessageData = {
agent: AgentType.GROUP_CHAT_MANAGER,
agent_type: AgentMessageType.AI_AGENT,
timestamp: Date.now(),
steps: [],
next_steps: [],
content: '\u{1F389}\u{1F389} ' + (finalMessage.data?.content || ''),
raw_data: finalMessage,
};
dispatch(setShowBufferingText(true));
dispatch(addAgentMessage(agentMessageData));
dispatch(setSelectedTeam(planData?.team || null));
Expand All @@ -196,11 +199,29 @@ export function usePlanWebSocket({
scrollToBottom();
webSocketService.disconnect();
persistAgentMessage(agentMessageData, planData, dispatch, true, streamingMessageBuffer);
} else if (messageStatus === 'error') {
// Safety net: handle error status sent as FINAL_RESULT_MESSAGE
const errorContent = finalMessage.data?.content || 'An unexpected error occurred. Please try again later.';
const errorAgent: AgentMessageData = {
agent: 'system',
agent_type: AgentMessageType.SYSTEM_AGENT,
timestamp: Date.now(),
steps: [],
next_steps: [],
content: formatErrorMessage(errorContent),
raw_data: finalMessage || '',
};
dispatch(addAgentMessage(errorAgent));
dispatch(planFailedFinal());
dispatch(setShowBufferingText(false));
scrollToBottom();
showToast(errorContent, 'error');
webSocketService.disconnect();
}
},
);
return unsub;
}, [dispatch, scrollToBottom, planData, streamingMessageBuffer]);
}, [dispatch, scrollToBottom, planData, streamingMessageBuffer, formatErrorMessage, showToast]);

// ── ERROR_MESSAGE ─────────────────────────────────────────────
useEffect(() => {
Expand Down Expand Up @@ -231,11 +252,11 @@ export function usePlanWebSocket({
raw_data: errorMessage || '',
};
dispatch(addAgentMessage(errorAgent));
dispatch(setShowProcessingPlanSpinner(false));
dispatch(planFailedFinal());
dispatch(setShowBufferingText(false));
dispatch(setSubmittingChatDisableInput(false));
scrollToBottom();
showToast(errorContent, 'error');
webSocketService.disconnect();
Comment thread
NirajC-Microsoft marked this conversation as resolved.
},
);
return unsub;
Expand Down
9 changes: 9 additions & 0 deletions src/App/src/store/slices/planSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ const planSlice = createSlice({
}
},

/** Single dispatch when an error occurs during plan execution */
planFailedFinal(state) {
state.showProcessingPlanSpinner = false;
Comment thread
NirajC-Microsoft marked this conversation as resolved.
if (state.planData?.plan) {
(state.planData as any).plan.overall_status = PlanStatus.FAILED;
}
Comment thread
NirajC-Microsoft marked this conversation as resolved.
},

/** Reset everything back to initial state (used when navigating to a new plan) */
resetPlan() {
return { ...initialState };
Expand Down Expand Up @@ -229,6 +237,7 @@ export const {
planApprovalRejected,
approvalRequestReceived,
planCompletedFinal,
planFailedFinal,
resetPlan,
} = planSlice.actions;

Expand Down
15 changes: 14 additions & 1 deletion src/backend/v4/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,20 @@ async def process_request(
try:

async def run_orchestration_task():
await OrchestrationManager().run_orchestration(user_id, input_task)
try:
await OrchestrationManager().run_orchestration(user_id, input_task, plan_id=plan_id)
except Exception as orch_error:
logger.error("Background orchestration failed for plan '%s': %s", plan_id, orch_error)
track_event_if_configured(
"Error_Orchestration_Failed",
{
"plan_id": plan_id,
"session_id": input_task.session_id,
"user_id": user_id,
"error": str(orch_error),
"error_type": type(orch_error).__name__,
},
)

background_tasks.add_task(run_orchestration_task)

Expand Down
45 changes: 40 additions & 5 deletions src/backend/v4/orchestration/orchestration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from v4.models.messages import WebsocketMessageType
from v4.orchestration.human_approval_manager import HumanApprovalMagenticManager
from v4.magentic_agents.magentic_agent_factory import MagenticAgentFactory
from common.database.database_factory import DatabaseFactory
from v4.models.models import PlanStatus
Comment thread
NirajC-Microsoft marked this conversation as resolved.


class OrchestrationManager:
Expand All @@ -47,6 +49,7 @@ class OrchestrationManager:

def __init__(self):
self.user_id: Optional[str] = None
self._plan_id: Optional[str] = None
self.logger = self.__class__.logger

def _extract_response_text(self, data) -> str:
Expand Down Expand Up @@ -293,10 +296,11 @@ async def get_current_or_new_orchestration(
# ---------------------------
# Execution
# ---------------------------
async def run_orchestration(self, user_id: str, input_task) -> None:
async def run_orchestration(self, user_id: str, input_task, plan_id: str = None) -> None:
"""
Execute the Magentic workflow for the provided user and task description.
"""
self._plan_id = plan_id
job_id = str(uuid.uuid4())
orchestration_config.set_approval_pending(job_id)
self.logger.info(
Expand Down Expand Up @@ -545,19 +549,50 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
self.logger.error("Error attributes: %s", e.__dict__)
self.logger.info("=" * 50)

# Send error status to user
# Build a user-friendly error message
error_str = str(e)
if "Too Many Requests" in error_str or "429" in error_str:
user_error_message = (
"The service is currently experiencing high demand (rate limit exceeded). "
"Please wait a moment and try again."
)
elif "timeout" in error_str.lower():
user_error_message = (
"The request timed out while processing. Please try again."
)
elif "conflict" in error_str.lower() or "modified concurrently" in error_str.lower():
user_error_message = (
"A conflict occurred while processing your request. "
"The resource was modified by another operation. Please start a new task and try again."
)
else:
user_error_message = "An error occurred while processing your request. Please start a new task and try again."

# Update plan status to failed in the database
try:
if self._plan_id:
memory_store = await DatabaseFactory.get_database(user_id=user_id)
plan = await memory_store.get_plan_by_plan_id(plan_id=self._plan_id)
if plan:
plan.overall_status = PlanStatus.FAILED
await memory_store.update_plan(plan)
Comment thread
NirajC-Microsoft marked this conversation as resolved.
self.logger.info("Plan '%s' status updated to FAILED", self._plan_id)
except Exception as db_error:
self.logger.error("Failed to update plan status to FAILED: %s", db_error)

# Send error status to user via ERROR_MESSAGE type
try:
await connection_config.send_status_update_async(
{
"type": WebsocketMessageType.FINAL_RESULT_MESSAGE,
"type": WebsocketMessageType.ERROR_MESSAGE,
"data": {
"content": f"Error during orchestration: {str(e)}",
"content": user_error_message,
"status": "error",
"timestamp": asyncio.get_event_loop().time(),
},
},
user_id,
message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE,
message_type=WebsocketMessageType.ERROR_MESSAGE,
)
except Exception as send_error:
self.logger.error("Failed to send error status: %s", send_error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class MockDatabaseBase:

sys.modules['common.database'] = Mock()
sys.modules['common.database.database_base'] = Mock(DatabaseBase=MockDatabaseBase)
sys.modules['common.database.database_factory'] = Mock(DatabaseFactory=Mock())

# Mock v4 modules
class MockTeamService:
Expand Down Expand Up @@ -315,9 +316,18 @@ def __init__(self):
class MockWebsocketMessageType:
"""Mock WebsocketMessageType."""
FINAL_RESULT_MESSAGE = "final_result_message"
ERROR_MESSAGE = "error_message"
AGENT_MESSAGE = "agent_message"

class MockPlanStatus:
"""Mock PlanStatus."""
FAILED = "failed"
COMPLETED = "completed"
IN_PROGRESS = "in_progress"

sys.modules['v4.models'] = Mock()
sys.modules['v4.models.messages'] = Mock(WebsocketMessageType=MockWebsocketMessageType)
sys.modules['v4.models.models'] = Mock(PlanStatus=MockPlanStatus)

# Mock v4.orchestration.human_approval_manager
class MockHumanApprovalMagenticManager:
Expand Down