Skip to content

Commit 7bd14ea

Browse files
mjnoviceclaude
andcommitted
feat: add escalation memory cache and ingest to escalation tool
Adds memory integration to the escalation tool: - Before creating HITL task: escalation_search_async() checks for cached answer - Cache hit returns cached result immediately, skipping human escalation - After human resolution: escalation_ingest_async() persists outcome - Gated by isAgentMemoryEnabled + memorySpaceId on the escalation resource - Search settings (threshold, searchMode, fieldSettings) read from resource config - Span attributes (fromMemory, savedToMemory) for trace observability Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bbf2e25 commit 7bd14ea

2 files changed

Lines changed: 357 additions & 1 deletion

File tree

src/uipath_langchain/agent/tools/escalation_tool.py

Lines changed: 234 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Escalation tool creation for Action Center integration."""
22

3+
import json
4+
import logging
35
from enum import Enum
46
from typing import Any, Literal
57

@@ -23,7 +25,11 @@
2325
from uipath.platform.common import WaitEscalation
2426
from uipath.runtime.errors import UiPathErrorCategory
2527

26-
from uipath_langchain._utils import get_execution_folder_path
28+
from uipath_langchain._utils import (
29+
get_current_span_and_trace_ids,
30+
get_execution_folder_path,
31+
set_span_attribute,
32+
)
2733
from uipath_langchain._utils.durable_interrupt import durable_interrupt
2834
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
2935
from uipath_langchain.agent.tools.structured_tool_with_argument_properties import (
@@ -39,6 +45,8 @@
3945
sanitize_tool_name,
4046
)
4147

48+
_escalation_logger = logging.getLogger(__name__)
49+
4250

4351
class EscalationAction(str, Enum):
4452
"""Actions that can be taken after an escalation completes."""
@@ -161,6 +169,35 @@ def _parse_task_data(
161169
return filtered_fields
162170

163171

172+
def _get_escalation_memory_space_id(
173+
resource: AgentEscalationResourceConfig,
174+
) -> str | None:
175+
"""Resolve memory space ID from escalation resource extra fields."""
176+
if not resource.is_agent_memory_enabled:
177+
return None
178+
return getattr(resource, "memorySpaceId", None) or getattr(
179+
resource, "memory_space_id", None
180+
)
181+
182+
183+
def _get_escalation_memory_settings(
184+
resource: AgentEscalationResourceConfig,
185+
) -> dict[str, Any] | None:
186+
"""Extract memory settings from escalation resource properties.
187+
188+
Maps to EscalationResourceDefinition.Properties.Memory in the Temporal
189+
backend (backend/Common.Models/AgentExecution/ResourceDefinition.cs:96).
190+
"""
191+
if not resource.is_agent_memory_enabled:
192+
return None
193+
props = getattr(resource, "properties", None)
194+
if isinstance(props, dict):
195+
return props.get("memory")
196+
if props is not None:
197+
return getattr(props, "memory", None)
198+
return None
199+
200+
164201
def create_escalation_tool(
165202
resource: AgentEscalationResourceConfig,
166203
) -> StructuredTool:
@@ -178,6 +215,8 @@ class EscalationToolOutput(BaseModel):
178215
is_deleted: bool = False
179216

180217
_bts_context: dict[str, Any] = {}
218+
_memory_space_id: str | None = _get_escalation_memory_space_id(resource)
219+
_memory_settings: dict[str, Any] | None = _get_escalation_memory_settings(resource)
181220

182221
async def escalation_tool_fn(**kwargs: Any) -> dict[str, Any]:
183222
agent_input: dict[str, Any] = (
@@ -198,6 +237,17 @@ async def escalation_tool_fn(**kwargs: Any) -> dict[str, Any]:
198237

199238
serialized_data = input_model.model_validate(kwargs).model_dump(mode="json")
200239

240+
# --- Escalation memory: check cache before creating HITL task ---
241+
if _memory_space_id:
242+
cached_result = await _check_escalation_memory_cache(
243+
_memory_space_id,
244+
serialized_data,
245+
folder_path=folder_path,
246+
memory_settings=_memory_settings,
247+
)
248+
if cached_result is not None:
249+
return cached_result
250+
201251
@mockable(
202252
name=tool_name.lower(),
203253
description=resource.description,
@@ -234,6 +284,13 @@ async def create_escalation_task():
234284
return await create_escalation_task()
235285

236286
result = await escalate(**kwargs)
287+
# Extract completed_by_user before validation drops extra fields
288+
# Ref: EscalationToolExecutor.cs:514-516 — resolves ReviewedBy email
289+
_completed_by_user = (
290+
result.get("completed_by_user")
291+
if isinstance(result, dict)
292+
else getattr(result, "completed_by_user", None)
293+
)
237294
if isinstance(result, dict):
238295
result = TypeAdapter(EscalationToolOutput).validate_python(result)
239296

@@ -262,6 +319,23 @@ async def create_escalation_task():
262319
EscalationAction(outcome_str) if outcome_str else EscalationAction.CONTINUE
263320
)
264321

322+
# --- Escalation memory: persist outcome for future recall ---
323+
# Shape must match Temporal backend (EscalationToolExecutor.cs):
324+
# answer: new { taskResult.Output, taskResult.Outcome } (line 485)
325+
# attributes: new JsonObject { ["arguments"] = payload.Input.Arguments } (line 503)
326+
# spanId/traceId/userId: lines 522-526
327+
if _memory_space_id:
328+
span_id, trace_id = get_current_span_and_trace_ids()
329+
await _ingest_escalation_memory(
330+
_memory_space_id,
331+
answer=json.dumps({"output": escalation_output, "outcome": outcome}),
332+
attributes=json.dumps({"arguments": serialized_data}),
333+
span_id=span_id,
334+
trace_id=trace_id,
335+
user_id=_get_user_email(_completed_by_user),
336+
folder_path=folder_path,
337+
)
338+
265339
return {
266340
"action": escalation_action,
267341
"output": escalation_output,
@@ -333,3 +407,162 @@ async def escalation_wrapper(
333407
tool.set_tool_wrappers(awrapper=escalation_wrapper)
334408

335409
return tool
410+
411+
412+
# --- Escalation memory helpers ---
413+
414+
415+
async def _check_escalation_memory_cache(
416+
memory_space_id: str,
417+
serialized_input: dict[str, Any],
418+
folder_path: str | None = None,
419+
memory_settings: dict[str, Any] | None = None,
420+
) -> dict[str, Any] | None:
421+
"""Check escalation memory for a cached answer.
422+
423+
SearchSettings (threshold, searchMode) are read from the user's memory
424+
settings on the escalation resource, matching the Temporal backend's
425+
BuildMemorySearchRequest (EscalationToolExecutor.cs:714-747).
426+
result_count is always 1 for escalation memory.
427+
428+
Returns the cached result dict if found, None otherwise.
429+
"""
430+
431+
try:
432+
from uipath.platform.memory import (
433+
FieldSettings,
434+
MemorySearchRequest,
435+
SearchField,
436+
SearchMode,
437+
SearchSettings,
438+
)
439+
440+
# Read search settings from user's memory config (threshold, searchMode),
441+
# falling back to defaults. result_count is always 1 for escalation memory.
442+
# Ref: EscalationToolExecutor.cs BuildMemorySearchRequest (lines 740-743)
443+
threshold = 0.0
444+
search_mode = SearchMode.Hybrid
445+
field_settings_lookup: dict[str, dict[str, Any]] = {}
446+
if memory_settings:
447+
threshold = memory_settings.get("threshold", 0.0)
448+
mode_str = memory_settings.get("searchMode", "Hybrid")
449+
search_mode = (
450+
SearchMode(mode_str)
451+
if mode_str in SearchMode.__members__
452+
else SearchMode.Hybrid
453+
)
454+
for fs in memory_settings.get("fieldSettings", []):
455+
if isinstance(fs, dict) and "name" in fs:
456+
field_settings_lookup[fs["name"]] = fs
457+
458+
fields: list[SearchField] = []
459+
for k, v in serialized_input.items():
460+
if v is None:
461+
continue
462+
# When field settings are configured, only include fields with
463+
# configured weights (matching Temporal backend behavior)
464+
if field_settings_lookup and k not in field_settings_lookup:
465+
continue
466+
settings = FieldSettings()
467+
if k in field_settings_lookup:
468+
fs = field_settings_lookup[k]
469+
settings = FieldSettings(weight=fs.get("weight", 1.0))
470+
# key_path must be prefixed with field type (FieldBuilder.cs:15)
471+
fields.append(
472+
SearchField(
473+
key_path=["escalation-input", k],
474+
value=str(v),
475+
settings=settings,
476+
)
477+
)
478+
if not fields:
479+
return None
480+
481+
request = MemorySearchRequest(
482+
fields=fields,
483+
settings=SearchSettings(
484+
threshold=threshold,
485+
result_count=1,
486+
search_mode=search_mode,
487+
),
488+
)
489+
sdk = UiPath()
490+
folder_key = (
491+
sdk.folders.retrieve_folder_key(folder_path) if folder_path else None
492+
)
493+
response = await sdk.memory.escalation_search_async(
494+
memory_space_id=memory_space_id,
495+
request=request,
496+
folder_key=folder_key,
497+
)
498+
if response.results and response.results[0].answer:
499+
cached = response.results[0].answer
500+
_escalation_logger.info(
501+
"Escalation memory cache hit for space '%s'", memory_space_id
502+
)
503+
# Ref: EscalationToolWorkflow.cs:103 — span.Attributes.FromMemory = true
504+
set_span_attribute("fromMemory", True)
505+
return {
506+
"action": EscalationAction.CONTINUE,
507+
"output": cached.output,
508+
"outcome": cached.outcome,
509+
}
510+
except Exception:
511+
_escalation_logger.warning(
512+
"Escalation memory search failed for space '%s'",
513+
memory_space_id,
514+
exc_info=True,
515+
)
516+
517+
return None
518+
519+
520+
async def _ingest_escalation_memory(
521+
memory_space_id: str,
522+
answer: str,
523+
attributes: str,
524+
span_id: str,
525+
trace_id: str,
526+
user_id: str | None = None,
527+
folder_path: str | None = None,
528+
) -> None:
529+
"""Persist a resolved escalation outcome into memory.
530+
531+
Sets span attributes to track memory state (EscalationToolWorkflow.cs:131-133):
532+
fromMemory=false (result was not from cache), savedToMemory=true/false.
533+
"""
534+
535+
# Ref: EscalationToolWorkflow.cs:132 — span.Attributes.FromMemory = false
536+
set_span_attribute("fromMemory", False)
537+
538+
try:
539+
from uipath.platform.memory import EscalationMemoryIngestRequest
540+
541+
request = EscalationMemoryIngestRequest(
542+
span_id=span_id,
543+
trace_id=trace_id,
544+
answer=answer,
545+
attributes=attributes,
546+
user_id=user_id,
547+
)
548+
sdk = UiPath()
549+
folder_key = (
550+
sdk.folders.retrieve_folder_key(folder_path) if folder_path else None
551+
)
552+
await sdk.memory.escalation_ingest_async(
553+
memory_space_id=memory_space_id,
554+
request=request,
555+
folder_key=folder_key,
556+
)
557+
# Ref: EscalationToolExecutor.cs:543 — savedToMemory = true on success
558+
set_span_attribute("savedToMemory", True)
559+
_escalation_logger.info(
560+
"Ingested escalation outcome into memory space '%s'", memory_space_id
561+
)
562+
except Exception:
563+
set_span_attribute("savedToMemory", False)
564+
_escalation_logger.warning(
565+
"Failed to ingest escalation outcome into memory space '%s'",
566+
memory_space_id,
567+
exc_info=True,
568+
)

0 commit comments

Comments
 (0)