|
1 | | -from typing import Union |
| 1 | +import webp |
| 2 | +from PIL import Image |
2 | 3 |
|
3 | | -import requests |
4 | | -import time |
5 | | -import magic |
6 | | -from django.core.files.uploadedfile import TemporaryUploadedFile, InMemoryUploadedFile |
7 | 4 |
|
8 | | -from files.exceptions import SelectelUploadError |
9 | | -from files.typings import UserFileInfo |
10 | | - |
11 | | -from procollab.settings import ( |
12 | | - DEBUG, |
13 | | - SELECTEL_ACCOUNT_ID, |
14 | | - SELECTEL_CONTAINER_NAME, |
15 | | - SELECTEL_CONTAINER_PASSWORD, |
16 | | - SELECTEL_CONTAINER_USERNAME, |
17 | | -) |
18 | | - |
19 | | - |
20 | | -class FileAPI: |
21 | | - # fixme: looks terrible |
22 | | - def __init__( |
23 | | - self, file: Union[TemporaryUploadedFile, InMemoryUploadedFile], user |
24 | | - ) -> None: |
25 | | - self.file = file # it's TemporaryUploadedFile, and it will be |
26 | | - # removed after first .close() call, so we must read this file only once |
27 | | - self.user = user |
28 | | - self.file_object = self.file.open(mode="rb") |
29 | | - |
30 | | - @staticmethod |
31 | | - def delete(url: str) -> int: |
32 | | - """Deletes file from selcdn""" |
33 | | - token = FileAPI._get_selectel_swift_token() |
34 | | - response = requests.delete(url, headers={"X-Auth-Token": token}) |
35 | | - return response.status_code |
36 | | - |
37 | | - def upload(self) -> tuple[str, UserFileInfo]: |
38 | | - url = self._upload_via_selectel_swift() |
39 | | - info = self.get_file_info(self.file) |
40 | | - self.file_object.close() |
41 | | - return url, info |
42 | | - |
43 | | - def get_file_info( |
44 | | - self, file: Union[TemporaryUploadedFile, InMemoryUploadedFile] |
45 | | - ) -> UserFileInfo: |
46 | | - name, ext = file.name.split(".") |
47 | | - return UserFileInfo( |
48 | | - size=file.size, name=name, extension=ext, mime_type=self.get_file_mime_type() |
49 | | - ) |
50 | | - |
51 | | - def get_file_mime_type(self): |
52 | | - if isinstance(self.file, InMemoryUploadedFile): |
53 | | - return magic.from_buffer(self.file_object.read(), mime=True) |
54 | | - else: |
55 | | - return magic.from_file(self.file.temporary_file_path(), mime=True) |
56 | | - |
57 | | - def _upload_via_selectel_swift(self) -> str: |
58 | | - token = self._get_selectel_swift_token() |
59 | | - url = self._generate_selectel_swift_file_url() |
60 | | - |
61 | | - requests.put( |
62 | | - url, |
63 | | - headers={ |
64 | | - "X-Auth-Token": token, |
65 | | - "Content-Type": self.file_object.content_type, |
66 | | - }, |
67 | | - data=self.file_object.read(), |
68 | | - ) |
69 | | - |
70 | | - return url |
71 | | - |
72 | | - def _generate_selectel_swift_link(sefl): |
73 | | - link = f"https://api.selcdn.ru/v1/SEL_{SELECTEL_ACCOUNT_ID}/{SELECTEL_CONTAINER_NAME}/" |
74 | | - if DEBUG: |
75 | | - link += "debug/" |
76 | | - return link |
77 | | - |
78 | | - @staticmethod |
79 | | - def _get_selectel_swift_token(): |
80 | | - """Returns auth token for selcdn""" |
81 | | - data = { |
82 | | - "auth": { |
83 | | - "identity": { |
84 | | - "methods": ["password"], |
85 | | - "password": { |
86 | | - "user": { |
87 | | - "id": SELECTEL_CONTAINER_USERNAME, |
88 | | - "password": SELECTEL_CONTAINER_PASSWORD, |
89 | | - } |
90 | | - }, |
91 | | - } |
92 | | - } |
93 | | - } |
94 | | - response = requests.post("https://api.selcdn.ru/v3/auth/tokens", json=data) |
95 | | - if response.status_code not in [200, 201]: |
96 | | - raise SelectelUploadError("Couldn't generate a token for selcdn") |
97 | | - return response.headers["x-subject-token"] |
98 | | - |
99 | | - def _get_file_extension(self) -> str: |
100 | | - if len(self.file.name.split(".")) > 1: |
101 | | - return "." + self.file.name.split(".")[1] |
102 | | - return "" |
103 | | - |
104 | | - def _generate_selectel_swift_file_url(self) -> str: |
105 | | - """ |
106 | | - Generates url for selcdn |
107 | | - Returns: |
108 | | - url: str looks like /hashedEmail/hashedFilename_hashedTime.extension |
109 | | - """ |
110 | | - link = self._generate_selectel_swift_link() |
111 | | - extension = self._get_file_extension() |
112 | | - return ( |
113 | | - link |
114 | | - + f"{abs(hash(self.user.email))}/{abs(hash(self.file.name))}_{abs(hash(time.time()))}{extension}" |
115 | | - ) |
| 5 | +def convert_image_to_webp(image, quality: int = 70): |
| 6 | + config = webp.WebPConfig.new(preset=webp.WebPPreset.PHOTO, quality=quality) |
| 7 | + pil_image = Image.open(image.file) |
| 8 | + webp_image = webp.WebPPicture.from_pil(pil_image) |
| 9 | + return webp_image.encode(config) |
0 commit comments