-
Notifications
You must be signed in to change notification settings - Fork 161
Expand file tree
/
Copy pathpython_entrypoint_stub.py
More file actions
128 lines (108 loc) · 4.4 KB
/
python_entrypoint_stub.py
File metadata and controls
128 lines (108 loc) · 4.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Example Python entrypoint for Rogue agent evaluation.
This file demonstrates how to create a Python entrypoint for testing
agents without A2A or MCP protocols. Rogue will dynamically import
this file and call the `call_agent` function.
Or via TUI:
1. Select "Python" as the protocol
2. Enter the path to your Python file
The call_agent function receives the full conversation history and
should return the agent's response as a string.
"""
from typing import Any, Optional
def call_agent(
messages: list[dict[str, Any]],
context_id: Optional[str] = None,
**kwargs: Any,
) -> str:
"""
Process conversation messages and return a response.
This function is called by Rogue to interact with your agent.
It receives the full conversation history and should return
the agent's response to the latest user message.
Args:
messages: List of message dicts with 'role' and 'content' keys.
The role is either 'user' or 'assistant'.
context_id: Optional unique conversation ID provided by Rogue.
Use this for session tracking in stateful agents.
**kwargs: Per-turn side-data forwarded by the multi-turn driver
(only when the scenario declares `available_kwargs` and the
driver chose to attach a key to this turn). Example: a
scenario can declare `available_kwargs={"file_path": "..."}`
and `kwargs.get("file_path")` will be set on the turn the
driver picks the upload step. Empty dict on other turns.
Returns:
The agent's response as a string.
Note:
- This function can be sync or async (Rogue handles both).
- Raise exceptions to indicate errors.
- The messages list grows with each turn of conversation.
"""
# Extract the latest user message
latest_message = messages[-1]["content"] if messages else ""
# ==============================================
# Replace this with your actual agent logic!
# ==============================================
#
# Example integrations:
#
# 1. OpenAI API:
# from openai import OpenAI
# client = OpenAI()
# response = client.chat.completions.create(
# model="gpt-4",
# messages=messages
# )
# return response.choices[0].message.content
#
# 2. LangChain:
# from langchain_openai import ChatOpenAI
# from langchain_core.messages import HumanMessage, AIMessage
# chat = ChatOpenAI()
# lc_messages = [
# HumanMessage(content=m["content"]) if m["role"] == "user"
# else AIMessage(content=m["content"])
# for m in messages
# ]
# response = chat.invoke(lc_messages)
# return response.content
#
# 3. Local model with transformers:
# from transformers import pipeline
# pipe = pipeline("text-generation", model="...")
# prompt = "\n".join(f"{m['role']}: {m['content']}" for m in messages)
# return pipe(prompt)[0]["generated_text"]
# Demonstrate per-turn kwargs: if the driver attached `file_path` this
# turn, read the file and reflect the size back so a scenario can
# exercise the upload step end-to-end.
file_path = kwargs.get("file_path")
if file_path:
try:
with open(file_path, "rb") as f:
size = len(f.read())
return f"Echo: {latest_message} (uploaded {file_path}, {size} bytes)"
except OSError as e:
return f"Echo: {latest_message} (file read error: {e})"
# Default echo implementation for testing
return f"Echo: {latest_message}"
# Optional: Async version
# Rogue automatically detects and awaits async functions
async def call_agent_async(
messages: list[dict[str, Any]],
context_id: Optional[str] = None,
**kwargs: Any,
) -> str:
"""
Async version of call_agent.
If you rename this to `call_agent`, Rogue will use it instead.
Useful for agents that make async API calls.
Args:
messages: List of message dicts with 'role' and 'content' keys.
context_id: Optional unique conversation ID for session tracking.
"""
import asyncio
# Simulate async processing
await asyncio.sleep(0.1)
latest_message = messages[-1]["content"] if messages else ""
# context_id can be used for session management
return f"Async Echo (session: {context_id}): {latest_message}"