Skip to content

Commit ae2c145

Browse files
feat: add initial messages mapper
1 parent cb77535 commit ae2c145

3 files changed

Lines changed: 267 additions & 19 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from uipath_llamaindex.runtime.chat.messages import UiPathChatMessagesMapper
2+
3+
__all__ = ["UiPathChatMessagesMapper"]
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
import logging
2+
from datetime import datetime, timezone
3+
from uuid import uuid4
4+
5+
from llama_index.core.agent.workflow.workflow_events import (
6+
AgentOutput,
7+
AgentStream,
8+
ToolCall,
9+
ToolCallResult,
10+
)
11+
from uipath.core.chat import (
12+
UiPathConversationContentPartChunkEvent,
13+
UiPathConversationContentPartEndEvent,
14+
UiPathConversationContentPartEvent,
15+
UiPathConversationContentPartStartEvent,
16+
UiPathConversationMessageEndEvent,
17+
UiPathConversationMessageEvent,
18+
UiPathConversationMessageStartEvent,
19+
UiPathConversationToolCallEndEvent,
20+
UiPathConversationToolCallEvent,
21+
UiPathConversationToolCallStartEvent,
22+
)
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class UiPathChatMessagesMapper:
28+
"""Stateful mapper that converts LlamaIndex agent events to UiPath message events.
29+
30+
Maintains state across events to properly track:
31+
- The current AI message ID (generated per agent turn, since LlamaIndex doesn't provide one)
32+
- Pending tool calls per message ID for correct message_end timing
33+
"""
34+
35+
def __init__(self, runtime_id: str) -> None:
36+
self.runtime_id = runtime_id
37+
self._current_message_id: str | None = None
38+
# message_id -> set of tool_ids still pending completion
39+
self._pending_tool_calls: dict[str, set[str]] = {}
40+
# tool_id -> message_id for correlating ToolCallResult with its parent AI message
41+
self._tool_id_to_message_id: dict[str, str] = {}
42+
43+
def get_timestamp(self) -> str:
44+
"""Format current time as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z"""
45+
return (
46+
datetime.now(timezone.utc)
47+
.isoformat(timespec="milliseconds")
48+
.replace("+00:00", "Z")
49+
)
50+
51+
def get_content_part_id(self, message_id: str) -> str:
52+
return f"chunk-{message_id}-0"
53+
54+
async def map_event(
55+
self,
56+
event: AgentStream | AgentOutput | ToolCall | ToolCallResult,
57+
) -> list[UiPathConversationMessageEvent] | None:
58+
"""Convert a LlamaIndex agent event into UiPath conversation message events.
59+
60+
Returns a list of events to emit, or None if the event should be skipped.
61+
"""
62+
if isinstance(event, AgentStream):
63+
return self._map_agent_stream(event)
64+
65+
if isinstance(event, AgentOutput):
66+
return self._map_agent_output(event)
67+
68+
# ToolCall start is handled via AgentOutput to have the message_id available
69+
if isinstance(event, ToolCall):
70+
return None
71+
72+
if isinstance(event, ToolCallResult):
73+
return self._map_tool_call_result(event)
74+
75+
return None
76+
77+
def _map_agent_stream(
78+
self, event: AgentStream
79+
) -> list[UiPathConversationMessageEvent] | None:
80+
events: list[UiPathConversationMessageEvent] = []
81+
82+
# First stream chunk of a new agent turn: generate a fresh message ID
83+
if self._current_message_id is None:
84+
self._current_message_id = str(uuid4())
85+
events.append(self._create_message_start_event(self._current_message_id))
86+
87+
if event.delta:
88+
events.append(
89+
self._create_content_chunk_event(self._current_message_id, event.delta)
90+
)
91+
92+
return events if events else None
93+
94+
def _map_agent_output(
95+
self, event: AgentOutput
96+
) -> list[UiPathConversationMessageEvent] | None:
97+
message_id = self._current_message_id
98+
# Reset for the next turn regardless of outcome
99+
self._current_message_id = None
100+
101+
if message_id is None:
102+
return None
103+
104+
events: list[UiPathConversationMessageEvent] = []
105+
106+
if event.tool_calls:
107+
# Emit a tool_call_start event for each tool call and track them as pending
108+
pending: set[str] = set()
109+
for tool_call in event.tool_calls:
110+
self._tool_id_to_message_id[tool_call.tool_id] = message_id
111+
pending.add(tool_call.tool_id)
112+
events.append(
113+
self._create_tool_call_start_event(
114+
message_id=message_id,
115+
tool_call_id=tool_call.tool_id,
116+
tool_name=tool_call.tool_name,
117+
input=tool_call.tool_kwargs,
118+
)
119+
)
120+
self._pending_tool_calls[message_id] = pending
121+
# message_end will be emitted once the last ToolCallResult comes in
122+
else:
123+
# No tool calls: this is the final text response, close the message now
124+
events.append(self._create_message_end_event(message_id))
125+
126+
return events if events else None
127+
128+
def _map_tool_call_result(
129+
self, event: ToolCallResult
130+
) -> list[UiPathConversationMessageEvent] | None:
131+
message_id = self._tool_id_to_message_id.pop(event.tool_id, None)
132+
if message_id is None:
133+
logger.warning(
134+
"ToolCallResult received for unknown tool_id '%s' — skipping.",
135+
event.tool_id,
136+
)
137+
return None
138+
139+
output = event.tool_output.content if event.tool_output else None
140+
141+
events: list[UiPathConversationMessageEvent] = [
142+
self._create_tool_call_end_event(
143+
message_id=message_id,
144+
tool_call_id=event.tool_id,
145+
output=output,
146+
)
147+
]
148+
149+
# Close the message once all tool calls for it have completed
150+
pending = self._pending_tool_calls.get(message_id)
151+
if pending is not None:
152+
pending.discard(event.tool_id)
153+
if not pending:
154+
del self._pending_tool_calls[message_id]
155+
events.append(self._create_message_end_event(message_id))
156+
157+
return events
158+
159+
# ── Factory helpers ────────────────────────────────────────────────────────
160+
161+
def _create_message_start_event(
162+
self, message_id: str
163+
) -> UiPathConversationMessageEvent:
164+
return UiPathConversationMessageEvent(
165+
message_id=message_id,
166+
start=UiPathConversationMessageStartEvent(
167+
role="assistant", timestamp=self.get_timestamp()
168+
),
169+
content_part=UiPathConversationContentPartEvent(
170+
content_part_id=self.get_content_part_id(message_id),
171+
start=UiPathConversationContentPartStartEvent(mime_type="text/plain"),
172+
),
173+
)
174+
175+
def _create_content_chunk_event(
176+
self, message_id: str, text: str
177+
) -> UiPathConversationMessageEvent:
178+
return UiPathConversationMessageEvent(
179+
message_id=message_id,
180+
content_part=UiPathConversationContentPartEvent(
181+
content_part_id=self.get_content_part_id(message_id),
182+
chunk=UiPathConversationContentPartChunkEvent(data=text),
183+
),
184+
)
185+
186+
def _create_message_end_event(
187+
self, message_id: str
188+
) -> UiPathConversationMessageEvent:
189+
return UiPathConversationMessageEvent(
190+
message_id=message_id,
191+
end=UiPathConversationMessageEndEvent(),
192+
content_part=UiPathConversationContentPartEvent(
193+
content_part_id=self.get_content_part_id(message_id),
194+
end=UiPathConversationContentPartEndEvent(),
195+
),
196+
)
197+
198+
def _create_tool_call_start_event(
199+
self, message_id: str, tool_call_id: str, tool_name: str, input: dict
200+
) -> UiPathConversationMessageEvent:
201+
return UiPathConversationMessageEvent(
202+
message_id=message_id,
203+
tool_call=UiPathConversationToolCallEvent(
204+
tool_call_id=tool_call_id,
205+
start=UiPathConversationToolCallStartEvent(
206+
tool_name=tool_name,
207+
timestamp=self.get_timestamp(),
208+
input=input,
209+
),
210+
),
211+
)
212+
213+
def _create_tool_call_end_event(
214+
self, message_id: str, tool_call_id: str, output: str | None
215+
) -> UiPathConversationMessageEvent:
216+
return UiPathConversationMessageEvent(
217+
message_id=message_id,
218+
tool_call=UiPathConversationToolCallEvent(
219+
tool_call_id=tool_call_id,
220+
end=UiPathConversationToolCallEndEvent(
221+
timestamp=self.get_timestamp(),
222+
output=output,
223+
),
224+
),
225+
)
226+
227+
228+
__all__ = ["UiPathChatMessagesMapper"]

packages/uipath-llamaindex/src/uipath_llamaindex/runtime/runtime.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
import asyncio
44
import json
5+
import logging
6+
import os
57
from typing import Any, AsyncGenerator, cast
68
from uuid import uuid4
79

810
from llama_index.core.agent.workflow.workflow_events import (
9-
AgentInput,
1011
AgentOutput,
1112
AgentStream,
1213
ToolCall,
@@ -47,9 +48,12 @@
4748
UiPathLlamaIndexErrorCode,
4849
UiPathLlamaIndexRuntimeError,
4950
)
51+
from uipath_llamaindex.runtime.chat import UiPathChatMessagesMapper
5052
from uipath_llamaindex.runtime.schema import get_entrypoints_schema, get_workflow_schema
5153
from uipath_llamaindex.runtime.storage import SqliteResumableStorage
5254

55+
logger = logging.getLogger(__name__)
56+
5357

5458
class UiPathLlamaIndexRuntime:
5559
"""
@@ -168,28 +172,35 @@ async def _run_workflow(
168172

169173
event_stream = handler.stream_events(expose_internal=True)
170174
suspended_event: InputRequiredEvent | None = None
175+
chat = UiPathChatMessagesMapper(runtime_id=self.runtime_id)
171176

177+
raw_chunks: list[dict] = []
178+
mapped_chunks: list[dict] = []
172179
is_resumed: bool = False
173180
async for event in event_stream:
181+
if not isinstance(event, BreakpointEvent):
182+
raw_chunks.append(
183+
{"type": type(event).__name__, "data": json.loads(serialize_json(event))}
184+
)
174185
node_name = self._get_node_name(event)
175-
if stream_events:
176-
if isinstance(
177-
event,
178-
(AgentOutput, AgentInput, AgentStream, ToolCall, ToolCallResult),
179-
):
180-
message_event = UiPathRuntimeMessageEvent(
181-
payload=json.loads(serialize_json(event)),
182-
node_name=node_name,
183-
execution_id=self.runtime_id,
184-
)
185-
yield message_event
186-
elif not isinstance(event, BreakpointEvent):
187-
state_event = UiPathRuntimeStateEvent(
188-
payload=json.loads(serialize_json(event)),
189-
node_name=node_name,
190-
execution_id=self.runtime_id,
191-
)
192-
yield state_event
186+
if isinstance(event, (AgentOutput, AgentStream, ToolCall, ToolCallResult)):
187+
try:
188+
mapped_events = await chat.map_event(event)
189+
except Exception as e:
190+
logger.warning("Error mapping agent event: %s", e)
191+
mapped_events = None
192+
if mapped_events:
193+
for mapped_event in mapped_events:
194+
mapped_chunks.append(mapped_event.model_dump(by_alias=True, exclude_none=True))
195+
if stream_events:
196+
yield UiPathRuntimeMessageEvent(payload=mapped_event)
197+
elif stream_events and not isinstance(event, BreakpointEvent):
198+
state_event = UiPathRuntimeStateEvent(
199+
payload=json.loads(serialize_json(event)),
200+
node_name=node_name,
201+
execution_id=self.runtime_id,
202+
)
203+
yield state_event
193204

194205
if isinstance(event, BreakpointEvent):
195206
# Check if we should actually pause at this breakpoint
@@ -215,6 +226,12 @@ async def _run_workflow(
215226
suspended_event = event
216227
break
217228

229+
for filename, data in [("raw_chunks.json", raw_chunks), ("mapped_chunks.json", mapped_chunks)]:
230+
output_path = os.path.abspath(filename)
231+
with open(output_path, "w") as f:
232+
json.dump(data, f, indent=2)
233+
print(f"{filename} written to: {output_path}")
234+
218235
if suspended_event is not None:
219236
await asyncio.sleep(0) # Yield control to event loop
220237
await self._save_context()

0 commit comments

Comments
 (0)