-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy pathmanager.py
More file actions
172 lines (148 loc) · 6.65 KB
/
manager.py
File metadata and controls
172 lines (148 loc) · 6.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
if __name__ == "__main__":
from src.utilities.start_work_functions import print_ascii_logo
print_ascii_logo()
from dotenv import find_dotenv, load_dotenv
from src.utilities.set_up_dotenv import set_up_env_manager, add_todoist_envs
import os
if not find_dotenv():
set_up_env_manager()
elif load_dotenv(find_dotenv()) and not os.getenv("TODOIST_API_KEY"):
add_todoist_envs()
from typing import TypedDict, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.load import dumps
from langgraph.graph import StateGraph
from src.tools.tools_project_manager import add_task, modify_task, finish_project_planning, reorder_tasks
from src.tools.tools_coder_pipeline import (
prepare_list_dir_tool, prepare_see_file_tool, see_image,
ask_human_tool, retrieve_files_by_semantic_query
)
from src.tools.rag.index_file_descriptions import prompt_index_project_files
from src.utilities.util_functions import save_state_history_to_disk, join_paths, convert_image
from src.utilities.manager_utils import (
actualize_tasks_list_and_progress_description,
setup_todoist_project_if_needed,
get_manager_messages,
)
from src.utilities.langgraph_common_functions import (
call_model,
call_tool,
multiple_tools_msg,
no_tools_msg,
)
from src.utilities.start_project_functions import set_up_dot_clean_coder_dir
from src.utilities.llms import init_llms_medium_intelligence
from src.utilities.print_formatters import print_formatted
from src.tools.rag.retrieval import vdb_available
import json
import os
import uuid
class AgentState(TypedDict):
messages: Sequence[BaseMessage]
class Manager:
def __init__(self):
load_dotenv(find_dotenv())
self.work_dir = os.getenv("WORK_DIR")
# initial project setup
set_up_dot_clean_coder_dir(self.work_dir)
setup_todoist_project_if_needed()
prompt_index_project_files()
self.tools = self.prepare_tools()
self.llms = init_llms_medium_intelligence(tools=self.tools, run_name="Manager")
self.manager = self.setup_workflow()
self.saved_messages_path = join_paths(self.work_dir, ".clean_coder/manager_messages.json")
def call_model_manager(self, state):
save_state_history_to_disk(state, self.saved_messages_path)
state = call_model(state, self.llms)
state = self.cut_off_context(state)
ai_messages = [msg for msg in state["messages"] if msg.type == "ai"]
last_ai_message = ai_messages[-1]
# in case model will return empty message (that strange thing happens for 3.5 and 3.7 sonnet after task execution),
# we will replace it with tool call message to go with next task
if not last_ai_message.content and not last_ai_message.tool_calls:
state["messages"].pop()
state["messages"].append(AIMessage(
content="Proceeding with next task.",
tool_calls=[
{
"name": "finish_project_planning",
"args": {"dummy": "ok"},
"id": str(uuid.uuid4()),
"type": "tool_call",
}
]
))
state = call_tool(state, self.tools)
if len(last_ai_message.tool_calls) == 0:
state["messages"].append(HumanMessage(content=no_tools_msg))
for tool_call in last_ai_message.tool_calls:
if tool_call["name"] == "see_image":
state["messages"].append(HumanMessage(content=convert_image(tool_call["args"]["filename"])))
state = actualize_tasks_list_and_progress_description(state)
return state
# Logic for conditional edges
def after_agent_condition(self, state):
last_message = state["messages"][-1]
if last_message.content in (multiple_tools_msg, no_tools_msg):
return "agent"
return "tool"
# just functions
def cut_off_context(self, state):
"""
Trims the message history in the state to keep only the most recent context for the agent.
Keeps the last ~30 messages, starting from the most recent 'ai' message, and always includes the system message at the beginning.
"""
approx_nr_msgs_to_save = 30
if len(state["messages"]) > approx_nr_msgs_to_save:
last_messages = state["messages"][-approx_nr_msgs_to_save:]
# Find the index of the first 'ai' message from the end in the last 30 messages
ai_message_index_in_last_msgs = next(
(i for i, message in enumerate(last_messages) if message.type == "ai"), None
)
# Calculate the actual index of the 'ai' message in the original list
ai_message_index = len(state["messages"]) - approx_nr_msgs_to_save + ai_message_index_in_last_msgs
# Collect all messages starting from the 'ai' message
last_messages_excluding_system = [
msg for msg in state["messages"][ai_message_index:] if msg.type != "system"
]
system_message = state["messages"][0]
state["messages"] = [system_message] + last_messages_excluding_system
return state
def save_messages_to_disk(self, state):
# remove system message
messages = state["messages"][1:]
messages_string = dumps(messages)
with open(self.saved_messages_path, "w") as f:
json.dump(messages_string, f)
def prepare_tools(self):
list_dir = prepare_list_dir_tool(self.work_dir)
see_file = prepare_see_file_tool(self.work_dir)
tools = [
add_task,
modify_task,
reorder_tasks,
ask_human_tool,
finish_project_planning,
list_dir,
see_file,
see_image,
]
if vdb_available():
tools.append(retrieve_files_by_semantic_query)
return tools
# workflow definition
def setup_workflow(self):
manager_workflow = StateGraph(AgentState)
manager_workflow.add_node("agent", self.call_model_manager)
manager_workflow.set_entry_point("agent")
manager_workflow.add_edge("agent", "agent")
return manager_workflow.compile()
def run(self):
"""Start the Manager agent to plan and manage project tasks interactively."""
print_formatted("😀 Hello! I'm Manager agent. Let's plan your project together!", color="green")
messages = get_manager_messages(self.saved_messages_path)
inputs = {"messages": messages}
self.manager.invoke(inputs, {"recursion_limit": 1000})
if __name__ == "__main__":
manager_instance = Manager()
manager_instance.run()