-
Notifications
You must be signed in to change notification settings - Fork 129
Expand file tree
/
Copy pathtriage.py
More file actions
195 lines (158 loc) · 6.89 KB
/
Copy pathtriage.py
File metadata and controls
195 lines (158 loc) · 6.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
---
title: Medical Office Triage System
category: complex-agents
tags: [multi_agent, agent_transfer, medical, context_preservation, chat_history]
difficulty: advanced
description: Multi-agent medical triage system with specialized departments
demonstrates:
- Multiple specialized agents (triage, support, billing)
- Agent-to-agent transfer with context preservation
- Chat history truncation and management
- Shared userdata across agent transfers
- Room attribute updates for agent tracking
- YAML prompt loading for agent instructions
---
"""
import logging
from dataclasses import dataclass, field
from typing import Optional
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent, AgentSession, RunContext
from livekit.plugins import cartesia, deepgram, openai, silero
from utils import load_prompt
logger = logging.getLogger("medical-office-triage")
logger.setLevel(logging.INFO)
load_dotenv()
@dataclass
class UserData:
"""Stores data and agents to be shared across the session"""
personas: dict[str, Agent] = field(default_factory=dict)
prev_agent: Optional[Agent] = None
ctx: Optional[JobContext] = None
def summarize(self) -> str:
return "User data: Medical office triage system"
RunContext_T = RunContext[UserData]
class BaseAgent(Agent):
async def on_enter(self) -> None:
agent_name = self.__class__.__name__
logger.info(f"Entering {agent_name}")
userdata: UserData = self.session.userdata
if userdata.ctx and userdata.ctx.room:
await userdata.ctx.room.local_participant.set_attributes({"agent": agent_name})
chat_ctx = self.chat_ctx.copy()
if userdata.prev_agent:
items_copy = self._truncate_chat_ctx(
userdata.prev_agent.chat_ctx.items, keep_function_call=True
)
existing_ids = {item.id for item in chat_ctx.items}
items_copy = [item for item in items_copy if item.id not in existing_ids]
chat_ctx.items.extend(items_copy)
chat_ctx.add_message(
role="system",
content=f"You are the {agent_name}. {userdata.summarize()}"
)
await self.update_chat_ctx(chat_ctx)
self.session.generate_reply()
def _truncate_chat_ctx(
self,
items: list,
keep_last_n_messages: int = 6,
keep_system_message: bool = False,
keep_function_call: bool = False,
) -> list:
"""Truncate the chat context to keep the last n messages."""
def _valid_item(item) -> bool:
if not keep_system_message and item.type == "message" and item.role == "system":
return False
if not keep_function_call and item.type in ["function_call", "function_call_output"]:
return False
return True
new_items = []
for item in reversed(items):
if _valid_item(item):
new_items.append(item)
if len(new_items) >= keep_last_n_messages:
break
new_items = new_items[::-1]
while new_items and new_items[0].type in ["function_call", "function_call_output"]:
new_items.pop(0)
return new_items
async def _transfer_to_agent(self, name: str, context: RunContext_T) -> Agent:
"""Transfer to another agent while preserving context"""
userdata = context.userdata
current_agent = context.session.current_agent
next_agent = userdata.personas[name]
userdata.prev_agent = current_agent
return next_agent
class TriageAgent(BaseAgent):
def __init__(self) -> None:
super().__init__(
instructions=load_prompt('triage_prompt.yaml'),
stt="assemblyai/universal-streaming",
llm="openai/gpt-4.1-mini",
tts="cartesia/sonic-2:6f84f4b8-58a2-430c-8c79-688dad597532",
vad=silero.VAD.load()
)
@function_tool
async def transfer_to_support(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you to our Patient Support team who can help with your medical services request.")
return await self._transfer_to_agent("support", context)
@function_tool
async def transfer_to_billing(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you to our Medical Billing department who can assist with your insurance and payment questions.")
return await self._transfer_to_agent("billing", context)
class SupportAgent(BaseAgent):
def __init__(self) -> None:
super().__init__(
instructions=load_prompt('support_prompt.yaml'),
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(),
vad=silero.VAD.load()
)
@function_tool
async def transfer_to_triage(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you back to our Medical Office Triage agent who can better direct your inquiry.")
return await self._transfer_to_agent("triage", context)
@function_tool
async def transfer_to_billing(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you to our Medical Billing department for assistance with your insurance and payment questions.")
return await self._transfer_to_agent("billing", context)
class BillingAgent(BaseAgent):
def __init__(self) -> None:
super().__init__(
instructions=load_prompt('billing_prompt.yaml'),
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(),
vad=silero.VAD.load()
)
@function_tool
async def transfer_to_triage(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you back to our Medical Office Triage agent who can better direct your inquiry.")
return await self._transfer_to_agent("triage", context)
@function_tool
async def transfer_to_support(self, context: RunContext_T) -> Agent:
await self.session.say("I'll transfer you to our Patient Support team who can help with your medical services request.")
return await self._transfer_to_agent("support", context)
async def entrypoint(ctx: JobContext):
userdata = UserData(ctx=ctx)
triage_agent = TriageAgent()
support_agent = SupportAgent()
billing_agent = BillingAgent()
# Register all agents in the userdata
userdata.personas.update({
"triage": triage_agent,
"support": support_agent,
"billing": billing_agent
})
session = AgentSession[UserData](userdata=userdata)
await session.start(
agent=triage_agent, # Start with the Medical Office Triage agent
room=ctx.room,
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))