-
Notifications
You must be signed in to change notification settings - Fork 1
fix(BrowserToolSet): Playwright greenlet thread-binding crash on cross-thread cache reuse #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
26c6c3a
ef0a926
d1bafaf
915e97a
ac0252e
213fc6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -727,24 +727,40 @@ def __init__( | |
| polar_fs_config=polar_fs_config, | ||
| ) | ||
| self._playwright_sync: Optional["BrowserPlaywrightSync"] = None | ||
| self._playwright_thread: Optional[threading.Thread] = None | ||
|
|
||
| def _get_playwright(self, sb: BrowserSandbox) -> "BrowserPlaywrightSync": | ||
| """获取或创建 Playwright 连接 / Get or create Playwright connection | ||
|
|
||
| 复用已有连接以减少连接建立开销和瞬态错误。 | ||
| 使用双重检查锁定避免并发调用时创建多个连接导致资源泄漏。 | ||
| 当创建连接的线程已退出时,自动重建连接(Playwright greenlet 绑定到创建它的线程)。 | ||
|
|
||
| Reuses existing connection to reduce connection overhead and transient errors. | ||
| Uses double-checked locking to avoid leaking connections under concurrent calls. | ||
| Automatically recreates the connection when the thread that created it has exited, | ||
| because Playwright's internal greenlet is bound to the thread that created it. | ||
| """ | ||
| if self._playwright_sync is not None: | ||
| return self._playwright_sync | ||
| if ( | ||
| self._playwright_sync is not None | ||
| and self._playwright_thread is not None | ||
| and not self._playwright_thread.is_alive() | ||
| ): | ||
| logger.debug( | ||
| "Playwright creating thread (id=%s) has exited, recreating" | ||
| " connection", | ||
| self._playwright_thread.ident, | ||
| ) | ||
| self._reset_playwright() | ||
|
|
||
| with self.lock: | ||
| if self._playwright_sync is None: | ||
| playwright_sync = sb.sync_playwright() | ||
| playwright_sync.open() | ||
| self._playwright_sync = playwright_sync | ||
| return self._playwright_sync | ||
| if self._playwright_sync is None: | ||
| with self.lock: | ||
| if self._playwright_sync is None: | ||
| playwright_sync = sb.sync_playwright() | ||
| playwright_sync.open() | ||
| self._playwright_sync = playwright_sync | ||
| self._playwright_thread = threading.current_thread() | ||
| return self._playwright_sync | ||
|
|
||
| def _reset_playwright(self) -> None: | ||
| """重置 Playwright 连接 / Reset Playwright connection | ||
|
|
@@ -763,6 +779,7 @@ def _reset_playwright(self) -> None: | |
| exc_info=True, | ||
| ) | ||
| self._playwright_sync = None | ||
| self._playwright_thread = None | ||
|
|
||
| def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any: | ||
| """在沙箱中执行操作,智能区分错误类型 / Execute in sandbox with smart error handling | ||
|
|
@@ -813,7 +830,16 @@ def _run_in_sandbox(self, callback: Callable[[Sandbox], Any]) -> Any: | |
| ) | ||
| return {"error": f"{e!s}"} | ||
| except Exception as e: | ||
| logger.debug("Unexpected error in browser sandbox: %s", e) | ||
| error_msg = str(e) | ||
| if "cannot switch to" in error_msg: | ||
| logger.debug( | ||
|
||
| "Greenlet thread-binding error, resetting Playwright: %s", | ||
| e, | ||
| ) | ||
| self._reset_playwright() | ||
| self.sandbox = None | ||
| else: | ||
| logger.debug("Unexpected error in browser sandbox: %s", e) | ||
| return {"error": f"{e!s}"} | ||
OhYee marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def _is_infrastructure_error(self, error_msg: str) -> bool: | ||
|
|
@@ -881,7 +907,7 @@ def inner(sb: Sandbox): | |
| def browser_navigate( | ||
| self, | ||
| url: str, | ||
| wait_until: str = "load", | ||
| wait_until: str = "domcontentloaded", | ||
| timeout: Optional[float] = None, | ||
| ) -> Dict[str, Any]: | ||
| """导航到 URL / Navigate to URL""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,7 @@ def toolset(self, mock_sandbox): | |
| with patch.object(BrowserToolSet, "__init__", lambda self: None): | ||
| ts = BrowserToolSet() | ||
| ts._playwright_sync = None | ||
| ts._playwright_thread = None | ||
| ts.sandbox = mock_sandbox | ||
| ts.sandbox_id = "test-sandbox-id" | ||
| ts.lock = MagicMock() | ||
|
|
@@ -218,6 +219,7 @@ def toolset(self, mock_sandbox): | |
| with patch.object(BrowserToolSet, "__init__", lambda self: None): | ||
| ts = BrowserToolSet() | ||
| ts._playwright_sync = None | ||
| ts._playwright_thread = None | ||
| ts.sandbox = mock_sandbox | ||
| ts.sandbox_id = "test-sandbox-id" | ||
| ts.lock = threading.Lock() | ||
|
|
@@ -255,14 +257,22 @@ def test_reset_playwright_handles_close_error(self, toolset, mock_sandbox): | |
| def test_concurrent_get_playwright_creates_only_one_connection( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试并发调用 _get_playwright 只创建一个连接,不会泄漏""" | ||
| barrier = threading.Barrier(5) | ||
| """测试并发调用 _get_playwright 只创建一个连接,不会泄漏 | ||
|
|
||
| 所有工作线程在同一 executor 内并发运行(即创建线程仍存活), | ||
| 应复用同一连接,不会触发重建。 | ||
| """ | ||
| start_barrier = threading.Barrier(5) | ||
| # Keep all threads alive until every thread has obtained playwright, | ||
| # simulating concurrent workers within the same executor context. | ||
| hold_barrier = threading.Barrier(5) | ||
| results: list = [] | ||
|
|
||
| def worker(): | ||
| barrier.wait() | ||
| start_barrier.wait() | ||
| p = toolset._get_playwright(mock_sandbox) | ||
| results.append(p) | ||
| hold_barrier.wait() # stay alive so is_alive() == True for peers | ||
|
||
|
|
||
| threads = [threading.Thread(target=worker) for _ in range(5)] | ||
| for t in threads: | ||
|
|
@@ -289,6 +299,7 @@ def toolset(self, mock_sandbox): | |
| with patch.object(BrowserToolSet, "__init__", lambda self: None): | ||
| ts = BrowserToolSet() | ||
| ts._playwright_sync = MagicMock() | ||
| ts._playwright_thread = threading.current_thread() | ||
| ts.sandbox = mock_sandbox | ||
| ts.sandbox_id = "test-sandbox-id" | ||
| ts.lock = threading.Lock() | ||
|
|
@@ -307,3 +318,173 @@ def test_close_cleans_up_playwright_and_sandbox( | |
| mock_sandbox.stop.assert_called_once() | ||
| assert toolset.sandbox is None | ||
| assert toolset.sandbox_id == "" | ||
|
|
||
|
|
||
| class TestBrowserToolSetThreadAwareness: | ||
| """测试 _get_playwright 的线程感知行为 / Tests for thread-aware Playwright caching""" | ||
|
|
||
| @pytest.fixture | ||
| def mock_sandbox(self): | ||
| """创建模拟的沙箱""" | ||
| sb = MagicMock() | ||
| sb.sync_playwright.return_value = MagicMock() | ||
| return sb | ||
|
|
||
| @pytest.fixture | ||
| def toolset(self, mock_sandbox): | ||
| """创建带有模拟沙箱的 BrowserToolSet 实例""" | ||
| with patch.object(BrowserToolSet, "__init__", lambda self: None): | ||
| ts = BrowserToolSet() | ||
| ts._playwright_sync = None | ||
| ts._playwright_thread = None | ||
| ts.sandbox = mock_sandbox | ||
| ts.sandbox_id = "test-sandbox-id" | ||
| ts.lock = threading.Lock() | ||
| return ts | ||
|
|
||
| def test_get_playwright_records_creating_thread(self, toolset, mock_sandbox): | ||
| """测试 _get_playwright 记录创建连接的线程""" | ||
| toolset._get_playwright(mock_sandbox) | ||
|
|
||
| assert toolset._playwright_thread is threading.current_thread() | ||
|
|
||
| def test_get_playwright_same_thread_reuses_connection( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试同一线程多次调用复用连接""" | ||
| p1 = toolset._get_playwright(mock_sandbox) | ||
| p2 = toolset._get_playwright(mock_sandbox) | ||
|
|
||
| assert p1 is p2 | ||
| mock_sandbox.sync_playwright.assert_called_once() | ||
|
|
||
| def test_get_playwright_dead_thread_recreates_connection( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试创建线程退出后重建 Playwright 连接(Bug 1 修复) | ||
|
|
||
| 模拟 LangGraph ToolNode 的行为:每次工具调用在不同的线程上执行。 | ||
| 当创建连接的工作线程退出后,缓存的 Playwright 实例必须重建, | ||
| 因为 Playwright 内部 greenlet 绑定到创建它的线程。 | ||
| """ | ||
| first_instance: list = [] | ||
| second_instance: list = [] | ||
|
|
||
| def first_call(): | ||
| p = toolset._get_playwright(mock_sandbox) | ||
| first_instance.append(p) | ||
|
|
||
| t1 = threading.Thread(target=first_call) | ||
| t1.start() | ||
| t1.join() | ||
| # t1 has now exited — its greenlet binding is dead | ||
|
|
||
| def second_call(): | ||
| p = toolset._get_playwright(mock_sandbox) | ||
| second_instance.append(p) | ||
|
|
||
| t2 = threading.Thread(target=second_call) | ||
| t2.start() | ||
| t2.join() | ||
|
|
||
| assert len(first_instance) == 1 | ||
| assert len(second_instance) == 1 | ||
| # A new connection must have been created for the second call | ||
| assert mock_sandbox.sync_playwright.call_count == 2 | ||
|
|
||
| def test_get_playwright_live_thread_not_recreated( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试创建线程仍存活时不重建连接(并发安全) | ||
|
|
||
| 即使在不同线程中调用,只要创建线程仍然存活,就应复用同一连接。 | ||
| 这模拟同一 executor 内并发工具调用的场景。 | ||
| """ | ||
| results: list = [] | ||
|
|
||
| # Create connection in main thread first | ||
| toolset._get_playwright(mock_sandbox) | ||
| # The creating thread (main test thread) is still alive | ||
|
|
||
| # Another thread should reuse the same connection | ||
| def worker(): | ||
| p = toolset._get_playwright(mock_sandbox) | ||
| results.append(p) | ||
|
|
||
| t = threading.Thread(target=worker) | ||
| t.start() | ||
| t.join() | ||
|
|
||
| assert len(results) == 1 | ||
| assert results[0] is toolset._playwright_sync | ||
| mock_sandbox.sync_playwright.assert_called_once() | ||
|
|
||
|
||
| def test_reset_playwright_clears_thread(self, toolset, mock_sandbox): | ||
| """测试 _reset_playwright 清理线程引用""" | ||
| toolset._get_playwright(mock_sandbox) | ||
| assert toolset._playwright_thread is not None | ||
|
|
||
| toolset._reset_playwright() | ||
|
|
||
| assert toolset._playwright_thread is None | ||
| assert toolset._playwright_sync is None | ||
|
|
||
|
|
||
| class TestBrowserToolSetGreenletErrorHandling: | ||
| """测试 _run_in_sandbox 对 greenlet 死亡错误的处理(Bug 3 修复)""" | ||
|
|
||
| @pytest.fixture | ||
| def mock_sandbox(self): | ||
| """创建模拟的沙箱""" | ||
| return MagicMock() | ||
|
|
||
| @pytest.fixture | ||
| def toolset(self, mock_sandbox): | ||
| """创建带有模拟沙箱的 BrowserToolSet 实例""" | ||
| with patch.object(BrowserToolSet, "__init__", lambda self: None): | ||
| ts = BrowserToolSet() | ||
| ts._playwright_sync = None | ||
| ts._playwright_thread = None | ||
| ts.sandbox = mock_sandbox | ||
| ts.sandbox_id = "test-sandbox-id" | ||
| ts.lock = MagicMock() | ||
| ts._reset_playwright = MagicMock() | ||
| ts._ensure_sandbox = MagicMock(return_value=mock_sandbox) | ||
| return ts | ||
|
|
||
| def test_greenlet_thread_error_resets_playwright_and_sandbox( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试 greenlet 线程绑定错误触发 Playwright 和沙箱重置 | ||
|
|
||
| 当 'cannot switch to a different thread' 错误发生时, | ||
| 必须重置缓存的 Playwright 实例,避免后续调用持续失败。 | ||
| """ | ||
|
|
||
| def callback(sb): | ||
| raise Exception( | ||
| "cannot switch to a different thread (which happens to have" | ||
| " exited)" | ||
| ) | ||
|
|
||
| result = toolset._run_in_sandbox(callback) | ||
|
|
||
| assert "error" in result | ||
| assert "cannot switch to" in result["error"] | ||
| toolset._reset_playwright.assert_called_once() | ||
| assert toolset.sandbox is None | ||
|
|
||
| def test_non_greenlet_unexpected_error_does_not_reset( | ||
| self, toolset, mock_sandbox | ||
| ): | ||
| """测试普通未知错误不触发 Playwright 重置""" | ||
| original_sandbox = toolset.sandbox | ||
|
|
||
| def callback(sb): | ||
| raise ValueError("Some other unexpected error") | ||
|
|
||
| result = toolset._run_in_sandbox(callback) | ||
|
|
||
| assert "error" in result | ||
| toolset._reset_playwright.assert_not_called() | ||
| assert toolset.sandbox is original_sandbox | ||
Uh oh!
There was an error while loading. Please reload this page.