Skip to content

Commit 78ed79d

Browse files
committed
feat: api trigger hitl
1 parent 866f907 commit 78ed79d

2 files changed

Lines changed: 113 additions & 8 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-llamaindex"
3-
version = "0.0.12"
3+
version = "0.0.13"
44
description = "UiPath LlamaIndex SDK"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.10"

src/uipath_llamaindex/_cli/_runtime/_runtime.py

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
11
import json
22
import logging
3+
import os
4+
import pickle
5+
import uuid
36
from contextlib import suppress
4-
from typing import Optional
7+
from typing import Any, Optional
58

9+
from llama_index.core.workflow import (
10+
Context,
11+
HumanResponseEvent,
12+
InputRequiredEvent,
13+
JsonPickleSerializer,
14+
)
615
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
716
from opentelemetry import trace
817
from opentelemetry.sdk.trace import TracerProvider
918
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1019
from uipath import UiPath
1120
from uipath._cli._runtime._contracts import (
21+
UiPathApiTrigger,
1222
UiPathBaseRuntime,
1323
UiPathErrorCategory,
24+
UiPathResumeTrigger,
1425
UiPathRuntimeResult,
26+
UiPathRuntimeStatus,
1527
)
1628

1729
from .._tracing._oteladapter import LlamaIndexExporter
@@ -55,19 +67,45 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
5567

5668
try:
5769
start_event_class = self.context.workflow._start_event_class
58-
5970
ev = start_event_class(**self.context.input_json)
6071

61-
handler = self.context.workflow.run(start_event=ev)
72+
ctx: Context = self._get_context()
73+
74+
handler = self.context.workflow.run(start_event=ev, ctx=ctx)
75+
76+
resume_trigger: UiPathResumeTrigger = None
6277

6378
async for event in handler.stream_events():
79+
if isinstance(event, InputRequiredEvent):
80+
resume_trigger = UiPathResumeTrigger(
81+
api_resume=UiPathApiTrigger(
82+
inbox_id=str(uuid.uuid4()), request=event.prefix
83+
)
84+
)
85+
break
6486
print(event)
6587

66-
output = await handler
88+
if resume_trigger is None:
89+
output = await handler
90+
self.context.result = UiPathRuntimeResult(
91+
output=self._serialize_object(output),
92+
status=UiPathRuntimeStatus.SUCCESSFUL,
93+
)
94+
else:
95+
self.context.result = UiPathRuntimeResult(
96+
output=self._serialize_object(output),
97+
status=UiPathRuntimeStatus.SUSPENDED,
98+
resume=resume_trigger,
99+
)
67100

68-
self.context.result = UiPathRuntimeResult(
69-
output=self._serialize_object(output)
70-
)
101+
if self.context.state_file:
102+
serializer = JsonPickleSerializer()
103+
ctx_dict = ctx.to_dict(serializer=serializer)
104+
ctx_dict["uipath_resume_trigger"] = (
105+
serializer.serialize(resume_trigger) if resume_trigger else None
106+
)
107+
with open(self.context.state_file, "wb") as f:
108+
pickle.dump(ctx_dict, f)
71109

72110
return self.context.result
73111

@@ -172,6 +210,73 @@ async def cleanup(self) -> None:
172210
"""Clean up all resources."""
173211
pass
174212

213+
async def _get_context(self) -> Context:
214+
"""
215+
Get the context for the LlamaIndex agent.
216+
217+
Returns:
218+
The context object for the LlamaIndex agent.
219+
"""
220+
logger.debug(f"Resumed: {self.context.resume} Input: {self.context.input_json}")
221+
222+
if not self.context.resume:
223+
return Context(self.context.workflow)
224+
225+
if not self.context.state_file or not os.path.exists(self.context.state_file):
226+
return Context(self.context.workflow)
227+
228+
serializer = JsonPickleSerializer()
229+
ctx: Context = None
230+
231+
with open(self.context.state_file, "rb") as f:
232+
loaded_ctx_dict = pickle.load(f)
233+
ctx = Context.from_dict(
234+
self.context.workflow,
235+
loaded_ctx_dict,
236+
serializer=serializer,
237+
)
238+
239+
if self.context.input_json:
240+
ctx.send_event(HumanResponseEvent(response=self.context.input_json))
241+
242+
resumed_trigger_data = loaded_ctx_dict["uipath_resume_trigger"]
243+
if resumed_trigger_data:
244+
resumed_trigger: UiPathResumeTrigger = serializer.deserialize(
245+
resumed_trigger_data, UiPathResumeTrigger
246+
)
247+
inbox_id = resumed_trigger.api_resume.inbox_id
248+
payload = await self._get_api_payload(inbox_id)
249+
ctx.send_event(HumanResponseEvent(response=payload))
250+
251+
return ctx
252+
253+
async def _get_api_payload(self, inbox_id: str) -> Any:
254+
"""
255+
Fetch payload data for API triggers.
256+
257+
Args:
258+
inbox_id: The Id of the inbox to fetch the payload for.
259+
260+
Returns:
261+
The value field from the API response payload, or None if an error occurs.
262+
"""
263+
try:
264+
response = self._uipath.api_client.request(
265+
"GET",
266+
f"/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}",
267+
include_folder_headers=True,
268+
)
269+
data = response.json()
270+
return data.get("payload")
271+
except Exception as e:
272+
raise UiPathLlamaIndexRuntimeError(
273+
"API_CONNECTION_ERROR",
274+
"Failed to get trigger payload",
275+
f"Error fetching API trigger payload for inbox {inbox_id}: {str(e)}",
276+
UiPathErrorCategory.SYSTEM,
277+
response.status_code,
278+
) from e
279+
175280
def _serialize_object(self, obj):
176281
"""Recursively serializes an object and all its nested components."""
177282
# Handle Pydantic models

0 commit comments

Comments
 (0)