|
18 | 18 | from collections import namedtuple |
19 | 19 | from collections.abc import Callable, Generator, Iterable, Iterator, Mapping |
20 | 20 | from contextlib import suppress |
| 21 | +from email.message import EmailMessage |
21 | 22 | from email.parser import HeaderParser |
| 23 | +from email.policy import HTTP |
22 | 24 | from email.utils import parsedate |
23 | 25 | from math import ceil |
24 | 26 | from pathlib import Path |
@@ -347,14 +349,40 @@ def parse_mimetype(mimetype: str) -> MimeType: |
347 | 349 | ) |
348 | 350 |
|
349 | 351 |
|
| 352 | +class EnsureOctetStream(EmailMessage): |
| 353 | + def __init__(self) -> None: |
| 354 | + super().__init__() |
| 355 | + # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 |
| 356 | + self.set_default_type("application/octet-stream") |
| 357 | + |
| 358 | + def get_content_type(self) -> Any: |
| 359 | + """Re-implementation from Message |
| 360 | +
|
| 361 | + Returns application/octet-stream in place of plain/text when |
| 362 | + value is wrong. |
| 363 | +
|
| 364 | + The way this class is used guarantees that content-type will |
| 365 | + be present so simplify the checks wrt to the base implementation. |
| 366 | + """ |
| 367 | + value = self.get("content-type", "").lower() |
| 368 | + |
| 369 | + # Based on the implementation of _splitparam in the standard library |
| 370 | + ctype, _, _ = value.partition(";") |
| 371 | + ctype = ctype.strip() |
| 372 | + if ctype.count("/") != 1: |
| 373 | + return self.get_default_type() |
| 374 | + return ctype |
| 375 | + |
| 376 | + |
350 | 377 | @functools.lru_cache(maxsize=56) |
351 | 378 | def parse_content_type(raw: str) -> tuple[str, MappingProxyType[str, str]]: |
352 | 379 | """Parse Content-Type header. |
353 | 380 |
|
354 | 381 | Returns a tuple of the parsed content type and a |
355 | | - MappingProxyType of parameters. |
| 382 | + MappingProxyType of parameters. The default returned value |
| 383 | + is `application/octet-stream` |
356 | 384 | """ |
357 | | - msg = HeaderParser().parsestr(f"Content-Type: {raw}") |
| 385 | + msg = HeaderParser(EnsureOctetStream, policy=HTTP).parsestr(f"Content-Type: {raw}") |
358 | 386 | content_type = msg.get_content_type() |
359 | 387 | params = msg.get_params(()) |
360 | 388 | content_dict = dict(params[1:]) # First element is content type again |
|
0 commit comments