-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy path__browser_sandbox_async_template.py
More file actions
179 lines (143 loc) · 5.71 KB
/
__browser_sandbox_async_template.py
File metadata and controls
179 lines (143 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""浏览器沙箱高层API模板 / Browser Sandbox High-Level API Template
此模板用于生成浏览器沙箱资源的高级API代码。
This template is used to generate high-level API code for browser sandbox resources.
"""
import asyncio
import time # noqa: F401
from typing import Dict, Literal, Optional, overload, Tuple, Union
from agentrun.sandbox.api import BrowserDataAPI
from agentrun.sandbox.model import TemplateType
from agentrun.utils.config import Config
from agentrun.utils.log import logger
from .sandbox import Sandbox
class BrowserSandbox(Sandbox):
_template_type = TemplateType.BROWSER
_data_api: Optional["BrowserDataAPI"] = None
async def __aenter__(self):
# Poll health check asynchronously
max_retries = 60 # Maximum 60 seconds
retry_count = 0
logger.debug("Waiting for browser to be ready...")
while retry_count < max_retries:
retry_count += 1
try:
health = await self.check_health_async()
if health["status"] == "ok":
logger.debug(
f"✓ Browser is ready! (took {retry_count} seconds)"
)
return self
logger.debug(
f"[{retry_count}/{max_retries}] Health status:"
f" {health.get('code')} {health.get('message')}",
)
except Exception as e:
logger.error(
f"[{retry_count}/{max_retries}] Health check failed: {e}"
)
if retry_count < max_retries:
await asyncio.sleep(1)
raise RuntimeError(
f"Health check timeout after {max_retries} seconds. "
"Browser did not become ready in time."
)
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.sandbox_id is None:
raise ValueError("Sandbox ID is not set")
logger.debug(f"Deleting browser sandbox {self.sandbox_id}...")
await self.delete_async()
@property
def data_api(self):
if self._data_api is None:
if self.sandbox_id is None:
raise ValueError("Sandbox ID is not set")
self._data_api = BrowserDataAPI(
sandbox_id=self.sandbox_id, config=self._config
)
return self._data_api
async def check_health_async(self):
return await self.data_api.check_health_async()
@overload
def get_cdp_url(
self,
record: Optional[bool] = False,
*,
with_headers: Literal[True],
config: Optional[Config] = None,
) -> Tuple[str, Dict[str, str]]:
...
@overload
def get_cdp_url(
self,
record: Optional[bool] = False,
*,
with_headers: Literal[False] = False,
config: Optional[Config] = None,
) -> str:
...
def get_cdp_url(
self,
record: Optional[bool] = False,
*,
with_headers: bool = False,
config: Optional[Config] = None,
) -> Union[str, Tuple[str, Dict[str, str]]]:
"""Get CDP WebSocket URL for browser automation.
Args:
record: Whether to enable recording / 是否启用录制
with_headers: If True, return (url, headers) tuple with authentication headers.
当为 True 时,返回 (url, headers) 元组,包含鉴权头信息。
config: Optional config override / 可选的配置覆盖
"""
return self.data_api.get_cdp_url(record=record, with_headers=with_headers, config=config) # type: ignore[call-overload]
@overload
def get_vnc_url(
self,
record: Optional[bool] = False,
*,
with_headers: Literal[True],
config: Optional[Config] = None,
) -> Tuple[str, Dict[str, str]]:
...
@overload
def get_vnc_url(
self,
record: Optional[bool] = False,
*,
with_headers: Literal[False] = False,
config: Optional[Config] = None,
) -> str:
...
def get_vnc_url(
self,
record: Optional[bool] = False,
*,
with_headers: bool = False,
config: Optional[Config] = None,
) -> Union[str, Tuple[str, Dict[str, str]]]:
"""Get VNC WebSocket URL for live view.
Args:
record: Whether to enable recording / 是否启用录制
with_headers: If True, return (url, headers) tuple with authentication headers.
当为 True 时,返回 (url, headers) 元组,包含鉴权头信息。
config: Optional config override / 可选的配置覆盖
"""
return self.data_api.get_vnc_url(record=record, with_headers=with_headers, config=config) # type: ignore[call-overload]
def sync_playwright(self, record: Optional[bool] = False):
return self.data_api.sync_playwright(record=record)
def async_playwright(self, record: Optional[bool] = False):
return self.data_api.async_playwright(record=record)
async def list_recordings_async(self):
return await self.data_api.list_recordings_async()
async def download_recording_async(self, filename: str, save_path: str):
"""
Asynchronously download a recording video file and save it to local path.
Args:
filename: The name of the recording file to download
save_path: Local file path to save the downloaded video file (.mkv)
Returns:
Dictionary with 'saved_path' and 'size' keys
"""
return await self.data_api.download_recording_async(filename, save_path)
async def delete_recording_async(self, filename: str):
return await self.data_api.delete_recording_async(filename)