|
1 | | -# Thunder/utils/file_properties.py |
2 | | - |
3 | | -import asyncio |
4 | | -from datetime import datetime as dt |
5 | | -from typing import Any, Optional |
6 | | - |
7 | | -from pyrogram.client import Client |
8 | | -from pyrogram.errors import FloodWait |
9 | | -from pyrogram.file_id import FileId |
10 | | -from pyrogram.types import Message |
11 | | - |
12 | | -from Thunder.server.exceptions import FileNotFound |
13 | | -from Thunder.utils.logger import logger |
14 | | - |
15 | | - |
16 | | -def get_media(message: Message) -> Optional[Any]: |
17 | | - for attr in ("audio", "document", "photo", "sticker", "animation", "video", "voice", "video_note"): |
18 | | - media = getattr(message, attr, None) |
19 | | - if media: |
20 | | - return media |
21 | | - return None |
22 | | - |
23 | | - |
24 | | -def get_uniqid(message: Message) -> Optional[str]: |
25 | | - media = get_media(message) |
26 | | - return getattr(media, 'file_unique_id', None) |
27 | | - |
28 | | - |
29 | | -def get_hash(media_msg: Message) -> str: |
30 | | - uniq_id = get_uniqid(media_msg) |
31 | | - return uniq_id[:6] if uniq_id else '' |
32 | | - |
33 | | - |
34 | | -def get_fsize(message: Message) -> int: |
35 | | - media = get_media(message) |
36 | | - return getattr(media, 'file_size', 0) if media else 0 |
37 | | - |
38 | | - |
39 | | -def parse_fid(message: Message) -> Optional[FileId]: |
40 | | - media = get_media(message) |
41 | | - if media and hasattr(media, 'file_id'): |
42 | | - try: |
43 | | - return FileId.decode(media.file_id) |
44 | | - except Exception: |
45 | | - return None |
46 | | - return None |
47 | | - |
48 | | - |
49 | | -def get_fname(msg: Message) -> str: |
50 | | - media = get_media(msg) |
51 | | - fname = getattr(media, 'file_name', None) if media else None |
52 | | - |
53 | | - if not fname: |
54 | | - ext = "bin" |
55 | | - if media: |
56 | | - media_types = { |
57 | | - "photo": "jpg", |
58 | | - "audio": "mp3", |
59 | | - "voice": "ogg", |
60 | | - "video": "mp4", |
61 | | - "animation": "mp4", |
62 | | - "video_note": "mp4", |
63 | | - "sticker": "webp" |
64 | | - } |
65 | | - |
66 | | - # Check which attribute type the message has |
67 | | - for attr, extension in media_types.items(): |
68 | | - if getattr(msg, attr, None) is not None: |
69 | | - ext = extension |
70 | | - break |
71 | | - |
72 | | - timestamp = dt.now().strftime("%Y%m%d%H%M%S") |
73 | | - fname = f"Thunder File To Link_{timestamp}.{ext}" |
74 | | - |
75 | | - return fname |
76 | | - |
77 | | - |
78 | | -async def get_fids(client: Client, chat_id: int, message_id: int) -> FileId: |
79 | | - try: |
80 | | - try: |
81 | | - msg = await client.get_messages(chat_id, message_id) |
82 | | - except FloodWait as e: |
83 | | - await asyncio.sleep(e.value) |
84 | | - msg = await client.get_messages(chat_id, message_id) |
85 | | - |
86 | | - if not msg or getattr(msg, 'empty', False): |
87 | | - raise FileNotFound("Message not found") |
88 | | - |
89 | | - media = get_media(msg) |
90 | | - if media: |
91 | | - if not hasattr(media, 'file_id') or not hasattr(media, 'file_unique_id'): |
92 | | - raise FileNotFound("Media metadata incomplete") |
93 | | - return FileId.decode(media.file_id) |
94 | | - |
95 | | - raise FileNotFound("No media in message") |
96 | | - |
97 | | - except Exception as e: |
98 | | - logger.error(f"Error in get_fids: {e}", exc_info=True) |
99 | | - raise FileNotFound(str(e)) |
| 1 | +# Thunder/utils/file_properties.py |
| 2 | + |
| 3 | +import asyncio |
| 4 | +from datetime import datetime as dt |
| 5 | +from typing import Any, Optional |
| 6 | + |
| 7 | +from pyrogram.client import Client |
| 8 | +from pyrogram.errors import FloodWait |
| 9 | +from pyrogram.file_id import FileId |
| 10 | +from pyrogram.types import Message |
| 11 | + |
| 12 | +from Thunder.server.exceptions import FileNotFound |
| 13 | +from Thunder.utils.logger import logger |
| 14 | + |
| 15 | + |
| 16 | +def get_media(message: Message) -> Optional[Any]: |
| 17 | + for attr in ("audio", "document", "photo", "sticker", "animation", "video", "voice", "video_note"): |
| 18 | + media = getattr(message, attr, None) |
| 19 | + if media: |
| 20 | + return media |
| 21 | + return None |
| 22 | + |
| 23 | + |
| 24 | +def get_uniqid(message: Message) -> Optional[str]: |
| 25 | + media = get_media(message) |
| 26 | + return getattr(media, 'file_unique_id', None) |
| 27 | + |
| 28 | + |
| 29 | +def get_hash(media_msg: Message) -> str: |
| 30 | + uniq_id = get_uniqid(media_msg) |
| 31 | + return uniq_id[:6] if uniq_id else '' |
| 32 | + |
| 33 | + |
| 34 | +def get_fsize(message: Message) -> int: |
| 35 | + media = get_media(message) |
| 36 | + return getattr(media, 'file_size', 0) if media else 0 |
| 37 | + |
| 38 | + |
| 39 | +def parse_fid(message: Message) -> Optional[FileId]: |
| 40 | + media = get_media(message) |
| 41 | + if media and hasattr(media, 'file_id'): |
| 42 | + try: |
| 43 | + return FileId.decode(media.file_id) |
| 44 | + except Exception: |
| 45 | + return None |
| 46 | + return None |
| 47 | + |
| 48 | + |
| 49 | +def get_fname(msg: Message) -> str: |
| 50 | + media = get_media(msg) |
| 51 | + fname = getattr(media, 'file_name', None) if media else None |
| 52 | + |
| 53 | + if not fname: |
| 54 | + ext = "bin" |
| 55 | + if media: |
| 56 | + media_types = { |
| 57 | + "photo": "jpg", |
| 58 | + "audio": "mp3", |
| 59 | + "voice": "ogg", |
| 60 | + "video": "mp4", |
| 61 | + "animation": "mp4", |
| 62 | + "video_note": "mp4", |
| 63 | + "sticker": "webp" |
| 64 | + } |
| 65 | + |
| 66 | + # Check which attribute type the message has |
| 67 | + for attr, extension in media_types.items(): |
| 68 | + if getattr(msg, attr, None) is not None: |
| 69 | + ext = extension |
| 70 | + break |
| 71 | + |
| 72 | + timestamp = dt.now().strftime("%Y%m%d%H%M%S") |
| 73 | + fname = f"Thunder File To Link_{timestamp}.{ext}" |
| 74 | + |
| 75 | + return fname |
| 76 | + |
| 77 | + |
| 78 | +async def get_fids(client: Client, chat_id: int, message_id: int) -> FileId: |
| 79 | + try: |
| 80 | + try: |
| 81 | + msg = await client.get_messages(chat_id, message_id) |
| 82 | + except FloodWait as e: |
| 83 | + await asyncio.sleep(e.value) |
| 84 | + msg = await client.get_messages(chat_id, message_id) |
| 85 | + |
| 86 | + if not msg or getattr(msg, 'empty', False): |
| 87 | + raise FileNotFound("Message not found") |
| 88 | + |
| 89 | + media = get_media(msg) |
| 90 | + if media: |
| 91 | + if not hasattr(media, 'file_id') or not hasattr(media, 'file_unique_id'): |
| 92 | + raise FileNotFound("Media metadata incomplete") |
| 93 | + return FileId.decode(media.file_id) |
| 94 | + |
| 95 | + raise FileNotFound("No media in message") |
| 96 | + |
| 97 | + except Exception as e: |
| 98 | + logger.error(f"Error in get_fids: {e}", exc_info=True) |
| 99 | + raise FileNotFound(str(e)) |
0 commit comments