|
1 | 1 | """Code related to blocklists.""" |
| 2 | +from dataclasses import dataclass |
2 | 3 | import requests |
3 | 4 | import logging |
4 | 5 |
|
5 | 6 | logger = logging.getLogger(__name__) |
6 | 7 |
|
7 | 8 |
|
8 | | -def _parse_block_list_from_url(url: str) -> list[str]: |
9 | | - block_list = requests.get(url).text.strip() |
10 | | - return [username.strip() for username in block_list.split("\n")] |
| 9 | +@dataclass |
| 10 | +class BlocklistData: |
| 11 | + users: list[str] |
| 12 | + etag: str | None |
| 13 | + |
| 14 | + |
| 15 | +def _parse_block_list_from_url(url: str, old_data: BlocklistData) -> BlocklistData: |
| 16 | + headers = {"If-None-Match": old_data.etag} if old_data.etag else {} |
| 17 | + response = requests.get(url, headers=headers) |
| 18 | + |
| 19 | + response.raise_for_status() |
| 20 | + |
| 21 | + if response.status_code == 304: |
| 22 | + return old_data |
| 23 | + |
| 24 | + block_list = [username.strip() for username in response.text.strip().split("\n")] |
| 25 | + |
| 26 | + return BlocklistData(block_list, response.headers.get("ETag")) |
11 | 27 |
|
12 | 28 |
|
13 | 29 | class OnlineBlocklist: |
14 | 30 | """Manage online blocklists.""" |
15 | 31 |
|
16 | 32 | def __init__(self, urls: list[str]) -> None: |
17 | 33 | """Initialize the OnlineBlockList class.""" |
18 | | - self.blocklist: dict[str, list[str]] = {url : [] for url in urls} |
19 | | - self.refresh() |
| 34 | + self.blocklist: dict[str, BlocklistData] = {url : BlocklistData([], None) for url in urls} |
| 35 | + for _ in range(5): |
| 36 | + self.refresh() |
20 | 37 |
|
21 | 38 | def refresh(self) -> None: |
22 | 39 | """Pull updated blocklists from the list of blocklist urls.""" |
23 | 40 | logger.info(f"Refreshing {len(self.blocklist)} online blocklists") |
24 | 41 |
|
25 | | - for url in self.blocklist.keys(): |
| 42 | + for url, data in self.blocklist.items(): |
26 | 43 | try: |
27 | | - self.blocklist[url] = _parse_block_list_from_url(url) |
| 44 | + self.blocklist[url] = _parse_block_list_from_url(url, data) |
28 | 45 | except Exception: |
29 | 46 | logger.warning(f"Failed to refresh online blocklist {url}") |
30 | 47 |
|
31 | 48 | def __contains__(self, item: str) -> bool: |
32 | 49 | """Check if an username is in the blocklist.""" |
33 | | - return any(item in blocklist for blocklist in self.blocklist) |
| 50 | + return any(item in blocklist.users for blocklist in self.blocklist.values()) |
0 commit comments