-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathruntime.py
More file actions
187 lines (159 loc) · 6.45 KB
/
runtime.py
File metadata and controls
187 lines (159 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Interactive runtime adapter for the plan-and-task example."""
from __future__ import annotations
import asyncio
import re
import re as _re
import unicodedata
from typing import TYPE_CHECKING
from ecs_agent.components import UserInputComponent
from ecs_agent.components.definitions import ConversationComponent, TerminalComponent
from ecs_agent.logging import get_logger
from ecs_agent.providers.protocol import LLMModel
from ecs_agent.systems import TerminalCleanupSystem
from ecs_agent.systems.user_input import UserInputSystem
from ecs_agent.types import (
CompletionResult,
Message,
ReasoningCompleteEvent,
UserInputRequestedEvent,
)
if TYPE_CHECKING:
from ecs_agent.core import World
from ecs_agent.types import EntityId
logger = get_logger(__name__)
_MAX_SLUG_LENGTH = 50
_SLUG_SEPARATOR = "-"
def slug_from_description(description: str) -> str:
"""Derive a URL-safe workflow ID slug from a natural language task description.
Returns an empty string if the description yields no usable tokens,
leaving the caller to decide the fallback.
"""
text = description.strip()
if not text:
return ""
normalized = unicodedata.normalize("NFKC", text)
cjk_range = re.compile(
r"[\u4e00-\u9fff\u3400-\u4dbf\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]"
)
has_cjk = bool(cjk_range.search(normalized))
if has_cjk:
allowed = re.sub(
r"[^\u4e00-\u9fff\u3400-\u4dbf\u3040-\u309f\u30a0-\u30ff\uac00-\ud7afa-z0-9\s]",
"",
normalized.lower(),
)
slug = re.sub(r"\s+", _SLUG_SEPARATOR, allowed).strip(_SLUG_SEPARATOR)
else:
lower = normalized.lower()
allowed = re.sub(r"[^a-z0-9\s]", " ", lower)
tokens = allowed.split()
slug = _SLUG_SEPARATOR.join(tokens[:6])
return slug[:_MAX_SLUG_LENGTH].rstrip(_SLUG_SEPARATOR)
_VALID_SLUG = _re.compile(r"^[a-z][a-z0-9-]*$")
async def derive_workflow_id_from_llm(description: str, model: LLMModel) -> str:
prompt = (
"Convert the following task description into a short, meaningful English "
"workflow identifier. Rules: lowercase letters, digits, and hyphens only; "
"2-6 words; max 50 characters; no spaces, no punctuation, no explanation. "
"Return ONLY the identifier, nothing else.\n\n"
f"Description: {description}"
)
try:
result = await model.complete(
[Message(role="user", content=prompt)],
stream=False,
)
if not isinstance(result, CompletionResult):
return slug_from_description(description)
raw = (result.message.content or "").strip().splitlines()[0].strip().lower()
normalized = _re.sub(r"[^a-z0-9]+", "-", raw).strip("-")
normalized = normalized[:_MAX_SLUG_LENGTH].rstrip("-")
if normalized and _VALID_SLUG.match(normalized):
logger.info(
"plan_task_workflow_id_derived",
method="llm",
slug=normalized,
)
return normalized
except Exception as exc:
logger.warning(
"plan_task_workflow_id_llm_failed",
exception=str(exc),
)
fallback = slug_from_description(description)
logger.info(
"plan_task_workflow_id_derived",
method="fallback",
slug=fallback,
)
return fallback
async def setup_interactive_input(
world: World,
agent_id: EntityId,
) -> None:
"""Wire interactive stdin into the ECS world for the plan-and-task example."""
last_printed_index: list[int] = [0]
async def provide_input(event: UserInputRequestedEvent) -> None:
loop = asyncio.get_running_loop()
conv = world.get_component(event.entity_id, ConversationComponent)
if conv is not None:
for msg in conv.messages[last_printed_index[0] :]:
if msg.role == "assistant" and msg.content:
print(f"\nAssistant: {msg.content}\n")
last_printed_index[0] = len(conv.messages)
while True:
lines: list[str] = []
prompt = event.prompt
try:
while True:
line = await loop.run_in_executor(None, input, prompt)
if not lines and line.lower().strip() in ("exit", "quit"):
logger.info(
"plan_task_user_exit",
entity_id=int(event.entity_id),
)
world.add_component(
event.entity_id,
TerminalComponent(reason="user_exit_command"),
)
if not event.input_future.done():
event.input_future.set_result(line)
return
if line == "":
break
lines.append(line)
prompt = "... "
except EOFError:
if not lines:
world.add_component(
event.entity_id,
TerminalComponent(reason="stdin_eof"),
)
if not event.input_future.done():
event.input_future.set_result("exit")
return
user_text = "\n".join(lines).strip()
if not user_text:
continue
if not event.input_future.done():
event.input_future.set_result(user_text)
return
async def on_reasoning_complete(event: ReasoningCompleteEvent) -> None:
if event.entity_id != agent_id:
return
logger.info(
"plan_task_reasoning_complete",
entity_id=int(agent_id),
)
world.add_component(agent_id, UserInputComponent(prompt="You> "))
world.event_bus.subscribe(UserInputRequestedEvent, provide_input)
world.event_bus.subscribe(ReasoningCompleteEvent, on_reasoning_complete)
world.register_system(
TerminalCleanupSystem(priority=1, clear_reasons=("reasoning_complete",)),
priority=1,
)
# Priority -15: run BEFORE UserPromptNormalizationSystem (-10) so the user
# message is in ConversationComponent when normalization and script handlers run.
world.register_system(UserInputSystem(priority=-15), priority=-15)
if world.get_component(agent_id, UserInputComponent) is None:
world.add_component(agent_id, UserInputComponent(prompt="You> "))