|
20 | 20 | from collections import namedtuple |
21 | 21 | from collections.abc import Callable, Iterable, Iterator, Mapping |
22 | 22 | from contextlib import suppress |
| 23 | +from email.message import EmailMessage |
23 | 24 | from email.parser import HeaderParser |
| 25 | +from email.policy import HTTP |
24 | 26 | from email.utils import parsedate |
25 | 27 | from http.cookies import SimpleCookie |
26 | 28 | from math import ceil |
@@ -356,14 +358,40 @@ def parse_mimetype(mimetype: str) -> MimeType: |
356 | 358 | ) |
357 | 359 |
|
358 | 360 |
|
| 361 | +class EnsureOctetStream(EmailMessage): |
| 362 | + def __init__(self) -> None: |
| 363 | + super().__init__() |
| 364 | + # https://www.rfc-editor.org/rfc/rfc9110#section-8.3-5 |
| 365 | + self.set_default_type("application/octet-stream") |
| 366 | + |
| 367 | + def get_content_type(self) -> Any: |
| 368 | + """Re-implementation from Message |
| 369 | +
|
| 370 | + Returns application/octet-stream in place of plain/text when |
| 371 | + value is wrong. |
| 372 | +
|
| 373 | + The way this class is used guarantees that content-type will |
| 374 | + be present so simplify the checks wrt to the base implementation. |
| 375 | + """ |
| 376 | + value = self.get("content-type", "").lower() |
| 377 | + |
| 378 | + # Based on the implementation of _splitparam in the standard library |
| 379 | + ctype, _, _ = value.partition(";") |
| 380 | + ctype = ctype.strip() |
| 381 | + if ctype.count("/") != 1: |
| 382 | + return self.get_default_type() |
| 383 | + return ctype |
| 384 | + |
| 385 | + |
359 | 386 | @functools.lru_cache(maxsize=56) |
360 | 387 | def parse_content_type(raw: str) -> tuple[str, MappingProxyType[str, str]]: |
361 | 388 | """Parse Content-Type header. |
362 | 389 |
|
363 | 390 | Returns a tuple of the parsed content type and a |
364 | | - MappingProxyType of parameters. |
| 391 | + MappingProxyType of parameters. The default returned value |
| 392 | + is `application/octet-stream` |
365 | 393 | """ |
366 | | - msg = HeaderParser().parsestr(f"Content-Type: {raw}") |
| 394 | + msg = HeaderParser(EnsureOctetStream, policy=HTTP).parsestr(f"Content-Type: {raw}") |
367 | 395 | content_type = msg.get_content_type() |
368 | 396 | params = msg.get_params(()) |
369 | 397 | content_dict = dict(params[1:]) # First element is content type again |
|
0 commit comments