-
Notifications
You must be signed in to change notification settings - Fork 109
Expand file tree
/
Copy pathcontext.py
More file actions
281 lines (237 loc) · 11.4 KB
/
context.py
File metadata and controls
281 lines (237 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import asyncio
import os
import weakref
from playwright.async_api import BrowserContext, Page
from .page import StagehandPage
class StagehandContext:
def __init__(self, context: BrowserContext, stagehand):
self._context = context
self.stagehand = stagehand
# Use a weak key dictionary to map Playwright Pages to our StagehandPage wrappers
self.page_map = weakref.WeakKeyDictionary()
self.active_stagehand_page = None
# Map frame IDs to StagehandPage instances
self.frame_id_map = {}
async def new_page(self) -> StagehandPage:
pw_page: Page = await self._context.new_page()
stagehand_page = await self.create_stagehand_page(pw_page)
self.set_active_page(stagehand_page)
return stagehand_page
async def create_stagehand_page(self, pw_page: Page) -> StagehandPage:
# Create a StagehandPage wrapper for the given Playwright page
stagehand_page = StagehandPage(pw_page, self.stagehand, self)
await self.inject_custom_scripts(pw_page)
self.page_map[pw_page] = stagehand_page
# Initialize frame tracking for this page
await self._attach_frame_navigated_listener(pw_page, stagehand_page)
return stagehand_page
async def inject_custom_scripts(self, pw_page: Page):
script_path = os.path.join(os.path.dirname(__file__), "domScripts.js")
try:
with open(script_path) as f:
script = f.read()
except Exception as e:
self.stagehand.logger.error(f"Error reading domScripts.js: {e}")
script = "/* fallback injection script */"
await pw_page.add_init_script(script)
async def get_stagehand_page(self, pw_page: Page) -> StagehandPage:
if pw_page not in self.page_map:
return await self.create_stagehand_page(pw_page)
stagehand_page = self.page_map[pw_page]
return stagehand_page
async def get_stagehand_pages(self) -> list:
# Return a list of StagehandPage wrappers for all pages in the context
pages = self._context.pages
result = []
for pw_page in pages:
stagehand_page = await self.get_stagehand_page(pw_page)
result.append(stagehand_page)
return result
def set_active_page(self, stagehand_page: StagehandPage):
self.active_stagehand_page = stagehand_page
# Update the active page in the stagehand client
if hasattr(self.stagehand, "_set_active_page"):
self.stagehand._set_active_page(stagehand_page)
self.stagehand.logger.debug(
f"Set active page to: {stagehand_page.url}", category="context"
)
else:
self.stagehand.logger.debug(
"Stagehand does not have _set_active_page method", category="context"
)
def get_active_page(self) -> StagehandPage:
return self.active_stagehand_page
def register_frame_id(self, frame_id: str, page: StagehandPage):
"""Register a frame ID to StagehandPage mapping."""
self.frame_id_map[frame_id] = page
def unregister_frame_id(self, frame_id: str):
"""Unregister a frame ID from the mapping."""
if frame_id in self.frame_id_map:
del self.frame_id_map[frame_id]
def get_stagehand_page_by_frame_id(self, frame_id: str) -> StagehandPage:
"""Get StagehandPage by frame ID."""
return self.frame_id_map.get(frame_id)
@classmethod
async def init(cls, context: BrowserContext, stagehand):
instance = cls(context, stagehand)
# Pre-initialize StagehandPages for any existing pages
stagehand.logger.debug(
f"Found {len(instance._context.pages)} existing pages", category="context"
)
for pw_page in instance._context.pages:
await instance.create_stagehand_page(pw_page)
if instance._context.pages:
first_page = instance._context.pages[0]
stagehand_page = await instance.get_stagehand_page(first_page)
instance.set_active_page(stagehand_page)
# Add event listener for new pages (popups, new tabs from window.open, etc.)
def handle_page_event(pw_page):
# Playwright expects sync handler, so we schedule the async work
asyncio.create_task(instance._handle_new_page(pw_page))
context.on("page", handle_page_event)
return instance
async def _handle_new_page(self, pw_page: Page):
"""
Handle new pages created by the browser (popups, window.open, etc.).
Uses the page switch lock to prevent race conditions with ongoing operations.
"""
try:
# Use wait_for for Python 3.10 compatibility (timeout prevents indefinite blocking)
async def handle_with_lock():
async with self.stagehand._page_switch_lock:
self.stagehand.logger.debug(
f"Creating StagehandPage for new page with URL: {pw_page.url}",
category="context",
)
stagehand_page = await self.create_stagehand_page(pw_page)
self.set_active_page(stagehand_page)
self.stagehand.logger.debug(
"New page detected and initialized", category="context"
)
await asyncio.wait_for(handle_with_lock(), timeout=30)
except asyncio.TimeoutError:
self.stagehand.logger.error(
f"Timeout waiting for page switch lock when handling new page: {pw_page.url}",
category="context",
)
except Exception as e:
self.stagehand.logger.error(
f"Failed to initialize new page: {str(e)}", category="context"
)
async def _handle_page_close(self, closing_page: StagehandPage):
"""
Handle page close events and update the active page if needed.
Uses the page switch lock to prevent race conditions with ongoing operations.
"""
try:
async def handle_with_lock():
async with self.stagehand._page_switch_lock:
if self.active_stagehand_page is not closing_page:
return
remaining_pages = self._context.pages
if remaining_pages:
first_remaining = remaining_pages[0]
new_active = self.page_map.get(first_remaining)
if new_active:
self.set_active_page(new_active)
self.stagehand.logger.debug(
f"Active page closed, switching to: {first_remaining.url}",
category="context",
)
else:
self.stagehand.logger.debug(
"Could not find StagehandPage wrapper for remaining page",
category="context",
)
else:
self.active_stagehand_page = None
self.stagehand.logger.debug(
"Active page closed and no pages remaining",
category="context",
)
await asyncio.wait_for(handle_with_lock(), timeout=30)
except asyncio.TimeoutError:
self.stagehand.logger.error(
"Timeout waiting for page switch lock when handling page close",
category="context",
)
except Exception as e:
self.stagehand.logger.error(
f"Failed to handle page close: {str(e)}", category="context"
)
def __getattr__(self, name):
# Forward attribute lookups to the underlying BrowserContext
attr = getattr(self._context, name)
# Special handling for methods that return pages
if name == "new_page":
# Replace with our own implementation that wraps the page
async def wrapped_new_page(*args, **kwargs):
pw_page = await self._context.new_page(*args, **kwargs)
stagehand_page = await self.create_stagehand_page(pw_page)
self.set_active_page(stagehand_page)
return stagehand_page
return wrapped_new_page
elif name == "pages":
async def wrapped_pages():
pw_pages = self._context.pages
# Return StagehandPage objects
result = []
for pw_page in pw_pages:
stagehand_page = await self.get_stagehand_page(pw_page)
result.append(stagehand_page)
return result
return wrapped_pages
return attr
async def _attach_frame_navigated_listener(
self, pw_page: Page, stagehand_page: StagehandPage
):
"""
Attach CDP listener for frame navigation events to track frame IDs.
This mirrors the TypeScript implementation's frame tracking.
"""
try:
# Create CDP session for the page
cdp_session = await self._context.new_cdp_session(pw_page)
await cdp_session.send("Page.enable")
# Get the current root frame ID
frame_tree = await cdp_session.send("Page.getFrameTree")
root_frame_id = frame_tree.get("frameTree", {}).get("frame", {}).get("id")
if root_frame_id:
# Initialize the page with its frame ID
stagehand_page.update_root_frame_id(root_frame_id)
self.register_frame_id(root_frame_id, stagehand_page)
# Set up event listener for frame navigation
def on_frame_navigated(params):
"""Handle Page.frameNavigated events"""
frame = params.get("frame", {})
frame_id = frame.get("id")
parent_id = frame.get("parentId")
# Only track root frames (no parent)
if not parent_id and frame_id:
# Skip if it's the same frame ID
if frame_id == stagehand_page.frame_id:
return
# Unregister old frame ID if exists
old_id = stagehand_page.frame_id
if old_id:
self.unregister_frame_id(old_id)
# Register new frame ID
self.register_frame_id(frame_id, stagehand_page)
stagehand_page.update_root_frame_id(frame_id)
self.stagehand.logger.debug(
f"Frame navigated from {old_id} to {frame_id}",
category="context",
)
# Register the event listener
cdp_session.on("Page.frameNavigated", on_frame_navigated)
def on_page_close():
if stagehand_page.frame_id:
self.unregister_frame_id(stagehand_page.frame_id)
if self.active_stagehand_page is stagehand_page:
asyncio.create_task(self._handle_page_close(stagehand_page))
pw_page.once("close", on_page_close)
except Exception as e:
self.stagehand.logger.error(
f"Failed to attach frame navigation listener: {str(e)}",
category="context",
)