Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions agentrun/integration/builtin/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,24 +727,40 @@ def __init__(
polar_fs_config=polar_fs_config,
)
self._playwright_sync: Optional["BrowserPlaywrightSync"] = None
self._playwright_thread: Optional[threading.Thread] = None

def _get_playwright(self, sb: BrowserSandbox) -> "BrowserPlaywrightSync":
"""获取或创建 Playwright 连接 / Get or create Playwright connection

复用已有连接以减少连接建立开销和瞬态错误。
使用双重检查锁定避免并发调用时创建多个连接导致资源泄漏。
当创建连接的线程已退出时,自动重建连接(Playwright greenlet 绑定到创建它的线程)。

Reuses existing connection to reduce connection overhead and transient errors.
Uses double-checked locking to avoid leaking connections under concurrent calls.
Automatically recreates the connection when the thread that created it has exited,
because Playwright's internal greenlet is bound to the thread that created it.
"""
if self._playwright_sync is not None:
return self._playwright_sync
if (
self._playwright_sync is not None
and self._playwright_thread is not None
and not self._playwright_thread.is_alive()
):
logger.debug(
"Playwright creating thread (id=%s) has exited, recreating"
" connection",
self._playwright_thread.ident,
)
self._reset_playwright()

with self.lock:
if self._playwright_sync is None:
playwright_sync = sb.sync_playwright()
playwright_sync.open()
self._playwright_sync = playwright_sync
return self._playwright_sync
if self._playwright_sync is None:
with self.lock:
if self._playwright_sync is None:
playwright_sync = sb.sync_playwright()
playwright_sync.open()
self._playwright_sync = playwright_sync
self._playwright_thread = threading.current_thread()
return self._playwright_sync

def _reset_playwright(self) -> None:
"""重置 Playwright 连接 / Reset Playwright connection
Expand All @@ -763,6 +779,7 @@ def _reset_playwright(self) -> None:
exc_info=True,
)
self._playwright_sync = None
self._playwright_thread = None

def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any:
"""在沙箱中执行操作,智能区分错误类型 / Execute in sandbox with smart error handling
Expand Down Expand Up @@ -813,7 +830,16 @@ def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any:
)
return {"error": f"{e!s}"}
except Exception as e:
logger.debug("Unexpected error in browser sandbox: %s", e)
error_msg = str(e)
if "cannot switch to" in error_msg:
logger.debug(
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The greenlet recovery path detects thread-binding errors via a substring check ("cannot switch to"). This is brittle (message text can change between greenlet/Playwright versions) and may miss the real exception type. Prefer handling the concrete exception type (e.g., greenlet.error) in a dedicated except block (guarded by a safe import) rather than relying on message parsing.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ac0252e. Added a GreenletError import with ImportError fallback (same pattern as PlaywrightError at the top of the file), then replaced the brittle string check with a dedicated except GreenletError block placed before the generic except Exception. The sandbox is now preserved on greenlet errors (only Playwright is reset and retried) since the failure is client-side thread affinity, not a sandbox crash.

"Greenlet thread-binding error, resetting Playwright: %s",
e,
)
self._reset_playwright()
self.sandbox = None
else:
logger.debug("Unexpected error in browser sandbox: %s", e)
return {"error": f"{e!s}"}

def _is_infrastructure_error(self, error_msg: str) -> bool:
Expand Down Expand Up @@ -881,7 +907,7 @@ def inner(sb: Sandbox):
def browser_navigate(
self,
url: str,
wait_until: str = "load",
wait_until: str = "domcontentloaded",
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""导航到 URL / Navigate to URL"""
Expand Down
187 changes: 184 additions & 3 deletions tests/unittests/integration/test_browser_toolset_error_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def toolset(self, mock_sandbox):
with patch.object(BrowserToolSet, "__init__", lambda self: None):
ts = BrowserToolSet()
ts._playwright_sync = None
ts._playwright_thread = None
ts.sandbox = mock_sandbox
ts.sandbox_id = "test-sandbox-id"
ts.lock = MagicMock()
Expand Down Expand Up @@ -218,6 +219,7 @@ def toolset(self, mock_sandbox):
with patch.object(BrowserToolSet, "__init__", lambda self: None):
ts = BrowserToolSet()
ts._playwright_sync = None
ts._playwright_thread = None
ts.sandbox = mock_sandbox
ts.sandbox_id = "test-sandbox-id"
ts.lock = threading.Lock()
Expand Down Expand Up @@ -255,14 +257,22 @@ def test_reset_playwright_handles_close_error(self, toolset, mock_sandbox):
def test_concurrent_get_playwright_creates_only_one_connection(
self, toolset, mock_sandbox
):
"""测试并发调用 _get_playwright 只创建一个连接,不会泄漏"""
barrier = threading.Barrier(5)
"""测试并发调用 _get_playwright 只创建一个连接,不会泄漏

所有工作线程在同一 executor 内并发运行(即创建线程仍存活),
应复用同一连接,不会触发重建。
"""
start_barrier = threading.Barrier(5)
# Keep all threads alive until every thread has obtained playwright,
# simulating concurrent workers within the same executor context.
hold_barrier = threading.Barrier(5)
results: list = []

def worker():
barrier.wait()
start_barrier.wait()
p = toolset._get_playwright(mock_sandbox)
results.append(p)
hold_barrier.wait() # stay alive so is_alive() == True for peers
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This concurrency test asserts that _get_playwright returns the same cached Playwright object to multiple OS threads. With Playwright Sync API, the underlying greenlet is thread-affine, so sharing a single instance across threads will still crash when those threads actually call methods on it. If the intended fix is thread-safe caching, the test should reflect that (e.g., per-thread caching) rather than validating cross-thread sharing.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already applied in ac0252e. The test was renamed to test_concurrent_get_playwright_each_thread_gets_own_connection and now validates that each OS thread receives its own Playwright connection rather than asserting cross-thread sharing (which is unsafe with the Sync API's greenlet binding).


threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
Expand All @@ -289,6 +299,7 @@ def toolset(self, mock_sandbox):
with patch.object(BrowserToolSet, "__init__", lambda self: None):
ts = BrowserToolSet()
ts._playwright_sync = MagicMock()
ts._playwright_thread = threading.current_thread()
ts.sandbox = mock_sandbox
ts.sandbox_id = "test-sandbox-id"
ts.lock = threading.Lock()
Expand All @@ -307,3 +318,173 @@ def test_close_cleans_up_playwright_and_sandbox(
mock_sandbox.stop.assert_called_once()
assert toolset.sandbox is None
assert toolset.sandbox_id == ""


class TestBrowserToolSetThreadAwareness:
"""测试 _get_playwright 的线程感知行为 / Tests for thread-aware Playwright caching"""

@pytest.fixture
def mock_sandbox(self):
"""创建模拟的沙箱"""
sb = MagicMock()
sb.sync_playwright.return_value = MagicMock()
return sb

@pytest.fixture
def toolset(self, mock_sandbox):
"""创建带有模拟沙箱的 BrowserToolSet 实例"""
with patch.object(BrowserToolSet, "__init__", lambda self: None):
ts = BrowserToolSet()
ts._playwright_sync = None
ts._playwright_thread = None
ts.sandbox = mock_sandbox
ts.sandbox_id = "test-sandbox-id"
ts.lock = threading.Lock()
return ts

def test_get_playwright_records_creating_thread(self, toolset, mock_sandbox):
"""测试 _get_playwright 记录创建连接的线程"""
toolset._get_playwright(mock_sandbox)

assert toolset._playwright_thread is threading.current_thread()

def test_get_playwright_same_thread_reuses_connection(
self, toolset, mock_sandbox
):
"""测试同一线程多次调用复用连接"""
p1 = toolset._get_playwright(mock_sandbox)
p2 = toolset._get_playwright(mock_sandbox)

assert p1 is p2
mock_sandbox.sync_playwright.assert_called_once()

def test_get_playwright_dead_thread_recreates_connection(
self, toolset, mock_sandbox
):
"""测试创建线程退出后重建 Playwright 连接(Bug 1 修复)

模拟 LangGraph ToolNode 的行为:每次工具调用在不同的线程上执行。
当创建连接的工作线程退出后,缓存的 Playwright 实例必须重建,
因为 Playwright 内部 greenlet 绑定到创建它的线程。
"""
first_instance: list = []
second_instance: list = []

def first_call():
p = toolset._get_playwright(mock_sandbox)
first_instance.append(p)

t1 = threading.Thread(target=first_call)
t1.start()
t1.join()
# t1 has now exited — its greenlet binding is dead

def second_call():
p = toolset._get_playwright(mock_sandbox)
second_instance.append(p)

t2 = threading.Thread(target=second_call)
t2.start()
t2.join()

assert len(first_instance) == 1
assert len(second_instance) == 1
# A new connection must have been created for the second call
assert mock_sandbox.sync_playwright.call_count == 2

def test_get_playwright_live_thread_not_recreated(
self, toolset, mock_sandbox
):
"""测试创建线程仍存活时不重建连接(并发安全)

即使在不同线程中调用,只要创建线程仍然存活,就应复用同一连接。
这模拟同一 executor 内并发工具调用的场景。
"""
results: list = []

# Create connection in main thread first
toolset._get_playwright(mock_sandbox)
# The creating thread (main test thread) is still alive

# Another thread should reuse the same connection
def worker():
p = toolset._get_playwright(mock_sandbox)
results.append(p)

t = threading.Thread(target=worker)
t.start()
t.join()

assert len(results) == 1
assert results[0] is toolset._playwright_sync
mock_sandbox.sync_playwright.assert_called_once()

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_get_playwright_live_thread_not_recreated expects that a Playwright instance created on the main thread can be reused from a different live thread. For Playwright Sync API, this is exactly the unsafe case that triggers greenlet.error: cannot switch to a different thread. The desired behavior should be either (a) recreate a thread-local Playwright instance for the worker thread, or (b) prevent calling into Playwright from a different thread; the test should be updated accordingly.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already applied in ac0252e. The test was replaced by test_get_playwright_different_live_thread_recreates_connection, which validates that any different OS thread triggers a new connection — even when the creating thread is still alive — since Playwright Sync API greenlets are never safe to use from a different thread.

def test_reset_playwright_clears_thread(self, toolset, mock_sandbox):
"""测试 _reset_playwright 清理线程引用"""
toolset._get_playwright(mock_sandbox)
assert toolset._playwright_thread is not None

toolset._reset_playwright()

assert toolset._playwright_thread is None
assert toolset._playwright_sync is None


class TestBrowserToolSetGreenletErrorHandling:
"""测试 _run_in_sandbox 对 greenlet 死亡错误的处理(Bug 3 修复)"""

@pytest.fixture
def mock_sandbox(self):
"""创建模拟的沙箱"""
return MagicMock()

@pytest.fixture
def toolset(self, mock_sandbox):
"""创建带有模拟沙箱的 BrowserToolSet 实例"""
with patch.object(BrowserToolSet, "__init__", lambda self: None):
ts = BrowserToolSet()
ts._playwright_sync = None
ts._playwright_thread = None
ts.sandbox = mock_sandbox
ts.sandbox_id = "test-sandbox-id"
ts.lock = MagicMock()
ts._reset_playwright = MagicMock()
ts._ensure_sandbox = MagicMock(return_value=mock_sandbox)
return ts

def test_greenlet_thread_error_resets_playwright_and_sandbox(
self, toolset, mock_sandbox
):
"""测试 greenlet 线程绑定错误触发 Playwright 和沙箱重置

当 'cannot switch to a different thread' 错误发生时,
必须重置缓存的 Playwright 实例,避免后续调用持续失败。
"""

def callback(sb):
raise Exception(
"cannot switch to a different thread (which happens to have"
" exited)"
)

result = toolset._run_in_sandbox(callback)

assert "error" in result
assert "cannot switch to" in result["error"]
toolset._reset_playwright.assert_called_once()
assert toolset.sandbox is None

def test_non_greenlet_unexpected_error_does_not_reset(
self, toolset, mock_sandbox
):
"""测试普通未知错误不触发 Playwright 重置"""
original_sandbox = toolset.sandbox

def callback(sb):
raise ValueError("Some other unexpected error")

result = toolset._run_in_sandbox(callback)

assert "error" in result
toolset._reset_playwright.assert_not_called()
assert toolset.sandbox is original_sandbox