|
| 1 | +"""Long-running support inbox triage agent. |
| 2 | +
|
| 3 | +Watches a UiPath Integration Services Outlook connection for emails whose |
| 4 | +subject matches the value passed in as agent input. Each match: |
| 5 | +
|
| 6 | +1. Resumes the suspended job with the enriched Microsoft Graph `Message` |
| 7 | + as the resume value of `WaitIntegrationEvent`. |
| 8 | +2. The LLM classifies the email into severity, category, a one-sentence |
| 9 | + summary, and a polite acknowledgement draft. |
| 10 | +3. The agent replies to the original email with the LLM-drafted |
| 11 | + acknowledgement (via Microsoft Graph, using the connection's OAuth token). |
| 12 | +4. The result is logged, transient state is cleared, and the agent loops |
| 13 | + back to suspend on the next matching email. |
| 14 | +
|
| 15 | +The graph has no terminal node — the agent stays SUSPENDED on the Outlook |
| 16 | +trigger forever, briefly waking to triage and reply to each matching email |
| 17 | +and then re-suspending. Cancel the job manually when you're done with it. |
| 18 | +
|
| 19 | +Demonstrates one suspend/resume primitive in a long-running agent: |
| 20 | +- `WaitIntegrationEvent` — suspend until an external IS connector event fires. |
| 21 | +""" |
| 22 | + |
| 23 | +import logging |
| 24 | +from enum import Enum |
| 25 | +from typing import Any, Optional |
| 26 | + |
| 27 | +import httpx |
| 28 | +from langchain_core.messages import HumanMessage, SystemMessage |
| 29 | +from langgraph.graph import START, StateGraph |
| 30 | +from langgraph.types import interrupt |
| 31 | +from pydantic import BaseModel, Field |
| 32 | +from uipath.platform import UiPath |
| 33 | +from uipath.platform.common import WaitIntegrationEvent |
| 34 | +from uipath_langchain.chat import UiPathChat |
| 35 | + |
| 36 | +logger = logging.getLogger(__name__) |
| 37 | + |
| 38 | +GRAPH_API_BASE = "https://graph.microsoft.com/v1.0" |
| 39 | +OUTLOOK_CONNECTOR = "uipath-microsoft-outlook365" |
| 40 | + |
| 41 | +# Placeholder connection key bound to a real connection via bindings.json. |
| 42 | +# `connections.retrieve_async` is decorated with @resource_override("connection", |
| 43 | +# resource_identifier="key"), so at run time the decorator inspects the binding |
| 44 | +# overwrite context and substitutes the deployer-selected connection's real key. |
| 45 | +OUTLOOK_CONNECTION_KEY = "<your-outlook-connection>" |
| 46 | + |
| 47 | + |
| 48 | +class Severity(str, Enum): |
| 49 | + P0_CRITICAL = "P0_critical" |
| 50 | + P1_HIGH = "P1_high" |
| 51 | + P2_NORMAL = "P2_normal" |
| 52 | + P3_LOW = "P3_low" |
| 53 | + |
| 54 | + |
| 55 | +class Category(str, Enum): |
| 56 | + BUG = "bug" |
| 57 | + FEATURE_REQUEST = "feature_request" |
| 58 | + HOWTO = "howto" |
| 59 | + BILLING = "billing" |
| 60 | + SPAM = "spam" |
| 61 | + OTHER = "other" |
| 62 | + |
| 63 | + |
| 64 | +class Triage(BaseModel): |
| 65 | + severity: Severity = Field( |
| 66 | + description=( |
| 67 | + "P0 = production outage / data loss, " |
| 68 | + "P1 = major workflow impact, " |
| 69 | + "P2 = normal request or single-user impact, " |
| 70 | + "P3 = low / cosmetic / general question." |
| 71 | + ) |
| 72 | + ) |
| 73 | + category: Category |
| 74 | + summary: str = Field(description="One-sentence summary in the customer's voice.") |
| 75 | + suggested_response: str = Field( |
| 76 | + description="Polite acknowledgement reply confirming receipt and next steps." |
| 77 | + ) |
| 78 | + |
| 79 | + |
| 80 | +class GraphInput(BaseModel): |
| 81 | + subject: str = Field( |
| 82 | + description="The exact email subject to watch for. The IS trigger filters incoming emails by this value." |
| 83 | + ) |
| 84 | + |
| 85 | + |
| 86 | +class GraphState(BaseModel): |
| 87 | + subject: str = "" |
| 88 | + email: Optional[dict[str, Any]] = None |
| 89 | + triage: Optional[Triage] = None |
| 90 | + reply_sent: Optional[bool] = None |
| 91 | + reply_body: Optional[str] = None |
| 92 | + triage_count: int = 0 |
| 93 | + |
| 94 | + |
| 95 | +llm = UiPathChat(model="gpt-4o-mini-2024-07-18") |
| 96 | + |
| 97 | + |
| 98 | +def _email_str(email: dict[str, Any], *path: str, default: str = "") -> str: |
| 99 | + current: Any = email |
| 100 | + for p in path: |
| 101 | + if not isinstance(current, dict): |
| 102 | + return default |
| 103 | + current = current.get(p) |
| 104 | + return current if isinstance(current, str) else default |
| 105 | + |
| 106 | + |
| 107 | +async def _send_outlook_reply(message_id: str, body: str) -> None: |
| 108 | + """Reply to an Outlook message via Microsoft Graph, using the OAuth token |
| 109 | + issued for the UiPath Outlook connection that received the trigger. |
| 110 | + """ |
| 111 | + sdk = UiPath() |
| 112 | + connection = await sdk.connections.retrieve_async(OUTLOOK_CONNECTION_KEY) |
| 113 | + if connection.id is None: |
| 114 | + raise RuntimeError( |
| 115 | + f"Outlook connection {OUTLOOK_CONNECTION_KEY!r} could not be resolved." |
| 116 | + ) |
| 117 | + |
| 118 | + token = await sdk.connections.retrieve_token_async(connection.id) |
| 119 | + |
| 120 | + async with httpx.AsyncClient(timeout=30) as client: |
| 121 | + response = await client.post( |
| 122 | + f"{GRAPH_API_BASE}/me/messages/{message_id}/reply", |
| 123 | + headers={ |
| 124 | + "Authorization": f"Bearer {token.access_token}", |
| 125 | + "Content-Type": "application/json", |
| 126 | + }, |
| 127 | + json={"comment": body}, |
| 128 | + ) |
| 129 | + response.raise_for_status() |
| 130 | + |
| 131 | + |
| 132 | +async def wait_for_email(state: GraphState) -> dict[str, Any]: |
| 133 | + sdk = UiPath() |
| 134 | + connection = await sdk.connections.retrieve_async(OUTLOOK_CONNECTION_KEY) |
| 135 | + folder_path = ( |
| 136 | + connection.folder.get("path") if isinstance(connection.folder, dict) else None |
| 137 | + ) |
| 138 | + logger.info( |
| 139 | + "Waiting for next email on '%s' (folder='%s') with subject=%r (triaged so far: %d)...", |
| 140 | + connection.name, |
| 141 | + folder_path, |
| 142 | + state.subject, |
| 143 | + state.triage_count, |
| 144 | + ) |
| 145 | + email = interrupt( |
| 146 | + WaitIntegrationEvent( |
| 147 | + connector=OUTLOOK_CONNECTOR, |
| 148 | + connection_name=connection.name or "", |
| 149 | + connection_folder_path=folder_path, |
| 150 | + operation="EMAIL_RECEIVED", |
| 151 | + object_name="Message", |
| 152 | + filter_expression=f"(subject=='{state.subject}')", |
| 153 | + ) |
| 154 | + ) |
| 155 | + sender = _email_str(email, "from", "emailAddress", "address", default="?") |
| 156 | + logger.info("Received email from %s: %s", sender, _email_str(email, "subject")) |
| 157 | + return {"email": email} |
| 158 | + |
| 159 | + |
| 160 | +async def triage_email(state: GraphState) -> dict[str, Any]: |
| 161 | + email = state.email or {} |
| 162 | + sender = _email_str(email, "from", "emailAddress", "address", default="unknown") |
| 163 | + subject = _email_str(email, "subject") |
| 164 | + body = _email_str(email, "bodyPreview") or _email_str(email, "body", "content") |
| 165 | + |
| 166 | + triage_llm = llm.with_structured_output(Triage) |
| 167 | + result: Triage = await triage_llm.ainvoke( |
| 168 | + [ |
| 169 | + SystemMessage( |
| 170 | + "You are a support triage assistant. Read the customer email and " |
| 171 | + "produce a structured triage result.\n\n" |
| 172 | + "Severity guidelines:\n" |
| 173 | + "- P0: production outage, data loss, or anything blocking critical work.\n" |
| 174 | + "- P1: major workflow impact; affects many users.\n" |
| 175 | + "- P2: normal request or single-user impact.\n" |
| 176 | + "- P3: low priority, cosmetic, or general question.\n\n" |
| 177 | + "Always draft a polite acknowledgement confirming receipt and " |
| 178 | + "setting expectations for next steps." |
| 179 | + ), |
| 180 | + HumanMessage(f"From: {sender}\nSubject: {subject}\n\n{body}"), |
| 181 | + ] |
| 182 | + ) |
| 183 | + logger.info( |
| 184 | + "Triage: severity=%s category=%s", |
| 185 | + result.severity.value, |
| 186 | + result.category.value, |
| 187 | + ) |
| 188 | + return {"triage": result} |
| 189 | + |
| 190 | + |
| 191 | +async def send_reply(state: GraphState) -> dict[str, Any]: |
| 192 | + email = state.email or {} |
| 193 | + triage = state.triage |
| 194 | + message_id = email.get("id") if isinstance(email, dict) else None |
| 195 | + body = triage.suggested_response if triage else None |
| 196 | + |
| 197 | + if not body: |
| 198 | + logger.warning("No reply body resolved — skipping send.") |
| 199 | + return {"reply_sent": False, "reply_body": None} |
| 200 | + |
| 201 | + if not message_id: |
| 202 | + logger.warning("Email payload had no 'id' field — cannot send reply.") |
| 203 | + return {"reply_sent": False, "reply_body": body} |
| 204 | + |
| 205 | + try: |
| 206 | + await _send_outlook_reply(message_id, body) |
| 207 | + logger.info("Reply sent.") |
| 208 | + return {"reply_sent": True, "reply_body": body} |
| 209 | + except Exception: |
| 210 | + logger.exception("Failed to send Outlook reply.") |
| 211 | + return {"reply_sent": False, "reply_body": body} |
| 212 | + |
| 213 | + |
| 214 | +async def finalize(state: GraphState) -> dict[str, Any]: |
| 215 | + triage = state.triage |
| 216 | + assert triage is not None |
| 217 | + email = state.email or {} |
| 218 | + sender = _email_str(email, "from", "emailAddress", "address", default="unknown") |
| 219 | + subject = _email_str(email, "subject") |
| 220 | + |
| 221 | + logger.info( |
| 222 | + "Triaged email #%d from %s (subject=%r, severity=%s, category=%s, reply_sent=%s)", |
| 223 | + state.triage_count + 1, |
| 224 | + sender, |
| 225 | + subject, |
| 226 | + triage.severity.value, |
| 227 | + triage.category.value, |
| 228 | + bool(state.reply_sent), |
| 229 | + ) |
| 230 | + return { |
| 231 | + "triage_count": state.triage_count + 1, |
| 232 | + "email": None, |
| 233 | + "triage": None, |
| 234 | + "reply_sent": None, |
| 235 | + "reply_body": None, |
| 236 | + } |
| 237 | + |
| 238 | + |
| 239 | +builder = StateGraph(GraphState, input_schema=GraphInput) |
| 240 | +builder.add_node("wait_for_email", wait_for_email) |
| 241 | +builder.add_node("triage_email", triage_email) |
| 242 | +builder.add_node("send_reply", send_reply) |
| 243 | +builder.add_node("finalize", finalize) |
| 244 | + |
| 245 | +builder.add_edge(START, "wait_for_email") |
| 246 | +builder.add_edge("wait_for_email", "triage_email") |
| 247 | +builder.add_edge("triage_email", "send_reply") |
| 248 | +builder.add_edge("send_reply", "finalize") |
| 249 | +builder.add_edge("finalize", "wait_for_email") |
| 250 | + |
| 251 | +graph = builder.compile() |
0 commit comments