-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpages.py
More file actions
256 lines (223 loc) · 8.71 KB
/
pages.py
File metadata and controls
256 lines (223 loc) · 8.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import json
from datetime import datetime
from typing import Any, Dict, List, Union
from autosim.agent import Agent
from autosim.base import Action, BaseSimulator, Movie, BaseView
from autosim.utils import (
process_prompt_with_images,
)
from actions import *
from prompts import (
MOVIE_DETAIL_VIEW_PROMPT,
MOVIE_LIST_VIEW_PROMPT,
)
image_marker = "<IMAGE>"
class MovieListPage(BaseView):
def __init__(
self,
movies: List[Movie],
current_page: int,
total_pages: int,
name: str = "Movie List",
prompt_template: str = MOVIE_LIST_VIEW_PROMPT,
):
self.name = name
self.movies = movies
self.current_page = current_page
self.total_pages = total_pages
self.prompt_template = prompt_template
def format_movie_details(self, movie: Movie) -> str:
image_data = {"image_url": movie.poster_path} if movie.poster_path else None
image_part = (
f"{image_marker}{json.dumps(image_data)}{image_marker}\n"
if image_data
else ""
)
detail = (
f"{image_part}"
f"{movie.title}\n"
f"ID: {movie.id}\n"
f"Genres: {', '.join(movie.genres)}\n"
f"Rating: {movie.rating}\n"
)
if movie.user_rating is not None:
detail += f"Your Rating: {movie.user_rating}\n"
return detail.strip()
def format_movie_details(self, movie: Movie) -> str:
image_data = {"image_url": movie.poster_path} if movie.poster_path else None
image_part = (
f"{image_marker}{json.dumps(image_data)}{image_marker}\n"
if image_data
else ""
)
title_line = movie.title
if movie.user_rating is not None:
title_line += f" (Viewed, Your Rating: {movie.user_rating})"
detail = (
f"{image_part}"
f"{title_line}\n"
f"ID: {movie.id}\n"
f"Genres: {', '.join(movie.genres)}\n"
f"Rating: {movie.rating}\n"
)
return detail.strip()
def get_actions(self) -> List[Action]:
actions = [click_movie, exit_action]
if self.current_page > 1:
actions.append(previous_page)
if self.current_page < self.total_pages:
actions.append(next_page)
return actions
def get_content(self) -> Union[str, List[Dict[str, Any]]]:
mov_lst = [self.format_movie_details(movie) for movie in self.movies]
prompt = self.prompt_template.format(
movies="\n\n".join(mov_lst),
current_page=self.current_page,
total_pages=self.total_pages,
actions=[action.to_dict() for action in self.get_actions()],
)
return process_prompt_with_images(prompt, image_marker=image_marker)
def execute_action(self, action: Action, simulator: "BaseSimulator") -> BaseView:
action_name = action.name
action_args = action.parameters or {}
page_number = self.current_page
if action_name == next_page.name and self.current_page < self.total_pages:
page_number += 1
elif action_name == previous_page.name and self.current_page > 1:
page_number -= 1
elif action_name == click_movie.name:
movie_id = action_args.get("movie_id")
movie = simulator.get_movie_by_id(movie_id)
return MovieDetailPage(movie=movie)
else:
raise ValueError(f"Invalid action: {action_name}")
simulator.current_page = page_number
return MovieListPage(
movies=simulator.get_movies_in_page(page_number),
current_page=page_number,
total_pages=simulator.total_pages,
)
class MovieDetailPage(BaseView):
def __init__(
self,
movie: Movie,
name: str = "Movie Detail",
prompt_template: str = MOVIE_DETAIL_VIEW_PROMPT,
):
self.name = name
self.movie = movie
self.prompt_template = prompt_template
def get_actions(self) -> List[Action]:
return [back_action, watch_and_rate_movie, exit_action]
def get_content(self) -> str:
image_data = (
{"image_url": self.movie.poster_path} if self.movie.poster_path else None
)
image_part = (
f"{image_marker}{json.dumps(image_data)}{image_marker}\n"
if image_data
else ""
)
detail = (
f"{image_part}"
f"{self.movie.title}\n"
f"ID: {self.movie.id}\n"
f"Genres: {', '.join(self.movie.genres)}\n"
f"Rating: {self.movie.rating}\n"
)
if self.movie.user_rating is not None:
detail += f"Your Rating: {self.movie.user_rating}\n"
detail += (
f"Vote Count: {self.movie.vote_count}\n"
f"Release Date: {self.movie.release_date}\n"
f"Director: {self.movie.directors}\n"
f"Actors: {self.movie.actors}\n"
f"Overview: {self.movie.overview}"
)
prompt = self.prompt_template.format(
movie_detail=detail,
actions=[action.to_dict() for action in self.get_actions()],
)
return process_prompt_with_images(prompt, image_marker=image_marker)
def execute_action(self, action: Action, simulator: "BaseSimulator") -> BaseView:
return MovieListPage(
movies=simulator.get_movies_in_page(simulator.current_page),
current_page=simulator.current_page,
total_pages=simulator.total_pages,
)
class MovieWebsite(BaseSimulator):
def __init__(
self,
user: Agent,
movies: List[Movie],
n_movies_per_page: int = 5,
):
self.user = user
self.movies = movies
self.movie_pages = self._create_movie_pages(movies, n_movies_per_page)
self.current_page = 1
self.logs = []
@property
def total_pages(self) -> int:
return len(self.movie_pages)
def _create_movie_pages(self, lst: List, n: int) -> List[List[Movie]]:
"""Chunks a list into smaller lists of a given size."""
return [lst[i : i + n] for i in range(0, len(lst), n)]
def get_initial_page(self) -> BaseView:
"""Returns the initial page view."""
return MovieListPage(
movies=self.get_movies_in_page(self.current_page),
current_page=self.current_page,
total_pages=self.total_pages,
)
def get_movie_by_id(self, movie_id: int) -> Movie:
"""Gets a movie by its ID."""
for movie in self.movies:
if movie.id == movie_id:
return movie
raise ValueError(f"Movie with ID {movie_id} not found.")
def update_movie_rating(self, movie_id: int, rating: float) -> None:
"""Updates the user rating for a specific movie."""
for movie in self.movies:
if movie.id == movie_id:
movie.user_rating = rating
break
def get_movies_in_page(self, page_number: int) -> List[Movie]:
"""Gets the movies for a given page number."""
return self.movie_pages[page_number - 1]
def log_action(self, action: Action) -> None:
"""Logs an action along with a message."""
log_entry = {
"datetime": datetime.now().isoformat(),
"action": action.to_dict(),
"token_usage": self.user.token_usage[-1],
}
self.logs.append(log_entry)
def run(self) -> None:
"""Runs the simulator."""
page = self.get_initial_page()
while True:
content = page.get_content()
self.user.receive(content)
messages, action = self.user.generate_reply(
page=self.current_page, page_name=page.name
)
if action.name == watch_and_rate_movie.name:
movie_id = action.parameters["movie_id"]
rating = action.parameters["rating"]
self.update_movie_rating(movie_id, rating)
self.log_action(action)
fatigue_module = self.user.working_memory.fatigue_module
cumulative_fatigue = fatigue_module.cumulative_fatigue
max_fatigue = fatigue_module.max_fatigue
working_memory_length = self.user.working_memory.memory_length
average_matching_score = self.user.working_memory.average_match_score
if (
action.name == exit_action.name
or cumulative_fatigue >= max_fatigue
or (working_memory_length >= 10 and average_matching_score <= 3)
or working_memory_length >= 15
):
break
page = page.execute_action(action, self)
self.user.clear_chat_messages()