|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from pathlib import Path |
4 | 3 | from typing import TYPE_CHECKING, Optional |
5 | 4 |
|
6 | | -from hcaptcha_challenger.agents import AgentT |
| 5 | +from hcaptcha_challenger.agent import AgentV, AgentConfig |
| 6 | +from hcaptcha_challenger.models import ChallengeSignal |
7 | 7 |
|
8 | 8 | if TYPE_CHECKING: |
9 | 9 | from botright.extended_typing import BrowserContext, Page |
10 | 10 |
|
11 | | -tmp_dir = Path(__file__).parent.joinpath("tmp_dir") |
12 | | - |
13 | 11 |
|
14 | 12 | class hCaptcha: |
15 | 13 | def __init__(self, browser: BrowserContext, page: Page) -> None: |
16 | | - """ |
17 | | - Initialize an hCaptcha solver. |
18 | | -
|
19 | | - Args: |
20 | | - browser (BrowserContext): The Playwright browser context to use. |
21 | | - page (Page): The Playwright page where hCaptcha challenges will be solved. |
22 | | - """ |
23 | 14 | self.browser = browser |
24 | 15 | self.page = page |
25 | | - |
26 | | - self.retry_times = 8 |
27 | | - self.hcaptcha_agent = AgentT.from_page(page=page, tmp_dir=tmp_dir, self_supervised=True) |
| 16 | + self.hcaptcha_agent = AgentV(page=page, agent_config=AgentConfig()) |
28 | 17 |
|
29 | 18 | async def mock_captcha(self, rq_data: str) -> None: |
30 | | - """ |
31 | | - Mock hCaptcha requests by intercepting network requests to getcaptcha. |
32 | | -
|
33 | | - Args: |
34 | | - rq_data (str): The data required for mocking the hCaptcha request. |
35 | | -
|
36 | | - This method mocks the hCaptcha request and captures the generated hCaptcha token. |
37 | | - """ |
38 | | - |
39 | 19 | async def mock_json(route, request): |
40 | | - |
41 | | - payload = {**request.post_data_json, "rqdata": rq_data, "hl": "en"} if rq_data else request.post_data_json |
42 | | - response = await self.page.request.post(request.url, form=payload, headers=request.headers) |
| 20 | + payload = ( |
| 21 | + {**request.post_data_json, "rqdata": rq_data, "hl": "en"} |
| 22 | + if rq_data |
| 23 | + else request.post_data_json |
| 24 | + ) |
| 25 | + response = await self.page.request.post( |
| 26 | + request.url, form=payload, headers=request.headers |
| 27 | + ) |
43 | 28 | await route.fulfill(response=response) |
44 | 29 |
|
45 | 30 | await self.page.route("https://hcaptcha.com/getcaptcha/**", mock_json) |
46 | 31 |
|
47 | 32 | async def solve_hcaptcha(self, rq_data: Optional[str] = None) -> Optional[str]: |
48 | | - """ |
49 | | - Solve an hCaptcha challenge. |
50 | | -
|
51 | | - Args: |
52 | | - rq_data (Optional[str]): Additional data required for solving the hCaptcha challenge. |
53 | | -
|
54 | | - Returns: |
55 | | - Optional[str]: The hCaptcha token if successfully solved; otherwise, None. |
56 | | -
|
57 | | - This method captures the hCaptcha token by logging and mocking hCaptcha requests, then simulates clicking the |
58 | | - hCaptcha checkbox to solve the challenge. |
59 | | - """ |
60 | | - # Mocking Captcha Request |
61 | 33 | if rq_data: |
62 | 34 | await self.mock_captcha(rq_data) |
63 | | - # Clicking Captcha Checkbox |
64 | | - await self.hcaptcha_agent.handle_checkbox() |
65 | | - |
66 | | - for pth in range(1, self.retry_times): |
67 | | - result = await self.hcaptcha_agent.execute() |
68 | | - if result == self.hcaptcha_agent.status.CHALLENGE_BACKCALL: |
69 | | - await self.page.wait_for_timeout(500) |
70 | | - fl = self.page.frame_locator(self.hcaptcha_agent.HOOK_CHALLENGE) |
71 | | - await fl.locator("//div[@class='refresh button']").click() |
72 | | - elif result == self.hcaptcha_agent.status.CHALLENGE_SUCCESS: |
73 | | - if self.hcaptcha_agent.cr: |
74 | | - captcha_token: str = self.hcaptcha_agent.cr.generated_pass_UUID |
75 | | - return captcha_token |
76 | | - |
77 | | - return f"Exceeded maximum retry times of {self.retry_times}" |
78 | | - |
79 | | - async def get_hcaptcha(self, site_key: Optional[str] = "00000000-0000-0000-0000-000000000000", rq_data: Optional[str] = None) -> Optional[str]: |
80 | | - """ |
81 | | - Get an hCaptcha token for a specific site. |
82 | | -
|
83 | | - Args: |
84 | | - site_key (Optional[str]): The site key for the hCaptcha challenge (default is a demo site key). |
85 | | - rq_data (Optional[str]): Additional data required for solving the hCaptcha challenge. |
86 | | -
|
87 | | - Returns: |
88 | | - Optional[str]: The hCaptcha token if successfully obtained; otherwise, None. |
89 | | -
|
90 | | - This method opens a new page, navigates to a specified hCaptcha demo page with the given site key, and |
91 | | - solves the hCaptcha challenge to obtain the token. |
92 | | - """ |
| 35 | + await self.hcaptcha_agent.robotic_arm.click_checkbox() |
| 36 | + result = await self.hcaptcha_agent.wait_for_challenge() |
| 37 | + if result == ChallengeSignal.SUCCESS and self.hcaptcha_agent.cr_list: |
| 38 | + return self.hcaptcha_agent.cr_list[-1].generated_pass_UUID |
| 39 | + return None |
| 40 | + |
| 41 | + async def get_hcaptcha( |
| 42 | + self, |
| 43 | + site_key: str = "00000000-0000-0000-0000-000000000000", |
| 44 | + rq_data: Optional[str] = None, |
| 45 | + ) -> Optional[str]: |
93 | 46 | page = await self.browser.new_page() |
94 | 47 | await page.goto(f"https://accounts.hcaptcha.com/demo?sitekey={site_key}") |
95 | 48 | return await page.solve_hcaptcha(rq_data=rq_data) |
0 commit comments